소스 검색

fix log bug

lxg 3 년 전
부모
커밋
af3a5e48d2
4개의 변경된 파일60개의 추가작업 그리고 31개의 파일을 삭제
  1. 24 12
      gateway/gateway.go
  2. 8 0
      options.go
  3. 2 2
      request.go
  4. 26 17
      service.go

+ 24 - 12
gateway/gateway.go

@@ -31,9 +31,10 @@ type (
 	}
 
 	Gateway struct {
-		listeners []*listener
-		l         net.Listener
-		ch        chan net.Conn
+		listeners   []*listener
+		l           net.Listener
+		ch          chan net.Conn
+		enableStats bool
 	}
 )
 
@@ -81,20 +82,28 @@ func (g *Gateway) process(conn net.Conn) {
 		missing uint8
 		feature = make([]byte, MinFeatureLength)
 	)
-	requestCounter.Add(1)
+	if g.enableStats {
+		requestCounter.Add(1)
+	}
 	//set deadline
 	if err = conn.SetReadDeadline(time.Now().Add(time.Millisecond * 800)); err != nil {
-		requestDropCounter.Add("ERROR", 1)
+		if g.enableStats {
+			requestDropCounter.Add("ERROR", 1)
+		}
 		return
 	}
 	if n, err = io.ReadFull(conn, feature); err != nil {
-		requestDropCounter.Add("EOF", 1)
+		if g.enableStats {
+			requestDropCounter.Add("EOF", 1)
+		}
 		_ = conn.Close()
 		return
 	}
 	//reset deadline
 	if err = conn.SetReadDeadline(time.Time{}); err != nil {
-		requestDropCounter.Add("ERROR", 1)
+		if g.enableStats {
+			requestDropCounter.Add("ERROR", 1)
+		}
 		return
 	}
 	missing = 1
@@ -106,7 +115,9 @@ func (g *Gateway) process(conn net.Conn) {
 		}
 	}
 	if missing == 1 {
-		requestDropCounter.Add("MISSING", 1)
+		if g.enableStats {
+			requestDropCounter.Add("MISSING", 1)
+		}
 	}
 }
 
@@ -156,10 +167,11 @@ func (g *Gateway) Run(ctx context.Context) {
 	wg.Wait()
 }
 
-func New(l net.Listener) *Gateway {
+func New(l net.Listener, stats bool) *Gateway {
 	return &Gateway{
-		l:         l,
-		ch:        make(chan net.Conn, 10),
-		listeners: make([]*listener, 0),
+		l:           l,
+		enableStats: stats,
+		ch:          make(chan net.Conn, 10),
+		listeners:   make([]*listener, 0),
 	}
 }

+ 8 - 0
options.go

@@ -22,6 +22,7 @@ type (
 		Address                string            //绑定地址
 		EnableHttpPProf        bool              //启用HTTP调试工具
 		EnableStats            bool              //启用数据统计
+		EnableLogPrefix        bool              //启用日志前缀
 		Context                context.Context
 		shortName              string
 	}
@@ -90,6 +91,12 @@ func WithoutRegister() Option {
 	}
 }
 
+func WithoutLogPrefix() Option {
+	return func(o *Options) {
+		o.EnableLogPrefix = false
+	}
+}
+
 func WithoutRPC() Option {
 	return func(o *Options) {
 		o.EnableRPC = false
@@ -103,6 +110,7 @@ func NewOptions() *Options {
 		EnableHttp:             true,
 		EnableRPC:              true,
 		EnableInternalListener: true,
+		EnableLogPrefix:        true,
 		Context:                context.Background(),
 		registry:               registry.DefaultRegistry,
 	}

+ 2 - 2
request.go

@@ -11,12 +11,12 @@ type (
 	}
 )
 
-//进行请求
+//Do do request
 func (r *Request) Do(ctx context.Context) (Response, error) {
 	return r.client.Do(ctx, r)
 }
 
-//调用当前请求方法
+//Call call method
 func (r *Request) Call(ctx context.Context, i interface{}) (err error) {
 	var (
 		res Response

+ 26 - 17
service.go

@@ -4,6 +4,8 @@ import (
 	"context"
 	"crypto/md5"
 	"encoding/hex"
+	"git.nspix.com/golang/micro/stats/prometheusbackend"
+	"github.com/prometheus/client_golang/prometheus/promhttp"
 	"net"
 	hp "net/http"
 	"net/http/pprof"
@@ -19,11 +21,9 @@ import (
 	"git.nspix.com/golang/micro/gateway/rpc"
 	"git.nspix.com/golang/micro/log"
 	"git.nspix.com/golang/micro/registry"
-	"git.nspix.com/golang/micro/stats/prometheusbackend"
 	"git.nspix.com/golang/micro/utils/docker"
 	"git.nspix.com/golang/micro/utils/net/ip"
 	"git.nspix.com/golang/micro/utils/unsafestr"
-	"github.com/prometheus/client_golang/prometheus/promhttp"
 )
 
 type Service struct {
@@ -174,6 +174,10 @@ func (svr *Service) instance() *registry.ServiceNode {
 	if svr.opts.EnableRPC {
 		node.Metadata["enable-rpc"] = "true"
 	}
+	//服务注册时候处理
+	if svr.opts.EnableStats {
+		node.Metadata["prometheus"] = "enable"
+	}
 	return node
 }
 
@@ -185,6 +189,7 @@ func (svr *Service) startHTTPServe() (err error) {
 				log.Warnf("http serve error: %s", err.Error())
 			}
 		})
+
 		svr.httpSvr.Handle("GET", "/healthy", func(ctx *http.Context) (err error) {
 			return ctx.Success(map[string]interface{}{
 				"id":      svr.node.ID,
@@ -192,6 +197,12 @@ func (svr *Service) startHTTPServe() (err error) {
 				"uptime":  time.Now().Sub(svr.upTime).String(),
 			})
 		})
+
+		if svr.opts.EnableStats {
+			prometheusbackend.Init(svr.opts.shortName)
+			svr.httpSvr.Handler("GET", "/metrics", promhttp.Handler())
+		}
+
 		if svr.opts.EnableHttpPProf {
 			svr.httpSvr.Handler("GET", "/debug/pprof/", hp.HandlerFunc(pprof.Index))
 			svr.httpSvr.Handler("GET", "/debug/pprof/goroutine", hp.HandlerFunc(pprof.Index))
@@ -203,13 +214,9 @@ func (svr *Service) startHTTPServe() (err error) {
 			svr.httpSvr.Handler("GET", "/debug/pprof/symbol", hp.HandlerFunc(pprof.Symbol))
 			svr.httpSvr.Handler("GET", "/debug/pprof/trace", hp.HandlerFunc(pprof.Trace))
 		}
-		if svr.opts.EnableStats {
-			prometheusbackend.Init(svr.opts.shortName)
-			svr.httpSvr.Handler("GET", "/metrics", promhttp.Handler())
-		}
-		log.Infof("attach http server success")
+		log.Infof("attach http listener success")
 	} else {
-		log.Warnf("attach http listener error: %s", err.Error())
+		log.Warnf("attach http listener failed cause by %s", err.Error())
 	}
 	return
 }
@@ -219,18 +226,17 @@ func (svr *Service) startRPCServe() (err error) {
 	if err = svr.gateway.Attach([]byte("RPC"), l); err == nil {
 		svr.wrapSync(func() {
 			if err = svr.rpcSvr.Serve(l); err != nil {
-				log.Warnf("rpc serve error: %s", err.Error())
+				log.Warnf("rpc serve start error: %s", err.Error())
 			}
 		})
-		log.Infof("attach rpc server success")
+		log.Infof("attach rpc listener success")
 	} else {
-		log.Warnf("attach rpc listener error: %s", err.Error())
+		log.Warnf("attach rpc listener failed cause by %s", err.Error())
 	}
 	return
 }
 
 func (svr *Service) prepare() (err error) {
-	log.Prefix(svr.opts.Name)
 	svr.ctx = WithContext(svr.ctx, svr)
 	if svr.opts.EnableInternalListener {
 		var tcpAddr *net.TCPAddr
@@ -255,18 +261,18 @@ func (svr *Service) prepare() (err error) {
 		if svr.listener, err = net.ListenTCP("tcp", tcpAddr); err != nil {
 			return
 		}
-		log.Infof("listen on: %s", svr.listener.Addr())
-		svr.gateway = gateway.New(svr.listener)
+		log.Infof("server listen on: %s", svr.listener.Addr())
+		svr.gateway = gateway.New(svr.listener, svr.opts.EnableStats)
 		svr.wrapSync(func() {
 			svr.gateway.Run(svr.ctx)
 		})
 
-		//start http serve
+		//开启HTTP服务
 		if svr.opts.EnableHttp {
 			err = svr.startHTTPServe()
 		}
 
-		//start rpc serve
+		//开启RCP服务
 		if svr.opts.EnableRPC {
 			err = svr.startRPCServe()
 		}
@@ -304,6 +310,9 @@ func (svr *Service) destroy() (err error) {
 }
 
 func (svr *Service) Run() (err error) {
+	if svr.opts.EnableLogPrefix {
+		log.Prefix(svr.opts.Name)
+	}
 	log.Infof("service starting")
 	if err = svr.prepare(); err != nil {
 		return
@@ -314,7 +323,7 @@ func (svr *Service) Run() (err error) {
 			return
 		}
 	}
-	log.Infof("service ready")
+	log.Infof("service started")
 	//waiting
 	ch := make(chan os.Signal, 1)
 	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL)