123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- package broker
- import (
- "context"
- "fmt"
- "git.nspix.com/golang/micro/log"
- "runtime"
- "sync"
- )
- type InProcBus struct {
- ctx context.Context
- once sync.Once
- eventChan chan *Event
- subscriberLocker sync.RWMutex
- subscribers map[string]Subscriber
- }
- func (bus *InProcBus) worker() {
- for {
- select {
- case e, ok := <-bus.eventChan:
- if ok {
- _ = bus.Dispatch(e)
- }
- case <-bus.ctx.Done():
- return
- }
- }
- }
- //Subscribers 获取所有订阅者的快照
- func (bus *InProcBus) Subscribers() []Subscriber {
- bus.subscriberLocker.RLock()
- defer bus.subscriberLocker.RUnlock()
- vs := make([]Subscriber, len(bus.subscribers))
- i := 0
- for _, v := range bus.subscribers {
- vs[i] = v
- i++
- }
- return vs
- }
- // Publish 发布一个事件
- func (bus *InProcBus) Publish(e *Event) {
- bus.once.Do(func() {
- for i := 0; i < runtime.NumCPU(); i++ {
- go bus.worker()
- }
- })
- select {
- case bus.eventChan <- e:
- default:
- }
- return
- }
- // Dispatch 分配事件,关心返回值
- func (bus *InProcBus) Dispatch(e *Event) (err error) {
- return bus.DispatchCtx(context.Background(), e)
- }
- //DispatchCtx 分配一个事件
- func (bus *InProcBus) DispatchCtx(ctx context.Context, e *Event) (err error) {
- bus.subscriberLocker.RLock()
- defer bus.subscriberLocker.RUnlock()
- for _, sub := range bus.subscribers {
- if sub.HasTopic(e.Name) {
- if err = sub.Process(ctx, e); err != nil {
- log.Warnf("subscriber %s process event %s error: %s", sub.ID(), e.Name, err.Error())
- }
- }
- }
- releaseEvent(e)
- return
- }
- //Subscribe 订阅一个事件
- func (bus *InProcBus) Subscribe(sub Subscriber) (err error) {
- bus.subscriberLocker.Lock()
- defer bus.subscriberLocker.Unlock()
- if _, ok := bus.subscribers[sub.ID()]; ok {
- err = fmt.Errorf("subscriber %s already exists", sub.ID())
- return
- }
- if err = sub.OnAttach(); err != nil {
- return
- }
- bus.subscribers[sub.ID()] = sub
- return
- }
- //UnSubscribe 取消一个订阅
- func (bus *InProcBus) UnSubscribe(sub Subscriber) (err error) {
- bus.subscriberLocker.Lock()
- defer bus.subscriberLocker.Unlock()
- if instance, ok := bus.subscribers[sub.ID()]; !ok {
- err = fmt.Errorf("subscriber %s not exists", sub.ID())
- return
- } else {
- delete(bus.subscribers, sub.ID())
- err = instance.OnDetach()
- }
- return
- }
- func NewInPrcBus(ctx context.Context) *InProcBus {
- return &InProcBus{
- ctx: ctx,
- eventChan: make(chan *Event, 100),
- subscribers: make(map[string]Subscriber),
- }
- }
|