lxg 3 rokov pred
rodič
commit
efc552d221
8 zmenil súbory, kde vykonal 514 pridanie a 0 odobranie
  1. 26 0
      bus/bus.go
  2. 83 0
      bus/event.go
  3. 39 0
      bus/global.go
  4. 107 0
      bus/inprocess.go
  5. 15 0
      bus/subscriber.go
  6. 97 0
      bus/subscriber/http.go
  7. 46 0
      bus/subscriber/memory.go
  8. 101 0
      bus/subscriber/websocket.go

+ 26 - 0
bus/bus.go

@@ -0,0 +1,26 @@
+package bus
+
+import (
+	"context"
+)
+
+type Bus interface {
+	Publish(e *Event)
+	Dispatch(e *Event) (err error)
+	DispatchCtx(ctx context.Context, e *Event) (err error)
+	Subscribe(sub Subscriber) (err error)
+	UnSubscribe(sub Subscriber) (err error)
+	Subscribers() []Subscriber
+}
+
+func FromContext(ctx context.Context) Bus {
+	if v := ctx.Value("bus"); v != nil {
+		return v.(Bus)
+	} else {
+		return nil
+	}
+}
+
+func WitchContext(ctx context.Context, b Bus) context.Context {
+	return context.WithValue(ctx, "bus", b)
+}

+ 83 - 0
bus/event.go

@@ -0,0 +1,83 @@
+package bus
+
+import (
+	"encoding/json"
+	"sync"
+	"time"
+)
+
+var (
+	eventPool = new(sync.Pool)
+)
+
+func applyEvent() *Event {
+	if v := eventPool.Get(); v == nil {
+		return &Event{
+			Timestamp: time.Now().Unix(),
+		}
+	} else {
+		e := v.(*Event)
+		e.Timestamp = time.Now().Unix()
+		e.Header = make(map[string]string)
+		e.Body = nil
+		return e
+	}
+}
+
+func releaseEvent(e *Event) {
+	eventPool.Put(e)
+}
+
+type Event struct {
+	Name      string            `json:"name"`
+	Domain    string            `json:"-"`
+	Trigger   string            `json:"-"` //触发的用户,用于一些订阅者过滤数据
+	Timestamp int64             `json:"ts"`
+	Header    map[string]string `json:"meta"`
+	Body      []byte            `json:"body,omitempty"`
+}
+
+func (e *Event) WithTrigger(target string) *Event {
+	e.Trigger = target
+	return e
+}
+
+func (e *Event) Set(k, v string) *Event {
+	if e.Header == nil {
+		e.Header = make(map[string]string)
+	}
+	e.Header[k] = v
+	return e
+}
+
+func (e *Event) MultiSet(ss ...string) *Event {
+	if len(ss)%2 != 0 {
+		return e
+	}
+	if e.Header == nil {
+		e.Header = make(map[string]string)
+	}
+	for i := 0; i < len(ss); i += 2 {
+		e.Header[ss[i]] = ss[i+1]
+	}
+	return e
+}
+
+func (e *Event) Get(k string) string {
+	if e.Header == nil {
+		return ""
+	}
+	return e.Header[k]
+}
+
+func (e *Event) Encode() (buf []byte) {
+	buf, _ = json.Marshal(e)
+	return
+}
+
+func NewEvent(name string, domain string) *Event {
+	e := applyEvent()
+	e.Name = name
+	e.Domain = domain
+	return e
+}

+ 39 - 0
bus/global.go

@@ -0,0 +1,39 @@
+package bus
+
+import "context"
+
+var (
+	Default Bus
+)
+
+func init() {
+	Default = NewInPrcBus(context.Background())
+}
+
+func SetGlobal(bus Bus) {
+	Default = bus
+}
+
+func GetGlobal() Bus {
+	return Default
+}
+
+func Publish(e *Event) {
+	Default.Publish(e)
+}
+
+func Dispatch(e *Event) (err error) {
+	return Default.Dispatch(e)
+}
+
+func DispatchCtx(ctx context.Context, e *Event) (err error) {
+	return Default.DispatchCtx(ctx, e)
+}
+
+func Subscribe(sub Subscriber) (err error) {
+	return Default.Subscribe(sub)
+}
+
+func UnSubscribe(sub Subscriber) (err error) {
+	return Default.UnSubscribe(sub)
+}

+ 107 - 0
bus/inprocess.go

@@ -0,0 +1,107 @@
+package bus
+
+import (
+	"context"
+	"fmt"
+	"runtime"
+	"sync"
+)
+
+type InProcBus struct {
+	ctx              context.Context
+	once             sync.Once
+	eventChan        chan *Event
+	subscriberLocker sync.RWMutex
+	subscribers      map[string]Subscriber
+}
+
+func (bus *InProcBus) Subscribers() []Subscriber {
+	bus.subscriberLocker.RLock()
+	defer bus.subscriberLocker.RUnlock()
+	vs := make([]Subscriber, len(bus.subscribers))
+	i := 0
+	for _, v := range bus.subscribers {
+		vs[i] = v
+		i++
+	}
+	return vs
+}
+
+func (bus *InProcBus) worker() {
+	for {
+		select {
+		case e, ok := <-bus.eventChan:
+			if ok {
+				_ = bus.Dispatch(e)
+			}
+		case <-bus.ctx.Done():
+			return
+		}
+	}
+}
+
+// Publish 发布一个事件
+func (bus *InProcBus) Publish(e *Event) {
+	bus.once.Do(func() {
+		for i := 0; i < runtime.NumCPU(); i++ {
+			go bus.worker()
+		}
+	})
+	select {
+	case bus.eventChan <- e:
+	default:
+	}
+	return
+}
+
+// Dispatch 分配事件,关心返回值
+func (bus *InProcBus) Dispatch(e *Event) (err error) {
+	return bus.DispatchCtx(context.Background(), e)
+}
+
+func (bus *InProcBus) DispatchCtx(ctx context.Context, e *Event) (err error) {
+	bus.subscriberLocker.RLock()
+	defer bus.subscriberLocker.RUnlock()
+	for _, sub := range bus.subscribers {
+		if sub.HasTopic(e.Name) {
+			err = sub.Process(ctx, e)
+		}
+	}
+	releaseEvent(e)
+	return
+}
+
+func (bus *InProcBus) Subscribe(sub Subscriber) (err error) {
+	bus.subscriberLocker.Lock()
+	defer bus.subscriberLocker.Unlock()
+	if _, ok := bus.subscribers[sub.ID()]; ok {
+		err = fmt.Errorf("subscriber %s already exists", sub.ID())
+		return
+	}
+	if err = sub.OnAttach(); err != nil {
+		return
+	}
+	bus.subscribers[sub.ID()] = sub
+	return
+}
+
+func (bus *InProcBus) UnSubscribe(sub Subscriber) (err error) {
+	bus.subscriberLocker.Lock()
+	defer bus.subscriberLocker.Unlock()
+	if instance, ok := bus.subscribers[sub.ID()]; !ok {
+		err = fmt.Errorf("subscriber %s not exists", sub.ID())
+		return
+	} else {
+		delete(bus.subscribers, sub.ID())
+		err = instance.OnDetach()
+	}
+	return
+}
+
+func NewInPrcBus(ctx context.Context) *InProcBus {
+	return &InProcBus{
+		ctx:         ctx,
+		eventChan:   make(chan *Event, 100),
+		subscribers: make(map[string]Subscriber),
+	}
+}

+ 15 - 0
bus/subscriber.go

@@ -0,0 +1,15 @@
+package bus
+
+import (
+	"context"
+)
+
+type HandleFunc func(ctx context.Context, e *Event) error
+
+type Subscriber interface {
+	ID() string
+	HasTopic(topic string) bool
+	OnAttach() (err error)
+	Process(ctx context.Context, e *Event) (err error)
+	OnDetach() (err error)
+}

+ 97 - 0
bus/subscriber/http.go

@@ -0,0 +1,97 @@
+package subscriber
+
+import (
+	"bytes"
+	"context"
+	"encoding/base64"
+	"encoding/json"
+	"encoding/xml"
+	"fmt"
+	"git.nspix.com/golang/micro/bus"
+	"net/http"
+	"strconv"
+	"time"
+)
+
+var (
+	MaxHttpTimeout = time.Second * 10  //最大的HTTP处理超时时间
+)
+
+var (
+	httpClient = &http.Client{
+		Timeout: MaxHttpTimeout,
+	}
+)
+
+type Http struct {
+	Id     string
+	Url    string
+	Topics []string
+	Format string
+}
+
+func (sub *Http) ID() string {
+	return sub.Id
+}
+
+func (sub *Http) HasTopic(topic string) bool {
+	for _, s := range sub.Topics {
+		if topic == s {
+			return true
+		}
+	}
+	return false
+}
+
+func (sub *Http) OnAttach() (err error) {
+	return
+}
+
+func (sub *Http) Process(ctx context.Context, e *bus.Event) (err error) {
+	var (
+		req  *http.Request
+		resp *http.Response
+		buf  []byte
+	)
+	if sub.Format == "xml" {
+		if buf, err = json.Marshal(e); err != nil {
+			return
+		}
+	} else {
+		if buf, err = xml.Marshal(e); err != nil {
+			return
+		}
+	}
+	if req, err = http.NewRequest(http.MethodPost, sub.Url, bytes.NewReader(buf)); err != nil {
+		return
+	}
+	if sub.Format == "xml" {
+		req.Header.Set("Content-Type", "application/xml")
+	} else {
+		req.Header.Set("Content-Type", "application/json")
+	}
+	req.Header.Set("X-Event-Name", e.Name)
+	req.Header.Set("X-Event-Timestamp", strconv.FormatInt(e.Timestamp, 10))
+	req.Header.Set("User-Agent", "micro")
+	if resp, err = httpClient.Do(req.WithContext(ctx)); err != nil {
+		return
+	}
+	if resp.StatusCode != http.StatusOK {
+		err = fmt.Errorf("request \"%s\" got status code %d", sub.Url, resp.StatusCode)
+	}
+	_ = resp.Body.Close()
+	return
+}
+
+func (sub *Http) OnDetach() (err error) {
+	return
+}
+
+func NewHttpSubscriber(topics []string, url string) *Http {
+	return &Http{
+		Id:     base64.StdEncoding.EncodeToString([]byte(url)),
+		Url:    url,
+		Topics: topics,
+		Format: "json",
+	}
+}

+ 46 - 0
bus/subscriber/memory.go

@@ -0,0 +1,46 @@
+package subscriber
+
+import (
+	"context"
+	"fmt"
+	"git.nspix.com/golang/micro/bus"
+	"reflect"
+)
+
+type Memory struct {
+	Id    string
+	topic string
+	cb    bus.HandleFunc
+}
+
+func (sub *Memory) ID() string {
+	return sub.Id
+}
+
+func (sub *Memory) HasTopic(topic string) bool {
+	return sub.topic == topic || sub.topic == "ALL"
+}
+
+func (sub *Memory) OnAttach() (err error) {
+	return
+}
+
+func (sub *Memory) Process(ctx context.Context, e *bus.Event) (err error) {
+	if sub.cb != nil {
+		err = sub.cb(ctx, e)
+	}
+	return
+}
+
+func (sub *Memory) OnDetach() (err error) {
+	return
+}
+
+func NewMemorySubscriber(topic string, cb bus.HandleFunc) *Memory {
+	id := topic + ":" + fmt.Sprint(reflect.ValueOf(cb).Pointer())
+	return &Memory{
+		Id:    id,
+		topic: topic,
+		cb:    cb,
+	}
+}

+ 101 - 0
bus/subscriber/websocket.go

@@ -0,0 +1,101 @@
+package subscriber
+
+import (
+	"context"
+	"encoding/json"
+	"git.nspix.com/golang/micro/bus"
+	"golang.org/x/net/websocket"
+)
+
+type Websocket struct {
+	id     string
+	sid    string
+	domain string
+	topics []string
+	conn   *websocket.Conn
+}
+
+type websocketMessageWrapper struct {
+	Type string     `json:"type"`
+	From string     `json:"from"`
+	To   string     `json:"to"`
+	Body *bus.Event `json:"body"`
+}
+
+func (sub *Websocket) ID() string {
+	return sub.id
+}
+
+func (sub *Websocket) HasTopic(topic string) bool {
+	for _, s := range sub.topics {
+		if topic == s {
+			return true
+		}
+	}
+	return false
+}
+
+func (sub *Websocket) OnAttach() (err error) {
+	return
+}
+
+func (sub *Websocket) Process(ctx context.Context, e *bus.Event) (err error) {
+	//websocket 只处理改域下面的消息
+	if e.Domain != sub.domain {
+		return
+	}
+	//websocket 只处理输出它自己的消息
+	if e.Trigger != "" && e.Trigger != sub.sid {
+		return
+	}
+	msg := &websocketMessageWrapper{
+		Type: "event",
+		From: "system",
+		To:   sub.sid,
+		Body: e,
+	}
+	var buf []byte
+	if buf, err = json.Marshal(msg); err != nil {
+		return
+	}
+	//写入消息到数据流
+	_, err = sub.conn.Write(buf)
+	return
+}
+
+func (sub *Websocket) OnDetach() (err error) {
+	return
+}
+
+func (sub *Websocket) Subscribe(topic string) {
+	if sub.topics == nil {
+		sub.topics = make([]string, 0)
+	}
+	for _, s := range sub.topics {
+		if s == topic {
+			return
+		}
+	}
+	sub.topics = append(sub.topics, topic)
+}
+
+func (sub *Websocket) Unsubscribe(topic string) {
+	if sub.topics == nil {
+		return
+	}
+	for i, s := range sub.topics {
+		if s == topic {
+			sub.topics = append(sub.topics[:i], sub.topics[i+1:]...)
+			return
+		}
+	}
+}
+
+func NewWebsocketSubscriber(id string, sid string, domain string, conn *websocket.Conn) *Websocket {
+	return &Websocket{
+		id:     "websocket:" + id,
+		sid:    sid,
+		domain: domain,
+		conn:   conn,
+	}
+}