package subscriber import ( "context" "encoding/json" "git.nspix.com/golang/micro/broker" "golang.org/x/net/websocket" ) type Websocket struct { id string sid string namespace string topics []string conn *websocket.Conn } type websocketMessageWrapper struct { Type string `json:"type"` From string `json:"from"` To string `json:"to"` Body *broker.Event `json:"body"` } func (sub *Websocket) ID() string { return sub.id } func (sub *Websocket) HasTopic(topic string) bool { for _, s := range sub.topics { if topic == s { return true } } return false } func (sub *Websocket) OnAttach() (err error) { return } func (sub *Websocket) Process(ctx context.Context, e *broker.Event) (err error) { if e.Namespace != sub.namespace { return } if e.Trigger != "" && e.Trigger != sub.sid { return } msg := &websocketMessageWrapper{ Type: "event", From: "system", To: sub.sid, Body: e, } var buf []byte if buf, err = json.Marshal(msg); err != nil { return } //写入消息到数据流 _, err = sub.conn.Write(buf) return } func (sub *Websocket) OnDetach() (err error) { return } func (sub *Websocket) Subscribe(topic string) { if sub.topics == nil { sub.topics = make([]string, 0) } for _, s := range sub.topics { if s == topic { return } } sub.topics = append(sub.topics, topic) } func (sub *Websocket) Unsubscribe(topic string) { if sub.topics == nil { return } for i, s := range sub.topics { if s == topic { sub.topics = append(sub.topics[:i], sub.topics[i+1:]...) return } } } func NewWebsocketSubscriber(id string, sid string, namespace string, conn *websocket.Conn) *Websocket { return &Websocket{ id: "websocket:" + id, sid: sid, namespace: namespace, conn: conn, } }