소스 검색

优化rpc客户端断开连接可能出现崩溃的问题

lxg 3 년 전
부모
커밋
e998e88009
1개의 변경된 파일10개의 추가작업 그리고 10개의 파일을 삭제
  1. 10 10
      gateway/rpc/client.go

+ 10 - 10
gateway/rpc/client.go

@@ -34,21 +34,22 @@ type (
 	}
 	}
 
 
 	transaction struct {
 	transaction struct {
-		sequence   uint16
-		response   *Response
-		isCanceled bool
-		ch         chan *transaction
+		sequence     uint16
+		response     *Response
+		canceledFlag int32
+		ch           chan *transaction
 	}
 	}
 )
 )
 
 
 func (t *transaction) Cancel() {
 func (t *transaction) Cancel() {
-	t.isCanceled = true
-	close(t.ch)
+	if atomic.CompareAndSwapInt32(&t.canceledFlag, 0, 1) {
+		close(t.ch)
+	}
 }
 }
 
 
 func (t *transaction) Done(r *Response) {
 func (t *transaction) Done(r *Response) {
 	t.response = r
 	t.response = r
-	if t.ch != nil && !t.isCanceled {
+	if t.ch != nil && atomic.LoadInt32(&t.canceledFlag) == 0 {
 		select {
 		select {
 		case t.ch <- t:
 		case t.ch <- t:
 		default:
 		default:
@@ -59,9 +60,8 @@ func (t *transaction) Done(r *Response) {
 func (c *Client) commit(seq uint16) *transaction {
 func (c *Client) commit(seq uint16) *transaction {
 	c.transactionLocker.Lock()
 	c.transactionLocker.Lock()
 	trans := &transaction{
 	trans := &transaction{
-		sequence:   seq,
-		isCanceled: false,
-		ch:         make(chan *transaction),
+		sequence: seq,
+		ch:       make(chan *transaction),
 	}
 	}
 	c.transaction[seq] = trans
 	c.transaction[seq] = trans
 	c.transactionLocker.Unlock()
 	c.transactionLocker.Unlock()