Browse Source

rpc请求服务端添加重试处理

lxg 2 years ago
parent
commit
08465bb774
2 changed files with 13 additions and 7 deletions
  1. 12 6
      client.go
  2. 1 1
      service.go

+ 12 - 6
client.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"fmt"
 	"sync"
+	"sync/atomic"
 
 	"git.nspix.com/golang/micro/gateway/rpc"
 	"git.nspix.com/golang/micro/registry"
@@ -17,18 +18,20 @@ type Client struct {
 
 func (c *Client) Do(ctx context.Context, r *Request) (resp Response, err error) {
 	var (
-		ok     bool
-		node   *registry.ServiceNode
-		value  interface{}
-		client *rpc.Client
+		ok      bool
+		node    *registry.ServiceNode
+		value   interface{}
+		client  *rpc.Client
+		tryFlag int32
 	)
 	value, ok = c.clients.Load(r.ServiceName)
 	if ok {
 		client = value.(*rpc.Client)
 		goto _EXEC
 	}
+_RETRY:
 	if node, err = c.selector.Select(ctx, r.ServiceName); err != nil {
-		err = fmt.Errorf("selector %s failed (%s)", r.ServiceName, err.Error())
+		err = fmt.Errorf("get service %s failed cause by %s", r.ServiceName, err.Error())
 		return
 	}
 	client = rpc.NewClient()
@@ -38,8 +41,11 @@ func (c *Client) Do(ctx context.Context, r *Request) (resp Response, err error)
 	c.clients.Store(r.ServiceName, client)
 _EXEC:
 	if resp, err = client.Do(ctx, rpc.NewRequest(r.Method, r.Body)); err != nil {
-		client.Close()
+		err = client.Close()
 		c.clients.Delete(r.ServiceName)
+		if atomic.CompareAndSwapInt32(&tryFlag, 0, 1) {
+			goto _RETRY
+		}
 	}
 	return
 }

+ 1 - 1
service.go

@@ -477,7 +477,7 @@ func (svr *Service) Reload() (err error) {
 	if svr.opts.Server == nil {
 		return
 	}
-	log.Infof("reload server")
+	log.Infof("reloading server %s", svr.opts.Name)
 	if err = svr.opts.Server.Stop(); err != nil {
 		return
 	}