Private
Public Access
0
0
Files
pathway/post.go
2021-10-24 12:32:32 +02:00

122 lines
4.0 KiB
Go

package pathway
import (
"io"
"net/http"
"sync/atomic"
)
// handlePubSub regards a POST as PUB and all GETs as SUBs. the content from the
// PUB will be 'broadcasted' to all SUBs waiting for content.
// a PUB does not guarantee any sort of delivery. trying to send to a path
// which already has other regular POSTs waiting, or one without any SUBs will
// still return as successful.
func (pw *Pathway) handlePubSub(pathID string, r *http.Request) {
// PUBSUBs do not create a new paths preemptively. without an existing path
// which exclusively has SUBs waiting, a PUB would not make sense.
var queue *transferQueue
if p, loaded := pw.Load(pathID); loaded {
// PUB successfully loads a SUB queue.
queue = p.(*transferQueue)
} else {
// no loaded queue with SUBs, nothing to PUB.
return
}
if atomic.AddInt32(&queue.posts, 1) > 1 {
// not a good idea to publish if there are posts waiting or in progress.
// since the other POST might have finished already, it is still neccesary
// to check if all POSTs and GETs are finished in order to delete the
// path and close the queue's channel.
if atomic.AddInt32(&queue.posts, -1) <= 0 {
if atomic.LoadInt32(&queue.gets) <= 0 {
pw.Delete(pathID)
close(queue.ch)
}
}
return
}
var masterWriter io.WriteCloser
var saturated bool
for !saturated {
// create a transfer and writer for any more waiting SUB requests.
transfer, writer := newTransfer(r.Header.Get("Content-Length"))
select {
// try to put the transfer in a queue channel and add the writer
// to it to the masterWriter.
case queue.ch <- transfer:
if masterWriter == nil {
masterWriter = BroadcastWriter(writer)
} else {
masterWriter = BroadcastWriter(masterWriter, writer)
}
default:
// if no more SUB requests are waiting, then getting a channel from
// the queue won't work. this PUB is saturated.
saturated = true
}
}
// copy all incoming data as efficiently as possible over the broadcastWriter.
io.Copy(masterWriter, r.Body)
masterWriter.Close()
r.Body.Close()
// if this was indeed the last POST and there are no more new GET requests
// waiting, then go ahead and close the queue and remove the path.
if atomic.AddInt32(&queue.posts, -1) <= 0 {
if atomic.LoadInt32(&queue.gets) <= 0 {
pw.Delete(pathID)
close(queue.ch)
}
}
}
// handlePost simply attempts to receive whatever data is sent by the POST
// request and relay it to the first available GET request on the same path.
// if there is no GET waiting, it will block indefinitely until it either finds
// a GET or the connection is cancelled.
func (pw *Pathway) handlePost(pathID string, r *http.Request) {
// POST creates a new path preemptively.
queue := &transferQueue{ch: make(chan httpTransfer)}
if p, loaded := pw.LoadOrStore(pathID, queue); loaded {
// POST successfully loads a path instead.
queue = p.(*transferQueue)
}
atomic.AddInt32(&queue.posts, 1)
// create a transfer and writer for the next or waiting GET request.
transfer, writer := newTransfer(r.Header.Get("Content-Length"))
// start a go routing to start reading from the body. else a cancelled
// POST is never correctly detected and this hangs forever.
go func() {
// copy all incoming data efficiently once a GET picks up the transfer.
io.Copy(writer, r.Body)
writer.Close()
r.Body.Close()
}()
select {
// either a GET presents itself on the other side of the queue's channel.
case queue.ch <- transfer:
// or the request is cancelled, and this POST needs to close the transfer
// lest it be waiting for the done channel to close forever.
case <-r.Context().Done():
close(transfer.done)
}
// wait here for the GET to signal it is done reading everything from the POST
<-transfer.done
// if this was indeed the last POST and there are no more new GET requests
// waiting, then go ahead and close the queue and remove the path.
if atomic.AddInt32(&queue.posts, -1) <= 0 {
if atomic.LoadInt32(&queue.gets) <= 0 {
pw.Delete(pathID)
close(queue.ch)
}
}
}