123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package subscriber
- import (
- "context"
- "encoding/json"
- "git.nspix.com/golang/micro/broker"
- "golang.org/x/net/websocket"
- )
- type Websocket struct {
- id string
- sid string
- namespace string
- topics []string
- conn *websocket.Conn
- }
- type websocketMessageWrapper struct {
- Type string `json:"type"`
- From string `json:"from"`
- To string `json:"to"`
- Body *broker.Event `json:"body"`
- }
- func (sub *Websocket) ID() string {
- return sub.id
- }
- func (sub *Websocket) HasTopic(topic string) bool {
- for _, s := range sub.topics {
- if topic == s {
- return true
- }
- }
- return false
- }
- func (sub *Websocket) OnAttach() (err error) {
- return
- }
- func (sub *Websocket) Process(ctx context.Context, e *broker.Event) (err error) {
- if e.Namespace != sub.namespace {
- return
- }
- if e.Trigger != "" && e.Trigger != sub.sid {
- return
- }
- msg := &websocketMessageWrapper{
- Type: "event",
- From: "system",
- To: sub.sid,
- Body: e,
- }
- var buf []byte
- if buf, err = json.Marshal(msg); err != nil {
- return
- }
- //写入消息到数据流
- _, err = sub.conn.Write(buf)
- return
- }
- func (sub *Websocket) OnDetach() (err error) {
- return
- }
- func (sub *Websocket) Subscribe(topic string) {
- if sub.topics == nil {
- sub.topics = make([]string, 0)
- }
- for _, s := range sub.topics {
- if s == topic {
- return
- }
- }
- sub.topics = append(sub.topics, topic)
- }
- func (sub *Websocket) Unsubscribe(topic string) {
- if sub.topics == nil {
- return
- }
- for i, s := range sub.topics {
- if s == topic {
- sub.topics = append(sub.topics[:i], sub.topics[i+1:]...)
- return
- }
- }
- }
- func NewWebsocketSubscriber(id string, sid string, namespace string, conn *websocket.Conn) *Websocket {
- return &Websocket{
- id: "websocket:" + id,
- sid: sid,
- namespace: namespace,
- conn: conn,
- }
- }
|