Explorar el Código

添加日志管理

lxg hace 4 años
padre
commit
5d56a411be
Se han modificado 3 ficheros con 87 adiciones y 62 borrados
  1. 1 0
      micro.go
  2. 25 16
      options.go
  3. 61 46
      service.go

+ 1 - 0
micro.go

@@ -18,6 +18,7 @@ type (
 
 	HandleFunc func(ctx Context) (err error)
 
+
 	Application interface {
 		Handle(method string, cb HandleFunc, opts ...HandleOption)                     //注册一个处理器
 		CreateRequest(name, method string, body interface{}) (req *Request, err error) //创建一个rpc请求

+ 25 - 16
options.go

@@ -7,16 +7,18 @@ import (
 
 type (
 	Options struct {
-		Zone       string `json:"zone"`
-		Name       string `json:"name"`
-		Version    string `json:"version"`
-		EnableHttp bool   `json:"enable_http"`
-		EnableRPC  bool   `json:"enable_rpc"`
-		registry   registry.Registry
-		Server     Server
-		Context    context.Context
-	}
-	
+		Zone                   string            //注册域
+		Name                   string            //名称
+		Version                string            //版本号
+		EnableHttp             bool              //启用HTTP功能
+		EnableRPC              bool              //启用RPC功能
+		EnableInternalListener bool              //启用内置网络监听服务
+		registry               registry.Registry //注册仓库
+		Server                 Server            //加载的服务
+		Port                   int               //绑定端口
+		Context                context.Context
+	}
+
 	Option func(o *Options)
 )
 
@@ -27,6 +29,12 @@ func WithName(name string, version string) Option {
 	}
 }
 
+func WithPort(port int) Option {
+	return func(o *Options) {
+		o.Port = port
+	}
+}
+
 func WithRegistry(r registry.Registry) Option {
 	return func(o *Options) {
 		o.registry = r
@@ -53,11 +61,12 @@ func WithoutRPC() Option {
 
 func NewOptions() *Options {
 	return &Options{
-		Zone:       "default",
-		Version:    "1.0.1",
-		EnableHttp: true,
-		EnableRPC:  true,
-		Context:    context.Background(),
-		registry:   registry.DefaultRegistry,
+		Zone:                   "default",
+		Version:                "1.0.1",
+		EnableHttp:             true,
+		EnableRPC:              true,
+		EnableInternalListener: true,
+		Context:                context.Background(),
+		registry:               registry.DefaultRegistry,
 	}
 }

+ 61 - 46
service.go

@@ -103,7 +103,6 @@ func (svr *Service) generateInstance() {
 		dockerID string
 		tcpAddr  *net.TCPAddr
 	)
-
 	if id, err = docker.SelfContainerID(); err != nil {
 		//生成唯一ID
 		e5 := md5.New()
@@ -122,8 +121,12 @@ func (svr *Service) generateInstance() {
 	}
 	ipLocal := ip.InternalIP()
 	svr.node.Address = ipLocal
-	if tcpAddr, err = net.ResolveTCPAddr("tcp", svr.listener.Addr().String()); err == nil {
-		svr.node.Port = tcpAddr.Port
+	if svr.listener != nil {
+		if tcpAddr, err = net.ResolveTCPAddr("tcp", svr.listener.Addr().String()); err == nil {
+			svr.node.Port = tcpAddr.Port
+		}
+	} else {
+		svr.node.Port = svr.opts.Port
 	}
 	svr.node.Metadata["docker-id"] = dockerID
 	if svr.opts.EnableHttp {
@@ -136,52 +139,62 @@ func (svr *Service) generateInstance() {
 
 func (svr *Service) prepare() (err error) {
 	log.Prefix(svr.opts.Name)
-	svr.ctx = WithContext(svr.ctx,svr)
-	if svr.listener, err = net.ListenTCP("tcp", nil); err != nil {
-		return
+	svr.ctx = WithContext(svr.ctx, svr)
+	if svr.opts.EnableInternalListener {
+		var tcpAddr *net.TCPAddr
+		if svr.opts.Port != 0 {
+			tcpAddr = &net.TCPAddr{
+				Port: svr.opts.Port,
+			}
+		}
+		if svr.listener, err = net.ListenTCP("tcp", tcpAddr); err != nil {
+			return
+		}
+		log.Infof("listener on: %s", svr.listener.Addr())
+		svr.gateway = gateway.New(svr.listener)
+		svr.wrapSync(func() {
+			svr.gateway.Run(svr.ctx)
+		})
+		//启用HTTP服务
+		if svr.opts.EnableHttp {
+			l := gateway.NewListener(svr.listener.Addr())
+			if err = svr.gateway.Attaches([][]byte{[]byte("GET"), []byte("POST"), []byte("PUT"), []byte("DELETE"), []byte("OPTIONS")}, l); err == nil {
+				svr.wrapSync(func() {
+					if err = svr.httpSvr.Serve(l); err != nil {
+						log.Warnf("http serve error: %s", err.Error())
+					}
+				})
+				svr.httpSvr.Handle("GET", "/healthy", func(ctx *http.Context) (err error) {
+					return ctx.Success(map[string]interface{}{
+						"id":      svr.node.ID,
+						"healthy": "healthy",
+						"uptime":  time.Now().Sub(svr.upTime).String(),
+					})
+				})
+				log.Infof("attach http server success")
+			} else {
+				log.Warnf("attach http listener error: %s", err.Error())
+			}
+		}
+		//启用RPC服务
+		if svr.opts.EnableRPC {
+			l := gateway.NewListener(svr.listener.Addr())
+			if err = svr.gateway.Attach([]byte("RPC"), l); err == nil {
+				svr.wrapSync(func() {
+					if err = svr.rpcSvr.Serve(l); err != nil {
+						log.Warnf("rpc serve error: %s", err.Error())
+					}
+				})
+				log.Infof("attach rpc server success")
+			} else {
+				log.Warnf("attach rpc listener error: %s", err.Error())
+			}
+		}
 	}
-	log.Infof("listener on: %s", svr.listener.Addr())
-	svr.gateway = gateway.New(svr.listener)
 	svr.generateInstance()
-	svr.wrapSync(func() {
-		svr.gateway.Run(svr.ctx)
-	})
 	svr.wrapSync(func() {
 		svr.eventLoop()
 	})
-	if svr.opts.EnableHttp {
-		l := gateway.NewListener(svr.listener.Addr())
-		if err = svr.gateway.Attaches([][]byte{[]byte("GET"), []byte("POST"), []byte("PUT"), []byte("DELETE"), []byte("OPTIONS")}, l); err == nil {
-			svr.wrapSync(func() {
-				if err = svr.httpSvr.Serve(l); err != nil {
-					log.Warnf("http serve error: %s", err.Error())
-				}
-			})
-			svr.httpSvr.Handle("GET", "/healthy", func(ctx *http.Context) (err error) {
-				return ctx.Success(map[string]interface{}{
-					"id":      svr.node.ID,
-					"healthy": "healthy",
-					"uptime":  time.Now().Sub(svr.upTime).String(),
-				})
-			})
-			log.Infof("attach http server success")
-		} else {
-			log.Warnf("attach http listener error: %s", err.Error())
-		}
-	}
-	if svr.opts.EnableHttp {
-		l := gateway.NewListener(svr.listener.Addr())
-		if err = svr.gateway.Attach([]byte("RPC"), l); err == nil {
-			svr.wrapSync(func() {
-				if err = svr.rpcSvr.Serve(l); err != nil {
-					log.Warnf("rpc serve error: %s", err.Error())
-				}
-			})
-			log.Infof("attach rpc server success")
-		} else {
-			log.Warnf("attach rpc listener error: %s", err.Error())
-		}
-	}
 	if err = svr.registry.Register(svr.node); err == nil {
 		log.Infof("register service %s successful", svr.opts.Name)
 	}
@@ -189,7 +202,7 @@ func (svr *Service) prepare() (err error) {
 }
 
 func (svr *Service) destroy() (err error) {
-	log.Infof("stopping")
+	log.Infof("service stopping")
 	svr.cancelFunc()
 	if err = svr.registry.Deregister(svr.node); err != nil {
 		log.Warnf("deregister service %s error: %s", svr.opts.Name, err.Error())
@@ -204,11 +217,12 @@ func (svr *Service) destroy() (err error) {
 	if err = svr.client.Close(); err != nil {
 		log.Warnf(err.Error())
 	}
-	log.Infof("stopped")
+	log.Infof("service stopped")
 	return
 }
 
 func (svr *Service) Run() (err error) {
+	log.Infof("service starting")
 	if err = svr.prepare(); err != nil {
 		return
 	}
@@ -218,6 +232,7 @@ func (svr *Service) Run() (err error) {
 			return
 		}
 	}
+	log.Infof("service ready")
 	//waiting
 	ch := make(chan os.Signal, 1)
 	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL)