pathway.go 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package pathway
  2. import (
  3. "io"
  4. "log"
  5. "net/http"
  6. "strings"
  7. "sync"
  8. // "nhooyr.io/websocket"
  9. )
  10. var (
  11. // actual business end of the device
  12. paths *sync.Map
  13. debugging string
  14. )
  15. func init() {
  16. paths = &sync.Map{}
  17. }
  18. // Transfer holds a single tranferable connection to be read
  19. type Transfer struct {
  20. reader io.ReadCloser
  21. done chan struct{}
  22. contentlength string
  23. }
  24. // NewTransfer returns a ready to use Transfer and it's writer
  25. func NewTransfer(contentlength string) (transfer Transfer, writer io.WriteCloser) {
  26. var reader io.ReadCloser
  27. reader, writer = io.Pipe()
  28. transfer = Transfer{
  29. reader: reader.(io.ReadCloser),
  30. contentlength: contentlength,
  31. done: make(chan struct{}),
  32. }
  33. return
  34. }
  35. // Queue is where posts and gets can exchange transfers
  36. type Queue struct {
  37. ch chan Transfer
  38. posts int32
  39. gets int32
  40. }
  41. func pathHandler(w http.ResponseWriter, r *http.Request) {
  42. pathID := r.URL.Path
  43. if len(pathID) < 2 {
  44. w.WriteHeader(400)
  45. w.Write([]byte("path to short"))
  46. return
  47. }
  48. if h := r.Header.Get("Connection"); len(h) > 0 && strings.ToLower(h) == "upgrade" {
  49. if h = r.Header.Get("Upgrade"); len(h) > 0 && strings.ToLower(h) == "websocket" {
  50. // c, err := websocket.Accept(w, r, nil)
  51. // if err != nil {
  52. // info("Websocket not accepted: %+s", err.Error())
  53. // return
  54. // }
  55. //
  56. // defer c.Close(websocket.StatusInternalError, "the sky is falling")
  57. info("Websocket connections not supported yet. Headers: %+v", r.Header)
  58. // return
  59. }
  60. }
  61. if r.Method == "GET" {
  62. info("%s [GET] Connected", pathID)
  63. handleGet(pathID, w, r)
  64. } else if r.Method == "POST" {
  65. info("%s [POST] Connected", pathID)
  66. if h := r.Header.Get("X-Pathway"); h == "pubsub" {
  67. debug("%s [PUBSUB] Upgrade POST to PUBSUB", pathID)
  68. handlePubSub(pathID, r)
  69. } else {
  70. handlePost(pathID, r)
  71. }
  72. } else {
  73. info("Unhandled request type: '%s'", r.Method)
  74. }
  75. }
  76. func info(msg string, args ...interface{}) {
  77. log.Printf("INFO | "+msg, args...)
  78. }
  79. func debug(msg string, args ...interface{}) {
  80. if len(debugging) > 0 {
  81. log.Printf("DEBUG | "+msg, args...)
  82. }
  83. }