|
@@ -6,97 +6,116 @@ import (
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
)
|
|
)
|
|
|
|
|
|
-func handlePubSub(pathID string, r *http.Request) {
|
|
|
|
- var queue *Queue
|
|
|
|
- if p, loaded := paths.Load(pathID); loaded {
|
|
|
|
- queue = p.(*Queue)
|
|
|
|
- debug("%s [PUBSUB] Loads path", pathID)
|
|
|
|
|
|
+// handlePubSub regards a POST as PUB and all GETs as SUBs. the content from the
|
|
|
|
+// PUB will be 'broadcasted' to all SUBs waiting for content.
|
|
|
|
+// a PUB does not guarantee any sort of delivery. trying to send to a path
|
|
|
|
+// which already has other regular POSTs waiting, or one without any SUBs will
|
|
|
|
+// still return as successful.
|
|
|
|
+func (pw *Pathway) handlePubSub(pathID string, r *http.Request) {
|
|
|
|
+ // PUBSUBs do not create a new paths preemptively. without an existing path
|
|
|
|
+ // which exclusively has SUBs waiting, a PUB would not make sense.
|
|
|
|
+ var queue *transferQueue
|
|
|
|
+ if p, loaded := pw.Load(pathID); loaded {
|
|
|
|
+ // PUB successfully loads a SUB queue.
|
|
|
|
+ queue = p.(*transferQueue)
|
|
} else {
|
|
} else {
|
|
- debug("%s [PUBSUB] No subs for path", pathID)
|
|
|
|
|
|
+ // no loaded queue with SUBs, nothing to PUB.
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
if atomic.AddInt32(&queue.posts, 1) > 1 {
|
|
if atomic.AddInt32(&queue.posts, 1) > 1 {
|
|
- debug("%s [PUBSUB] Path already has posts", pathID)
|
|
|
|
- atomic.AddInt32(&queue.posts, -1)
|
|
|
|
|
|
+ // not a good idea to publish if there are posts waiting or in progress.
|
|
|
|
+ // since the other POST might have finished already, it is still neccesary
|
|
|
|
+ // to check if all POSTs and GETs are finished in order to delete the
|
|
|
|
+ // path and close the queue's channel.
|
|
|
|
+ if atomic.AddInt32(&queue.posts, -1) <= 0 {
|
|
|
|
+ if atomic.LoadInt32(&queue.gets) <= 0 {
|
|
|
|
+ pw.Delete(pathID)
|
|
|
|
+ close(queue.ch)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
var masterWriter io.WriteCloser
|
|
var masterWriter io.WriteCloser
|
|
- var done bool
|
|
|
|
- for !done {
|
|
|
|
- transfer, writer := NewTransfer(r.Header.Get("Content-Length"))
|
|
|
|
|
|
+ var saturated bool
|
|
|
|
+ for !saturated {
|
|
|
|
+ // create a transfer and writer for any more waiting SUB requests.
|
|
|
|
+ transfer, writer := newTransfer(r.Header.Get("Content-Length"))
|
|
select {
|
|
select {
|
|
|
|
+ // try to put the transfer in a queue channel and add the writer
|
|
|
|
+ // to it to the masterWriter.
|
|
case queue.ch <- transfer:
|
|
case queue.ch <- transfer:
|
|
- debug("%s [PUBSUB] Adds sub to pub", pathID)
|
|
|
|
if masterWriter == nil {
|
|
if masterWriter == nil {
|
|
masterWriter = BroadcastWriter(writer)
|
|
masterWriter = BroadcastWriter(writer)
|
|
} else {
|
|
} else {
|
|
masterWriter = BroadcastWriter(masterWriter, writer)
|
|
masterWriter = BroadcastWriter(masterWriter, writer)
|
|
}
|
|
}
|
|
default:
|
|
default:
|
|
- debug("%s [PUBSUB] Saturated path", pathID)
|
|
|
|
- done = true
|
|
|
|
|
|
+ // if no more SUB requests are waiting, then getting a channel from
|
|
|
|
+ // the queue won't work. this PUB is saturated.
|
|
|
|
+ saturated = true
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- n, err := io.Copy(masterWriter, r.Body)
|
|
|
|
- debug("%s [PUBSUB] Sends %d bytes", pathID, n)
|
|
|
|
- if err != nil {
|
|
|
|
- debug("%s [PUBSUB] Has error: %s", pathID, err.Error())
|
|
|
|
- }
|
|
|
|
|
|
+ // copy all incoming data as efficiently as possible over the broadcastWriter.
|
|
|
|
+ io.Copy(masterWriter, r.Body)
|
|
masterWriter.Close()
|
|
masterWriter.Close()
|
|
r.Body.Close()
|
|
r.Body.Close()
|
|
|
|
|
|
|
|
+ // if this was indeed the last POST and there are no more new GET requests
|
|
|
|
+ // waiting, then go ahead and close the queue and remove the path.
|
|
if atomic.AddInt32(&queue.posts, -1) <= 0 {
|
|
if atomic.AddInt32(&queue.posts, -1) <= 0 {
|
|
if atomic.LoadInt32(&queue.gets) <= 0 {
|
|
if atomic.LoadInt32(&queue.gets) <= 0 {
|
|
- paths.Delete(pathID)
|
|
|
|
|
|
+ pw.Delete(pathID)
|
|
close(queue.ch)
|
|
close(queue.ch)
|
|
- debug("%s [PUBSUB] Removes path", pathID)
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- info("%s [PUBSUB] Finishes", pathID)
|
|
|
|
}
|
|
}
|
|
|
|
|
|
-func handlePost(pathID string, r *http.Request) {
|
|
|
|
- queue := &Queue{ch: make(chan Transfer)}
|
|
|
|
- if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
|
|
|
|
- queue = p.(*Queue)
|
|
|
|
- debug("%s [POST] Loads path", pathID)
|
|
|
|
- } else {
|
|
|
|
- debug("%s [POST] Creates path", pathID)
|
|
|
|
|
|
+// handlePost simply attempts to receive whatever data is sent by the POST
|
|
|
|
+// request and relay it to the first available GET request on the same path.
|
|
|
|
+// if there is no GET waiting, it will block indefinitely until it either finds
|
|
|
|
+// a GET or the connection is cancelled.
|
|
|
|
+func (pw *Pathway) handlePost(pathID string, r *http.Request) {
|
|
|
|
+ // POST creates a new path preemptively.
|
|
|
|
+ queue := &transferQueue{ch: make(chan httpTransfer)}
|
|
|
|
+ if p, loaded := pw.LoadOrStore(pathID, queue); loaded {
|
|
|
|
+ // POST successfully loads a path instead.
|
|
|
|
+ queue = p.(*transferQueue)
|
|
}
|
|
}
|
|
atomic.AddInt32(&queue.posts, 1)
|
|
atomic.AddInt32(&queue.posts, 1)
|
|
|
|
|
|
- transfer, writer := NewTransfer(r.Header.Get("Content-Length"))
|
|
|
|
|
|
+ // create a transfer and writer for the next or waiting GET request.
|
|
|
|
+ transfer, writer := newTransfer(r.Header.Get("Content-Length"))
|
|
|
|
|
|
|
|
+ // start a go routing to start reading from the body. else a cancelled
|
|
|
|
+ // POST is never correctly detected and this hangs forever.
|
|
go func() {
|
|
go func() {
|
|
- n, err := io.Copy(writer, r.Body)
|
|
|
|
- debug("%s [POST] Sends %d bytes", pathID, n)
|
|
|
|
- if err != nil {
|
|
|
|
- debug("%s [POST] Has error: %s", pathID, err.Error())
|
|
|
|
- }
|
|
|
|
|
|
+ // copy all incoming data efficiently once a GET picks up the transfer.
|
|
|
|
+ io.Copy(writer, r.Body)
|
|
writer.Close()
|
|
writer.Close()
|
|
r.Body.Close()
|
|
r.Body.Close()
|
|
}()
|
|
}()
|
|
|
|
|
|
select {
|
|
select {
|
|
|
|
+ // either a GET presents itself on the other side of the queue's channel.
|
|
case queue.ch <- transfer:
|
|
case queue.ch <- transfer:
|
|
- debug("%s [POST] Writes to path", pathID)
|
|
|
|
|
|
+ // or the request is cancelled, and this POST needs to close the transfer
|
|
|
|
+ // lest it be waiting for the done channel to close forever.
|
|
case <-r.Context().Done():
|
|
case <-r.Context().Done():
|
|
- debug("%s [POST] Cancels path", pathID)
|
|
|
|
close(transfer.done)
|
|
close(transfer.done)
|
|
}
|
|
}
|
|
|
|
|
|
- debug("%s [POST] Waits for done", pathID)
|
|
|
|
|
|
+ // wait here for the GET to signal it is done reading everything from the POST
|
|
<-transfer.done
|
|
<-transfer.done
|
|
|
|
|
|
|
|
+ // if this was indeed the last POST and there are no more new GET requests
|
|
|
|
+ // waiting, then go ahead and close the queue and remove the path.
|
|
if atomic.AddInt32(&queue.posts, -1) <= 0 {
|
|
if atomic.AddInt32(&queue.posts, -1) <= 0 {
|
|
if atomic.LoadInt32(&queue.gets) <= 0 {
|
|
if atomic.LoadInt32(&queue.gets) <= 0 {
|
|
- paths.Delete(pathID)
|
|
|
|
|
|
+ pw.Delete(pathID)
|
|
close(queue.ch)
|
|
close(queue.ch)
|
|
- debug("%s [POST] Removes path", pathID)
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- info("%s [POST] Finishes", pathID)
|
|
|
|
}
|
|
}
|