123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- package pathway
- import (
- "io"
- "net/http"
- "sync/atomic"
- )
- // handlePubSub regards a POST as PUB and all GETs as SUBs. the content from the
- // PUB will be 'broadcasted' to all SUBs waiting for content.
- // a PUB does not guarantee any sort of delivery. trying to send to a path
- // which already has other regular POSTs waiting, or one without any SUBs will
- // still return as successful.
- func (pw *Pathway) handlePubSub(pathID string, r *http.Request) {
- // PUBSUBs do not create a new paths preemptively. without an existing path
- // which exclusively has SUBs waiting, a PUB would not make sense.
- var queue *transferQueue
- if p, loaded := pw.Load(pathID); loaded {
- // PUB successfully loads a SUB queue.
- queue = p.(*transferQueue)
- } else {
- // no loaded queue with SUBs, nothing to PUB.
- return
- }
- if atomic.AddInt32(&queue.posts, 1) > 1 {
- // not a good idea to publish if there are posts waiting or in progress.
- // since the other POST might have finished already, it is still neccesary
- // to check if all POSTs and GETs are finished in order to delete the
- // path and close the queue's channel.
- if atomic.AddInt32(&queue.posts, -1) <= 0 {
- if atomic.LoadInt32(&queue.gets) <= 0 {
- pw.Delete(pathID)
- close(queue.ch)
- }
- }
- return
- }
- var masterWriter io.WriteCloser
- var saturated bool
- for !saturated {
- // create a transfer and writer for any more waiting SUB requests.
- transfer, writer := newTransfer(r.Header.Get("Content-Length"))
- select {
- // try to put the transfer in a queue channel and add the writer
- // to it to the masterWriter.
- case queue.ch <- transfer:
- if masterWriter == nil {
- masterWriter = BroadcastWriter(writer)
- } else {
- masterWriter = BroadcastWriter(masterWriter, writer)
- }
- default:
- // if no more SUB requests are waiting, then getting a channel from
- // the queue won't work. this PUB is saturated.
- saturated = true
- }
- }
- // copy all incoming data as efficiently as possible over the broadcastWriter.
- io.Copy(masterWriter, r.Body)
- masterWriter.Close()
- r.Body.Close()
- // if this was indeed the last POST and there are no more new GET requests
- // waiting, then go ahead and close the queue and remove the path.
- if atomic.AddInt32(&queue.posts, -1) <= 0 {
- if atomic.LoadInt32(&queue.gets) <= 0 {
- pw.Delete(pathID)
- close(queue.ch)
- }
- }
- }
- // handlePost simply attempts to receive whatever data is sent by the POST
- // request and relay it to the first available GET request on the same path.
- // if there is no GET waiting, it will block indefinitely until it either finds
- // a GET or the connection is cancelled.
- func (pw *Pathway) handlePost(pathID string, r *http.Request) {
- // POST creates a new path preemptively.
- queue := &transferQueue{ch: make(chan httpTransfer)}
- if p, loaded := pw.LoadOrStore(pathID, queue); loaded {
- // POST successfully loads a path instead.
- queue = p.(*transferQueue)
- }
- atomic.AddInt32(&queue.posts, 1)
- // create a transfer and writer for the next or waiting GET request.
- transfer, writer := newTransfer(r.Header.Get("Content-Length"))
- // start a go routing to start reading from the body. else a cancelled
- // POST is never correctly detected and this hangs forever.
- go func() {
- // copy all incoming data efficiently once a GET picks up the transfer.
- io.Copy(writer, r.Body)
- writer.Close()
- r.Body.Close()
- }()
- select {
- // either a GET presents itself on the other side of the queue's channel.
- case queue.ch <- transfer:
- // or the request is cancelled, and this POST needs to close the transfer
- // lest it be waiting for the done channel to close forever.
- case <-r.Context().Done():
- close(transfer.done)
- }
- // wait here for the GET to signal it is done reading everything from the POST
- <-transfer.done
- // if this was indeed the last POST and there are no more new GET requests
- // waiting, then go ahead and close the queue and remove the path.
- if atomic.AddInt32(&queue.posts, -1) <= 0 {
- if atomic.LoadInt32(&queue.gets) <= 0 {
- pw.Delete(pathID)
- close(queue.ch)
- }
- }
- }
|