123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- package pathway
- import (
- "io"
- "net/http"
- "sync/atomic"
- )
- // handleGet simply attempts to relay whatever data is received from the first
- // available POST request into this GET request on the same path.
- // if there is no POST waiting, it will block indefinitely until it either finds
- // a POST or the connection is cancelled.
- func (pw *Pathway) handleGet(pathID string, w http.ResponseWriter, r *http.Request) {
- // GET creates a new path preemptively
- queue := &transferQueue{ch: make(chan httpTransfer)}
- if p, loaded := pw.LoadOrStore(pathID, queue); loaded {
- // GET successfully loads a path instead
- queue = p.(*transferQueue)
- }
- atomic.AddInt32(&queue.gets, 1)
- select {
- // either a POST presents itself on the other side of the queue's channel
- case transfer := <-queue.ch:
- if transfer.contentlength != "" {
- w.Header().Set("Content-Length", transfer.contentlength)
- }
- _, err := io.Copy(w, transfer.reader)
- if err != nil {
- transfer.reader.Close()
- }
- close(transfer.done)
- // or the request is cancelled, and this GET can continue to finish
- case <-r.Context().Done():
- }
- // if this was indeed the last GET and there are no more new POST requests
- // waiting, then go ahead and close the queue and remove the path
- if atomic.AddInt32(&queue.gets, -1) <= 0 {
- if atomic.LoadInt32(&queue.posts) <= 0 {
- pw.Delete(pathID)
- close(queue.ch)
- }
- }
- }
|