123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- package main
- import (
- "io"
- "log"
- "net/http"
- "sync"
- "sync/atomic"
- )
- var (
- // variables to set during build-time
- debugging = ""
- version = "0.0-undefined"
- buildtime = "0000-00-00T00:00:00+0000"
- // actual business end of the device
- paths = &sync.Map{}
- )
- // Transfer holds a single tranferable connection to be read
- type Transfer struct {
- reader *io.PipeReader
- done chan struct{}
- contentlength string
- }
- // Queue is where posts and gets can exchange transfers
- type Queue struct {
- ch chan Transfer
- posts int32
- gets int32
- }
- func pathHandler(w http.ResponseWriter, r *http.Request) {
- pathID := r.URL.Path
- if len(pathID) < 2 {
- w.WriteHeader(400)
- w.Write([]byte("path to short"))
- return
- }
- if r.Method == "GET" {
- log.Printf("%s [GET] Connected", pathID)
- queue := &Queue{ch: make(chan Transfer)}
- if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
- queue = p.(*Queue)
- debug("%s [GET] Loads path", pathID)
- } else {
- debug("%s [GET] Created path", pathID)
- }
- atomic.AddInt32(&queue.gets, 1)
- select {
- case transfer := <-queue.ch:
- debug("%s [GET] Reads from path", pathID)
- if transfer.contentlength != "" {
- w.Header().Set("Content-Length", transfer.contentlength)
- }
- _, err := io.Copy(w, transfer.reader)
- if err != nil {
- transfer.reader.Close()
- }
- debug("%s [GET] Sends done", pathID)
- close(transfer.done)
- case <-r.Context().Done():
- debug("%s [GET] Cancels path", pathID)
- }
- if atomic.AddInt32(&queue.gets, -1) <= 0 {
- if atomic.LoadInt32(&queue.posts) <= 0 {
- paths.Delete(pathID)
- debug("%s [GET] Removes path", pathID)
- }
- }
- log.Printf("%s [GET] Finishes", pathID)
- } else {
- log.Printf("%s [POST] Connected", pathID)
- queue := &Queue{ch: make(chan Transfer)}
- if p, loaded := paths.LoadOrStore(pathID, queue); loaded {
- queue = p.(*Queue)
- debug("%s [POST] Loads path", pathID)
- } else {
- debug("%s [POST] Creates path", pathID)
- }
- atomic.AddInt32(&queue.posts, 1)
- reader, writer := io.Pipe()
- transfer := Transfer{
- reader: reader,
- contentlength: r.Header.Get("Content-Length"),
- done: make(chan struct{}),
- }
- go func() {
- n, err := io.Copy(writer, r.Body)
- debug("%s [POST] Sends %d bytes", pathID, n)
- if err != nil {
- debug("%s [POST] Has error: %s", pathID, err.Error())
- }
- writer.Close()
- r.Body.Close()
- }()
- select {
- case queue.ch <- transfer:
- debug("%s [POST] Writes to path", pathID)
- case <-r.Context().Done():
- debug("%s [POST] Cancels path", pathID)
- close(transfer.done)
- }
- debug("%s [POST] Waits for done", pathID)
- <-transfer.done
- if atomic.AddInt32(&queue.posts, -1) <= 0 {
- if atomic.LoadInt32(&queue.gets) <= 0 {
- paths.Delete(pathID)
- debug("%s [POST] Removes path", pathID)
- }
- }
- log.Printf("%s [POST] Finishes", pathID)
- }
- }
- func okHandler(w http.ResponseWriter, r *http.Request) {
- w.WriteHeader(http.StatusOK)
- w.Write([]byte("ok"))
- }
- func emptyHandler(w http.ResponseWriter, r *http.Request) {
- w.WriteHeader(http.StatusOK)
- }
- func main() {
- log.Printf("pathway version:%s buildtime:%s", version, buildtime)
- http.HandleFunc("/health", okHandler)
- http.HandleFunc("/favicon.ico", emptyHandler)
- http.HandleFunc("/robots.txt", emptyHandler)
- http.HandleFunc("/", pathHandler)
- err := http.ListenAndServe(":8080", nil)
- if err != nil {
- log.Println(err)
- }
- }
- func debug(msg string, args ...interface{}) {
- if len(debugging) > 0 {
- log.Printf(msg, args...)
- }
- }
|