get.go 989 B

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. package pathway
  2. import (
  3. "io"
  4. "net/http"
  5. "sync/atomic"
  6. )
  7. func handleGet(pathID string, w http.ResponseWriter, r *http.Request) {
  8. queue := &Queue{ch: make(chan Transfer)}
  9. if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
  10. queue = p.(*Queue)
  11. debug("%s [GET] Loads path", pathID)
  12. } else {
  13. debug("%s [GET] Created path", pathID)
  14. }
  15. atomic.AddInt32(&queue.gets, 1)
  16. select {
  17. case transfer := <-queue.ch:
  18. debug("%s [GET] Reads from path", pathID)
  19. if transfer.contentlength != "" {
  20. w.Header().Set("Content-Length", transfer.contentlength)
  21. }
  22. _, err := io.Copy(w, transfer.reader)
  23. if err != nil {
  24. transfer.reader.Close()
  25. }
  26. debug("%s [GET] Sends done", pathID)
  27. close(transfer.done)
  28. case <-r.Context().Done():
  29. debug("%s [GET] Cancels path", pathID)
  30. }
  31. if atomic.AddInt32(&queue.gets, -1) <= 0 {
  32. if atomic.LoadInt32(&queue.posts) <= 0 {
  33. paths.Delete(pathID)
  34. debug("%s [GET] Removes path", pathID)
  35. }
  36. }
  37. info("%s [GET] Finishes", pathID)
  38. }