package pathway import ( "io" "net/http" "sync/atomic" ) // handleGet simply attempts to relay whatever data is received from the first // available POST request into this GET request on the same path. // if there is no POST waiting, it will block indefinitely until it either finds // a POST or the connection is cancelled. func (pw *Pathway) handleGet(pathID string, w http.ResponseWriter, r *http.Request) { // GET creates a new path preemptively queue := &transferQueue{ch: make(chan httpTransfer)} if p, loaded := pw.LoadOrStore(pathID, queue); loaded { // GET successfully loads a path instead queue = p.(*transferQueue) } atomic.AddInt32(&queue.gets, 1) select { // either a POST presents itself on the other side of the queue's channel case transfer := <-queue.ch: if transfer.contentlength != "" { w.Header().Set("Content-Length", transfer.contentlength) } _, err := io.Copy(w, transfer.reader) if err != nil { transfer.reader.Close() } close(transfer.done) // or the request is cancelled, and this GET can continue to finish case <-r.Context().Done(): } // if this was indeed the last GET and there are no more new POST requests // waiting, then go ahead and close the queue and remove the path if atomic.AddInt32(&queue.gets, -1) <= 0 { if atomic.LoadInt32(&queue.posts) <= 0 { pw.Delete(pathID) close(queue.ch) } } }