|
@@ -3,6 +3,7 @@ package broker
|
|
|
import (
|
|
|
"context"
|
|
|
"fmt"
|
|
|
+ "git.nspix.com/golang/micro/log"
|
|
|
"runtime"
|
|
|
"sync"
|
|
|
)
|
|
@@ -15,7 +16,6 @@ type InProcBus struct {
|
|
|
subscribers map[string]Subscriber
|
|
|
}
|
|
|
|
|
|
-
|
|
|
func (bus *InProcBus) worker() {
|
|
|
for {
|
|
|
select {
|
|
@@ -67,7 +67,9 @@ func (bus *InProcBus) DispatchCtx(ctx context.Context, e *Event) (err error) {
|
|
|
defer bus.subscriberLocker.RUnlock()
|
|
|
for _, sub := range bus.subscribers {
|
|
|
if sub.HasTopic(e.Name) {
|
|
|
- err = sub.Process(ctx, e)
|
|
|
+ if err = sub.Process(ctx, e); err != nil {
|
|
|
+ log.Warnf("subscriber %s process event %s error: %s", sub.ID(), e.Name, err.Error())
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
releaseEvent(e)
|
|
@@ -109,4 +111,4 @@ func NewInPrcBus(ctx context.Context) *InProcBus {
|
|
|
eventChan: make(chan *Event, 100),
|
|
|
subscribers: make(map[string]Subscriber),
|
|
|
}
|
|
|
-}
|
|
|
+}
|