get.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. package pathway
  2. import (
  3. "io"
  4. "net/http"
  5. "sync/atomic"
  6. )
  7. // handleGet simply attempts to relay whatever data is received from the first
  8. // available POST request into this GET request on the same path.
  9. // if there is no POST waiting, it will block indefinitely until it either finds
  10. // a POST or the connection is cancelled.
  11. func (pw *Pathway) handleGet(pathID string, w http.ResponseWriter, r *http.Request) {
  12. // GET creates a new path preemptively
  13. queue := &transferQueue{ch: make(chan httpTransfer)}
  14. if p, loaded := pw.LoadOrStore(pathID, queue); loaded {
  15. // GET successfully loads a path instead
  16. queue = p.(*transferQueue)
  17. }
  18. atomic.AddInt32(&queue.gets, 1)
  19. select {
  20. // either a POST presents itself on the other side of the queue's channel
  21. case transfer := <-queue.ch:
  22. if transfer.contentlength != "" {
  23. w.Header().Set("Content-Length", transfer.contentlength)
  24. }
  25. _, err := io.Copy(w, transfer.reader)
  26. if err != nil {
  27. transfer.reader.Close()
  28. }
  29. close(transfer.done)
  30. // or the request is cancelled, and this GET can continue to finish
  31. case <-r.Context().Done():
  32. }
  33. // if this was indeed the last GET and there are no more new POST requests
  34. // waiting, then go ahead and close the queue and remove the path
  35. if atomic.AddInt32(&queue.gets, -1) <= 0 {
  36. if atomic.LoadInt32(&queue.posts) <= 0 {
  37. pw.Delete(pathID)
  38. close(queue.ch)
  39. }
  40. }
  41. }