12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- package pathway
- import (
- "io"
- "log"
- "net/http"
- "strings"
- "sync"
- // "nhooyr.io/websocket"
- )
- var (
- // actual business end of the device
- paths *sync.Map
- debugging string
- )
- func init() {
- paths = &sync.Map{}
- }
- // Transfer holds a single tranferable connection to be read
- type Transfer struct {
- reader io.ReadCloser
- done chan struct{}
- contentlength string
- }
- // NewTransfer returns a ready to use Transfer and it's writer
- func NewTransfer(contentlength string) (transfer Transfer, writer io.WriteCloser) {
- var reader io.ReadCloser
- reader, writer = io.Pipe()
- transfer = Transfer{
- reader: reader.(io.ReadCloser),
- contentlength: contentlength,
- done: make(chan struct{}),
- }
- return
- }
- // Queue is where posts and gets can exchange transfers
- type Queue struct {
- ch chan Transfer
- posts int32
- gets int32
- }
- func PathHandler(w http.ResponseWriter, r *http.Request) {
- pathID := r.URL.Path
- if len(pathID) < 2 {
- w.WriteHeader(400)
- w.Write([]byte("path to short"))
- return
- }
- if h := r.Header.Get("Connection"); len(h) > 0 && strings.ToLower(h) == "upgrade" {
- if h = r.Header.Get("Upgrade"); len(h) > 0 && strings.ToLower(h) == "websocket" {
- // c, err := websocket.Accept(w, r, nil)
- // if err != nil {
- // info("Websocket not accepted: %+s", err.Error())
- // return
- // }
- //
- // defer c.Close(websocket.StatusInternalError, "the sky is falling")
- info("Websocket connections not supported yet. Headers: %+v", r.Header)
- // return
- }
- }
- if r.Method == "GET" {
- info("%s [GET] Connected", pathID)
- handleGet(pathID, w, r)
- } else if r.Method == "POST" {
- info("%s [POST] Connected", pathID)
- if h := r.Header.Get("X-Pathway"); h == "pubsub" {
- debug("%s [PUBSUB] Upgrade POST to PUBSUB", pathID)
- handlePubSub(pathID, r)
- } else {
- handlePost(pathID, r)
- }
- } else {
- info("Unhandled request type: '%s'", r.Method)
- }
- }
- func info(msg string, args ...interface{}) {
- log.Printf("INFO | "+msg, args...)
- }
- func debug(msg string, args ...interface{}) {
- if len(debugging) > 0 {
- log.Printf("DEBUG | "+msg, args...)
- }
- }
|