websocket.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package subscriber
  2. import (
  3. "context"
  4. "encoding/json"
  5. "git.nspix.com/golang/micro/bus"
  6. "golang.org/x/net/websocket"
  7. )
  8. type Websocket struct {
  9. id string
  10. sid string
  11. domain string
  12. topics []string
  13. conn *websocket.Conn
  14. }
  15. type websocketMessageWrapper struct {
  16. Type string `json:"type"`
  17. From string `json:"from"`
  18. To string `json:"to"`
  19. Body *bus.Event `json:"body"`
  20. }
  21. func (sub *Websocket) ID() string {
  22. return sub.id
  23. }
  24. func (sub *Websocket) HasTopic(topic string) bool {
  25. for _, s := range sub.topics {
  26. if topic == s {
  27. return true
  28. }
  29. }
  30. return false
  31. }
  32. func (sub *Websocket) OnAttach() (err error) {
  33. return
  34. }
  35. func (sub *Websocket) Process(ctx context.Context, e *bus.Event) (err error) {
  36. //websocket 只处理改域下面的消息
  37. if e.Domain != sub.domain {
  38. return
  39. }
  40. //websocket 只处理输出它自己的消息
  41. if e.Trigger != "" && e.Trigger != sub.sid {
  42. return
  43. }
  44. msg := &websocketMessageWrapper{
  45. Type: "event",
  46. From: "system",
  47. To: sub.sid,
  48. Body: e,
  49. }
  50. var buf []byte
  51. if buf, err = json.Marshal(msg); err != nil {
  52. return
  53. }
  54. //写入消息到数据流
  55. _, err = sub.conn.Write(buf)
  56. return
  57. }
  58. func (sub *Websocket) OnDetach() (err error) {
  59. return
  60. }
  61. func (sub *Websocket) Subscribe(topic string) {
  62. if sub.topics == nil {
  63. sub.topics = make([]string, 0)
  64. }
  65. for _, s := range sub.topics {
  66. if s == topic {
  67. return
  68. }
  69. }
  70. sub.topics = append(sub.topics, topic)
  71. }
  72. func (sub *Websocket) Unsubscribe(topic string) {
  73. if sub.topics == nil {
  74. return
  75. }
  76. for i, s := range sub.topics {
  77. if s == topic {
  78. sub.topics = append(sub.topics[:i], sub.topics[i+1:]...)
  79. return
  80. }
  81. }
  82. }
  83. func NewWebsocketSubscriber(id string, sid string, domain string, conn *websocket.Conn) *Websocket {
  84. return &Websocket{
  85. id: "websocket:" + id,
  86. sid: sid,
  87. domain: domain,
  88. conn: conn,
  89. }
  90. }