46 lines
1.4 KiB
Go
46 lines
1.4 KiB
Go
package pathway
|
|
|
|
import (
|
|
"io"
|
|
"net/http"
|
|
"sync/atomic"
|
|
)
|
|
|
|
// handleGet simply attempts to relay whatever data is received from the first
|
|
// available POST request into this GET request on the same path.
|
|
// if there is no POST waiting, it will block indefinitely until it either finds
|
|
// a POST or the connection is cancelled.
|
|
func (pw *Pathway) handleGet(pathID string, w http.ResponseWriter, r *http.Request) {
|
|
// GET creates a new path preemptively
|
|
queue := &transferQueue{ch: make(chan httpTransfer)}
|
|
if p, loaded := pw.LoadOrStore(pathID, queue); loaded {
|
|
// GET successfully loads a path instead
|
|
queue = p.(*transferQueue)
|
|
}
|
|
atomic.AddInt32(&queue.gets, 1)
|
|
|
|
select {
|
|
// either a POST presents itself on the other side of the queue's channel
|
|
case transfer := <-queue.ch:
|
|
if transfer.contentlength != "" {
|
|
w.Header().Set("Content-Length", transfer.contentlength)
|
|
}
|
|
_, err := io.Copy(w, transfer.reader)
|
|
if err != nil {
|
|
transfer.reader.Close()
|
|
}
|
|
close(transfer.done)
|
|
// or the request is cancelled, and this GET can continue to finish
|
|
case <-r.Context().Done():
|
|
}
|
|
|
|
// if this was indeed the last GET and there are no more new POST requests
|
|
// waiting, then go ahead and close the queue and remove the path
|
|
if atomic.AddInt32(&queue.gets, -1) <= 0 {
|
|
if atomic.LoadInt32(&queue.posts) <= 0 {
|
|
pw.Delete(pathID)
|
|
close(queue.ch)
|
|
}
|
|
}
|
|
}
|