Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c3e6bf0987 | ||
|
|
53013812a9 | ||
|
|
5c38e74522 | ||
|
|
59111873df | ||
|
|
979ef108f4 |
15
Dockerfile
15
Dockerfile
@@ -1,15 +0,0 @@
|
|||||||
FROM golang:alpine AS builder
|
|
||||||
RUN apk --no-cache --no-progress add make upx
|
|
||||||
WORKDIR /go/src/lolcathost/pathway/
|
|
||||||
|
|
||||||
COPY src/*.go VERSION ./
|
|
||||||
RUN echo Building && \
|
|
||||||
env CGO_ENABLED=0 GO111MODULE=off go build -trimpath -ldflags="all=-s -w -buildid= -X main.version=$(cat VERSION) -X main.buildtime=$(date +%FT%T%z)" && \
|
|
||||||
echo Compressing && \
|
|
||||||
upx pathway > /dev/null
|
|
||||||
|
|
||||||
FROM scratch
|
|
||||||
COPY --from=builder /go/src/lolcathost/pathway/pathway /pathway
|
|
||||||
|
|
||||||
EXPOSE 8080
|
|
||||||
CMD ["/pathway"]
|
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package main
|
package pathway
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
16
cmd/Dockerfile
Normal file
16
cmd/Dockerfile
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
FROM golang:alpine AS builder
|
||||||
|
RUN apk --no-cache --no-progress add upx
|
||||||
|
WORKDIR /go/build
|
||||||
|
|
||||||
|
COPY . .
|
||||||
|
RUN echo Building && \
|
||||||
|
go get -u && \
|
||||||
|
env CGO_ENABLED=0 go build -trimpath -ldflags="all=-s -w -buildid= -X main.version=v$(cat VERSION) -X main.buildtime=$(date +%FT%T%z)" -o pathway && \
|
||||||
|
echo Compressing && \
|
||||||
|
upx pathway > /dev/null
|
||||||
|
|
||||||
|
FROM scratch
|
||||||
|
COPY --from=builder /go/build/pathway /pathway
|
||||||
|
|
||||||
|
EXPOSE 8080
|
||||||
|
CMD ["/pathway"]
|
||||||
1
cmd/VERSION
Normal file
1
cmd/VERSION
Normal file
@@ -0,0 +1 @@
|
|||||||
|
3.0
|
||||||
3
cmd/build.sh
Executable file
3
cmd/build.sh
Executable file
@@ -0,0 +1,3 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
export CGO_ENABLED=0
|
||||||
|
go build -trimpath -ldflags="all=-s -w -buildid= -X main.version=v$(cat VERSION) -X main.buildtime=$(date +%FT%T%z)" -o pathway
|
||||||
7
cmd/go.mod
Normal file
7
cmd/go.mod
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
module main
|
||||||
|
|
||||||
|
go 1.16
|
||||||
|
|
||||||
|
require (
|
||||||
|
git.nxdomain.nl/mattijs/pathway v0.1.2
|
||||||
|
)
|
||||||
2
cmd/go.sum
Normal file
2
cmd/go.sum
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
git.nxdomain.nl/mattijs/pathway v0.1.2 h1:EXt0JaDlS4gPvQgGcyslYaLgRC2/AZPC+/8VDba3+bY=
|
||||||
|
git.nxdomain.nl/mattijs/pathway v0.1.2/go.mod h1:3FllmtSFlVdPPdXmtL+N7V0qdaTtYB/zLyM8uQr+50o=
|
||||||
50
cmd/main.go
Normal file
50
cmd/main.go
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"git.nxdomain.nl/mattijs/pathway"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// variables to set during build-time
|
||||||
|
debugging = ""
|
||||||
|
version = "0.0-undefined"
|
||||||
|
buildtime = "0000-00-00T00:00:00+0000"
|
||||||
|
)
|
||||||
|
|
||||||
|
func okHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
w.Write([]byte("ok"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func emptyHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
info("pathway version:%s buildtime:%s", version, buildtime)
|
||||||
|
|
||||||
|
paths := pathway.New()
|
||||||
|
|
||||||
|
http.HandleFunc("/health", okHandler)
|
||||||
|
http.HandleFunc("/favicon.ico", emptyHandler)
|
||||||
|
http.HandleFunc("/robots.txt", emptyHandler)
|
||||||
|
http.HandleFunc("/", paths.ServeHTTP)
|
||||||
|
|
||||||
|
err := http.ListenAndServe(":8080", nil)
|
||||||
|
if err != nil {
|
||||||
|
info("%s", err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func info(msg string, args ...interface{}) {
|
||||||
|
log.Printf("INFO | "+msg, args...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func debug(msg string, args ...interface{}) {
|
||||||
|
if len(debugging) > 0 {
|
||||||
|
log.Printf("DEBUG | "+msg, args...)
|
||||||
|
}
|
||||||
|
}
|
||||||
45
get.go
Normal file
45
get.go
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
package pathway
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
// handleGet simply attempts to relay whatever data is received from the first
|
||||||
|
// available POST request into this GET request on the same path.
|
||||||
|
// if there is no POST waiting, it will block indefinitely until it either finds
|
||||||
|
// a POST or the connection is cancelled.
|
||||||
|
func (pw *Pathway) handleGet(pathID string, w http.ResponseWriter, r *http.Request) {
|
||||||
|
// GET creates a new path preemptively
|
||||||
|
queue := &transferQueue{ch: make(chan httpTransfer)}
|
||||||
|
if p, loaded := pw.LoadOrStore(pathID, queue); loaded {
|
||||||
|
// GET successfully loads a path instead
|
||||||
|
queue = p.(*transferQueue)
|
||||||
|
}
|
||||||
|
atomic.AddInt32(&queue.gets, 1)
|
||||||
|
|
||||||
|
select {
|
||||||
|
// either a POST presents itself on the other side of the queue's channel
|
||||||
|
case transfer := <-queue.ch:
|
||||||
|
if transfer.contentlength != "" {
|
||||||
|
w.Header().Set("Content-Length", transfer.contentlength)
|
||||||
|
}
|
||||||
|
_, err := io.Copy(w, transfer.reader)
|
||||||
|
if err != nil {
|
||||||
|
transfer.reader.Close()
|
||||||
|
}
|
||||||
|
close(transfer.done)
|
||||||
|
// or the request is cancelled, and this GET can continue to finish
|
||||||
|
case <-r.Context().Done():
|
||||||
|
}
|
||||||
|
|
||||||
|
// if this was indeed the last GET and there are no more new POST requests
|
||||||
|
// waiting, then go ahead and close the queue and remove the path
|
||||||
|
if atomic.AddInt32(&queue.gets, -1) <= 0 {
|
||||||
|
if atomic.LoadInt32(&queue.posts) <= 0 {
|
||||||
|
pw.Delete(pathID)
|
||||||
|
close(queue.ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
5
go.mod
Normal file
5
go.mod
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
module git.nxdomain.nl/mattijs/pathway
|
||||||
|
|
||||||
|
go 1.16
|
||||||
|
|
||||||
|
retract [v0.1.0, v0.1.1] // Should never have been published
|
||||||
83
pathway.go
Normal file
83
pathway.go
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
package pathway
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// httpTransfer holds a single tranferable connection to be read.
|
||||||
|
// created by POSTs or PUBs, these are read from by GET requests.
|
||||||
|
// once the reader is done, it can go and close the done channel.
|
||||||
|
type httpTransfer struct {
|
||||||
|
reader io.ReadCloser
|
||||||
|
done chan struct{}
|
||||||
|
contentlength string
|
||||||
|
}
|
||||||
|
|
||||||
|
// newTransfer returns a ready to use httpTransfer and it's writer.
|
||||||
|
func newTransfer(contentlength string) (transfer httpTransfer, writer io.WriteCloser) {
|
||||||
|
var reader io.ReadCloser
|
||||||
|
reader, writer = io.Pipe()
|
||||||
|
|
||||||
|
transfer = httpTransfer{
|
||||||
|
reader: reader,
|
||||||
|
contentlength: contentlength,
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// transferQueue holds all pending connections for a particular path stored in
|
||||||
|
// the Pathway map.
|
||||||
|
type transferQueue struct {
|
||||||
|
ch chan httpTransfer
|
||||||
|
posts int32
|
||||||
|
gets int32
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pathway keeps track of and allows one to handle all the different paths.
|
||||||
|
type Pathway struct {
|
||||||
|
sync.Map
|
||||||
|
}
|
||||||
|
|
||||||
|
// New returns a usable Pathway which HTTP Handler can be used to allow clients to
|
||||||
|
// create and connect to eachother via arbitraty paths.
|
||||||
|
func New() *Pathway {
|
||||||
|
return new(Pathway)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServeHTTP is a handler for incoming GET or POST request. It directs them to the
|
||||||
|
// proper path: either by creating a new one or connecting it to an existing one.
|
||||||
|
func (pw *Pathway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
pathID := r.URL.Path
|
||||||
|
|
||||||
|
if h := r.Header.Get("Connection"); len(h) > 0 && strings.ToLower(h) == "upgrade" {
|
||||||
|
if h = r.Header.Get("Upgrade"); len(h) > 0 && strings.ToLower(h) == "websocket" {
|
||||||
|
// websockets are special, and not supported
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
w.Write([]byte("websockets are not supported"))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.Method == http.MethodGet {
|
||||||
|
// handle an incoming GET request to connect to either POST or PUB
|
||||||
|
pw.handleGet(pathID, w, r)
|
||||||
|
} else if r.Method == http.MethodPost {
|
||||||
|
if h := r.Header.Get("X-Pathway"); h == "pubsub" {
|
||||||
|
// handle an incoming PUB part of a PUBSUB.
|
||||||
|
pw.handlePubSub(pathID, r)
|
||||||
|
} else {
|
||||||
|
// handle an incoming POST.
|
||||||
|
pw.handlePost(pathID, r)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// if it's not a get or a post, then let the client know other methods
|
||||||
|
// are not supported.
|
||||||
|
w.Header().Set("Allow", http.MethodGet+", "+http.MethodPost)
|
||||||
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||||
|
w.Write([]byte(r.Method + " is not supported"))
|
||||||
|
}
|
||||||
|
}
|
||||||
121
post.go
Normal file
121
post.go
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
package pathway
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
// handlePubSub regards a POST as PUB and all GETs as SUBs. the content from the
|
||||||
|
// PUB will be 'broadcasted' to all SUBs waiting for content.
|
||||||
|
// a PUB does not guarantee any sort of delivery. trying to send to a path
|
||||||
|
// which already has other regular POSTs waiting, or one without any SUBs will
|
||||||
|
// still return as successful.
|
||||||
|
func (pw *Pathway) handlePubSub(pathID string, r *http.Request) {
|
||||||
|
// PUBSUBs do not create a new paths preemptively. without an existing path
|
||||||
|
// which exclusively has SUBs waiting, a PUB would not make sense.
|
||||||
|
var queue *transferQueue
|
||||||
|
if p, loaded := pw.Load(pathID); loaded {
|
||||||
|
// PUB successfully loads a SUB queue.
|
||||||
|
queue = p.(*transferQueue)
|
||||||
|
} else {
|
||||||
|
// no loaded queue with SUBs, nothing to PUB.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if atomic.AddInt32(&queue.posts, 1) > 1 {
|
||||||
|
// not a good idea to publish if there are posts waiting or in progress.
|
||||||
|
// since the other POST might have finished already, it is still neccesary
|
||||||
|
// to check if all POSTs and GETs are finished in order to delete the
|
||||||
|
// path and close the queue's channel.
|
||||||
|
if atomic.AddInt32(&queue.posts, -1) <= 0 {
|
||||||
|
if atomic.LoadInt32(&queue.gets) <= 0 {
|
||||||
|
pw.Delete(pathID)
|
||||||
|
close(queue.ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var masterWriter io.WriteCloser
|
||||||
|
var saturated bool
|
||||||
|
for !saturated {
|
||||||
|
// create a transfer and writer for any more waiting SUB requests.
|
||||||
|
transfer, writer := newTransfer(r.Header.Get("Content-Length"))
|
||||||
|
select {
|
||||||
|
// try to put the transfer in a queue channel and add the writer
|
||||||
|
// to it to the masterWriter.
|
||||||
|
case queue.ch <- transfer:
|
||||||
|
if masterWriter == nil {
|
||||||
|
masterWriter = BroadcastWriter(writer)
|
||||||
|
} else {
|
||||||
|
masterWriter = BroadcastWriter(masterWriter, writer)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// if no more SUB requests are waiting, then getting a channel from
|
||||||
|
// the queue won't work. this PUB is saturated.
|
||||||
|
saturated = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// copy all incoming data as efficiently as possible over the broadcastWriter.
|
||||||
|
io.Copy(masterWriter, r.Body)
|
||||||
|
masterWriter.Close()
|
||||||
|
r.Body.Close()
|
||||||
|
|
||||||
|
// if this was indeed the last POST and there are no more new GET requests
|
||||||
|
// waiting, then go ahead and close the queue and remove the path.
|
||||||
|
if atomic.AddInt32(&queue.posts, -1) <= 0 {
|
||||||
|
if atomic.LoadInt32(&queue.gets) <= 0 {
|
||||||
|
pw.Delete(pathID)
|
||||||
|
close(queue.ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handlePost simply attempts to receive whatever data is sent by the POST
|
||||||
|
// request and relay it to the first available GET request on the same path.
|
||||||
|
// if there is no GET waiting, it will block indefinitely until it either finds
|
||||||
|
// a GET or the connection is cancelled.
|
||||||
|
func (pw *Pathway) handlePost(pathID string, r *http.Request) {
|
||||||
|
// POST creates a new path preemptively.
|
||||||
|
queue := &transferQueue{ch: make(chan httpTransfer)}
|
||||||
|
if p, loaded := pw.LoadOrStore(pathID, queue); loaded {
|
||||||
|
// POST successfully loads a path instead.
|
||||||
|
queue = p.(*transferQueue)
|
||||||
|
}
|
||||||
|
atomic.AddInt32(&queue.posts, 1)
|
||||||
|
|
||||||
|
// create a transfer and writer for the next or waiting GET request.
|
||||||
|
transfer, writer := newTransfer(r.Header.Get("Content-Length"))
|
||||||
|
|
||||||
|
// start a go routing to start reading from the body. else a cancelled
|
||||||
|
// POST is never correctly detected and this hangs forever.
|
||||||
|
go func() {
|
||||||
|
// copy all incoming data efficiently once a GET picks up the transfer.
|
||||||
|
io.Copy(writer, r.Body)
|
||||||
|
writer.Close()
|
||||||
|
r.Body.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
// either a GET presents itself on the other side of the queue's channel.
|
||||||
|
case queue.ch <- transfer:
|
||||||
|
// or the request is cancelled, and this POST needs to close the transfer
|
||||||
|
// lest it be waiting for the done channel to close forever.
|
||||||
|
case <-r.Context().Done():
|
||||||
|
close(transfer.done)
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait here for the GET to signal it is done reading everything from the POST
|
||||||
|
<-transfer.done
|
||||||
|
|
||||||
|
// if this was indeed the last POST and there are no more new GET requests
|
||||||
|
// waiting, then go ahead and close the queue and remove the path.
|
||||||
|
if atomic.AddInt32(&queue.posts, -1) <= 0 {
|
||||||
|
if atomic.LoadInt32(&queue.gets) <= 0 {
|
||||||
|
pw.Delete(pathID)
|
||||||
|
close(queue.ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
42
src/get.go
42
src/get.go
@@ -1,42 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"sync/atomic"
|
|
||||||
)
|
|
||||||
|
|
||||||
func handleGet(pathID string, w http.ResponseWriter, r *http.Request) {
|
|
||||||
queue := &Queue{ch: make(chan Transfer)}
|
|
||||||
if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
|
|
||||||
queue = p.(*Queue)
|
|
||||||
debug("%s [GET] Loads path", pathID)
|
|
||||||
} else {
|
|
||||||
debug("%s [GET] Created path", pathID)
|
|
||||||
}
|
|
||||||
atomic.AddInt32(&queue.gets, 1)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case transfer := <-queue.ch:
|
|
||||||
debug("%s [GET] Reads from path", pathID)
|
|
||||||
if transfer.contentlength != "" {
|
|
||||||
w.Header().Set("Content-Length", transfer.contentlength)
|
|
||||||
}
|
|
||||||
_, err := io.Copy(w, transfer.reader)
|
|
||||||
if err != nil {
|
|
||||||
transfer.reader.Close()
|
|
||||||
}
|
|
||||||
debug("%s [GET] Sends done", pathID)
|
|
||||||
close(transfer.done)
|
|
||||||
case <-r.Context().Done():
|
|
||||||
debug("%s [GET] Cancels path", pathID)
|
|
||||||
}
|
|
||||||
|
|
||||||
if atomic.AddInt32(&queue.gets, -1) <= 0 {
|
|
||||||
if atomic.LoadInt32(&queue.posts) <= 0 {
|
|
||||||
paths.Delete(pathID)
|
|
||||||
debug("%s [GET] Removes path", pathID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
info("%s [GET] Finishes", pathID)
|
|
||||||
}
|
|
||||||
111
src/main.go
111
src/main.go
@@ -1,111 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"log"
|
|
||||||
"net/http"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
// variables to set during build-time
|
|
||||||
debugging = ""
|
|
||||||
version = "0.0-undefined"
|
|
||||||
buildtime = "0000-00-00T00:00:00+0000"
|
|
||||||
|
|
||||||
// actual business end of the device
|
|
||||||
paths = &sync.Map{}
|
|
||||||
)
|
|
||||||
|
|
||||||
// Transfer holds a single tranferable connection to be read
|
|
||||||
type Transfer struct {
|
|
||||||
reader io.ReadCloser
|
|
||||||
done chan struct{}
|
|
||||||
contentlength string
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewTransfer returns a ready to use Transfer and it's writer
|
|
||||||
func NewTransfer(contentlength string) (transfer Transfer, writer io.WriteCloser) {
|
|
||||||
var reader io.ReadCloser
|
|
||||||
reader, writer = io.Pipe()
|
|
||||||
|
|
||||||
transfer = Transfer{
|
|
||||||
reader: reader.(io.ReadCloser),
|
|
||||||
contentlength: contentlength,
|
|
||||||
done: make(chan struct{}),
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Queue is where posts and gets can exchange transfers
|
|
||||||
type Queue struct {
|
|
||||||
ch chan Transfer
|
|
||||||
posts int32
|
|
||||||
gets int32
|
|
||||||
}
|
|
||||||
|
|
||||||
func pathHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
pathID := r.URL.Path
|
|
||||||
|
|
||||||
if len(pathID) < 2 {
|
|
||||||
w.WriteHeader(400)
|
|
||||||
w.Write([]byte("path to short"))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if h := r.Header.Get("Connection"); len(h) > 0 && strings.ToLower(h) == "upgrade" {
|
|
||||||
if h = r.Header.Get("Upgrade"); len(h) > 0 && strings.ToLower(h) == "websocket" {
|
|
||||||
info("Websocket connections not supported yet. Headers: %+v", r.Header)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if r.Method == "GET" {
|
|
||||||
info("%s [GET] Connected", pathID)
|
|
||||||
handleGet(pathID, w, r)
|
|
||||||
} else if r.Method == "POST" {
|
|
||||||
info("%s [POST] Connected", pathID)
|
|
||||||
if h := r.Header.Get("X-Pathway"); h == "pubsub" {
|
|
||||||
debug("%s [PUBSUB] Upgrade POST to PUBSUB", pathID)
|
|
||||||
handlePubSub(pathID, r)
|
|
||||||
} else {
|
|
||||||
handlePost(pathID, r)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
info("Unhandled request type: '%s'", r.Method)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func okHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
w.Write([]byte("ok"))
|
|
||||||
}
|
|
||||||
|
|
||||||
func emptyHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
w.WriteHeader(http.StatusOK)
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
info("pathway version:%s buildtime:%s", version, buildtime)
|
|
||||||
|
|
||||||
http.HandleFunc("/health", okHandler)
|
|
||||||
http.HandleFunc("/favicon.ico", emptyHandler)
|
|
||||||
http.HandleFunc("/robots.txt", emptyHandler)
|
|
||||||
http.HandleFunc("/", pathHandler)
|
|
||||||
|
|
||||||
err := http.ListenAndServe(":8080", nil)
|
|
||||||
if err != nil {
|
|
||||||
info("%s", err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func info(msg string, args ...interface{}) {
|
|
||||||
log.Printf("INFO | "+msg, args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func debug(msg string, args ...interface{}) {
|
|
||||||
if len(debugging) > 0 {
|
|
||||||
log.Printf("DEBUG | "+msg, args...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
102
src/post.go
102
src/post.go
@@ -1,102 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"sync/atomic"
|
|
||||||
)
|
|
||||||
|
|
||||||
func handlePubSub(pathID string, r *http.Request) {
|
|
||||||
var queue *Queue
|
|
||||||
if p, loaded := paths.Load(pathID); loaded {
|
|
||||||
queue = p.(*Queue)
|
|
||||||
debug("%s [PUBSUB] Loads path", pathID)
|
|
||||||
} else {
|
|
||||||
debug("%s [PUBSUB] No subs for path", pathID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if atomic.AddInt32(&queue.posts, 1) > 1 {
|
|
||||||
debug("%s [PUBSUB] Path already has posts", pathID)
|
|
||||||
atomic.AddInt32(&queue.posts, -1)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var masterWriter io.WriteCloser
|
|
||||||
var done bool
|
|
||||||
for !done {
|
|
||||||
transfer, writer := NewTransfer(r.Header.Get("Content-Length"))
|
|
||||||
select {
|
|
||||||
case queue.ch <- transfer:
|
|
||||||
debug("%s [PUBSUB] Adds sub to pub", pathID)
|
|
||||||
if masterWriter == nil {
|
|
||||||
masterWriter = BroadcastWriter(writer)
|
|
||||||
} else {
|
|
||||||
masterWriter = BroadcastWriter(masterWriter, writer)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
debug("%s [PUBSUB] Saturated path", pathID)
|
|
||||||
done = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err := io.Copy(masterWriter, r.Body)
|
|
||||||
debug("%s [PUBSUB] Sends %d bytes", pathID, n)
|
|
||||||
if err != nil {
|
|
||||||
debug("%s [PUBSUB] Has error: %s", pathID, err.Error())
|
|
||||||
}
|
|
||||||
masterWriter.Close()
|
|
||||||
r.Body.Close()
|
|
||||||
|
|
||||||
if atomic.AddInt32(&queue.posts, -1) <= 0 {
|
|
||||||
if atomic.LoadInt32(&queue.gets) <= 0 {
|
|
||||||
paths.Delete(pathID)
|
|
||||||
close(queue.ch)
|
|
||||||
debug("%s [PUBSUB] Removes path", pathID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
info("%s [PUBSUB] Finishes", pathID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func handlePost(pathID string, r *http.Request) {
|
|
||||||
queue := &Queue{ch: make(chan Transfer)}
|
|
||||||
if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
|
|
||||||
queue = p.(*Queue)
|
|
||||||
debug("%s [POST] Loads path", pathID)
|
|
||||||
} else {
|
|
||||||
debug("%s [POST] Creates path", pathID)
|
|
||||||
}
|
|
||||||
atomic.AddInt32(&queue.posts, 1)
|
|
||||||
|
|
||||||
transfer, writer := NewTransfer(r.Header.Get("Content-Length"))
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
n, err := io.Copy(writer, r.Body)
|
|
||||||
debug("%s [POST] Sends %d bytes", pathID, n)
|
|
||||||
if err != nil {
|
|
||||||
debug("%s [POST] Has error: %s", pathID, err.Error())
|
|
||||||
}
|
|
||||||
writer.Close()
|
|
||||||
r.Body.Close()
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case queue.ch <- transfer:
|
|
||||||
debug("%s [POST] Writes to path", pathID)
|
|
||||||
case <-r.Context().Done():
|
|
||||||
debug("%s [POST] Cancels path", pathID)
|
|
||||||
close(transfer.done)
|
|
||||||
}
|
|
||||||
|
|
||||||
debug("%s [POST] Waits for done", pathID)
|
|
||||||
<-transfer.done
|
|
||||||
|
|
||||||
if atomic.AddInt32(&queue.posts, -1) <= 0 {
|
|
||||||
if atomic.LoadInt32(&queue.gets) <= 0 {
|
|
||||||
paths.Delete(pathID)
|
|
||||||
close(queue.ch)
|
|
||||||
debug("%s [POST] Removes path", pathID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
info("%s [POST] Finishes", pathID)
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user