Browse Source

优化框架代码

fancl 2 years ago
parent
commit
c78754eeca
10 changed files with 103 additions and 89 deletions
  1. 0 1
      cmd/main.go
  2. 17 6
      gateway/conn.go
  3. 23 10
      gateway/gateway.go
  4. 0 8
      gateway/state/state.go
  5. 23 9
      global.go
  6. 4 0
      options.go
  7. 0 4
      options_test.go
  8. 17 4
      reporter.go
  9. 0 31
      reporter_test.go
  10. 19 16
      service.go

+ 0 - 1
cmd/main.go

@@ -19,7 +19,6 @@ type (
 )
 
 func main() {
-
 	svr := micro.Init(
 		micro.WithName("git.nspix.com/test", "0.0.01"),
 		micro.WithoutRegister(),

+ 17 - 6
gateway/conn.go

@@ -2,12 +2,15 @@ package gateway
 
 import (
 	"net"
+	"sync/atomic"
 	"time"
 )
 
 type Conn struct {
-	buf  []byte
-	conn net.Conn
+	buf      []byte
+	conn     net.Conn
+	exitFlag int32
+	state    *State
 }
 
 func (c *Conn) Read(b []byte) (n int, err error) {
@@ -20,15 +23,22 @@ func (c *Conn) Read(b []byte) (n int, err error) {
 	}
 	n, err = c.conn.Read(b[m:])
 	n += m
+	atomic.AddInt64(&c.state.InTraffic, int64(n))
 	return
 }
 
 func (c *Conn) Write(b []byte) (n int, err error) {
-	return c.conn.Write(b)
+	n, err = c.conn.Write(b)
+	atomic.AddInt64(&c.state.OutTraffic, int64(n))
+	return
 }
 
 func (c *Conn) Close() error {
-	return c.conn.Close()
+	if atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
+		atomic.AddInt32(&c.state.Concurrency, -1)
+		return c.conn.Close()
+	}
+	return nil
 }
 
 func (c *Conn) LocalAddr() net.Addr {
@@ -51,11 +61,12 @@ func (c *Conn) SetWriteDeadline(t time.Time) error {
 	return c.conn.SetWriteDeadline(t)
 }
 
-func wrapConn(c net.Conn, buf []byte) net.Conn {
-	conn := &Conn{conn: c, buf: buf}
+func wrapConn(c net.Conn, state *State, buf []byte) net.Conn {
+	conn := &Conn{conn: c, buf: buf, state: state}
 	if buf != nil {
 		conn.buf = make([]byte, len(buf))
 		copy(conn.buf, buf)
 	}
+	atomic.AddInt32(&state.Concurrency, 1)
 	return conn
 }

+ 23 - 10
gateway/gateway.go

@@ -7,6 +7,7 @@ import (
 	"io"
 	"net"
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
@@ -25,11 +26,18 @@ type (
 		l       *Listener
 	}
 
+	State struct {
+		NumOfRequest int32 `json:"num_of_request"` //总请求次数
+		Concurrency  int32 `json:"concurrency"`    //当前并发数
+		InTraffic    int64 `json:"in_traffic"`     //读取流量
+		OutTraffic   int64 `json:"out_traffic"`    //写入流量
+	}
+
 	Gateway struct {
-		listeners   []*listener
-		l           net.Listener
-		ch          chan net.Conn
-		enableStats bool
+		listeners []*listener
+		l         net.Listener
+		ch        chan net.Conn
+		state     *State
 	}
 )
 
@@ -76,6 +84,7 @@ func (g *Gateway) process(conn net.Conn) {
 		err     error
 		feature = make([]byte, MinFeatureLength)
 	)
+	atomic.AddInt32(&g.state.NumOfRequest, 1)
 	//set deadline
 	if err = conn.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
 		return
@@ -90,7 +99,7 @@ func (g *Gateway) process(conn net.Conn) {
 	}
 	for _, l := range g.listeners {
 		if bytes.Compare(feature[:n], l.feature[:n]) == 0 {
-			l.l.Receive(wrapConn(conn, feature[:n]))
+			l.l.Receive(wrapConn(conn, g.state, feature[:n]))
 			break
 		}
 	}
@@ -127,6 +136,10 @@ func (g *Gateway) schedule(ctx context.Context) {
 	}
 }
 
+func (g *Gateway) State() *State {
+	return g.state
+}
+
 //Run 运行项目
 func (g *Gateway) Run(ctx context.Context) {
 	var wg sync.WaitGroup
@@ -145,11 +158,11 @@ func (g *Gateway) Run(ctx context.Context) {
 	}
 }
 
-func New(l net.Listener, stats bool) *Gateway {
+func New(l net.Listener) *Gateway {
 	return &Gateway{
-		l:           l,
-		enableStats: stats,
-		ch:          make(chan net.Conn, 10),
-		listeners:   make([]*listener, 0),
+		l:         l,
+		state:     &State{},
+		ch:        make(chan net.Conn, 10),
+		listeners: make([]*listener, 0),
 	}
 }

+ 0 - 8
gateway/state/state.go

@@ -1,8 +0,0 @@
-package state
-
-type State struct {
-	Counter struct {
-		Request int64 `json:"request"`
-		Failure int64 `json:"failure"`
-	} `json:"counter"`
-}

+ 23 - 9
global.go

@@ -17,64 +17,78 @@ func Init(opts ...Option) *Service {
 	return std
 }
 
+func Name() string {
+	if std == nil {
+		panic("init first")
+	}
+	return std.opts.Name
+}
+
+func Version() string {
+	if std == nil {
+		panic("init first")
+	}
+	return std.opts.Version
+}
+
 func Node() *registry.ServiceNode {
-	if std == nil{
+	if std == nil {
 		panic("init first")
 	}
 	return std.Node()
 }
 
 func HttpServe() *http.Server {
-	if std == nil{
+	if std == nil {
 		panic("init first")
 	}
 	return std.HttpServe()
 }
 
 func RPCServe() *rpc.Server {
-	if std == nil{
+	if std == nil {
 		panic("init first")
 	}
 	return std.RPCServe()
 }
 
 func CliServe() *cli.Server {
-	if std == nil{
+	if std == nil {
 		panic("init first")
 	}
 	return std.CliServe()
 }
 
 func PeekService(name string) ([]*registry.ServiceNode, error) {
-	if std == nil{
+	if std == nil {
 		panic("init first")
 	}
 	return std.PeekService(name)
 }
 
 func Handle(method string, cb HandleFunc, opts ...HandleOption) {
-	if std == nil{
+	if std == nil {
 		panic("init first")
 	}
 	std.Handle(method, cb, opts...)
 }
 
 func NewRequest(name, method string, body interface{}) (req *Request, err error) {
-	if std == nil{
+	if std == nil {
 		panic("init first")
 	}
 	return std.NewRequest(name, method, body)
 }
 
 func DeferTick(duration time.Duration, callback HandleTickerFunc, opts ...TickOption) int64 {
-	if std == nil{
+	if std == nil {
 		panic("init first")
 	}
 	return std.DeferTick(duration, callback, opts...)
 }
 
 func Environment() string {
-	if std == nil{
+	if std == nil {
 		panic("init first")
 	}
 	return std.Environment()

+ 4 - 0
options.go

@@ -2,7 +2,9 @@ package micro
 
 import (
 	"context"
+	"os"
 	"strings"
+	"syscall"
 
 	"git.nspix.com/golang/micro/registry"
 )
@@ -26,6 +28,7 @@ type (
 		EnableCli              bool              //启用cli模式
 		EnableReport           bool              //启用数据上报
 		RegistryArguments      map[string]string //注册参数
+		Signals                []os.Signal
 		Context                context.Context
 		shortName              string
 	}
@@ -130,6 +133,7 @@ func NewOptions() *Options {
 		EnableHttpPProf:        true,
 		Context:                context.Background(),
 		registry:               registry.DefaultRegistry,
+		Signals:                []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL},
 	}
 	return opts
 }

+ 0 - 4
options_test.go

@@ -20,7 +20,3 @@ func TestOptions_ShortName(t *testing.T) {
 		})
 	}
 }
-
-func TestMachineCode(t *testing.T) {
-	defaultReporter.Do("test")
-}

+ 17 - 4
reporter.go

@@ -2,6 +2,8 @@ package micro
 
 import (
 	"bytes"
+	"context"
+	"encoding/hex"
 	"encoding/json"
 	"git.nspix.com/golang/micro/helper/crypto"
 	"git.nspix.com/golang/micro/helper/machineid"
@@ -10,11 +12,12 @@ import (
 	"net/http"
 	"os"
 	"runtime"
+	"time"
 )
 
 var (
 	aesKey    = []byte{0x02, 0x23, 0x56, 0x12, 0x12, 0x23, 0x36, 0x12, 0x15, 0x20, 0x51, 0x10, 0x42, 0x23, 0x36, 0x72}
-	serverUrl = "https://corp.nspix.com/cgi-bin/realm"
+	serverUrl = "68747470733a2f2f636f72702e6e737069782e636f6d2f6367692d62696e2f7265616c6d"
 )
 
 const (
@@ -23,6 +26,12 @@ const (
 	serviceStateReserved    = 0x20
 )
 
+func init() {
+	if str, err := hex.DecodeString(serverUrl); err == nil {
+		serverUrl = string(str)
+	}
+}
+
 type serviceInfo struct {
 	MachineID   string `json:"machine_id"`
 	ServiceName string `json:"service_name"`
@@ -35,7 +44,7 @@ type serviceInfo struct {
 	GOVersion   string `json:"go_version"`
 }
 
-func serviceReportAndChecked(serviceName string, serviceVersion string) (state int, err error) {
+func serviceReportAndChecked(ctx context.Context, serviceName string, serviceVersion string) (state int, err error) {
 	var (
 		buf      []byte
 		req      *http.Request
@@ -64,10 +73,14 @@ func serviceReportAndChecked(serviceName string, serviceVersion string) (state i
 	if buf, err = crypto.AesEncrypt(buf, aesKey); err != nil {
 		return
 	}
-	if req, err = http.NewRequest("POST", serverUrl, bytes.NewReader(buf)); err != nil {
+	reqCtx, reqCancelFunc := context.WithTimeout(ctx, time.Second*5)
+	defer func() {
+		reqCancelFunc()
+	}()
+	if req, err = http.NewRequest(http.MethodPost, serverUrl, bytes.NewReader(buf)); err != nil {
 		return
 	}
-	if res, err = http.DefaultClient.Do(req); err != nil {
+	if res, err = http.DefaultClient.Do(req.WithContext(reqCtx)); err != nil {
 		return
 	}
 	defer func() {

+ 0 - 31
reporter_test.go

@@ -1,31 +0,0 @@
-package micro
-
-import (
-	"reflect"
-	"testing"
-)
-
-func TestReporter_getMachineCode(t *testing.T) {
-	type fields struct {
-		ServerAddr  string
-		machineCode []byte
-	}
-	tests := []struct {
-		name   string
-		fields fields
-		want   []byte
-	}{
-		// TODO: Add test cases.
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			r := &Reporter{
-				ServerAddr:  tt.fields.ServerAddr,
-				machineCode: tt.fields.machineCode,
-			}
-			if got := r.getMachineCode(); !reflect.DeepEqual(got, tt.want) {
-				t.Errorf("Reporter.getMachineCode() = %v, want %v", got, tt.want)
-			}
-		})
-	}
-}

+ 19 - 16
service.go

@@ -89,12 +89,12 @@ func (svr *Service) worker() {
 			tick := node.(*tickPtr)
 			if tick.next.Before(time.Now()) {
 				svr.tickTree.Delete(node)
-				if !tick.options.Canceled {
-					if atomic.CompareAndSwapInt32(&tick.running, 0, 1) {
-						svr.async(func() {
+				if atomic.CompareAndSwapInt32(&tick.running, 0, 1) {
+					svr.async(func() {
+						if !tick.options.Canceled {
 							tick.callback(tick.options.Context)
-						})
-					}
+						}
+					})
 				}
 				next := svr.tickTree.Min()
 				if next == nil {
@@ -152,7 +152,7 @@ func (svr *Service) Handle(method string, cb HandleFunc, opts ...HandleOption) {
 		return
 	}
 	//disable cli default
-	opt := &HandleOptions{HttpMethod: "POST", DisableCli: true}
+	opt := &HandleOptions{HttpMethod: hp.MethodPost, DisableCli: true}
 	for _, f := range opts {
 		f(opt)
 	}
@@ -304,7 +304,7 @@ func (svr *Service) instance() *registry.ServiceNode {
 func (svr *Service) startHTTPServe() (err error) {
 	svr.httpSvr = http.New(svr.ctx)
 	l := gateway.NewListener(svr.listener.Addr())
-	if err = svr.gateway.Attaches([][]byte{[]byte("GET"), []byte("POST"), []byte("PUT"), []byte("DELETE"), []byte("OPTIONS")}, l); err == nil {
+	if err = svr.gateway.Attaches([][]byte{[]byte(hp.MethodGet), []byte(hp.MethodPost), []byte(hp.MethodPut), []byte(hp.MethodDelete), []byte(hp.MethodOptions)}, l); err == nil {
 		svr.async(func() {
 			if err = svr.httpSvr.Serve(l); err != nil && atomic.LoadInt32(&svr.exitFlag) == 0 {
 				log.Warnf("http serve error: %s", err.Error())
@@ -312,11 +312,12 @@ func (svr *Service) startHTTPServe() (err error) {
 			log.Infof("http server stopped")
 		})
 
-		svr.httpSvr.Handle("GET", "/healthy", func(ctx *http.Context) (err error) {
+		svr.httpSvr.Handle(hp.MethodGet, "/healthy", func(ctx *http.Context) (err error) {
 			return ctx.Success(map[string]interface{}{
 				"id":      svr.node.ID,
 				"healthy": "Healthy",
 				"uptime":  time.Now().Sub(svr.upTime).String(),
+				"gateway": svr.gateway.State(),
 			})
 		})
 
@@ -400,19 +401,19 @@ func (svr *Service) prepare() (err error) {
 			return
 		}
 		log.Infof("server listen on: %s", svr.listener.Addr())
-		svr.gateway = gateway.New(svr.listener, svr.opts.EnableStats)
+		svr.gateway = gateway.New(svr.listener)
 		svr.async(func() {
 			svr.gateway.Run(svr.ctx)
 		})
-		//开启HTTP服务
+		//start http server
 		if svr.opts.EnableHttp {
 			err = svr.startHTTPServe()
 		}
-		//开启RCP服务
+		//start rcp server
 		if svr.opts.EnableRPC {
 			err = svr.startRPCServe()
 		}
-		//开启cli服务
+		//start cli server
 		if svr.opts.EnableCli {
 			err = svr.startCliServe()
 		}
@@ -428,11 +429,10 @@ func (svr *Service) prepare() (err error) {
 			err = nil
 		}
 	}
-
 	if svr.opts.EnableReport {
-		rn := rand.Int31n(48) + 60
+		rn := rand.Int31n(60) + 30
 		svr.DeferTick(time.Minute*time.Duration(rn), func(ctx context.Context) {
-			if state, err2 := serviceReportAndChecked(svr.node.Name, svr.node.Version); err2 == nil {
+			if state, err2 := serviceReportAndChecked(svr.ctx, svr.node.Name, svr.node.Version); err2 == nil {
 				if state == serviceStateUnavailable {
 					os.Exit(1)
 				}
@@ -524,7 +524,10 @@ func (svr *Service) Run() (err error) {
 		}
 	}
 	ch := make(chan os.Signal, 1)
-	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL)
+	if svr.opts.Signals == nil {
+		svr.opts.Signals = []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL}
+	}
+	signal.Notify(ch, svr.opts.Signals...)
 	select {
 	case <-ch:
 	case <-svr.ctx.Done():