package broker import ( "context" "fmt" "sync" "sync/atomic" "time" "git.nspix.com/golang/micro/log" ) type InProcBus struct { ctx context.Context once sync.Once eventChan chan *Event subscribers sync.Map numOfWorker int32 cwChan chan struct{} } func (bus *InProcBus) worker() { for { select { case e, ok := <-bus.eventChan: if ok { _ = bus.Dispatch(e) } case <-bus.cwChan: return case <-bus.ctx.Done(): return } } } func (bus *InProcBus) resizePool(num int) { needPoolSize := int32(float64(num) / 1.5) if needPoolSize <= 1 { needPoolSize = 1 } if needPoolSize > 20 { needPoolSize = 20 } for { if needPoolSize == atomic.LoadInt32(&bus.numOfWorker) { break } else if needPoolSize < atomic.LoadInt32(&bus.numOfWorker) { bus.cwChan <- struct{}{} } else { go bus.worker() atomic.AddInt32(&bus.numOfWorker, 1) } } } func (bus *InProcBus) eventLoop() { ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() for { select { case <-ticker.C: bus.resizePool(len(bus.eventChan)) case <-bus.ctx.Done(): return } } } //Subscribers 获取所有订阅者的快照 func (bus *InProcBus) Subscribers() []Subscriber { vs := make([]Subscriber, 0) bus.subscribers.Range(func(key, value interface{}) bool { vs = append(vs, value.(Subscriber)) return true }) return vs } // Publish 发布一个事件 func (bus *InProcBus) Publish(e *Event) { select { case bus.eventChan <- e: default: log.Warnf("event queue is full, event %s@%s has been discarded", e.Name, e.Namespace) } return } // Dispatch 分配事件,关心返回值 func (bus *InProcBus) Dispatch(e *Event) (err error) { return bus.DispatchCtx(bus.ctx, e) } //DispatchCtx 分配一个事件 func (bus *InProcBus) DispatchCtx(ctx context.Context, e *Event) (err error) { bus.subscribers.Range(func(key, value interface{}) bool { sub := value.(Subscriber) if sub.HasTopic(e.Name) { if err = sub.Process(ctx, e); err != nil { log.Warnf("subscriber %s process event %s error: %s", sub.ID(), e.Name, err.Error()) } } return true }) releaseEvent(e) return } //Subscribe 订阅一个事件 func (bus *InProcBus) Subscribe(sub Subscriber) (err error) { if _, ok := bus.subscribers.Load(sub.ID()); ok { err = fmt.Errorf("subscriber %s already exists", sub.ID()) return } if err = sub.OnAttach(); err != nil { return } bus.subscribers.Store(sub.ID(), sub) return } //UnSubscribe 取消一个订阅 func (bus *InProcBus) UnSubscribe(sub Subscriber) (err error) { bus.subscribers.Delete(sub.ID()) return } func (bus *InProcBus) WithContext(ctx context.Context) { bus.ctx = ctx } func NewInPrcBus(ctx context.Context) *InProcBus { bus := &InProcBus{ ctx: ctx, eventChan: make(chan *Event, 1024), } bus.once.Do(func() { atomic.AddInt32(&bus.numOfWorker, 1) go bus.worker() }) return bus }