package pathway import ( "io" "net/http" "sync/atomic" ) // handlePubSub regards a POST as PUB and all GETs as SUBs. the content from the // PUB will be 'broadcasted' to all SUBs waiting for content. // a PUB does not guarantee any sort of delivery. trying to send to a path // which already has other regular POSTs waiting, or one without any SUBs will // still return as successful. func (pw *Pathway) handlePubSub(pathID string, r *http.Request) { // PUBSUBs do not create a new paths preemptively. without an existing path // which exclusively has SUBs waiting, a PUB would not make sense. var queue *transferQueue if p, loaded := pw.Load(pathID); loaded { // PUB successfully loads a SUB queue. queue = p.(*transferQueue) } else { // no loaded queue with SUBs, nothing to PUB. return } if atomic.AddInt32(&queue.posts, 1) > 1 { // not a good idea to publish if there are posts waiting or in progress. // since the other POST might have finished already, it is still neccesary // to check if all POSTs and GETs are finished in order to delete the // path and close the queue's channel. if atomic.AddInt32(&queue.posts, -1) <= 0 { if atomic.LoadInt32(&queue.gets) <= 0 { pw.Delete(pathID) close(queue.ch) } } return } var masterWriter io.WriteCloser var saturated bool for !saturated { // create a transfer and writer for any more waiting SUB requests. transfer, writer := newTransfer(r.Header.Get("Content-Length")) select { // try to put the transfer in a queue channel and add the writer // to it to the masterWriter. case queue.ch <- transfer: if masterWriter == nil { masterWriter = BroadcastWriter(writer) } else { masterWriter = BroadcastWriter(masterWriter, writer) } default: // if no more SUB requests are waiting, then getting a channel from // the queue won't work. this PUB is saturated. saturated = true } } // copy all incoming data as efficiently as possible over the broadcastWriter. io.Copy(masterWriter, r.Body) masterWriter.Close() r.Body.Close() // if this was indeed the last POST and there are no more new GET requests // waiting, then go ahead and close the queue and remove the path. if atomic.AddInt32(&queue.posts, -1) <= 0 { if atomic.LoadInt32(&queue.gets) <= 0 { pw.Delete(pathID) close(queue.ch) } } } // handlePost simply attempts to receive whatever data is sent by the POST // request and relay it to the first available GET request on the same path. // if there is no GET waiting, it will block indefinitely until it either finds // a GET or the connection is cancelled. func (pw *Pathway) handlePost(pathID string, r *http.Request) { // POST creates a new path preemptively. queue := &transferQueue{ch: make(chan httpTransfer)} if p, loaded := pw.LoadOrStore(pathID, queue); loaded { // POST successfully loads a path instead. queue = p.(*transferQueue) } atomic.AddInt32(&queue.posts, 1) // create a transfer and writer for the next or waiting GET request. transfer, writer := newTransfer(r.Header.Get("Content-Length")) // start a go routing to start reading from the body. else a cancelled // POST is never correctly detected and this hangs forever. go func() { // copy all incoming data efficiently once a GET picks up the transfer. io.Copy(writer, r.Body) writer.Close() r.Body.Close() }() select { // either a GET presents itself on the other side of the queue's channel. case queue.ch <- transfer: // or the request is cancelled, and this POST needs to close the transfer // lest it be waiting for the done channel to close forever. case <-r.Context().Done(): close(transfer.done) } // wait here for the GET to signal it is done reading everything from the POST <-transfer.done // if this was indeed the last POST and there are no more new GET requests // waiting, then go ahead and close the queue and remove the path. if atomic.AddInt32(&queue.posts, -1) <= 0 { if atomic.LoadInt32(&queue.gets) <= 0 { pw.Delete(pathID) close(queue.ch) } } }