diff --git a/VERSION b/VERSION deleted file mode 100644 index 879b416..0000000 --- a/VERSION +++ /dev/null @@ -1 +0,0 @@ -2.1 diff --git a/cmd/go.mod b/cmd/go.mod deleted file mode 100644 index 8ed1ef0..0000000 --- a/cmd/go.mod +++ /dev/null @@ -1,7 +0,0 @@ -module main - -go 1.16 - -require ( - git.nxdomain.nl/mattijs/pathway v0.1.1 -) diff --git a/cmd/main.go b/cmd/main.go deleted file mode 100644 index 548fd19..0000000 --- a/cmd/main.go +++ /dev/null @@ -1,48 +0,0 @@ -package main - -import ( - "log" - "net/http" - - "git.nxdomain.nl/mattijs/pathway" -) - -var ( - // variables to set during build-time - debugging = "" - version = "0.0-undefined" - buildtime = "0000-00-00T00:00:00+0000" -) - -func okHandler(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Write([]byte("ok")) -} - -func emptyHandler(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) -} - -func main() { - info("pathway version:%s buildtime:%s", version, buildtime) - - http.HandleFunc("/health", okHandler) - http.HandleFunc("/favicon.ico", emptyHandler) - http.HandleFunc("/robots.txt", emptyHandler) - http.HandleFunc("/", pathway.pathHandler) - - err := http.ListenAndServe(":8080", nil) - if err != nil { - info("%s", err.Error()) - } -} - -func info(msg string, args ...interface{}) { - log.Printf("INFO | "+msg, args...) -} - -func debug(msg string, args ...interface{}) { - if len(debugging) > 0 { - log.Printf("DEBUG | "+msg, args...) - } -} diff --git a/docker/Dockerfile b/docker/Dockerfile deleted file mode 100644 index e5d09d7..0000000 --- a/docker/Dockerfile +++ /dev/null @@ -1,15 +0,0 @@ -FROM golang:alpine AS builder -RUN apk --no-cache --no-progress add make upx -WORKDIR /go/src/lolcathost/pathway/ - -COPY src/*.go VERSION ./ -RUN echo Building && \ - env CGO_ENABLED=0 GO111MODULE=off go build -trimpath -ldflags="all=-s -w -buildid= -X main.version=$(cat VERSION) -X main.buildtime=$(date +%FT%T%z)" && \ - echo Compressing && \ - upx pathway > /dev/null - -FROM scratch -COPY --from=builder /go/src/lolcathost/pathway/pathway /pathway - -EXPOSE 8080 -CMD ["/pathway"] diff --git a/get.go b/get.go index 2f2dea4..0e4d937 100644 --- a/get.go +++ b/get.go @@ -6,19 +6,22 @@ import ( "sync/atomic" ) -func handleGet(pathID string, w http.ResponseWriter, r *http.Request) { - queue := &Queue{ch: make(chan Transfer)} - if p, loaded := paths.LoadOrStore(pathID, queue); loaded { - queue = p.(*Queue) - debug("%s [GET] Loads path", pathID) - } else { - debug("%s [GET] Created path", pathID) +// handleGet simply attempts to relay whatever data is received from the first +// available POST request into this GET request on the same path. +// if there is no POST waiting, it will block indefinitely until it either finds +// a POST or the connection is cancelled. +func (pw *Pathway) handleGet(pathID string, w http.ResponseWriter, r *http.Request) { + // GET creates a new path preemptively + queue := &transferQueue{ch: make(chan httpTransfer)} + if p, loaded := pw.LoadOrStore(pathID, queue); loaded { + // GET successfully loads a path instead + queue = p.(*transferQueue) } atomic.AddInt32(&queue.gets, 1) select { + // either a POST presents itself on the other side of the queue's channel case transfer := <-queue.ch: - debug("%s [GET] Reads from path", pathID) if transfer.contentlength != "" { w.Header().Set("Content-Length", transfer.contentlength) } @@ -26,17 +29,17 @@ func handleGet(pathID string, w http.ResponseWriter, r *http.Request) { if err != nil { transfer.reader.Close() } - debug("%s [GET] Sends done", pathID) close(transfer.done) + // or the request is cancelled, and this GET can continue to finish case <-r.Context().Done(): - debug("%s [GET] Cancels path", pathID) } + // if this was indeed the last GET and there are no more new POST requests + // waiting, then go ahead and close the queue and remove the path if atomic.AddInt32(&queue.gets, -1) <= 0 { if atomic.LoadInt32(&queue.posts) <= 0 { - paths.Delete(pathID) - debug("%s [GET] Removes path", pathID) + pw.Delete(pathID) + close(queue.ch) } } - info("%s [GET] Finishes", pathID) } diff --git a/go.mod b/go.mod index 8656eea..cad7eb8 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,5 @@ module git.nxdomain.nl/mattijs/pathway go 1.16 + +retract [v0.1.0, v0.1.1] // Should never have been published diff --git a/pathway.go b/pathway.go index 8cbd533..9675cd1 100644 --- a/pathway.go +++ b/pathway.go @@ -2,96 +2,82 @@ package pathway import ( "io" - "log" "net/http" "strings" "sync" - // "nhooyr.io/websocket" ) -var ( - // actual business end of the device - paths *sync.Map - debugging string -) - -func init() { - paths = &sync.Map{} -} - -// Transfer holds a single tranferable connection to be read -type Transfer struct { +// httpTransfer holds a single tranferable connection to be read. +// created by POSTs or PUBs, these are read from by GET requests. +// once the reader is done, it can go and close the done channel. +type httpTransfer struct { reader io.ReadCloser done chan struct{} contentlength string } -// NewTransfer returns a ready to use Transfer and it's writer -func NewTransfer(contentlength string) (transfer Transfer, writer io.WriteCloser) { +// newTransfer returns a ready to use httpTransfer and it's writer. +func newTransfer(contentlength string) (transfer httpTransfer, writer io.WriteCloser) { var reader io.ReadCloser reader, writer = io.Pipe() - transfer = Transfer{ - reader: reader.(io.ReadCloser), + transfer = httpTransfer{ + reader: reader, contentlength: contentlength, done: make(chan struct{}), } - return } -// Queue is where posts and gets can exchange transfers -type Queue struct { - ch chan Transfer +// transferQueue holds all pending connections for a particular path stored in +// the Pathway map. +type transferQueue struct { + ch chan httpTransfer posts int32 gets int32 } -func PathHandler(w http.ResponseWriter, r *http.Request) { - pathID := r.URL.Path +// Pathway keeps track of and allows one to handle all the different paths. +type Pathway struct { + sync.Map +} - if len(pathID) < 2 { - w.WriteHeader(400) - w.Write([]byte("path to short")) - return - } +// New returns a usable Pathway which HTTP Handler can be used to allow clients to +// create and connect to eachother via arbitraty paths. +func New() *Pathway { + return new(Pathway) +} + +// ServeHTTP is a handler for incoming GET or POST request. It directs them to the +// proper path: either by creating a new one or connecting it to an existing one. +func (pw *Pathway) ServeHTTP(w http.ResponseWriter, r *http.Request) { + pathID := r.URL.Path if h := r.Header.Get("Connection"); len(h) > 0 && strings.ToLower(h) == "upgrade" { if h = r.Header.Get("Upgrade"); len(h) > 0 && strings.ToLower(h) == "websocket" { - // c, err := websocket.Accept(w, r, nil) - // if err != nil { - // info("Websocket not accepted: %+s", err.Error()) - // return - // } - // - // defer c.Close(websocket.StatusInternalError, "the sky is falling") - info("Websocket connections not supported yet. Headers: %+v", r.Header) - // return + // websockets are special, and not supported + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("websockets are not supported")) + return } } - if r.Method == "GET" { - info("%s [GET] Connected", pathID) - handleGet(pathID, w, r) - } else if r.Method == "POST" { - info("%s [POST] Connected", pathID) + if r.Method == http.MethodGet { + // handle an incoming GET request to connect to either POST or PUB + pw.handleGet(pathID, w, r) + } else if r.Method == http.MethodPost { if h := r.Header.Get("X-Pathway"); h == "pubsub" { - debug("%s [PUBSUB] Upgrade POST to PUBSUB", pathID) - handlePubSub(pathID, r) + // handle an incoming PUB part of a PUBSUB. + pw.handlePubSub(pathID, r) } else { - handlePost(pathID, r) + // handle an incoming POST. + pw.handlePost(pathID, r) } } else { - info("Unhandled request type: '%s'", r.Method) - } -} - -func info(msg string, args ...interface{}) { - log.Printf("INFO | "+msg, args...) -} - -func debug(msg string, args ...interface{}) { - if len(debugging) > 0 { - log.Printf("DEBUG | "+msg, args...) + // if it's not a get or a post, then let the client know other methods + // are not supported. + w.Header().Set("Allow", http.MethodGet+", "+http.MethodPost) + w.WriteHeader(http.StatusMethodNotAllowed) + w.Write([]byte(r.Method + " is not supported")) } } diff --git a/post.go b/post.go index 9985fd5..314ea50 100644 --- a/post.go +++ b/post.go @@ -6,97 +6,116 @@ import ( "sync/atomic" ) -func handlePubSub(pathID string, r *http.Request) { - var queue *Queue - if p, loaded := paths.Load(pathID); loaded { - queue = p.(*Queue) - debug("%s [PUBSUB] Loads path", pathID) +// handlePubSub regards a POST as PUB and all GETs as SUBs. the content from the +// PUB will be 'broadcasted' to all SUBs waiting for content. +// a PUB does not guarantee any sort of delivery. trying to send to a path +// which already has other regular POSTs waiting, or one without any SUBs will +// still return as successful. +func (pw *Pathway) handlePubSub(pathID string, r *http.Request) { + // PUBSUBs do not create a new paths preemptively. without an existing path + // which exclusively has SUBs waiting, a PUB would not make sense. + var queue *transferQueue + if p, loaded := pw.Load(pathID); loaded { + // PUB successfully loads a SUB queue. + queue = p.(*transferQueue) } else { - debug("%s [PUBSUB] No subs for path", pathID) + // no loaded queue with SUBs, nothing to PUB. return } if atomic.AddInt32(&queue.posts, 1) > 1 { - debug("%s [PUBSUB] Path already has posts", pathID) - atomic.AddInt32(&queue.posts, -1) + // not a good idea to publish if there are posts waiting or in progress. + // since the other POST might have finished already, it is still neccesary + // to check if all POSTs and GETs are finished in order to delete the + // path and close the queue's channel. + if atomic.AddInt32(&queue.posts, -1) <= 0 { + if atomic.LoadInt32(&queue.gets) <= 0 { + pw.Delete(pathID) + close(queue.ch) + } + } return } var masterWriter io.WriteCloser - var done bool - for !done { - transfer, writer := NewTransfer(r.Header.Get("Content-Length")) + var saturated bool + for !saturated { + // create a transfer and writer for any more waiting SUB requests. + transfer, writer := newTransfer(r.Header.Get("Content-Length")) select { + // try to put the transfer in a queue channel and add the writer + // to it to the masterWriter. case queue.ch <- transfer: - debug("%s [PUBSUB] Adds sub to pub", pathID) if masterWriter == nil { masterWriter = BroadcastWriter(writer) } else { masterWriter = BroadcastWriter(masterWriter, writer) } default: - debug("%s [PUBSUB] Saturated path", pathID) - done = true + // if no more SUB requests are waiting, then getting a channel from + // the queue won't work. this PUB is saturated. + saturated = true } } - n, err := io.Copy(masterWriter, r.Body) - debug("%s [PUBSUB] Sends %d bytes", pathID, n) - if err != nil { - debug("%s [PUBSUB] Has error: %s", pathID, err.Error()) - } + // copy all incoming data as efficiently as possible over the broadcastWriter. + io.Copy(masterWriter, r.Body) masterWriter.Close() r.Body.Close() + // if this was indeed the last POST and there are no more new GET requests + // waiting, then go ahead and close the queue and remove the path. if atomic.AddInt32(&queue.posts, -1) <= 0 { if atomic.LoadInt32(&queue.gets) <= 0 { - paths.Delete(pathID) + pw.Delete(pathID) close(queue.ch) - debug("%s [PUBSUB] Removes path", pathID) } } - info("%s [PUBSUB] Finishes", pathID) } -func handlePost(pathID string, r *http.Request) { - queue := &Queue{ch: make(chan Transfer)} - if p, loaded := paths.LoadOrStore(pathID, queue); loaded { - queue = p.(*Queue) - debug("%s [POST] Loads path", pathID) - } else { - debug("%s [POST] Creates path", pathID) +// handlePost simply attempts to receive whatever data is sent by the POST +// request and relay it to the first available GET request on the same path. +// if there is no GET waiting, it will block indefinitely until it either finds +// a GET or the connection is cancelled. +func (pw *Pathway) handlePost(pathID string, r *http.Request) { + // POST creates a new path preemptively. + queue := &transferQueue{ch: make(chan httpTransfer)} + if p, loaded := pw.LoadOrStore(pathID, queue); loaded { + // POST successfully loads a path instead. + queue = p.(*transferQueue) } atomic.AddInt32(&queue.posts, 1) - transfer, writer := NewTransfer(r.Header.Get("Content-Length")) + // create a transfer and writer for the next or waiting GET request. + transfer, writer := newTransfer(r.Header.Get("Content-Length")) + // start a go routing to start reading from the body. else a cancelled + // POST is never correctly detected and this hangs forever. go func() { - n, err := io.Copy(writer, r.Body) - debug("%s [POST] Sends %d bytes", pathID, n) - if err != nil { - debug("%s [POST] Has error: %s", pathID, err.Error()) - } + // copy all incoming data efficiently once a GET picks up the transfer. + io.Copy(writer, r.Body) writer.Close() r.Body.Close() }() select { + // either a GET presents itself on the other side of the queue's channel. case queue.ch <- transfer: - debug("%s [POST] Writes to path", pathID) + // or the request is cancelled, and this POST needs to close the transfer + // lest it be waiting for the done channel to close forever. case <-r.Context().Done(): - debug("%s [POST] Cancels path", pathID) close(transfer.done) } - debug("%s [POST] Waits for done", pathID) + // wait here for the GET to signal it is done reading everything from the POST <-transfer.done + // if this was indeed the last POST and there are no more new GET requests + // waiting, then go ahead and close the queue and remove the path. if atomic.AddInt32(&queue.posts, -1) <= 0 { if atomic.LoadInt32(&queue.gets) <= 0 { - paths.Delete(pathID) + pw.Delete(pathID) close(queue.ch) - debug("%s [POST] Removes path", pathID) } } - info("%s [POST] Finishes", pathID) }