فهرست منبع

add support for pubsub

teizz 3 سال پیش
والد
کامیت
f7713084a4
8فایلهای تغییر یافته به همراه316 افزوده شده و 162 حذف شده
  1. 1 1
      Dockerfile
  2. 1 1
      LICENSE
  3. 1 1
      VERSION
  4. 0 159
      main.go
  5. 69 0
      src/broadcast.go
  6. 43 0
      src/get.go
  7. 98 0
      src/main.go
  8. 103 0
      src/post.go

+ 1 - 1
Dockerfile

@@ -2,7 +2,7 @@ FROM golang:alpine AS builder
 RUN apk --no-cache --no-progress add make upx
 WORKDIR /go/src/lolcathost/pathway/
 
-COPY *.go VERSION ./
+COPY src/*.go VERSION ./
 RUN echo Building && \
     env CGO_ENABLED=0 GO111MODULE=off go build -trimpath -ldflags="all=-s -w -buildid= -X main.version=$(cat VERSION) -X main.buildtime=$(date +%FT%T%z)" && \
     echo Compressing && \

+ 1 - 1
LICENSE

@@ -1,5 +1,5 @@
 MIT License
-Copyright (c) <year> <copyright holders>
+Copyright (c) 2021 Mattijs
 
 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
 

+ 1 - 1
VERSION

@@ -1 +1 @@
-2.0
+2.1

+ 0 - 159
main.go

@@ -1,159 +0,0 @@
-package main
-
-import (
-	"io"
-	"log"
-	"net/http"
-	"sync"
-	"sync/atomic"
-)
-
-var (
-	// variables to set during build-time
-	debugging = ""
-	version   = "0.0-undefined"
-	buildtime = "0000-00-00T00:00:00+0000"
-
-	// actual business end of the device
-	paths = &sync.Map{}
-)
-
-// Transfer holds a single tranferable connection to be read
-type Transfer struct {
-	reader        *io.PipeReader
-	done          chan struct{}
-	contentlength string
-}
-
-// Queue is where posts and gets can exchange transfers
-type Queue struct {
-	ch    chan Transfer
-	posts int32
-	gets  int32
-}
-
-func pathHandler(w http.ResponseWriter, r *http.Request) {
-	pathID := r.URL.Path
-
-	if len(pathID) < 2 {
-		w.WriteHeader(400)
-		w.Write([]byte("path to short"))
-		return
-	}
-
-	if r.Method == "GET" {
-		log.Printf("%s [GET] Connected", pathID)
-
-		queue := &Queue{ch: make(chan Transfer)}
-		if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
-			queue = p.(*Queue)
-			debug("%s [GET] Loads path", pathID)
-		} else {
-			debug("%s [GET] Created path", pathID)
-		}
-		atomic.AddInt32(&queue.gets, 1)
-
-		select {
-		case transfer := <-queue.ch:
-			debug("%s [GET] Reads from path", pathID)
-			if transfer.contentlength != "" {
-				w.Header().Set("Content-Length", transfer.contentlength)
-			}
-			_, err := io.Copy(w, transfer.reader)
-			if err != nil {
-				transfer.reader.Close()
-			}
-			debug("%s [GET] Sends done", pathID)
-			close(transfer.done)
-		case <-r.Context().Done():
-			debug("%s [GET] Cancels path", pathID)
-		}
-
-		if atomic.AddInt32(&queue.gets, -1) <= 0 {
-			if atomic.LoadInt32(&queue.posts) <= 0 {
-				paths.Delete(pathID)
-				debug("%s [GET] Removes path", pathID)
-			}
-		}
-		log.Printf("%s [GET] Finishes", pathID)
-
-	} else {
-		log.Printf("%s [POST] Connected", pathID)
-
-		queue := &Queue{ch: make(chan Transfer)}
-		if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
-			queue = p.(*Queue)
-			debug("%s [POST] Loads path", pathID)
-		} else {
-			debug("%s [POST] Creates path", pathID)
-		}
-		atomic.AddInt32(&queue.posts, 1)
-
-		reader, writer := io.Pipe()
-
-		transfer := Transfer{
-			reader:        reader,
-			contentlength: r.Header.Get("Content-Length"),
-			done:          make(chan struct{}),
-		}
-
-		go func() {
-			n, err := io.Copy(writer, r.Body)
-			debug("%s [POST] Sends %d bytes", pathID, n)
-			if err != nil {
-				debug("%s [POST] Has error: %s", pathID, err.Error())
-			}
-			writer.Close()
-			r.Body.Close()
-		}()
-
-		select {
-		case queue.ch <- transfer:
-			debug("%s [POST] Writes to path", pathID)
-		case <-r.Context().Done():
-			debug("%s [POST] Cancels path", pathID)
-			close(transfer.done)
-		}
-
-		debug("%s [POST] Waits for done", pathID)
-		<-transfer.done
-
-		if atomic.AddInt32(&queue.posts, -1) <= 0 {
-			if atomic.LoadInt32(&queue.gets) <= 0 {
-				paths.Delete(pathID)
-				debug("%s [POST] Removes path", pathID)
-			}
-		}
-
-		log.Printf("%s [POST] Finishes", pathID)
-	}
-}
-
-func okHandler(w http.ResponseWriter, r *http.Request) {
-	w.WriteHeader(http.StatusOK)
-	w.Write([]byte("ok"))
-}
-
-func emptyHandler(w http.ResponseWriter, r *http.Request) {
-	w.WriteHeader(http.StatusOK)
-}
-
-func main() {
-	log.Printf("pathway version:%s buildtime:%s", version, buildtime)
-
-	http.HandleFunc("/health", okHandler)
-	http.HandleFunc("/favicon.ico", emptyHandler)
-	http.HandleFunc("/robots.txt", emptyHandler)
-	http.HandleFunc("/", pathHandler)
-
-	err := http.ListenAndServe(":8080", nil)
-	if err != nil {
-		log.Println(err)
-	}
-}
-
-func debug(msg string, args ...interface{}) {
-	if len(debugging) > 0 {
-		log.Printf(msg, args...)
-	}
-}

+ 69 - 0
src/broadcast.go

@@ -0,0 +1,69 @@
+package main
+
+import (
+	"io"
+)
+
+type broadcastWriter struct {
+	writers []io.Writer
+}
+
+func (t *broadcastWriter) Close() (err error) {
+	for i := 0; i < len(t.writers); i++ {
+		if w, ok := t.writers[i].(io.WriteCloser); ok {
+			if e := w.Close(); e != nil {
+				err = e
+			}
+		}
+	}
+	return
+}
+
+func (t *broadcastWriter) Write(p []byte) (n int, err error) {
+	for i := 0; i < len(t.writers); i++ {
+		n, err = t.writers[i].Write(p)
+
+		if err == nil && n != len(p) {
+			err = io.ErrShortWrite
+		}
+
+		if err != nil {
+			// if not at the end, move last to here and redo this
+			if i < len(t.writers)-1 {
+				// close the writer if it implements WriteCloser
+				if w, ok := t.writers[i].(io.WriteCloser); ok {
+					defer w.Close()
+				}
+				t.writers[i] = t.writers[len(t.writers)-1]
+				i = i - 1
+			}
+			// always shorten list by one to drop the last one
+			t.writers = t.writers[:len(t.writers)-1]
+		}
+	}
+	if len(t.writers) <= 0 {
+		return 0, err
+	}
+
+	return len(p), nil
+}
+
+// BroadcastWriter creates a writer that duplicates its writes to all the
+// provided writers, similar to the Unix tee(1) command.
+//
+// Each write is written to each listed writer, one at a time.
+// If a listed writer returns an error, that overall write operation
+// continues and the offending writer is dropped from the list.
+//
+// Only if all writers are dropped, the last error is returned at the end.
+func BroadcastWriter(writers ...io.Writer) io.WriteCloser {
+	allWriters := make([]io.Writer, 0, len(writers))
+	for _, w := range writers {
+		if bw, ok := w.(*broadcastWriter); ok {
+			allWriters = append(allWriters, bw.writers...)
+		} else {
+			allWriters = append(allWriters, w)
+		}
+	}
+	return &broadcastWriter{allWriters}
+}

+ 43 - 0
src/get.go

@@ -0,0 +1,43 @@
+package main
+
+import (
+	"io"
+	"log"
+	"net/http"
+	"sync/atomic"
+)
+
+func handleGet(pathID string, w http.ResponseWriter, r *http.Request) {
+	queue := &Queue{ch: make(chan Transfer)}
+	if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
+		queue = p.(*Queue)
+		debug("%s [GET] Loads path", pathID)
+	} else {
+		debug("%s [GET] Created path", pathID)
+	}
+	atomic.AddInt32(&queue.gets, 1)
+
+	select {
+	case transfer := <-queue.ch:
+		debug("%s [GET] Reads from path", pathID)
+		if transfer.contentlength != "" {
+			w.Header().Set("Content-Length", transfer.contentlength)
+		}
+		_, err := io.Copy(w, transfer.reader)
+		if err != nil {
+			transfer.reader.Close()
+		}
+		debug("%s [GET] Sends done", pathID)
+		close(transfer.done)
+	case <-r.Context().Done():
+		debug("%s [GET] Cancels path", pathID)
+	}
+
+	if atomic.AddInt32(&queue.gets, -1) <= 0 {
+		if atomic.LoadInt32(&queue.posts) <= 0 {
+			paths.Delete(pathID)
+			debug("%s [GET] Removes path", pathID)
+		}
+	}
+	log.Printf("%s [GET] Finishes", pathID)
+}

+ 98 - 0
src/main.go

@@ -0,0 +1,98 @@
+package main
+
+import (
+	"io"
+	"log"
+	"net/http"
+	"sync"
+)
+
+var (
+	// variables to set during build-time
+	debugging = ""
+	version   = "0.0-undefined"
+	buildtime = "0000-00-00T00:00:00+0000"
+
+	// actual business end of the device
+	paths = &sync.Map{}
+)
+
+// Transfer holds a single tranferable connection to be read
+type Transfer struct {
+	reader        io.ReadCloser
+	done          chan struct{}
+	contentlength string
+}
+
+// NewTransfer returns a ready to use Transfer and it's writer
+func NewTransfer(contentlength string) (transfer Transfer, writer io.WriteCloser) {
+	var reader io.ReadCloser
+	reader, writer = io.Pipe()
+
+	transfer = Transfer{
+		reader:        reader.(io.ReadCloser),
+		contentlength: contentlength,
+		done:          make(chan struct{}),
+	}
+
+	return
+}
+
+// Queue is where posts and gets can exchange transfers
+type Queue struct {
+	ch    chan Transfer
+	posts int32
+	gets  int32
+}
+
+func pathHandler(w http.ResponseWriter, r *http.Request) {
+	pathID := r.URL.Path
+
+	if len(pathID) < 2 {
+		w.WriteHeader(400)
+		w.Write([]byte("path to short"))
+		return
+	}
+
+	if r.Method == "GET" {
+		log.Printf("%s [GET] Connected", pathID)
+		handleGet(pathID, w, r)
+	} else {
+		log.Printf("%s [POST] Connected", pathID)
+		if h := r.Header.Get("X-Pathway"); h == "pubsub" {
+			debug("%s [PUBSUB] Upgrade POST to PUBSUB", pathID)
+			handlePubSub(pathID, r)
+		} else {
+			handlePost(pathID, r)
+		}
+	}
+}
+
+func okHandler(w http.ResponseWriter, r *http.Request) {
+	w.WriteHeader(http.StatusOK)
+	w.Write([]byte("ok"))
+}
+
+func emptyHandler(w http.ResponseWriter, r *http.Request) {
+	w.WriteHeader(http.StatusOK)
+}
+
+func main() {
+	log.Printf("pathway version:%s buildtime:%s", version, buildtime)
+
+	http.HandleFunc("/health", okHandler)
+	http.HandleFunc("/favicon.ico", emptyHandler)
+	http.HandleFunc("/robots.txt", emptyHandler)
+	http.HandleFunc("/", pathHandler)
+
+	err := http.ListenAndServe(":8080", nil)
+	if err != nil {
+		log.Println(err)
+	}
+}
+
+func debug(msg string, args ...interface{}) {
+	if len(debugging) > 0 {
+		log.Printf(msg, args...)
+	}
+}

+ 103 - 0
src/post.go

@@ -0,0 +1,103 @@
+package main
+
+import (
+	"io"
+	"log"
+	"net/http"
+	"sync/atomic"
+)
+
+func handlePubSub(pathID string, r *http.Request) {
+	var queue *Queue
+	if p, loaded := paths.Load(pathID); loaded {
+		queue = p.(*Queue)
+		debug("%s [PUBSUB] Loads path", pathID)
+	} else {
+		debug("%s [PUBSUB] No subs for path", pathID)
+		return
+	}
+
+	if atomic.AddInt32(&queue.posts, 1) > 1 {
+		debug("%s [PUBSUB] Path already has posts", pathID)
+		atomic.AddInt32(&queue.posts, -1)
+		return
+	}
+
+	var masterWriter io.WriteCloser
+	var done bool
+	for !done {
+		transfer, writer := NewTransfer(r.Header.Get("Content-Length"))
+		select {
+		case queue.ch <- transfer:
+			debug("%s [PUBSUB] Adds sub to pub", pathID)
+			if masterWriter == nil {
+				masterWriter = BroadcastWriter(writer)
+			} else {
+				masterWriter = BroadcastWriter(masterWriter, writer)
+			}
+		default:
+			debug("%s [PUBSUB] Saturated path", pathID)
+			done = true
+		}
+	}
+
+	n, err := io.Copy(masterWriter, r.Body)
+	debug("%s [PUBSUB] Sends %d bytes", pathID, n)
+	if err != nil {
+		debug("%s [PUBSUB] Has error: %s", pathID, err.Error())
+	}
+	masterWriter.Close()
+	r.Body.Close()
+
+	if atomic.AddInt32(&queue.posts, -1) <= 0 {
+		if atomic.LoadInt32(&queue.gets) <= 0 {
+			paths.Delete(pathID)
+			close(queue.ch)
+			debug("%s [PUBSUB] Removes path", pathID)
+		}
+	}
+	log.Printf("%s [PUBSUB] Finishes", pathID)
+}
+
+func handlePost(pathID string, r *http.Request) {
+	queue := &Queue{ch: make(chan Transfer)}
+	if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
+		queue = p.(*Queue)
+		debug("%s [POST] Loads path", pathID)
+	} else {
+		debug("%s [POST] Creates path", pathID)
+	}
+	atomic.AddInt32(&queue.posts, 1)
+
+	transfer, writer := NewTransfer(r.Header.Get("Content-Length"))
+
+	go func() {
+		n, err := io.Copy(writer, r.Body)
+		debug("%s [POST] Sends %d bytes", pathID, n)
+		if err != nil {
+			debug("%s [POST] Has error: %s", pathID, err.Error())
+		}
+		writer.Close()
+		r.Body.Close()
+	}()
+
+	select {
+	case queue.ch <- transfer:
+		debug("%s [POST] Writes to path", pathID)
+	case <-r.Context().Done():
+		debug("%s [POST] Cancels path", pathID)
+		close(transfer.done)
+	}
+
+	debug("%s [POST] Waits for done", pathID)
+	<-transfer.done
+
+	if atomic.AddInt32(&queue.posts, -1) <= 0 {
+		if atomic.LoadInt32(&queue.gets) <= 0 {
+			paths.Delete(pathID)
+			close(queue.ch)
+			debug("%s [POST] Removes path", pathID)
+		}
+	}
+	log.Printf("%s [POST] Finishes", pathID)
+}