main.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package main
  2. import (
  3. "fmt"
  4. "io"
  5. "log"
  6. "net/http"
  7. "strconv"
  8. "sync"
  9. )
  10. var (
  11. version = "0.0-undefined"
  12. buildtime = "0000-00-00T00:00:00+0000"
  13. queues = make(map[string]*Queue)
  14. mut = &sync.Mutex{}
  15. )
  16. type Transfer struct {
  17. reader io.Reader
  18. size int
  19. }
  20. type Queue struct {
  21. ch chan Transfer
  22. producers int
  23. consumers int
  24. }
  25. func NewQueue() *Queue {
  26. return &Queue{
  27. ch: make(chan Transfer),
  28. }
  29. }
  30. func (q *Queue) addConsumer() {
  31. q.consumers = q.consumers + 1
  32. }
  33. func (q *Queue) remConsumer() {
  34. q.consumers = q.consumers - 1
  35. }
  36. func (q *Queue) addProducer() {
  37. q.producers = q.producers + 1
  38. }
  39. func (q *Queue) remProducer() {
  40. q.producers = q.producers - 1
  41. }
  42. func (q *Queue) isEmpty() bool {
  43. return q.producers == 0 && q.consumers == 0
  44. }
  45. func main() {
  46. log.Printf("pathway version:%s buildtime:%s", version, buildtime)
  47. handle := func(w http.ResponseWriter, r *http.Request) {
  48. channelId := r.URL.Path
  49. if r.Method == "GET" {
  50. log.Printf("GET: %s", channelId)
  51. mut.Lock()
  52. queue, exists := queues[channelId]
  53. if !exists {
  54. queue = NewQueue()
  55. queues[channelId] = queue
  56. }
  57. ch := queue.ch
  58. queue.addConsumer()
  59. mut.Unlock()
  60. select {
  61. case transfer := <-ch:
  62. w.Header().Set("Content-Length", fmt.Sprintf("%d", transfer.size))
  63. _, err := io.Copy(w, transfer.reader)
  64. if err != nil {
  65. if closer, ok := transfer.reader.(io.Closer); ok {
  66. closer.Close()
  67. }
  68. }
  69. case <-r.Context().Done():
  70. }
  71. mut.Lock()
  72. queue.remConsumer()
  73. if queue.isEmpty() {
  74. delete(queues, channelId)
  75. }
  76. mut.Unlock()
  77. } else {
  78. log.Printf("POST: %s", channelId)
  79. mut.Lock()
  80. queue, exists := queues[channelId]
  81. if !exists {
  82. queue = NewQueue()
  83. queues[channelId] = queue
  84. }
  85. ch := queue.ch
  86. queue.addProducer()
  87. mut.Unlock()
  88. reader, writer := io.Pipe()
  89. contentLength, err := strconv.Atoi(r.Header.Get("Content-Length"))
  90. if err != nil {
  91. contentLength = 0
  92. }
  93. transfer := Transfer{
  94. reader: reader,
  95. size: contentLength,
  96. }
  97. select {
  98. case ch <- transfer:
  99. io.Copy(writer, r.Body)
  100. case <-r.Context().Done():
  101. }
  102. writer.Close()
  103. mut.Lock()
  104. queue.remProducer()
  105. if queue.isEmpty() {
  106. delete(queues, channelId)
  107. }
  108. mut.Unlock()
  109. }
  110. }
  111. health := func(w http.ResponseWriter, r *http.Request) {
  112. w.WriteHeader(http.StatusOK)
  113. w.Write([]byte("OK"))
  114. }
  115. http.HandleFunc("/health", health)
  116. http.HandleFunc("/", handle)
  117. err := http.ListenAndServe(":8080", nil)
  118. if err != nil {
  119. log.Println(err)
  120. }
  121. }