1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- package pathway
- import (
- "io"
- "net/http"
- "strings"
- "sync"
- )
- // httpTransfer holds a single tranferable connection to be read.
- // created by POSTs or PUBs, these are read from by GET requests.
- // once the reader is done, it can go and close the done channel.
- type httpTransfer struct {
- reader io.ReadCloser
- done chan struct{}
- contentlength string
- }
- // newTransfer returns a ready to use httpTransfer and it's writer.
- func newTransfer(contentlength string) (transfer httpTransfer, writer io.WriteCloser) {
- var reader io.ReadCloser
- reader, writer = io.Pipe()
- transfer = httpTransfer{
- reader: reader,
- contentlength: contentlength,
- done: make(chan struct{}),
- }
- return
- }
- // transferQueue holds all pending connections for a particular path stored in
- // the Pathway map.
- type transferQueue struct {
- ch chan httpTransfer
- posts int32
- gets int32
- }
- // Pathway keeps track of and allows one to handle all the different paths.
- type Pathway struct {
- sync.Map
- }
- // New returns a usable Pathway which HTTP Handler can be used to allow clients to
- // create and connect to eachother via arbitraty paths.
- func New() *Pathway {
- return new(Pathway)
- }
- // ServeHTTP is a handler for incoming GET or POST request. It directs them to the
- // proper path: either by creating a new one or connecting it to an existing one.
- func (pw *Pathway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- pathID := r.URL.Path
- if h := r.Header.Get("Connection"); len(h) > 0 && strings.ToLower(h) == "upgrade" {
- if h = r.Header.Get("Upgrade"); len(h) > 0 && strings.ToLower(h) == "websocket" {
- // websockets are special, and not supported
- w.WriteHeader(http.StatusBadRequest)
- w.Write([]byte("websockets are not supported"))
- return
- }
- }
- if r.Method == http.MethodGet {
- // handle an incoming GET request to connect to either POST or PUB
- pw.handleGet(pathID, w, r)
- } else if r.Method == http.MethodPost {
- if h := r.Header.Get("X-Pathway"); h == "pubsub" {
- // handle an incoming PUB part of a PUBSUB.
- pw.handlePubSub(pathID, r)
- } else {
- // handle an incoming POST.
- pw.handlePost(pathID, r)
- }
- } else {
- // if it's not a get or a post, then let the client know other methods
- // are not supported.
- w.Header().Set("Allow", http.MethodGet+", "+http.MethodPost)
- w.WriteHeader(http.StatusMethodNotAllowed)
- w.Write([]byte(r.Method + " is not supported"))
- }
- }
|