Browse Source

修复问题

fancl 1 year ago
parent
commit
26a3e72515
4 changed files with 56 additions and 5 deletions
  1. 41 4
      broker/inprocess.go
  2. 0 1
      cmd/main.go
  3. 4 0
      gateway/cli/server.go
  4. 11 0
      service.go

+ 41 - 4
broker/inprocess.go

@@ -3,8 +3,9 @@ package broker
 import (
 	"context"
 	"fmt"
-	"runtime"
 	"sync"
+	"sync/atomic"
+	"time"
 
 	"git.nspix.com/golang/micro/log"
 )
@@ -14,6 +15,8 @@ type InProcBus struct {
 	once        sync.Once
 	eventChan   chan *Event
 	subscribers sync.Map
+	numOfWorker int32
+	cwChan      chan struct{}
 }
 
 func (bus *InProcBus) worker() {
@@ -23,6 +26,41 @@ func (bus *InProcBus) worker() {
 			if ok {
 				_ = bus.Dispatch(e)
 			}
+		case <-bus.cwChan:
+			return
+		case <-bus.ctx.Done():
+			return
+		}
+	}
+}
+
+func (bus *InProcBus) resizePool(num int) {
+	needPoolSize := int32(float64(num) / 1.5)
+	if needPoolSize <= 1 {
+		needPoolSize = 1
+	}
+	if needPoolSize > 20 {
+		needPoolSize = 20
+	}
+	for {
+		if needPoolSize == atomic.LoadInt32(&bus.numOfWorker) {
+			break
+		} else if needPoolSize < atomic.LoadInt32(&bus.numOfWorker) {
+			bus.cwChan <- struct{}{}
+		} else {
+			go bus.worker()
+			atomic.AddInt32(&bus.numOfWorker, 1)
+		}
+	}
+}
+
+func (bus *InProcBus) eventLoop() {
+	ticker := time.NewTicker(time.Second * 5)
+	defer ticker.Stop()
+	for {
+		select {
+		case <-ticker.C:
+			bus.resizePool(len(bus.eventChan))
 		case <-bus.ctx.Done():
 			return
 		}
@@ -98,9 +136,8 @@ func NewInPrcBus(ctx context.Context) *InProcBus {
 		eventChan: make(chan *Event, 1024),
 	}
 	bus.once.Do(func() {
-		for i := 0; i < runtime.NumCPU()*2; i++ {
-			go bus.worker()
-		}
+		atomic.AddInt32(&bus.numOfWorker, 1)
+		go bus.worker()
 	})
 	return bus
 }

+ 0 - 1
cmd/main.go

@@ -66,7 +66,6 @@ func main() {
 			return ctx.Success(ctx.ParamValue("uid"))
 		})
 	})
-
 	if err := svr.Run(); err != nil {
 		fmt.Println(err)
 	}

+ 4 - 0
gateway/cli/server.go

@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"git.nspix.com/golang/micro/helper/pool/bytepool"
 	"git.nspix.com/golang/micro/helper/utils"
+	"git.nspix.com/golang/micro/log"
 	"net"
 	"os"
 	"path"
@@ -85,6 +86,9 @@ func (svr *Server) process(id int32, conn net.Conn) (err error) {
 		delete(svr.contextMap, id)
 		svr.locker.Unlock()
 		err = conn.Close()
+		if v := recover(); v != nil {
+			log.Errorf("handle %d command failed cause by %v", id, v)
+		}
 	}()
 	for {
 		reqPacket := &Frame{}

+ 11 - 0
service.go

@@ -55,6 +55,7 @@ type Service struct {
 	triesRegister int
 	tickTimer     *time.Timer
 	tickTree      *btree.BTree
+	tickLocker    sync.RWMutex
 	deferHandles  []handleEntry
 	environment   string
 	readyFlag     int32
@@ -81,14 +82,18 @@ func (svr *Service) worker() {
 	for {
 		select {
 		case <-svr.tickTimer.C:
+			svr.tickLocker.RLock()
 			node := svr.tickTree.Min()
+			svr.tickLocker.RUnlock()
 			if node == nil {
 				svr.tickTimer.Reset(math.MaxInt64)
 				break
 			}
 			tick := node.(*tickPtr)
 			if tick.next.Before(time.Now()) {
+				svr.tickLocker.Lock()
 				svr.tickTree.Delete(node)
+				svr.tickLocker.Unlock()
 				if atomic.CompareAndSwapInt32(&tick.running, 0, 1) {
 					svr.async(func() {
 						if !tick.options.Canceled {
@@ -96,7 +101,9 @@ func (svr *Service) worker() {
 						}
 					})
 				}
+				svr.tickLocker.RLock()
 				next := svr.tickTree.Min()
+				svr.tickLocker.RUnlock()
 				if next == nil {
 					svr.tickTimer.Reset(math.MaxInt64)
 				} else {
@@ -137,7 +144,9 @@ func (svr *Service) DeferTick(duration time.Duration, callback HandleTickerFunc,
 		tick.options.Context = svr.ctx
 	}
 	svr.tickTimer.Reset(0)
+	svr.tickLocker.Lock()
 	svr.tickTree.ReplaceOrInsert(tick)
+	svr.tickLocker.Unlock()
 	return tick.sequence
 }
 
@@ -468,7 +477,9 @@ func (svr *Service) destroy() (err error) {
 	}
 	svr.tickTimer.Stop()
 	for svr.tickTree.Len() > 0 {
+		svr.tickLocker.Lock()
 		node := svr.tickTree.DeleteMin()
+		svr.tickLocker.Unlock()
 		if node == nil {
 			break
 		}