122 lines
4.0 KiB
Go
122 lines
4.0 KiB
Go
package pathway
|
|
|
|
import (
|
|
"io"
|
|
"net/http"
|
|
"sync/atomic"
|
|
)
|
|
|
|
// handlePubSub regards a POST as PUB and all GETs as SUBs. the content from the
|
|
// PUB will be 'broadcasted' to all SUBs waiting for content.
|
|
// a PUB does not guarantee any sort of delivery. trying to send to a path
|
|
// which already has other regular POSTs waiting, or one without any SUBs will
|
|
// still return as successful.
|
|
func (pw *Pathway) handlePubSub(pathID string, r *http.Request) {
|
|
// PUBSUBs do not create a new paths preemptively. without an existing path
|
|
// which exclusively has SUBs waiting, a PUB would not make sense.
|
|
var queue *transferQueue
|
|
if p, loaded := pw.Load(pathID); loaded {
|
|
// PUB successfully loads a SUB queue.
|
|
queue = p.(*transferQueue)
|
|
} else {
|
|
// no loaded queue with SUBs, nothing to PUB.
|
|
return
|
|
}
|
|
|
|
if atomic.AddInt32(&queue.posts, 1) > 1 {
|
|
// not a good idea to publish if there are posts waiting or in progress.
|
|
// since the other POST might have finished already, it is still neccesary
|
|
// to check if all POSTs and GETs are finished in order to delete the
|
|
// path and close the queue's channel.
|
|
if atomic.AddInt32(&queue.posts, -1) <= 0 {
|
|
if atomic.LoadInt32(&queue.gets) <= 0 {
|
|
pw.Delete(pathID)
|
|
close(queue.ch)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
var masterWriter io.WriteCloser
|
|
var saturated bool
|
|
for !saturated {
|
|
// create a transfer and writer for any more waiting SUB requests.
|
|
transfer, writer := newTransfer(r.Header.Get("Content-Length"))
|
|
select {
|
|
// try to put the transfer in a queue channel and add the writer
|
|
// to it to the masterWriter.
|
|
case queue.ch <- transfer:
|
|
if masterWriter == nil {
|
|
masterWriter = BroadcastWriter(writer)
|
|
} else {
|
|
masterWriter = BroadcastWriter(masterWriter, writer)
|
|
}
|
|
default:
|
|
// if no more SUB requests are waiting, then getting a channel from
|
|
// the queue won't work. this PUB is saturated.
|
|
saturated = true
|
|
}
|
|
}
|
|
|
|
// copy all incoming data as efficiently as possible over the broadcastWriter.
|
|
io.Copy(masterWriter, r.Body)
|
|
masterWriter.Close()
|
|
r.Body.Close()
|
|
|
|
// if this was indeed the last POST and there are no more new GET requests
|
|
// waiting, then go ahead and close the queue and remove the path.
|
|
if atomic.AddInt32(&queue.posts, -1) <= 0 {
|
|
if atomic.LoadInt32(&queue.gets) <= 0 {
|
|
pw.Delete(pathID)
|
|
close(queue.ch)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handlePost simply attempts to receive whatever data is sent by the POST
|
|
// request and relay it to the first available GET request on the same path.
|
|
// if there is no GET waiting, it will block indefinitely until it either finds
|
|
// a GET or the connection is cancelled.
|
|
func (pw *Pathway) handlePost(pathID string, r *http.Request) {
|
|
// POST creates a new path preemptively.
|
|
queue := &transferQueue{ch: make(chan httpTransfer)}
|
|
if p, loaded := pw.LoadOrStore(pathID, queue); loaded {
|
|
// POST successfully loads a path instead.
|
|
queue = p.(*transferQueue)
|
|
}
|
|
atomic.AddInt32(&queue.posts, 1)
|
|
|
|
// create a transfer and writer for the next or waiting GET request.
|
|
transfer, writer := newTransfer(r.Header.Get("Content-Length"))
|
|
|
|
// start a go routing to start reading from the body. else a cancelled
|
|
// POST is never correctly detected and this hangs forever.
|
|
go func() {
|
|
// copy all incoming data efficiently once a GET picks up the transfer.
|
|
io.Copy(writer, r.Body)
|
|
writer.Close()
|
|
r.Body.Close()
|
|
}()
|
|
|
|
select {
|
|
// either a GET presents itself on the other side of the queue's channel.
|
|
case queue.ch <- transfer:
|
|
// or the request is cancelled, and this POST needs to close the transfer
|
|
// lest it be waiting for the done channel to close forever.
|
|
case <-r.Context().Done():
|
|
close(transfer.done)
|
|
}
|
|
|
|
// wait here for the GET to signal it is done reading everything from the POST
|
|
<-transfer.done
|
|
|
|
// if this was indeed the last POST and there are no more new GET requests
|
|
// waiting, then go ahead and close the queue and remove the path.
|
|
if atomic.AddInt32(&queue.posts, -1) <= 0 {
|
|
if atomic.LoadInt32(&queue.gets) <= 0 {
|
|
pw.Delete(pathID)
|
|
close(queue.ch)
|
|
}
|
|
}
|
|
}
|