package pathway import ( "io" "net/http" "sync/atomic" ) func handlePubSub(pathID string, r *http.Request) { var queue *Queue if p, loaded := paths.Load(pathID); loaded { queue = p.(*Queue) debug("%s [PUBSUB] Loads path", pathID) } else { debug("%s [PUBSUB] No subs for path", pathID) return } if atomic.AddInt32(&queue.posts, 1) > 1 { debug("%s [PUBSUB] Path already has posts", pathID) atomic.AddInt32(&queue.posts, -1) return } var masterWriter io.WriteCloser var done bool for !done { transfer, writer := NewTransfer(r.Header.Get("Content-Length")) select { case queue.ch <- transfer: debug("%s [PUBSUB] Adds sub to pub", pathID) if masterWriter == nil { masterWriter = BroadcastWriter(writer) } else { masterWriter = BroadcastWriter(masterWriter, writer) } default: debug("%s [PUBSUB] Saturated path", pathID) done = true } } n, err := io.Copy(masterWriter, r.Body) debug("%s [PUBSUB] Sends %d bytes", pathID, n) if err != nil { debug("%s [PUBSUB] Has error: %s", pathID, err.Error()) } masterWriter.Close() r.Body.Close() if atomic.AddInt32(&queue.posts, -1) <= 0 { if atomic.LoadInt32(&queue.gets) <= 0 { paths.Delete(pathID) close(queue.ch) debug("%s [PUBSUB] Removes path", pathID) } } info("%s [PUBSUB] Finishes", pathID) } func handlePost(pathID string, r *http.Request) { queue := &Queue{ch: make(chan Transfer)} if p, loaded := paths.LoadOrStore(pathID, queue); loaded { queue = p.(*Queue) debug("%s [POST] Loads path", pathID) } else { debug("%s [POST] Creates path", pathID) } atomic.AddInt32(&queue.posts, 1) transfer, writer := NewTransfer(r.Header.Get("Content-Length")) go func() { n, err := io.Copy(writer, r.Body) debug("%s [POST] Sends %d bytes", pathID, n) if err != nil { debug("%s [POST] Has error: %s", pathID, err.Error()) } writer.Close() r.Body.Close() }() select { case queue.ch <- transfer: debug("%s [POST] Writes to path", pathID) case <-r.Context().Done(): debug("%s [POST] Cancels path", pathID) close(transfer.done) } debug("%s [POST] Waits for done", pathID) <-transfer.done if atomic.AddInt32(&queue.posts, -1) <= 0 { if atomic.LoadInt32(&queue.gets) <= 0 { paths.Delete(pathID) close(queue.ch) debug("%s [POST] Removes path", pathID) } } info("%s [POST] Finishes", pathID) }