From 84945beb020b7b52049681d21014881ef76acad7 Mon Sep 17 00:00:00 2001 From: teizz Date: Thu, 15 Apr 2021 22:53:03 +0200 Subject: [PATCH] complete rewrite --- main.go | 237 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 123 insertions(+), 114 deletions(-) diff --git a/main.go b/main.go index 707adcc..1519c1e 100644 --- a/main.go +++ b/main.go @@ -1,150 +1,159 @@ package main import ( - "fmt" "io" "log" "net/http" - "strconv" "sync" + "sync/atomic" ) var ( + // variables to set during build-time + debugging = "" version = "0.0-undefined" buildtime = "0000-00-00T00:00:00+0000" - queues = make(map[string]*Queue) - mut = &sync.Mutex{} + + // actual business end of the device + paths = &sync.Map{} ) +// Transfer holds a single tranferable connection to be read type Transfer struct { - reader io.Reader - size int + reader *io.PipeReader + done chan struct{} + contentlength string } +// Queue is where posts and gets can exchange transfers type Queue struct { - ch chan Transfer - producers int - consumers int + ch chan Transfer + posts int32 + gets int32 } -func NewQueue() *Queue { - return &Queue{ - ch: make(chan Transfer), +func pathHandler(w http.ResponseWriter, r *http.Request) { + pathID := r.URL.Path + + if len(pathID) < 2 { + w.WriteHeader(400) + w.Write([]byte("path to short")) + return + } + + if r.Method == "GET" { + log.Printf("%s [GET] Connected", pathID) + + queue := &Queue{ch: make(chan Transfer)} + if p, loaded := paths.LoadOrStore(pathID, queue); loaded { + queue = p.(*Queue) + debug("%s [GET] Loads path", pathID) + } else { + debug("%s [GET] Created path", pathID) + } + atomic.AddInt32(&queue.gets, 1) + + select { + case transfer := <-queue.ch: + debug("%s [GET] Reads from path", pathID) + if transfer.contentlength != "" { + w.Header().Set("Content-Length", transfer.contentlength) + } + _, err := io.Copy(w, transfer.reader) + if err != nil { + transfer.reader.Close() + } + debug("%s [GET] Sends done", pathID) + close(transfer.done) + case <-r.Context().Done(): + debug("%s [GET] Cancels path", pathID) + } + + if atomic.AddInt32(&queue.gets, -1) <= 0 { + if atomic.LoadInt32(&queue.posts) <= 0 { + paths.Delete(pathID) + debug("%s [GET] Removes path", pathID) + } + } + log.Printf("%s [GET] Finishes", pathID) + + } else { + log.Printf("%s [POST] Connected", pathID) + + queue := &Queue{ch: make(chan Transfer)} + if p, loaded := paths.LoadOrStore(pathID, queue); loaded { + queue = p.(*Queue) + debug("%s [POST] Loads path", pathID) + } else { + debug("%s [POST] Creates path", pathID) + } + atomic.AddInt32(&queue.posts, 1) + + reader, writer := io.Pipe() + + transfer := Transfer{ + reader: reader, + contentlength: r.Header.Get("Content-Length"), + done: make(chan struct{}), + } + + go func() { + n, err := io.Copy(writer, r.Body) + debug("%s [POST] Sends %d bytes", pathID, n) + if err != nil { + debug("%s [POST] Has error: %s", pathID, err.Error()) + } + writer.Close() + r.Body.Close() + }() + + select { + case queue.ch <- transfer: + debug("%s [POST] Writes to path", pathID) + case <-r.Context().Done(): + debug("%s [POST] Cancels path", pathID) + close(transfer.done) + } + + debug("%s [POST] Waits for done", pathID) + <-transfer.done + + if atomic.AddInt32(&queue.posts, -1) <= 0 { + if atomic.LoadInt32(&queue.gets) <= 0 { + paths.Delete(pathID) + debug("%s [POST] Removes path", pathID) + } + } + + log.Printf("%s [POST] Finishes", pathID) } } -func (q *Queue) addConsumer() { - q.consumers = q.consumers + 1 +func okHandler(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("ok")) } -func (q *Queue) remConsumer() { - q.consumers = q.consumers - 1 -} - -func (q *Queue) addProducer() { - q.producers = q.producers + 1 -} - -func (q *Queue) remProducer() { - q.producers = q.producers - 1 -} - -func (q *Queue) isEmpty() bool { - return q.producers == 0 && q.consumers == 0 +func emptyHandler(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) } func main() { log.Printf("pathway version:%s buildtime:%s", version, buildtime) - handle := func(w http.ResponseWriter, r *http.Request) { - channelId := r.URL.Path - - if r.Method == "GET" { - log.Printf("GET: %s", channelId) - mut.Lock() - queue, exists := queues[channelId] - if !exists { - queue = NewQueue() - queues[channelId] = queue - } - - ch := queue.ch - - queue.addConsumer() - mut.Unlock() - - select { - case transfer := <-ch: - w.Header().Set("Content-Length", fmt.Sprintf("%d", transfer.size)) - _, err := io.Copy(w, transfer.reader) - if err != nil { - if closer, ok := transfer.reader.(io.Closer); ok { - closer.Close() - } - } - case <-r.Context().Done(): - } - - mut.Lock() - queue.remConsumer() - if queue.isEmpty() { - delete(queues, channelId) - } - mut.Unlock() - } else { - log.Printf("POST: %s", channelId) - mut.Lock() - queue, exists := queues[channelId] - if !exists { - queue = NewQueue() - queues[channelId] = queue - } - - ch := queue.ch - - queue.addProducer() - mut.Unlock() - - reader, writer := io.Pipe() - - contentLength, err := strconv.Atoi(r.Header.Get("Content-Length")) - if err != nil { - contentLength = 0 - } - - transfer := Transfer{ - reader: reader, - size: contentLength, - } - - select { - case ch <- transfer: - io.Copy(writer, r.Body) - case <-r.Context().Done(): - } - - writer.Close() - - mut.Lock() - queue.remProducer() - if queue.isEmpty() { - delete(queues, channelId) - } - mut.Unlock() - } - } - - health := func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write([]byte("OK")) - } - - http.HandleFunc("/health", health) - http.HandleFunc("/", handle) + http.HandleFunc("/health", okHandler) + http.HandleFunc("/favicon.ico", emptyHandler) + http.HandleFunc("/robots.txt", emptyHandler) + http.HandleFunc("/", pathHandler) err := http.ListenAndServe(":8080", nil) if err != nil { log.Println(err) } - +} + +func debug(msg string, args ...interface{}) { + if len(debugging) > 0 { + log.Printf(msg, args...) + } }