From 172abc65f72aec22b4ed381f972c66ce86c7678b Mon Sep 17 00:00:00 2001 From: teizz Date: Wed, 14 Apr 2021 09:53:58 +0200 Subject: [PATCH] initial POC commit --- Dockerfile | 15 ++++++ VERSION | 1 + main.go | 150 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 166 insertions(+) create mode 100644 Dockerfile create mode 100644 VERSION create mode 100644 main.go diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..7911ce9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM golang:alpine AS builder +RUN apk --no-cache --no-progress add make upx +WORKDIR /go/src/lolcathost/pathway/ + +COPY *.go VERSION ./ +RUN echo Building && \ + env CGO_ENABLED=0 go build -trimpath -ldflags="all=-s -w -buildid= -X main.version=$(cat VERSION) -X main.buildtime=$(date +%FT%T%z)" && \ + echo Compressing && \ + upx pathway > /dev/null + +FROM scratch +COPY --from=builder /go/src/lolcathost/pathway/pathway /pathway + +EXPOSE 8080 +CMD ["/pathway"] diff --git a/VERSION b/VERSION new file mode 100644 index 0000000..fa3b0e1 --- /dev/null +++ b/VERSION @@ -0,0 +1 @@ +1.0-beta diff --git a/main.go b/main.go new file mode 100644 index 0000000..707adcc --- /dev/null +++ b/main.go @@ -0,0 +1,150 @@ +package main + +import ( + "fmt" + "io" + "log" + "net/http" + "strconv" + "sync" +) + +var ( + version = "0.0-undefined" + buildtime = "0000-00-00T00:00:00+0000" + queues = make(map[string]*Queue) + mut = &sync.Mutex{} +) + +type Transfer struct { + reader io.Reader + size int +} + +type Queue struct { + ch chan Transfer + producers int + consumers int +} + +func NewQueue() *Queue { + return &Queue{ + ch: make(chan Transfer), + } +} + +func (q *Queue) addConsumer() { + q.consumers = q.consumers + 1 +} + +func (q *Queue) remConsumer() { + q.consumers = q.consumers - 1 +} + +func (q *Queue) addProducer() { + q.producers = q.producers + 1 +} + +func (q *Queue) remProducer() { + q.producers = q.producers - 1 +} + +func (q *Queue) isEmpty() bool { + return q.producers == 0 && q.consumers == 0 +} + +func main() { + log.Printf("pathway version:%s buildtime:%s", version, buildtime) + + handle := func(w http.ResponseWriter, r *http.Request) { + channelId := r.URL.Path + + if r.Method == "GET" { + log.Printf("GET: %s", channelId) + mut.Lock() + queue, exists := queues[channelId] + if !exists { + queue = NewQueue() + queues[channelId] = queue + } + + ch := queue.ch + + queue.addConsumer() + mut.Unlock() + + select { + case transfer := <-ch: + w.Header().Set("Content-Length", fmt.Sprintf("%d", transfer.size)) + _, err := io.Copy(w, transfer.reader) + if err != nil { + if closer, ok := transfer.reader.(io.Closer); ok { + closer.Close() + } + } + case <-r.Context().Done(): + } + + mut.Lock() + queue.remConsumer() + if queue.isEmpty() { + delete(queues, channelId) + } + mut.Unlock() + } else { + log.Printf("POST: %s", channelId) + mut.Lock() + queue, exists := queues[channelId] + if !exists { + queue = NewQueue() + queues[channelId] = queue + } + + ch := queue.ch + + queue.addProducer() + mut.Unlock() + + reader, writer := io.Pipe() + + contentLength, err := strconv.Atoi(r.Header.Get("Content-Length")) + if err != nil { + contentLength = 0 + } + + transfer := Transfer{ + reader: reader, + size: contentLength, + } + + select { + case ch <- transfer: + io.Copy(writer, r.Body) + case <-r.Context().Done(): + } + + writer.Close() + + mut.Lock() + queue.remProducer() + if queue.isEmpty() { + delete(queues, channelId) + } + mut.Unlock() + } + } + + health := func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) + } + + http.HandleFunc("/health", health) + http.HandleFunc("/", handle) + + err := http.ListenAndServe(":8080", nil) + if err != nil { + log.Println(err) + } + +}