1
0

post.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package pathway
  2. import (
  3. "io"
  4. "net/http"
  5. "sync/atomic"
  6. )
  7. func handlePubSub(pathID string, r *http.Request) {
  8. var queue *Queue
  9. if p, loaded := paths.Load(pathID); loaded {
  10. queue = p.(*Queue)
  11. debug("%s [PUBSUB] Loads path", pathID)
  12. } else {
  13. debug("%s [PUBSUB] No subs for path", pathID)
  14. return
  15. }
  16. if atomic.AddInt32(&queue.posts, 1) > 1 {
  17. debug("%s [PUBSUB] Path already has posts", pathID)
  18. atomic.AddInt32(&queue.posts, -1)
  19. return
  20. }
  21. var masterWriter io.WriteCloser
  22. var done bool
  23. for !done {
  24. transfer, writer := NewTransfer(r.Header.Get("Content-Length"))
  25. select {
  26. case queue.ch <- transfer:
  27. debug("%s [PUBSUB] Adds sub to pub", pathID)
  28. if masterWriter == nil {
  29. masterWriter = BroadcastWriter(writer)
  30. } else {
  31. masterWriter = BroadcastWriter(masterWriter, writer)
  32. }
  33. default:
  34. debug("%s [PUBSUB] Saturated path", pathID)
  35. done = true
  36. }
  37. }
  38. n, err := io.Copy(masterWriter, r.Body)
  39. debug("%s [PUBSUB] Sends %d bytes", pathID, n)
  40. if err != nil {
  41. debug("%s [PUBSUB] Has error: %s", pathID, err.Error())
  42. }
  43. masterWriter.Close()
  44. r.Body.Close()
  45. if atomic.AddInt32(&queue.posts, -1) <= 0 {
  46. if atomic.LoadInt32(&queue.gets) <= 0 {
  47. paths.Delete(pathID)
  48. close(queue.ch)
  49. debug("%s [PUBSUB] Removes path", pathID)
  50. }
  51. }
  52. info("%s [PUBSUB] Finishes", pathID)
  53. }
  54. func handlePost(pathID string, r *http.Request) {
  55. queue := &Queue{ch: make(chan Transfer)}
  56. if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
  57. queue = p.(*Queue)
  58. debug("%s [POST] Loads path", pathID)
  59. } else {
  60. debug("%s [POST] Creates path", pathID)
  61. }
  62. atomic.AddInt32(&queue.posts, 1)
  63. transfer, writer := NewTransfer(r.Header.Get("Content-Length"))
  64. go func() {
  65. n, err := io.Copy(writer, r.Body)
  66. debug("%s [POST] Sends %d bytes", pathID, n)
  67. if err != nil {
  68. debug("%s [POST] Has error: %s", pathID, err.Error())
  69. }
  70. writer.Close()
  71. r.Body.Close()
  72. }()
  73. select {
  74. case queue.ch <- transfer:
  75. debug("%s [POST] Writes to path", pathID)
  76. case <-r.Context().Done():
  77. debug("%s [POST] Cancels path", pathID)
  78. close(transfer.done)
  79. }
  80. debug("%s [POST] Waits for done", pathID)
  81. <-transfer.done
  82. if atomic.AddInt32(&queue.posts, -1) <= 0 {
  83. if atomic.LoadInt32(&queue.gets) <= 0 {
  84. paths.Delete(pathID)
  85. close(queue.ch)
  86. debug("%s [POST] Removes path", pathID)
  87. }
  88. }
  89. info("%s [POST] Finishes", pathID)
  90. }