package main import ( "io" "log" "net/http" "sync" "sync/atomic" ) var ( // variables to set during build-time debugging = "" version = "0.0-undefined" buildtime = "0000-00-00T00:00:00+0000" // actual business end of the device paths = &sync.Map{} ) // Transfer holds a single tranferable connection to be read type Transfer struct { reader *io.PipeReader done chan struct{} contentlength string } // Queue is where posts and gets can exchange transfers type Queue struct { ch chan Transfer posts int32 gets int32 } func pathHandler(w http.ResponseWriter, r *http.Request) { pathID := r.URL.Path if len(pathID) < 2 { w.WriteHeader(400) w.Write([]byte("path to short")) return } if r.Method == "GET" { log.Printf("%s [GET] Connected", pathID) queue := &Queue{ch: make(chan Transfer)} if p, loaded := paths.LoadOrStore(pathID, queue); loaded { queue = p.(*Queue) debug("%s [GET] Loads path", pathID) } else { debug("%s [GET] Created path", pathID) } atomic.AddInt32(&queue.gets, 1) select { case transfer := <-queue.ch: debug("%s [GET] Reads from path", pathID) if transfer.contentlength != "" { w.Header().Set("Content-Length", transfer.contentlength) } _, err := io.Copy(w, transfer.reader) if err != nil { transfer.reader.Close() } debug("%s [GET] Sends done", pathID) close(transfer.done) case <-r.Context().Done(): debug("%s [GET] Cancels path", pathID) } if atomic.AddInt32(&queue.gets, -1) <= 0 { if atomic.LoadInt32(&queue.posts) <= 0 { paths.Delete(pathID) debug("%s [GET] Removes path", pathID) } } log.Printf("%s [GET] Finishes", pathID) } else { log.Printf("%s [POST] Connected", pathID) queue := &Queue{ch: make(chan Transfer)} if p, loaded := paths.LoadOrStore(pathID, queue); loaded { queue = p.(*Queue) debug("%s [POST] Loads path", pathID) } else { debug("%s [POST] Creates path", pathID) } atomic.AddInt32(&queue.posts, 1) reader, writer := io.Pipe() transfer := Transfer{ reader: reader, contentlength: r.Header.Get("Content-Length"), done: make(chan struct{}), } go func() { n, err := io.Copy(writer, r.Body) debug("%s [POST] Sends %d bytes", pathID, n) if err != nil { debug("%s [POST] Has error: %s", pathID, err.Error()) } writer.Close() r.Body.Close() }() select { case queue.ch <- transfer: debug("%s [POST] Writes to path", pathID) case <-r.Context().Done(): debug("%s [POST] Cancels path", pathID) close(transfer.done) } debug("%s [POST] Waits for done", pathID) <-transfer.done if atomic.AddInt32(&queue.posts, -1) <= 0 { if atomic.LoadInt32(&queue.gets) <= 0 { paths.Delete(pathID) debug("%s [POST] Removes path", pathID) } } log.Printf("%s [POST] Finishes", pathID) } } func okHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) } func emptyHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } func main() { log.Printf("pathway version:%s buildtime:%s", version, buildtime) http.HandleFunc("/health", okHandler) http.HandleFunc("/favicon.ico", emptyHandler) http.HandleFunc("/robots.txt", emptyHandler) http.HandleFunc("/", pathHandler) err := http.ListenAndServe(":8080", nil) if err != nil { log.Println(err) } } func debug(msg string, args ...interface{}) { if len(debugging) > 0 { log.Printf(msg, args...) } }