Browse Source

修复问题

lxg 4 years ago
parent
commit
8c49c442c0
8 changed files with 62 additions and 45 deletions
  1. 5 7
      .idea/workspace.xml
  2. 17 14
      client.go
  3. 1 2
      cmd/main.go
  4. 0 10
      gateway/context.go
  5. 23 4
      gateway/rpc/client.go
  6. 1 1
      gateway/rpc/server.go
  7. 12 4
      micro.go
  8. 3 3
      service.go

+ 5 - 7
.idea/workspace.xml

@@ -2,15 +2,13 @@
 <project version="4">
   <component name="ChangeListManager">
     <list default="true" id="cd58867b-089e-4508-9033-393b8939261c" name="Default Changelist" comment="">
-      <change afterPath="$PROJECT_DIR$/client.go" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/cmd/mock/client/main.go" beforeDir="false" afterPath="$PROJECT_DIR$/cmd/mock/client/main.go" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/gateway/context.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/context.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/client.go" beforeDir="false" afterPath="$PROJECT_DIR$/client.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/cmd/main.go" beforeDir="false" afterPath="$PROJECT_DIR$/cmd/main.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/gateway/context.go" beforeDir="false" />
       <change beforePath="$PROJECT_DIR$/gateway/rpc/client.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/client.go" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/gateway/rpc/codec/gob_codec.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/codec/gob_codec.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/gateway/rpc/server.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/server.go" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/micro.go" beforeDir="false" afterPath="$PROJECT_DIR$/micro.go" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/options.go" beforeDir="false" afterPath="$PROJECT_DIR$/options.go" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/registry/selector.go" beforeDir="false" afterPath="$PROJECT_DIR$/registry/selector.go" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/service.go" beforeDir="false" afterPath="$PROJECT_DIR$/service.go" afterDir="false" />
     </list>
     <option name="SHOW_DIALOG" value="false" />
@@ -116,9 +114,9 @@
     </configuration>
     <recent_temporary>
       <list>
+        <item itemvalue="Go Build.go build git.nspix.com/golang/micro/cmd" />
         <item itemvalue="Go Build.go build git.nspix.com/golang/micro/cmd/mock/client" />
         <item itemvalue="Go Build.go build git.nspix.com/golang/micro/cmd/mock/rpc" />
-        <item itemvalue="Go Build.go build git.nspix.com/golang/micro/cmd" />
         <item itemvalue="Go Build.go build git.nspix.com/micro/cmd/mock/client" />
         <item itemvalue="Go Build.go build git.nspix.com/micro/cmd/mock/server" />
       </list>

+ 17 - 14
client.go

@@ -9,47 +9,50 @@ import (
 )
 
 type Client struct {
-	selector     *registry.Selector
-	clientLocker sync.RWMutex
-	clients      map[string]*rpc.Client
+	selector *registry.Selector
+	clients  sync.Map
+	once     sync.Once
 }
 
 func (c *Client) Do(ctx context.Context, r *Request) (resp Response, err error) {
 	var (
 		ok     bool
 		node   *registry.ServiceNode
+		value  interface{}
 		client *rpc.Client
 	)
-	c.clientLocker.RLock()
-	client, ok = c.clients[r.ServiceName]
-	c.clientLocker.RUnlock()
+	value, ok = c.clients.Load(r.ServiceName)
 	if ok {
+		client = value.(*rpc.Client)
 		goto _EXEC
 	}
 	if node, err = c.selector.Select(ctx, r.ServiceName); err != nil {
 		return
 	}
 	client = rpc.NewClient()
-	if err = client.Dialer("tcp", fmt.Sprintf("%s:%d", node.Address, node.Port)); err != nil {
+	if err = client.DialerContext(ctx, "tcp", fmt.Sprintf("%s:%d", node.Address, node.Port)); err != nil {
 		return
 	}
+	c.clients.Store(r.ServiceName, client)
 _EXEC:
-	return client.Do(ctx, rpc.NewRequest(r.Method, r.Body))
+	if resp, err = client.Do(ctx, rpc.NewRequest(r.Method, r.Body)); err != nil {
+		c.clients.Delete(r.ServiceName)
+	}
+	return
 }
 
 func (c *Client) Close() (err error) {
-	c.clientLocker.Lock()
-	defer c.clientLocker.Unlock()
-	for name, client := range c.clients {
+	c.clients.Range(func(key, value interface{}) bool {
+		client := value.(*rpc.Client)
 		_ = client.Close()
-		delete(c.clients, name)
-	}
+		c.clients.Delete(key)
+		return true
+	})
 	return
 }
 
 func NewClient(reg registry.Registry) *Client {
 	return &Client{
-		clients:  make(map[string]*rpc.Client),
 		selector: registry.NewSelector(reg),
 	}
 }

+ 1 - 2
cmd/main.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	"git.nspix.com/golang/micro"
-	"git.nspix.com/golang/micro/gateway"
 	"git.nspix.com/golang/micro/log"
 )
 
@@ -19,7 +18,7 @@ type (
 
 func main() {
 	svr := micro.New(micro.WithName("test", "0.1.01"))
-	svr.Handle("math.add", func(ctx gateway.Context) (err error) {
+	svr.Handle("math.add", func(ctx micro.Context) (err error) {
 		var req mathRequest
 		if err = ctx.Bind(&req); err == nil {
 			return ctx.Success(mathResponse{Value: req.NumA + req.NumB})

+ 0 - 10
gateway/context.go

@@ -1,10 +0,0 @@
-package gateway
-
-type (
-	Context interface {
-		Bind(i interface{}) (err error)
-		Error(code int, msg string) (err error)
-		Success(i interface{}) (err error)
-	}
-
-)

+ 23 - 4
gateway/rpc/client.go

@@ -17,6 +17,7 @@ type (
 	Client struct {
 		conn              net.Conn
 		seq               uint16
+		once              sync.Once
 		isConnected       int32
 		transactionLocker sync.RWMutex
 		transaction       map[uint16]*transaction
@@ -25,6 +26,7 @@ type (
 		network           string
 		address           string
 		connLock          sync.Mutex
+		pintAt            time.Time
 		Timeout           time.Duration
 	}
 
@@ -96,7 +98,7 @@ func (c *Client) rdyLoop() {
 					}
 				}
 			} else if frame.Func == FuncPing {
-
+				c.pintAt = time.Now()
 			}
 		} else {
 			break
@@ -104,12 +106,29 @@ func (c *Client) rdyLoop() {
 	}
 }
 
+func (c *Client) DialerContext(ctx context.Context, network string, addr string) (err error) {
+	var (
+		ok       bool
+		deadline time.Time
+	)
+	if deadline, ok = ctx.Deadline(); !ok {
+		deadline = time.Now().Add(c.Timeout)
+	}
+	c.network = network
+	c.address = addr
+	c.once.Do(func() {
+		go c.eventLoop()
+	})
+	return c.dialer(deadline.Sub(time.Now()))
+}
+
 func (c *Client) Dialer(network string, addr string) (err error) {
 	c.network = network
 	c.address = addr
-	go c.eventLoop()
+	c.once.Do(func() {
+		go c.eventLoop()
+	})
 	return c.dialer(c.Timeout)
-
 }
 
 func (c *Client) dialer(timeout time.Duration) (err error) {
@@ -136,7 +155,7 @@ func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error
 		if deadline, ok = ctx.Deadline(); !ok {
 			deadline = time.Now().Add(c.Timeout)
 		}
-		if err = c.dialer(time.Now().Sub(deadline)); err != nil {
+		if err = c.dialer(deadline.Sub(time.Now())); err != nil {
 			err = io.ErrClosedPipe
 			return
 		}

+ 1 - 1
gateway/rpc/server.go

@@ -106,7 +106,7 @@ func (svr *Server) process(conn net.Conn) {
 			//读取一个请求
 			if req, err2 := ReadRequest(frame.Data); err2 == nil {
 				req.reset(frame.Sequence, conn)
-				log.Debugf("rcp read request %s@%d", req.Method, req.Sequence)
+				log.Debugf("rcp read request %s@%d from %s", req.Method, req.Sequence, req.RemoteAddr().String())
 				select {
 				case svr.ch <- req:
 				default:

+ 12 - 4
micro.go

@@ -2,20 +2,28 @@ package micro
 
 import (
 	"context"
-	"git.nspix.com/golang/micro/gateway"
 )
 
 type (
+
+	Context interface {
+		Bind(i interface{}) (err error)
+		Error(code int, msg string) (err error)
+		Success(i interface{}) (err error)
+	}
+
 	HandleOptions struct {
-		HttpMethod string
+		HttpMethod  string
+		DisableRpc  bool
+		DisableHttp bool
 	}
 
 	HandleOption func(o *HandleOptions)
 
-	HandleFunc func(ctx gateway.Context) (err error)
+	HandleFunc func(ctx Context) (err error)
 
 	Application interface {
-		RegisterHandle(method string, cb HandleFunc) (err error)            //注册一个处理器
+		Handle(method string, cb HandleFunc) (err error)                    //注册一个处理器
 		NewRequest(service, method string, payload interface{}) (err error) //创建一个rpc请求
 	}
 

+ 3 - 3
service.go

@@ -68,7 +68,8 @@ func (svr *Service) Handle(method string, cb HandleFunc, opts ...HandleOption) {
 	for _, f := range opts {
 		f(opt)
 	}
-	if svr.opts.EnableHttp {
+	//HTTP处理
+	if svr.opts.EnableHttp && !opt.DisableHttp {
 		path := strings.ReplaceAll(method, ".", "/")
 		if path[0] != '/' {
 			path = "/" + path
@@ -77,9 +78,8 @@ func (svr *Service) Handle(method string, cb HandleFunc, opts ...HandleOption) {
 			return cb(ctx)
 		})
 	}
-
 	//启动RPC功能
-	if svr.opts.EnableRPC {
+	if svr.opts.EnableRPC && !opt.DisableRpc {
 		svr.rpcSvr.Handle(method, func(ctx *rpc.Context) error {
 			return cb(ctx)
 		})