package main import ( "fmt" "io" "log" "net/http" "strconv" "sync" ) var ( version = "0.0-undefined" buildtime = "0000-00-00T00:00:00+0000" queues = make(map[string]*Queue) mut = &sync.Mutex{} ) type Transfer struct { reader io.Reader size int } type Queue struct { ch chan Transfer producers int consumers int } func NewQueue() *Queue { return &Queue{ ch: make(chan Transfer), } } func (q *Queue) addConsumer() { q.consumers = q.consumers + 1 } func (q *Queue) remConsumer() { q.consumers = q.consumers - 1 } func (q *Queue) addProducer() { q.producers = q.producers + 1 } func (q *Queue) remProducer() { q.producers = q.producers - 1 } func (q *Queue) isEmpty() bool { return q.producers == 0 && q.consumers == 0 } func main() { log.Printf("pathway version:%s buildtime:%s", version, buildtime) handle := func(w http.ResponseWriter, r *http.Request) { channelId := r.URL.Path if r.Method == "GET" { log.Printf("GET: %s", channelId) mut.Lock() queue, exists := queues[channelId] if !exists { queue = NewQueue() queues[channelId] = queue } ch := queue.ch queue.addConsumer() mut.Unlock() select { case transfer := <-ch: w.Header().Set("Content-Length", fmt.Sprintf("%d", transfer.size)) _, err := io.Copy(w, transfer.reader) if err != nil { if closer, ok := transfer.reader.(io.Closer); ok { closer.Close() } } case <-r.Context().Done(): } mut.Lock() queue.remConsumer() if queue.isEmpty() { delete(queues, channelId) } mut.Unlock() } else { log.Printf("POST: %s", channelId) mut.Lock() queue, exists := queues[channelId] if !exists { queue = NewQueue() queues[channelId] = queue } ch := queue.ch queue.addProducer() mut.Unlock() reader, writer := io.Pipe() contentLength, err := strconv.Atoi(r.Header.Get("Content-Length")) if err != nil { contentLength = 0 } transfer := Transfer{ reader: reader, size: contentLength, } select { case ch <- transfer: io.Copy(writer, r.Body) case <-r.Context().Done(): } writer.Close() mut.Lock() queue.remProducer() if queue.isEmpty() { delete(queues, channelId) } mut.Unlock() } } health := func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("OK")) } http.HandleFunc("/health", health) http.HandleFunc("/", handle) err := http.ListenAndServe(":8080", nil) if err != nil { log.Println(err) } }