123456789101112131415161718192021222324252627282930313233343536373839404142 |
- package pathway
- import (
- "io"
- "net/http"
- "sync/atomic"
- )
- func handleGet(pathID string, w http.ResponseWriter, r *http.Request) {
- queue := &Queue{ch: make(chan Transfer)}
- if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
- queue = p.(*Queue)
- debug("%s [GET] Loads path", pathID)
- } else {
- debug("%s [GET] Created path", pathID)
- }
- atomic.AddInt32(&queue.gets, 1)
- select {
- case transfer := <-queue.ch:
- debug("%s [GET] Reads from path", pathID)
- if transfer.contentlength != "" {
- w.Header().Set("Content-Length", transfer.contentlength)
- }
- _, err := io.Copy(w, transfer.reader)
- if err != nil {
- transfer.reader.Close()
- }
- debug("%s [GET] Sends done", pathID)
- close(transfer.done)
- case <-r.Context().Done():
- debug("%s [GET] Cancels path", pathID)
- }
- if atomic.AddInt32(&queue.gets, -1) <= 0 {
- if atomic.LoadInt32(&queue.posts) <= 0 {
- paths.Delete(pathID)
- debug("%s [GET] Removes path", pathID)
- }
- }
- info("%s [GET] Finishes", pathID)
- }
|