1
0

post.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package pathway
  2. import (
  3. "io"
  4. "net/http"
  5. "sync/atomic"
  6. )
  7. // handlePubSub regards a POST as PUB and all GETs as SUBs. the content from the
  8. // PUB will be 'broadcasted' to all SUBs waiting for content.
  9. // a PUB does not guarantee any sort of delivery. trying to send to a path
  10. // which already has other regular POSTs waiting, or one without any SUBs will
  11. // still return as successful.
  12. func (pw *Pathway) handlePubSub(pathID string, r *http.Request) {
  13. // PUBSUBs do not create a new paths preemptively. without an existing path
  14. // which exclusively has SUBs waiting, a PUB would not make sense.
  15. var queue *transferQueue
  16. if p, loaded := pw.Load(pathID); loaded {
  17. // PUB successfully loads a SUB queue.
  18. queue = p.(*transferQueue)
  19. } else {
  20. // no loaded queue with SUBs, nothing to PUB.
  21. return
  22. }
  23. if atomic.AddInt32(&queue.posts, 1) > 1 {
  24. // not a good idea to publish if there are posts waiting or in progress.
  25. // since the other POST might have finished already, it is still neccesary
  26. // to check if all POSTs and GETs are finished in order to delete the
  27. // path and close the queue's channel.
  28. if atomic.AddInt32(&queue.posts, -1) <= 0 {
  29. if atomic.LoadInt32(&queue.gets) <= 0 {
  30. pw.Delete(pathID)
  31. close(queue.ch)
  32. }
  33. }
  34. return
  35. }
  36. var masterWriter io.WriteCloser
  37. var saturated bool
  38. for !saturated {
  39. // create a transfer and writer for any more waiting SUB requests.
  40. transfer, writer := newTransfer(r.Header.Get("Content-Length"))
  41. select {
  42. // try to put the transfer in a queue channel and add the writer
  43. // to it to the masterWriter.
  44. case queue.ch <- transfer:
  45. if masterWriter == nil {
  46. masterWriter = BroadcastWriter(writer)
  47. } else {
  48. masterWriter = BroadcastWriter(masterWriter, writer)
  49. }
  50. default:
  51. // if no more SUB requests are waiting, then getting a channel from
  52. // the queue won't work. this PUB is saturated.
  53. saturated = true
  54. }
  55. }
  56. // copy all incoming data as efficiently as possible over the broadcastWriter.
  57. io.Copy(masterWriter, r.Body)
  58. masterWriter.Close()
  59. r.Body.Close()
  60. // if this was indeed the last POST and there are no more new GET requests
  61. // waiting, then go ahead and close the queue and remove the path.
  62. if atomic.AddInt32(&queue.posts, -1) <= 0 {
  63. if atomic.LoadInt32(&queue.gets) <= 0 {
  64. pw.Delete(pathID)
  65. close(queue.ch)
  66. }
  67. }
  68. }
  69. // handlePost simply attempts to receive whatever data is sent by the POST
  70. // request and relay it to the first available GET request on the same path.
  71. // if there is no GET waiting, it will block indefinitely until it either finds
  72. // a GET or the connection is cancelled.
  73. func (pw *Pathway) handlePost(pathID string, r *http.Request) {
  74. // POST creates a new path preemptively.
  75. queue := &transferQueue{ch: make(chan httpTransfer)}
  76. if p, loaded := pw.LoadOrStore(pathID, queue); loaded {
  77. // POST successfully loads a path instead.
  78. queue = p.(*transferQueue)
  79. }
  80. atomic.AddInt32(&queue.posts, 1)
  81. // create a transfer and writer for the next or waiting GET request.
  82. transfer, writer := newTransfer(r.Header.Get("Content-Length"))
  83. // start a go routing to start reading from the body. else a cancelled
  84. // POST is never correctly detected and this hangs forever.
  85. go func() {
  86. // copy all incoming data efficiently once a GET picks up the transfer.
  87. io.Copy(writer, r.Body)
  88. writer.Close()
  89. r.Body.Close()
  90. }()
  91. select {
  92. // either a GET presents itself on the other side of the queue's channel.
  93. case queue.ch <- transfer:
  94. // or the request is cancelled, and this POST needs to close the transfer
  95. // lest it be waiting for the done channel to close forever.
  96. case <-r.Context().Done():
  97. close(transfer.done)
  98. }
  99. // wait here for the GET to signal it is done reading everything from the POST
  100. <-transfer.done
  101. // if this was indeed the last POST and there are no more new GET requests
  102. // waiting, then go ahead and close the queue and remove the path.
  103. if atomic.AddInt32(&queue.posts, -1) <= 0 {
  104. if atomic.LoadInt32(&queue.gets) <= 0 {
  105. pw.Delete(pathID)
  106. close(queue.ch)
  107. }
  108. }
  109. }