package bus import ( "context" "fmt" "runtime" "sync" ) type InProcBus struct { ctx context.Context once sync.Once eventChan chan *Event subscriberLocker sync.RWMutex subscribers map[string]Subscriber } func (bus *InProcBus) Subscribers() []Subscriber { bus.subscriberLocker.RLock() defer bus.subscriberLocker.RUnlock() vs := make([]Subscriber, len(bus.subscribers)) i := 0 for _, v := range bus.subscribers { vs[i] = v i++ } return vs } func (bus *InProcBus) worker() { for { select { case e, ok := <-bus.eventChan: if ok { _ = bus.Dispatch(e) } case <-bus.ctx.Done(): return } } } // Publish 发布一个事件 func (bus *InProcBus) Publish(e *Event) { bus.once.Do(func() { for i := 0; i < runtime.NumCPU(); i++ { go bus.worker() } }) select { case bus.eventChan <- e: default: } return } // Dispatch 分配事件,关心返回值 func (bus *InProcBus) Dispatch(e *Event) (err error) { return bus.DispatchCtx(context.Background(), e) } func (bus *InProcBus) DispatchCtx(ctx context.Context, e *Event) (err error) { bus.subscriberLocker.RLock() defer bus.subscriberLocker.RUnlock() for _, sub := range bus.subscribers { if sub.HasTopic(e.Name) { err = sub.Process(ctx, e) } } releaseEvent(e) return } func (bus *InProcBus) Subscribe(sub Subscriber) (err error) { bus.subscriberLocker.Lock() defer bus.subscriberLocker.Unlock() if _, ok := bus.subscribers[sub.ID()]; ok { err = fmt.Errorf("subscriber %s already exists", sub.ID()) return } if err = sub.OnAttach(); err != nil { return } bus.subscribers[sub.ID()] = sub return } func (bus *InProcBus) UnSubscribe(sub Subscriber) (err error) { bus.subscriberLocker.Lock() defer bus.subscriberLocker.Unlock() if instance, ok := bus.subscribers[sub.ID()]; !ok { err = fmt.Errorf("subscriber %s not exists", sub.ID()) return } else { delete(bus.subscribers, sub.ID()) err = instance.OnDetach() } return } func NewInPrcBus(ctx context.Context) *InProcBus { return &InProcBus{ ctx: ctx, eventChan: make(chan *Event, 100), subscribers: make(map[string]Subscriber), } }