|
@@ -0,0 +1,150 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "fmt"
|
|
|
+ "io"
|
|
|
+ "log"
|
|
|
+ "net/http"
|
|
|
+ "strconv"
|
|
|
+ "sync"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ version = "0.0-undefined"
|
|
|
+ buildtime = "0000-00-00T00:00:00+0000"
|
|
|
+ queues = make(map[string]*Queue)
|
|
|
+ mut = &sync.Mutex{}
|
|
|
+)
|
|
|
+
|
|
|
+type Transfer struct {
|
|
|
+ reader io.Reader
|
|
|
+ size int
|
|
|
+}
|
|
|
+
|
|
|
+type Queue struct {
|
|
|
+ ch chan Transfer
|
|
|
+ producers int
|
|
|
+ consumers int
|
|
|
+}
|
|
|
+
|
|
|
+func NewQueue() *Queue {
|
|
|
+ return &Queue{
|
|
|
+ ch: make(chan Transfer),
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (q *Queue) addConsumer() {
|
|
|
+ q.consumers = q.consumers + 1
|
|
|
+}
|
|
|
+
|
|
|
+func (q *Queue) remConsumer() {
|
|
|
+ q.consumers = q.consumers - 1
|
|
|
+}
|
|
|
+
|
|
|
+func (q *Queue) addProducer() {
|
|
|
+ q.producers = q.producers + 1
|
|
|
+}
|
|
|
+
|
|
|
+func (q *Queue) remProducer() {
|
|
|
+ q.producers = q.producers - 1
|
|
|
+}
|
|
|
+
|
|
|
+func (q *Queue) isEmpty() bool {
|
|
|
+ return q.producers == 0 && q.consumers == 0
|
|
|
+}
|
|
|
+
|
|
|
+func main() {
|
|
|
+ log.Printf("pathway version:%s buildtime:%s", version, buildtime)
|
|
|
+
|
|
|
+ handle := func(w http.ResponseWriter, r *http.Request) {
|
|
|
+ channelId := r.URL.Path
|
|
|
+
|
|
|
+ if r.Method == "GET" {
|
|
|
+ log.Printf("GET: %s", channelId)
|
|
|
+ mut.Lock()
|
|
|
+ queue, exists := queues[channelId]
|
|
|
+ if !exists {
|
|
|
+ queue = NewQueue()
|
|
|
+ queues[channelId] = queue
|
|
|
+ }
|
|
|
+
|
|
|
+ ch := queue.ch
|
|
|
+
|
|
|
+ queue.addConsumer()
|
|
|
+ mut.Unlock()
|
|
|
+
|
|
|
+ select {
|
|
|
+ case transfer := <-ch:
|
|
|
+ w.Header().Set("Content-Length", fmt.Sprintf("%d", transfer.size))
|
|
|
+ _, err := io.Copy(w, transfer.reader)
|
|
|
+ if err != nil {
|
|
|
+ if closer, ok := transfer.reader.(io.Closer); ok {
|
|
|
+ closer.Close()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ case <-r.Context().Done():
|
|
|
+ }
|
|
|
+
|
|
|
+ mut.Lock()
|
|
|
+ queue.remConsumer()
|
|
|
+ if queue.isEmpty() {
|
|
|
+ delete(queues, channelId)
|
|
|
+ }
|
|
|
+ mut.Unlock()
|
|
|
+ } else {
|
|
|
+ log.Printf("POST: %s", channelId)
|
|
|
+ mut.Lock()
|
|
|
+ queue, exists := queues[channelId]
|
|
|
+ if !exists {
|
|
|
+ queue = NewQueue()
|
|
|
+ queues[channelId] = queue
|
|
|
+ }
|
|
|
+
|
|
|
+ ch := queue.ch
|
|
|
+
|
|
|
+ queue.addProducer()
|
|
|
+ mut.Unlock()
|
|
|
+
|
|
|
+ reader, writer := io.Pipe()
|
|
|
+
|
|
|
+ contentLength, err := strconv.Atoi(r.Header.Get("Content-Length"))
|
|
|
+ if err != nil {
|
|
|
+ contentLength = 0
|
|
|
+ }
|
|
|
+
|
|
|
+ transfer := Transfer{
|
|
|
+ reader: reader,
|
|
|
+ size: contentLength,
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case ch <- transfer:
|
|
|
+ io.Copy(writer, r.Body)
|
|
|
+ case <-r.Context().Done():
|
|
|
+ }
|
|
|
+
|
|
|
+ writer.Close()
|
|
|
+
|
|
|
+ mut.Lock()
|
|
|
+ queue.remProducer()
|
|
|
+ if queue.isEmpty() {
|
|
|
+ delete(queues, channelId)
|
|
|
+ }
|
|
|
+ mut.Unlock()
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ health := func(w http.ResponseWriter, r *http.Request) {
|
|
|
+ w.WriteHeader(http.StatusOK)
|
|
|
+ w.Write([]byte("OK"))
|
|
|
+ }
|
|
|
+
|
|
|
+ http.HandleFunc("/health", health)
|
|
|
+ http.HandleFunc("/", handle)
|
|
|
+
|
|
|
+ err := http.ListenAndServe(":8080", nil)
|
|
|
+ if err != nil {
|
|
|
+ log.Println(err)
|
|
|
+ }
|
|
|
+
|
|
|
+}
|