pathway.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package pathway
  2. import (
  3. "io"
  4. "net/http"
  5. "strings"
  6. "sync"
  7. )
  8. // httpTransfer holds a single tranferable connection to be read.
  9. // created by POSTs or PUBs, these are read from by GET requests.
  10. // once the reader is done, it can go and close the done channel.
  11. type httpTransfer struct {
  12. reader io.ReadCloser
  13. done chan struct{}
  14. contentlength string
  15. }
  16. // newTransfer returns a ready to use httpTransfer and it's writer.
  17. func newTransfer(contentlength string) (transfer httpTransfer, writer io.WriteCloser) {
  18. var reader io.ReadCloser
  19. reader, writer = io.Pipe()
  20. transfer = httpTransfer{
  21. reader: reader,
  22. contentlength: contentlength,
  23. done: make(chan struct{}),
  24. }
  25. return
  26. }
  27. // transferQueue holds all pending connections for a particular path stored in
  28. // the Pathway map.
  29. type transferQueue struct {
  30. ch chan httpTransfer
  31. posts int32
  32. gets int32
  33. }
  34. // Pathway keeps track of and allows one to handle all the different paths.
  35. type Pathway struct {
  36. sync.Map
  37. }
  38. // New returns a usable Pathway which HTTP Handler can be used to allow clients to
  39. // create and connect to eachother via arbitraty paths.
  40. func New() *Pathway {
  41. return new(Pathway)
  42. }
  43. // ServeHTTP is a handler for incoming GET or POST request. It directs them to the
  44. // proper path: either by creating a new one or connecting it to an existing one.
  45. func (pw *Pathway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  46. pathID := r.URL.Path
  47. if h := r.Header.Get("Connection"); len(h) > 0 && strings.ToLower(h) == "upgrade" {
  48. if h = r.Header.Get("Upgrade"); len(h) > 0 && strings.ToLower(h) == "websocket" {
  49. // websockets are special, and not supported
  50. w.WriteHeader(http.StatusBadRequest)
  51. w.Write([]byte("websockets are not supported"))
  52. return
  53. }
  54. }
  55. if r.Method == http.MethodGet {
  56. // handle an incoming GET request to connect to either POST or PUB
  57. pw.handleGet(pathID, w, r)
  58. } else if r.Method == http.MethodPost {
  59. if h := r.Header.Get("X-Pathway"); h == "pubsub" {
  60. // handle an incoming PUB part of a PUBSUB.
  61. pw.handlePubSub(pathID, r)
  62. } else {
  63. // handle an incoming POST.
  64. pw.handlePost(pathID, r)
  65. }
  66. } else {
  67. // if it's not a get or a post, then let the client know other methods
  68. // are not supported.
  69. w.Header().Set("Allow", http.MethodGet+", "+http.MethodPost)
  70. w.WriteHeader(http.StatusMethodNotAllowed)
  71. w.Write([]byte(r.Method + " is not supported"))
  72. }
  73. }