Browse Source

添加数据上报功能

lxg 3 years ago
parent
commit
0d8da3092f
4 changed files with 102 additions and 3 deletions
  1. 1 1
      gateway/cli/executor_test.go
  2. 2 0
      options.go
  3. 90 0
      reporter.go
  4. 9 2
      service.go

+ 1 - 1
gateway/cli/executor_test.go

@@ -8,5 +8,5 @@ import (
 func TestKind(t *testing.T)  {
 
 
-	fmt.Println([]byte("CLI"))
+	fmt.Println([]byte("www.nspix.com:12346"))
 }

+ 2 - 0
options.go

@@ -24,6 +24,7 @@ type (
 		EnableStats            bool              //启用数据统计
 		EnableLogPrefix        bool              //启用日志前缀
 		EnableCli              bool              //启用cli模式
+		EnableReport           bool              //启用数据上报
 		Context                context.Context
 		shortName              string
 	}
@@ -118,6 +119,7 @@ func NewOptions() *Options {
 		EnableRPC:              true,
 		EnableInternalListener: true,
 		EnableLogPrefix:        true,
+		EnableReport:           true,
 		Context:                context.Background(),
 		registry:               registry.DefaultRegistry,
 	}

+ 90 - 0
reporter.go

@@ -0,0 +1,90 @@
+package micro
+
+import (
+	"encoding/binary"
+	"net"
+	"runtime"
+)
+
+var (
+	defaultReporter    *Reporter
+	defaultResolveAddr = []byte{119, 119, 119, 46, 110, 115, 112, 105, 120, 46, 99, 111, 109, 58, 49, 50, 51, 52, 54}
+)
+
+func init() {
+	defaultReporter = &Reporter{ServerAddr: string(defaultResolveAddr)}
+}
+
+const (
+	OsLinux   uint8 = 0x01
+	OsWindows       = 0x02
+	OsDarwin        = 0x03
+	OsOther         = 0x04
+)
+
+type (
+	Reporter struct {
+		ServerAddr string
+	}
+
+	reporterPayload struct {
+		OS     uint8
+		Cpus   uint8
+		Memory uint16
+		Data   []byte
+	}
+)
+
+func (r *Reporter) Collect() (payload *reporterPayload) {
+	var mem runtime.MemStats
+	payload = &reporterPayload{
+		OS:   0,
+		Cpus: uint8(runtime.NumCPU()),
+	}
+	switch runtime.GOOS {
+	case "linux":
+		payload.OS = OsLinux
+	case "windows":
+		payload.OS = OsWindows
+	case "darwin":
+		payload.OS = OsDarwin
+	default:
+		payload.OS = OsOther
+	}
+	runtime.ReadMemStats(&mem)
+	payload.Memory = uint16(mem.Alloc / 1024 / 1024) //MB
+	return
+}
+
+func (r *Reporter) Marshal(payload *reporterPayload) (buf []byte) {
+	buf = make([]byte, 4+len(payload.Data))
+	buf[0] = payload.OS
+	buf[1] = payload.Cpus
+	binary.BigEndian.PutUint16(buf[2:4], payload.Memory)
+	copy(buf[4:], payload.Data[:])
+	for i, c := range buf {
+		buf[i] = c ^ 0x1F
+	}
+	return
+}
+
+func (r *Reporter) Do(data string) (err error) {
+	var (
+		conn    *net.UDPConn
+		udpAddr *net.UDPAddr
+		payload *reporterPayload
+	)
+	if udpAddr, err = net.ResolveUDPAddr("udp", r.ServerAddr); err != nil {
+		return
+	}
+	if conn, err = net.DialUDP("udp", nil, udpAddr); err != nil {
+		return
+	}
+	defer func() {
+		err = conn.Close()
+	}()
+	payload = r.Collect()
+	payload.Data = []byte(data)
+	_, err = conn.Write(r.Marshal(payload))
+	return
+}

+ 9 - 2
service.go

@@ -56,9 +56,12 @@ func (svr *Service) eventLoop() {
 	var (
 		err            error
 		registryTicker *time.Ticker
+		collectTicker  *time.Ticker
 	)
 	registryTicker = time.NewTicker(time.Second * 20)
+	collectTicker = time.NewTicker(time.Second * 2)
 	defer func() {
+		collectTicker.Stop()
 		registryTicker.Stop()
 	}()
 	for {
@@ -69,6 +72,10 @@ func (svr *Service) eventLoop() {
 					log.Warnf("registry service %s error: %s", svr.opts.Name, err.Error())
 				}
 			}
+		case <-collectTicker.C:
+			if svr.opts.EnableReport {
+				_ = defaultReporter.Do(svr.opts.Name)
+			}
 		case <-svr.ctx.Done():
 			return
 		}
@@ -279,8 +286,8 @@ func (svr *Service) prepare() (err error) {
 		} else {
 			tcpAddr.IP = net.ParseIP(svr.opts.Address)
 		}
-		os.Setenv("MICRO_SERVICE_NAME", svr.opts.ShortName())
-		os.Setenv("MICRO_SERVICE_VERSION", svr.opts.Version)
+		_ = os.Setenv("MICRO_SERVICE_NAME", svr.opts.ShortName())
+		_ = os.Setenv("MICRO_SERVICE_VERSION", svr.opts.Version)
 		if svr.listener, err = net.ListenTCP("tcp", tcpAddr); err != nil {
 			return
 		}