lxg 4 anni fa
parent
commit
85dbbd229f
10 ha cambiato i file con 194 aggiunte e 66 eliminazioni
  1. 15 33
      .idea/workspace.xml
  2. 55 0
      client.go
  3. 20 16
      cmd/mock/client/main.go
  4. 8 5
      gateway/context.go
  5. 47 5
      gateway/rpc/client.go
  6. 1 2
      gateway/rpc/codec/gob_codec.go
  7. 17 2
      micro.go
  8. 12 1
      options.go
  9. 3 1
      registry/selector.go
  10. 16 1
      service.go

+ 15 - 33
.idea/workspace.xml

@@ -2,17 +2,15 @@
 <project version="4">
 <project version="4">
   <component name="ChangeListManager">
   <component name="ChangeListManager">
     <list default="true" id="cd58867b-089e-4508-9033-393b8939261c" name="Default Changelist" comment="">
     <list default="true" id="cd58867b-089e-4508-9033-393b8939261c" name="Default Changelist" comment="">
-      <change afterPath="$PROJECT_DIR$/cmd/mock/rpc/main.go" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/gateway/rpc/codec/codec.go" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/gateway/rpc/codec/gob_codec.go" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/client.go" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/gateway/conn.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/conn.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/cmd/mock/client/main.go" beforeDir="false" afterPath="$PROJECT_DIR$/cmd/mock/client/main.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/gateway/context.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/context.go" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/gateway/rpc/client.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/client.go" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/gateway/rpc/client.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/client.go" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/gateway/rpc/context.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/context.go" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/gateway/rpc/message.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/message.go" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/gateway/rpc/request.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/request.go" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/gateway/rpc/response.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/response.go" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/gateway/rpc/server.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/server.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/gateway/rpc/codec/gob_codec.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/codec/gob_codec.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/micro.go" beforeDir="false" afterPath="$PROJECT_DIR$/micro.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/options.go" beforeDir="false" afterPath="$PROJECT_DIR$/options.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/registry/selector.go" beforeDir="false" afterPath="$PROJECT_DIR$/registry/selector.go" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/service.go" beforeDir="false" afterPath="$PROJECT_DIR$/service.go" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/service.go" beforeDir="false" afterPath="$PROJECT_DIR$/service.go" afterDir="false" />
     </list>
     </list>
     <option name="SHOW_DIALOG" value="false" />
     <option name="SHOW_DIALOG" value="false" />
@@ -70,7 +68,7 @@
       <recent name="D:\workspace\golang\micro\internal\micro\transport\http" />
       <recent name="D:\workspace\golang\micro\internal\micro\transport\http" />
     </key>
     </key>
   </component>
   </component>
-  <component name="RunManager" selected="Go Build.go build git.nspix.com/golang/micro/cmd/mock/rpc">
+  <component name="RunManager" selected="Go Build.go build git.nspix.com/golang/micro/cmd/mock/client">
     <configuration name="go build git.nspix.com/golang/micro/cmd" type="GoApplicationRunConfiguration" factoryName="Go Application" temporary="true" nameIsGenerated="true">
     <configuration name="go build git.nspix.com/golang/micro/cmd" type="GoApplicationRunConfiguration" factoryName="Go Application" temporary="true" nameIsGenerated="true">
       <module name="micro" />
       <module name="micro" />
       <working_directory value="$PROJECT_DIR$" />
       <working_directory value="$PROJECT_DIR$" />
@@ -80,21 +78,21 @@
       <directory value="$PROJECT_DIR$" />
       <directory value="$PROJECT_DIR$" />
       <method v="2" />
       <method v="2" />
     </configuration>
     </configuration>
-    <configuration name="go build git.nspix.com/golang/micro/cmd/mock/rpc" type="GoApplicationRunConfiguration" factoryName="Go Application" temporary="true" nameIsGenerated="true">
+    <configuration name="go build git.nspix.com/golang/micro/cmd/mock/client" type="GoApplicationRunConfiguration" factoryName="Go Application" temporary="true" nameIsGenerated="true">
       <module name="micro" />
       <module name="micro" />
       <working_directory value="$PROJECT_DIR$" />
       <working_directory value="$PROJECT_DIR$" />
       <kind value="PACKAGE" />
       <kind value="PACKAGE" />
-      <filePath value="$PROJECT_DIR$/cmd/mock/rpc/main.go" />
-      <package value="git.nspix.com/golang/micro/cmd/mock/rpc" />
+      <filePath value="$PROJECT_DIR$/cmd/mock/client/main.go" />
+      <package value="git.nspix.com/golang/micro/cmd/mock/client" />
       <directory value="$PROJECT_DIR$" />
       <directory value="$PROJECT_DIR$" />
       <method v="2" />
       <method v="2" />
     </configuration>
     </configuration>
-    <configuration name="go build git.nspix.com/micro/cmd" type="GoApplicationRunConfiguration" factoryName="Go Application" temporary="true" nameIsGenerated="true">
+    <configuration name="go build git.nspix.com/golang/micro/cmd/mock/rpc" type="GoApplicationRunConfiguration" factoryName="Go Application" temporary="true" nameIsGenerated="true">
       <module name="micro" />
       <module name="micro" />
       <working_directory value="$PROJECT_DIR$" />
       <working_directory value="$PROJECT_DIR$" />
       <kind value="PACKAGE" />
       <kind value="PACKAGE" />
-      <filePath value="$PROJECT_DIR$/cmd/main.go" />
-      <package value="git.nspix.com/micro/cmd" />
+      <filePath value="$PROJECT_DIR$/cmd/mock/rpc/main.go" />
+      <package value="git.nspix.com/golang/micro/cmd/mock/rpc" />
       <directory value="$PROJECT_DIR$" />
       <directory value="$PROJECT_DIR$" />
       <method v="2" />
       <method v="2" />
     </configuration>
     </configuration>
@@ -118,11 +116,11 @@
     </configuration>
     </configuration>
     <recent_temporary>
     <recent_temporary>
       <list>
       <list>
+        <item itemvalue="Go Build.go build git.nspix.com/golang/micro/cmd/mock/client" />
         <item itemvalue="Go Build.go build git.nspix.com/golang/micro/cmd/mock/rpc" />
         <item itemvalue="Go Build.go build git.nspix.com/golang/micro/cmd/mock/rpc" />
         <item itemvalue="Go Build.go build git.nspix.com/golang/micro/cmd" />
         <item itemvalue="Go Build.go build git.nspix.com/golang/micro/cmd" />
         <item itemvalue="Go Build.go build git.nspix.com/micro/cmd/mock/client" />
         <item itemvalue="Go Build.go build git.nspix.com/micro/cmd/mock/client" />
         <item itemvalue="Go Build.go build git.nspix.com/micro/cmd/mock/server" />
         <item itemvalue="Go Build.go build git.nspix.com/micro/cmd/mock/server" />
-        <item itemvalue="Go Build.go build git.nspix.com/micro/cmd" />
       </list>
       </list>
     </recent_temporary>
     </recent_temporary>
   </component>
   </component>
@@ -145,20 +143,4 @@
   <component name="VgoProject">
   <component name="VgoProject">
     <integration-enabled>true</integration-enabled>
     <integration-enabled>true</integration-enabled>
   </component>
   </component>
-  <component name="XDebuggerManager">
-    <breakpoint-manager>
-      <breakpoints>
-        <line-breakpoint enabled="true" type="DlvLineBreakpoint">
-          <url>file://$PROJECT_DIR$/gateway/rpc/client.go</url>
-          <line>112</line>
-          <option name="timeStamp" value="4" />
-        </line-breakpoint>
-        <line-breakpoint enabled="true" type="DlvLineBreakpoint">
-          <url>file://$PROJECT_DIR$/gateway/rpc/client.go</url>
-          <line>62</line>
-          <option name="timeStamp" value="8" />
-        </line-breakpoint>
-      </breakpoints>
-    </breakpoint-manager>
-  </component>
 </project>
 </project>

+ 55 - 0
client.go

@@ -0,0 +1,55 @@
+package micro
+
+import (
+	"context"
+	"fmt"
+	"git.nspix.com/golang/micro/gateway/rpc"
+	"git.nspix.com/golang/micro/registry"
+	"sync"
+)
+
+type Client struct {
+	selector     *registry.Selector
+	clientLocker sync.RWMutex
+	clients      map[string]*rpc.Client
+}
+
+func (c *Client) Do(ctx context.Context, r *Request) (resp Response, err error) {
+	var (
+		ok     bool
+		node   *registry.ServiceNode
+		client *rpc.Client
+	)
+	c.clientLocker.RLock()
+	client, ok = c.clients[r.ServiceName]
+	c.clientLocker.RUnlock()
+	if ok {
+		goto _EXEC
+	}
+	if node, err = c.selector.Select(ctx, r.ServiceName); err != nil {
+		return
+	}
+	client = rpc.NewClient()
+	if err = client.Dialer("tcp", fmt.Sprintf("%s:%d", node.Address, node.Port)); err != nil {
+		return
+	}
+_EXEC:
+	return client.Do(ctx, rpc.NewRequest(r.Method, r.Body))
+}
+
+func (c *Client) Close() (err error) {
+	c.clientLocker.Lock()
+	defer c.clientLocker.Unlock()
+	for name, client := range c.clients {
+		_ = client.Close()
+		delete(c.clients, name)
+	}
+	return
+}
+
+func NewClient(reg registry.Registry) *Client {
+	return &Client{
+		clients:  make(map[string]*rpc.Client),
+		selector: registry.NewSelector(reg),
+	}
+}

+ 20 - 16
cmd/mock/client/main.go

@@ -3,20 +3,19 @@ package main
 import (
 import (
 	"context"
 	"context"
 	"fmt"
 	"fmt"
-	"git.nspix.com/micro/internal/micro"
-	"github.com/micro/go-micro/v2/util/log"
+	"git.nspix.com/golang/micro"
 )
 )
 
 
 type (
 type (
 	Math struct{}
 	Math struct{}
 
 
-	MathRequest struct {
-		Num  int
-		Num2 int
+	mathRequest struct {
+		NumA int `json:"num_a"`
+		NumB int `json:"num_b"`
 	}
 	}
 
 
-	MathResponse struct {
-		Val int
+	mathResponse struct {
+		Value int `json:"value"`
 	}
 	}
 )
 )
 
 
@@ -24,18 +23,23 @@ func main() {
 	s := micro.New(
 	s := micro.New(
 		micro.WithName("aa", "1.0.1"),
 		micro.WithName("aa", "1.0.1"),
 	)
 	)
-	args := &MathRequest{
-		Num:  10,
-		Num2: 20,
+	req, err := s.CreateRequest("test", "math.add", &mathRequest{
+		NumA: 100,
+		NumB: 221,
+	})
+	if err != nil {
+		fmt.Println(err)
+		return
 	}
 	}
-	if req, err := s.NewRequest("test.server", "Math.Add", args); err != nil {
-		log.Fatal(err)
+	if res, err := req.Do(context.Background()); err != nil {
+		fmt.Println(err)
 	} else {
 	} else {
-		res := &MathResponse{}
-		if err = req.Do(context.Background(), res); err == nil {
-			fmt.Println(res.Val)
-		} else {
+		if res.Error() != nil {
 			fmt.Println(err)
 			fmt.Println(err)
+		} else {
+			rr := &mathResponse{}
+			res.Decode(rr)
+			fmt.Println(rr)
 		}
 		}
 	}
 	}
 	fmt.Println(s.Run())
 	fmt.Println(s.Run())

+ 8 - 5
gateway/context.go

@@ -1,7 +1,10 @@
 package gateway
 package gateway
 
 
-type Context interface {
-	Bind(i interface{}) (err error)
-	Error(code int,msg string) (err error)
-	Success(i interface{}) (err error)
-}
+type (
+	Context interface {
+		Bind(i interface{}) (err error)
+		Error(code int, msg string) (err error)
+		Success(i interface{}) (err error)
+	}
+
+)

+ 47 - 5
gateway/rpc/client.go

@@ -9,6 +9,10 @@ import (
 	"time"
 	"time"
 )
 )
 
 
+var (
+	DefaultTimeout = time.Second * 5
+)
+
 type (
 type (
 	Client struct {
 	Client struct {
 		conn              net.Conn
 		conn              net.Conn
@@ -20,6 +24,8 @@ type (
 		exitChan          chan struct{}
 		exitChan          chan struct{}
 		network           string
 		network           string
 		address           string
 		address           string
+		connLock          sync.Mutex
+		Timeout           time.Duration
 	}
 	}
 
 
 	transaction struct {
 	transaction struct {
@@ -57,6 +63,23 @@ func (c *Client) commit(seq uint16) *transaction {
 	return trans
 	return trans
 }
 }
 
 
+func (c *Client) eventLoop() {
+	ticker := time.NewTicker(time.Second * 10)
+	defer ticker.Stop()
+	for {
+		select {
+		case <-c.exitChan:
+			return
+		case <-ticker.C:
+			if atomic.LoadInt32(&c.isConnected) == 1 {
+				_ = writeFrame(c.conn, &Frame{
+					Func: FuncPing,
+				})
+			}
+		}
+	}
+}
+
 func (c *Client) rdyLoop() {
 func (c *Client) rdyLoop() {
 	defer atomic.StoreInt32(&c.isConnected, 0)
 	defer atomic.StoreInt32(&c.isConnected, 0)
 	for {
 	for {
@@ -72,6 +95,8 @@ func (c *Client) rdyLoop() {
 						ch.Cancel()
 						ch.Cancel()
 					}
 					}
 				}
 				}
+			} else if frame.Func == FuncPing {
+
 			}
 			}
 		} else {
 		} else {
 			break
 			break
@@ -82,12 +107,18 @@ func (c *Client) rdyLoop() {
 func (c *Client) Dialer(network string, addr string) (err error) {
 func (c *Client) Dialer(network string, addr string) (err error) {
 	c.network = network
 	c.network = network
 	c.address = addr
 	c.address = addr
-	return c.dialer()
+	go c.eventLoop()
+	return c.dialer(c.Timeout)
 
 
 }
 }
 
 
-func (c *Client) dialer() (err error) {
-	if c.conn, err = net.DialTimeout(c.network, c.address, time.Second*10); err != nil {
+func (c *Client) dialer(timeout time.Duration) (err error) {
+	c.connLock.Lock()
+	defer c.connLock.Unlock()
+	if atomic.LoadInt32(&c.isConnected) == 1 {
+		return
+	}
+	if c.conn, err = net.DialTimeout(c.network, c.address, timeout); err != nil {
 		return
 		return
 	} else {
 	} else {
 		atomic.StoreInt32(&c.isConnected, 1)
 		atomic.StoreInt32(&c.isConnected, 1)
@@ -98,8 +129,17 @@ func (c *Client) dialer() (err error) {
 
 
 func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error) {
 func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error) {
 	if atomic.LoadInt32(&c.isConnected) == 0 {
 	if atomic.LoadInt32(&c.isConnected) == 0 {
-		err = io.ErrClosedPipe
-		return
+		var (
+			ok       bool
+			deadline time.Time
+		)
+		if deadline, ok = ctx.Deadline(); !ok {
+			deadline = time.Now().Add(c.Timeout)
+		}
+		if err = c.dialer(time.Now().Sub(deadline)); err != nil {
+			err = io.ErrClosedPipe
+			return
+		}
 	}
 	}
 	c.seq++
 	c.seq++
 	seq := c.seq
 	seq := c.seq
@@ -130,6 +170,7 @@ func (c *Client) Close() (err error) {
 		if c.conn != nil {
 		if c.conn != nil {
 			err = c.conn.Close()
 			err = c.conn.Close()
 		}
 		}
+		c.isConnected = 0
 		close(c.exitChan)
 		close(c.exitChan)
 	}
 	}
 	return
 	return
@@ -137,6 +178,7 @@ func (c *Client) Close() (err error) {
 
 
 func NewClient() *Client {
 func NewClient() *Client {
 	return &Client{
 	return &Client{
+		Timeout:     DefaultTimeout,
 		exitChan:    make(chan struct{}),
 		exitChan:    make(chan struct{}),
 		transaction: make(map[uint16]*transaction),
 		transaction: make(map[uint16]*transaction),
 	}
 	}

+ 1 - 2
gateway/rpc/codec/gob_codec.go

@@ -6,8 +6,7 @@ import (
 	"io"
 	"io"
 )
 )
 
 
-type GobCodec struct {
-}
+type GobCodec struct {}
 
 
 func (codec *GobCodec) EncodeTo(w io.Writer, i interface{}) (err error) {
 func (codec *GobCodec) EncodeTo(w io.Writer, i interface{}) (err error) {
 	return gob.NewEncoder(w).Encode(i)
 	return gob.NewEncoder(w).Encode(i)

+ 17 - 2
micro.go

@@ -3,7 +3,6 @@ package micro
 import (
 import (
 	"context"
 	"context"
 	"git.nspix.com/golang/micro/gateway"
 	"git.nspix.com/golang/micro/gateway"
-	"git.nspix.com/golang/micro/log"
 )
 )
 
 
 type (
 type (
@@ -16,7 +15,6 @@ type (
 	HandleFunc func(ctx gateway.Context) (err error)
 	HandleFunc func(ctx gateway.Context) (err error)
 
 
 	Application interface {
 	Application interface {
-		Logger() log.Logger                                                 //获取日志实例
 		RegisterHandle(method string, cb HandleFunc) (err error)            //注册一个处理器
 		RegisterHandle(method string, cb HandleFunc) (err error)            //注册一个处理器
 		NewRequest(service, method string, payload interface{}) (err error) //创建一个rpc请求
 		NewRequest(service, method string, payload interface{}) (err error) //创建一个rpc请求
 	}
 	}
@@ -25,8 +23,25 @@ type (
 		Start(ctx context.Context) (err error)
 		Start(ctx context.Context) (err error)
 		Stop() (err error)
 		Stop() (err error)
 	}
 	}
+
+	Request struct {
+		ServiceName string
+		Method      string
+		Body        interface{}
+		client      *Client
+	}
+
+	Response interface {
+		StatusCode() int
+		Error() error
+		Decode(i interface{}) error
+	}
 )
 )
 
 
+func (r *Request) Do(ctx context.Context) (Response, error) {
+	return r.client.Do(ctx, r)
+}
+
 func WithHttpMethod(method string) HandleOption {
 func WithHttpMethod(method string) HandleOption {
 	return func(o *HandleOptions) {
 	return func(o *HandleOptions) {
 		o.HttpMethod = method
 		o.HttpMethod = method

+ 12 - 1
options.go

@@ -1,6 +1,9 @@
 package micro
 package micro
 
 
-import "context"
+import (
+	"context"
+	"git.nspix.com/golang/micro/registry"
+)
 
 
 type (
 type (
 	Options struct {
 	Options struct {
@@ -9,6 +12,7 @@ type (
 		Version    string `json:"version"`
 		Version    string `json:"version"`
 		EnableHttp bool   `json:"enable_http"`
 		EnableHttp bool   `json:"enable_http"`
 		EnableRPC  bool   `json:"enable_rpc"`
 		EnableRPC  bool   `json:"enable_rpc"`
+		registry   registry.Registry
 		Server     Server
 		Server     Server
 		Context    context.Context
 		Context    context.Context
 	}
 	}
@@ -22,6 +26,12 @@ func WithName(name string, version string) OptionMiddleware {
 	}
 	}
 }
 }
 
 
+func WithRegistry(r registry.Registry) OptionMiddleware {
+	return func(o *Options) {
+		o.registry = r
+	}
+}
+
 func WithContext(c context.Context) OptionMiddleware {
 func WithContext(c context.Context) OptionMiddleware {
 	return func(o *Options) {
 	return func(o *Options) {
 		o.Context = c
 		o.Context = c
@@ -53,5 +63,6 @@ func NewOptions() *Options {
 		EnableHttp: true,
 		EnableHttp: true,
 		EnableRPC:  true,
 		EnableRPC:  true,
 		Context:    context.Background(),
 		Context:    context.Background(),
+		registry:   registry.DefaultRegistry,
 	}
 	}
 }
 }

+ 3 - 1
registry/selector.go

@@ -1,5 +1,7 @@
 package registry
 package registry
 
 
+import "context"
+
 const (
 const (
 	StatusHealthy = "healthy"
 	StatusHealthy = "healthy"
 	StatusDead    = "dead"
 	StatusDead    = "dead"
@@ -11,7 +13,7 @@ type (
 	}
 	}
 )
 )
 
 
-func (selector *Selector) Select(name string) (node *ServiceNode, err error) {
+func (selector *Selector) Select(ctx context.Context,name string) (node *ServiceNode, err error) {
 	var (
 	var (
 		nodes []*ServiceNode
 		nodes []*ServiceNode
 	)
 	)

+ 16 - 1
service.go

@@ -33,6 +33,7 @@ type Service struct {
 	httpSvr    *http.Server
 	httpSvr    *http.Server
 	rpcSvr     *rpc.Server
 	rpcSvr     *rpc.Server
 	upTime     time.Time
 	upTime     time.Time
+	client     *Client
 }
 }
 
 
 func (svr *Service) wrapSync(f func()) {
 func (svr *Service) wrapSync(f func()) {
@@ -86,6 +87,15 @@ func (svr *Service) Handle(method string, cb HandleFunc, opts ...HandleOption) {
 	return
 	return
 }
 }
 
 
+func (svr *Service) CreateRequest(name, method string, body interface{}) (req *Request, err error) {
+	return &Request{
+		ServiceName: name,
+		Method:      method,
+		Body:        body,
+		client:      svr.client,
+	}, nil
+}
+
 func (svr *Service) generateInstance() {
 func (svr *Service) generateInstance() {
 	var (
 	var (
 		err      error
 		err      error
@@ -148,6 +158,7 @@ func (svr *Service) prepare() (err error) {
 			})
 			})
 			svr.httpSvr.Handle("GET", "/healthy", func(ctx *http.Context) (err error) {
 			svr.httpSvr.Handle("GET", "/healthy", func(ctx *http.Context) (err error) {
 				return ctx.Success(map[string]interface{}{
 				return ctx.Success(map[string]interface{}{
+					"id":      svr.node.ID,
 					"healthy": "healthy",
 					"healthy": "healthy",
 					"uptime":  time.Now().Sub(svr.upTime).String(),
 					"uptime":  time.Now().Sub(svr.upTime).String(),
 				})
 				})
@@ -189,6 +200,9 @@ func (svr *Service) destroy() (err error) {
 			log.Warnf(err.Error())
 			log.Warnf(err.Error())
 		}
 		}
 	}
 	}
+	if err = svr.client.Close(); err != nil {
+		log.Warnf(err.Error())
+	}
 	log.Infof("stopped")
 	log.Infof("stopped")
 	return
 	return
 }
 }
@@ -227,7 +241,8 @@ func New(opts ...OptionMiddleware) *Service {
 		upTime:   time.Now(),
 		upTime:   time.Now(),
 		httpSvr:  http.New(),
 		httpSvr:  http.New(),
 		rpcSvr:   rpc.NewServer(),
 		rpcSvr:   rpc.NewServer(),
-		registry: registry.DefaultRegistry,
+		registry: o.registry,
+		client:   NewClient(o.registry),
 	}
 	}
 	svr.ctx, svr.cancelFunc = context.WithCancel(o.Context)
 	svr.ctx, svr.cancelFunc = context.WithCancel(o.Context)
 	return svr
 	return svr