main.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package main
  2. import (
  3. "io"
  4. "log"
  5. "net/http"
  6. "sync"
  7. "sync/atomic"
  8. )
  9. var (
  10. // variables to set during build-time
  11. debugging = ""
  12. version = "0.0-undefined"
  13. buildtime = "0000-00-00T00:00:00+0000"
  14. // actual business end of the device
  15. paths = &sync.Map{}
  16. )
  17. // Transfer holds a single tranferable connection to be read
  18. type Transfer struct {
  19. reader *io.PipeReader
  20. done chan struct{}
  21. contentlength string
  22. }
  23. // Queue is where posts and gets can exchange transfers
  24. type Queue struct {
  25. ch chan Transfer
  26. posts int32
  27. gets int32
  28. }
  29. func pathHandler(w http.ResponseWriter, r *http.Request) {
  30. pathID := r.URL.Path
  31. if len(pathID) < 2 {
  32. w.WriteHeader(400)
  33. w.Write([]byte("path to short"))
  34. return
  35. }
  36. if r.Method == "GET" {
  37. log.Printf("%s [GET] Connected", pathID)
  38. queue := &Queue{ch: make(chan Transfer)}
  39. if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
  40. queue = p.(*Queue)
  41. debug("%s [GET] Loads path", pathID)
  42. } else {
  43. debug("%s [GET] Created path", pathID)
  44. }
  45. atomic.AddInt32(&queue.gets, 1)
  46. select {
  47. case transfer := <-queue.ch:
  48. debug("%s [GET] Reads from path", pathID)
  49. if transfer.contentlength != "" {
  50. w.Header().Set("Content-Length", transfer.contentlength)
  51. }
  52. _, err := io.Copy(w, transfer.reader)
  53. if err != nil {
  54. transfer.reader.Close()
  55. }
  56. debug("%s [GET] Sends done", pathID)
  57. close(transfer.done)
  58. case <-r.Context().Done():
  59. debug("%s [GET] Cancels path", pathID)
  60. }
  61. if atomic.AddInt32(&queue.gets, -1) <= 0 {
  62. if atomic.LoadInt32(&queue.posts) <= 0 {
  63. paths.Delete(pathID)
  64. debug("%s [GET] Removes path", pathID)
  65. }
  66. }
  67. log.Printf("%s [GET] Finishes", pathID)
  68. } else {
  69. log.Printf("%s [POST] Connected", pathID)
  70. queue := &Queue{ch: make(chan Transfer)}
  71. if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
  72. queue = p.(*Queue)
  73. debug("%s [POST] Loads path", pathID)
  74. } else {
  75. debug("%s [POST] Creates path", pathID)
  76. }
  77. atomic.AddInt32(&queue.posts, 1)
  78. reader, writer := io.Pipe()
  79. transfer := Transfer{
  80. reader: reader,
  81. contentlength: r.Header.Get("Content-Length"),
  82. done: make(chan struct{}),
  83. }
  84. go func() {
  85. n, err := io.Copy(writer, r.Body)
  86. debug("%s [POST] Sends %d bytes", pathID, n)
  87. if err != nil {
  88. debug("%s [POST] Has error: %s", pathID, err.Error())
  89. }
  90. writer.Close()
  91. r.Body.Close()
  92. }()
  93. select {
  94. case queue.ch <- transfer:
  95. debug("%s [POST] Writes to path", pathID)
  96. case <-r.Context().Done():
  97. debug("%s [POST] Cancels path", pathID)
  98. close(transfer.done)
  99. }
  100. debug("%s [POST] Waits for done", pathID)
  101. <-transfer.done
  102. if atomic.AddInt32(&queue.posts, -1) <= 0 {
  103. if atomic.LoadInt32(&queue.gets) <= 0 {
  104. paths.Delete(pathID)
  105. debug("%s [POST] Removes path", pathID)
  106. }
  107. }
  108. log.Printf("%s [POST] Finishes", pathID)
  109. }
  110. }
  111. func okHandler(w http.ResponseWriter, r *http.Request) {
  112. w.WriteHeader(http.StatusOK)
  113. w.Write([]byte("ok"))
  114. }
  115. func emptyHandler(w http.ResponseWriter, r *http.Request) {
  116. w.WriteHeader(http.StatusOK)
  117. }
  118. func main() {
  119. log.Printf("pathway version:%s buildtime:%s", version, buildtime)
  120. http.HandleFunc("/health", okHandler)
  121. http.HandleFunc("/favicon.ico", emptyHandler)
  122. http.HandleFunc("/robots.txt", emptyHandler)
  123. http.HandleFunc("/", pathHandler)
  124. err := http.ListenAndServe(":8080", nil)
  125. if err != nil {
  126. log.Println(err)
  127. }
  128. }
  129. func debug(msg string, args ...interface{}) {
  130. if len(debugging) > 0 {
  131. log.Printf(msg, args...)
  132. }
  133. }