|
@@ -3,17 +3,17 @@ package broker
|
|
|
import (
|
|
|
"context"
|
|
|
"fmt"
|
|
|
- "git.nspix.com/golang/micro/log"
|
|
|
"runtime"
|
|
|
"sync"
|
|
|
+
|
|
|
+ "git.nspix.com/golang/micro/log"
|
|
|
)
|
|
|
|
|
|
type InProcBus struct {
|
|
|
- ctx context.Context
|
|
|
- once sync.Once
|
|
|
- eventChan chan *Event
|
|
|
- subscriberLocker sync.RWMutex
|
|
|
- subscribers map[string]Subscriber
|
|
|
+ ctx context.Context
|
|
|
+ once sync.Once
|
|
|
+ eventChan chan *Event
|
|
|
+ subscribers sync.Map
|
|
|
}
|
|
|
|
|
|
func (bus *InProcBus) worker() {
|
|
@@ -31,14 +31,11 @@ func (bus *InProcBus) worker() {
|
|
|
|
|
|
//Subscribers 获取所有订阅者的快照
|
|
|
func (bus *InProcBus) Subscribers() []Subscriber {
|
|
|
- bus.subscriberLocker.RLock()
|
|
|
- defer bus.subscriberLocker.RUnlock()
|
|
|
- vs := make([]Subscriber, len(bus.subscribers))
|
|
|
- i := 0
|
|
|
- for _, v := range bus.subscribers {
|
|
|
- vs[i] = v
|
|
|
- i++
|
|
|
- }
|
|
|
+ vs := make([]Subscriber, 0)
|
|
|
+ bus.subscribers.Range(func(key, value interface{}) bool {
|
|
|
+ vs = append(vs, value.(Subscriber))
|
|
|
+ return true
|
|
|
+ })
|
|
|
return vs
|
|
|
}
|
|
|
|
|
@@ -63,52 +60,41 @@ func (bus *InProcBus) Dispatch(e *Event) (err error) {
|
|
|
|
|
|
//DispatchCtx 分配一个事件
|
|
|
func (bus *InProcBus) DispatchCtx(ctx context.Context, e *Event) (err error) {
|
|
|
- bus.subscriberLocker.RLock()
|
|
|
- defer bus.subscriberLocker.RUnlock()
|
|
|
- for _, sub := range bus.subscribers {
|
|
|
+ bus.subscribers.Range(func(key, value interface{}) bool {
|
|
|
+ sub := value.(Subscriber)
|
|
|
if sub.HasTopic(e.Name) {
|
|
|
if err = sub.Process(ctx, e); err != nil {
|
|
|
log.Warnf("subscriber %s process event %s error: %s", sub.ID(), e.Name, err.Error())
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
+ return true
|
|
|
+ })
|
|
|
releaseEvent(e)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
//Subscribe 订阅一个事件
|
|
|
func (bus *InProcBus) Subscribe(sub Subscriber) (err error) {
|
|
|
- bus.subscriberLocker.Lock()
|
|
|
- defer bus.subscriberLocker.Unlock()
|
|
|
- if _, ok := bus.subscribers[sub.ID()]; ok {
|
|
|
+ if _, ok := bus.subscribers.Load(sub.ID()); ok {
|
|
|
err = fmt.Errorf("subscriber %s already exists", sub.ID())
|
|
|
return
|
|
|
}
|
|
|
if err = sub.OnAttach(); err != nil {
|
|
|
return
|
|
|
}
|
|
|
- bus.subscribers[sub.ID()] = sub
|
|
|
+ bus.subscribers.Store(sub.ID(), sub)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
//UnSubscribe 取消一个订阅
|
|
|
func (bus *InProcBus) UnSubscribe(sub Subscriber) (err error) {
|
|
|
- bus.subscriberLocker.Lock()
|
|
|
- defer bus.subscriberLocker.Unlock()
|
|
|
- if instance, ok := bus.subscribers[sub.ID()]; !ok {
|
|
|
- err = fmt.Errorf("subscriber %s not exists", sub.ID())
|
|
|
- return
|
|
|
- } else {
|
|
|
- delete(bus.subscribers, sub.ID())
|
|
|
- err = instance.OnDetach()
|
|
|
- }
|
|
|
+ bus.subscribers.Delete(sub.ID())
|
|
|
return
|
|
|
}
|
|
|
|
|
|
func NewInPrcBus(ctx context.Context) *InProcBus {
|
|
|
return &InProcBus{
|
|
|
- ctx: ctx,
|
|
|
- eventChan: make(chan *Event, 100),
|
|
|
- subscribers: make(map[string]Subscriber),
|
|
|
+ ctx: ctx,
|
|
|
+ eventChan: make(chan *Event, 100),
|
|
|
}
|
|
|
}
|