websocket.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package subscriber
  2. import (
  3. "context"
  4. "encoding/json"
  5. "git.nspix.com/golang/micro/broker"
  6. "golang.org/x/net/websocket"
  7. )
  8. type Websocket struct {
  9. id string
  10. sid string
  11. namespace string
  12. topics []string
  13. conn *websocket.Conn
  14. }
  15. type websocketMessageWrapper struct {
  16. Type string `json:"type"`
  17. From string `json:"from"`
  18. To string `json:"to"`
  19. Body *broker.Event `json:"body"`
  20. }
  21. func (sub *Websocket) ID() string {
  22. return sub.id
  23. }
  24. func (sub *Websocket) HasTopic(topic string) bool {
  25. for _, s := range sub.topics {
  26. if topic == s {
  27. return true
  28. }
  29. }
  30. return false
  31. }
  32. func (sub *Websocket) OnAttach() (err error) {
  33. return
  34. }
  35. func (sub *Websocket) Process(ctx context.Context, e *broker.Event) (err error) {
  36. if e.Namespace != sub.namespace {
  37. return
  38. }
  39. if e.Trigger != "" && e.Trigger != sub.sid {
  40. return
  41. }
  42. msg := &websocketMessageWrapper{
  43. Type: "event",
  44. From: "system",
  45. To: sub.sid,
  46. Body: e,
  47. }
  48. var buf []byte
  49. if buf, err = json.Marshal(msg); err != nil {
  50. return
  51. }
  52. //写入消息到数据流
  53. _, err = sub.conn.Write(buf)
  54. return
  55. }
  56. func (sub *Websocket) OnDetach() (err error) {
  57. return
  58. }
  59. func (sub *Websocket) Subscribe(topic string) {
  60. if sub.topics == nil {
  61. sub.topics = make([]string, 0)
  62. }
  63. for _, s := range sub.topics {
  64. if s == topic {
  65. return
  66. }
  67. }
  68. sub.topics = append(sub.topics, topic)
  69. }
  70. func (sub *Websocket) Unsubscribe(topic string) {
  71. if sub.topics == nil {
  72. return
  73. }
  74. for i, s := range sub.topics {
  75. if s == topic {
  76. sub.topics = append(sub.topics[:i], sub.topics[i+1:]...)
  77. return
  78. }
  79. }
  80. }
  81. func NewWebsocketSubscriber(id string, sid string, namespace string, conn *websocket.Conn) *Websocket {
  82. return &Websocket{
  83. id: "websocket:" + id,
  84. sid: sid,
  85. namespace: namespace,
  86. conn: conn,
  87. }
  88. }