package pathway import ( "io" "log" "net/http" "strings" "sync" // "nhooyr.io/websocket" ) var ( // actual business end of the device paths *sync.Map debugging string ) func init() { paths = &sync.Map{} } // Transfer holds a single tranferable connection to be read type Transfer struct { reader io.ReadCloser done chan struct{} contentlength string } // NewTransfer returns a ready to use Transfer and it's writer func NewTransfer(contentlength string) (transfer Transfer, writer io.WriteCloser) { var reader io.ReadCloser reader, writer = io.Pipe() transfer = Transfer{ reader: reader.(io.ReadCloser), contentlength: contentlength, done: make(chan struct{}), } return } // Queue is where posts and gets can exchange transfers type Queue struct { ch chan Transfer posts int32 gets int32 } func PathHandler(w http.ResponseWriter, r *http.Request) { pathID := r.URL.Path if len(pathID) < 2 { w.WriteHeader(400) w.Write([]byte("path to short")) return } if h := r.Header.Get("Connection"); len(h) > 0 && strings.ToLower(h) == "upgrade" { if h = r.Header.Get("Upgrade"); len(h) > 0 && strings.ToLower(h) == "websocket" { // c, err := websocket.Accept(w, r, nil) // if err != nil { // info("Websocket not accepted: %+s", err.Error()) // return // } // // defer c.Close(websocket.StatusInternalError, "the sky is falling") info("Websocket connections not supported yet. Headers: %+v", r.Header) // return } } if r.Method == "GET" { info("%s [GET] Connected", pathID) handleGet(pathID, w, r) } else if r.Method == "POST" { info("%s [POST] Connected", pathID) if h := r.Header.Get("X-Pathway"); h == "pubsub" { debug("%s [PUBSUB] Upgrade POST to PUBSUB", pathID) handlePubSub(pathID, r) } else { handlePost(pathID, r) } } else { info("Unhandled request type: '%s'", r.Method) } } func info(msg string, args ...interface{}) { log.Printf("INFO | "+msg, args...) } func debug(msg string, args ...interface{}) { if len(debugging) > 0 { log.Printf("DEBUG | "+msg, args...) } }