package pathway import ( "io" "net/http" "strings" "sync" ) // httpTransfer holds a single tranferable connection to be read. // created by POSTs or PUBs, these are read from by GET requests. // once the reader is done, it can go and close the done channel. type httpTransfer struct { reader io.ReadCloser done chan struct{} contentlength string } // newTransfer returns a ready to use httpTransfer and it's writer. func newTransfer(contentlength string) (transfer httpTransfer, writer io.WriteCloser) { var reader io.ReadCloser reader, writer = io.Pipe() transfer = httpTransfer{ reader: reader, contentlength: contentlength, done: make(chan struct{}), } return } // transferQueue holds all pending connections for a particular path stored in // the Pathway map. type transferQueue struct { ch chan httpTransfer posts int32 gets int32 } // Pathway keeps track of and allows one to handle all the different paths. type Pathway struct { sync.Map } // New returns a usable Pathway which HTTP Handler can be used to allow clients to // create and connect to eachother via arbitraty paths. func New() *Pathway { return new(Pathway) } // ServeHTTP is a handler for incoming GET or POST request. It directs them to the // proper path: either by creating a new one or connecting it to an existing one. func (pw *Pathway) ServeHTTP(w http.ResponseWriter, r *http.Request) { pathID := r.URL.Path if h := r.Header.Get("Connection"); len(h) > 0 && strings.ToLower(h) == "upgrade" { if h = r.Header.Get("Upgrade"); len(h) > 0 && strings.ToLower(h) == "websocket" { // websockets are special, and not supported w.WriteHeader(http.StatusBadRequest) w.Write([]byte("websockets are not supported")) return } } if r.Method == http.MethodGet { // handle an incoming GET request to connect to either POST or PUB pw.handleGet(pathID, w, r) } else if r.Method == http.MethodPost { if h := r.Header.Get("X-Pathway"); h == "pubsub" { // handle an incoming PUB part of a PUBSUB. pw.handlePubSub(pathID, r) } else { // handle an incoming POST. pw.handlePost(pathID, r) } } else { // if it's not a get or a post, then let the client know other methods // are not supported. w.Header().Set("Allow", http.MethodGet+", "+http.MethodPost) w.WriteHeader(http.StatusMethodNotAllowed) w.Write([]byte(r.Method + " is not supported")) } }