Browse Source

修改数据上报接口

fancl 2 years ago
parent
commit
8bba61a314
8 changed files with 156 additions and 109 deletions
  1. 1 0
      .gitignore
  2. 1 0
      cmd/main.go
  3. 16 0
      gateway/http/context.go
  4. 45 0
      helper/crypto/aes.go
  5. 20 0
      helper/crypto/aes_test.go
  6. 7 1
      options.go
  7. 59 106
      reporter.go
  8. 7 2
      service.go

+ 1 - 0
.gitignore

@@ -10,6 +10,7 @@ _posts
 *.dat
 *.db
 .DS_Store
+.vscode
 
 
 # Go.gitignore

+ 1 - 0
cmd/main.go

@@ -26,6 +26,7 @@ func main() {
 		micro.WithHttpDebug(),
 		micro.WithStats(),
 		micro.WithCli(),
+		micro.WithReport(),
 		micro.WithPort(6567),
 	)
 

+ 16 - 0
gateway/http/context.go

@@ -2,6 +2,7 @@ package http
 
 import (
 	"encoding/json"
+	"net"
 	"net/http"
 	"os"
 	"path"
@@ -44,6 +45,21 @@ func (c *Context) Success(val interface{}) (err error) {
 	return c.JSON(&Response{Result: val})
 }
 
+func (c *Context) GetRealIp() string {
+	if ip := c.Request().Header.Get("X-Forwarded-For"); ip != "" {
+		i := strings.IndexAny(ip, ",")
+		if i > 0 {
+			return strings.TrimSpace(ip[:i])
+		}
+		return ip
+	}
+	if ip := c.Request().Header.Get("X-Real-IP"); ip != "" {
+		return ip
+	}
+	ra, _, _ := net.SplitHostPort(c.Request().RemoteAddr)
+	return ra
+}
+
 func (c *Context) Error(code int, message string) (err error) {
 	return c.JSON(&Response{Code: code, Message: message})
 }

+ 45 - 0
helper/crypto/aes.go

@@ -0,0 +1,45 @@
+package crypto
+
+import (
+	"bytes"
+	"crypto/aes"
+	"crypto/cipher"
+)
+
+func PKCS7Padding(ciphertext []byte, blockSize int) []byte {
+	padding := blockSize - len(ciphertext)%blockSize
+	padtext := bytes.Repeat([]byte{byte(padding)}, padding)
+	return append(ciphertext, padtext...)
+}
+
+func PKCS7UnPadding(origData []byte) []byte {
+	length := len(origData)
+	unpadding := int(origData[length-1])
+	return origData[:(length - unpadding)]
+}
+
+func AesEncrypt(origData, key []byte) ([]byte, error) {
+	block, err := aes.NewCipher(key)
+	if err != nil {
+		return nil, err
+	}
+	blockSize := block.BlockSize()
+	origData = PKCS7Padding(origData, blockSize)
+	blockMode := cipher.NewCBCEncrypter(block, key[:blockSize])
+	crypted := make([]byte, len(origData))
+	blockMode.CryptBlocks(crypted, origData)
+	return crypted, nil
+}
+
+func AesDecrypt(crypted, key []byte) ([]byte, error) {
+	block, err := aes.NewCipher(key)
+	if err != nil {
+		return nil, err
+	}
+	blockSize := block.BlockSize()
+	blockMode := cipher.NewCBCDecrypter(block, key[:blockSize])
+	origData := make([]byte, len(crypted))
+	blockMode.CryptBlocks(origData, crypted)
+	origData = PKCS7UnPadding(origData)
+	return origData, nil
+}

+ 20 - 0
helper/crypto/aes_test.go

@@ -0,0 +1,20 @@
+package crypto
+
+import (
+	"fmt"
+	"testing"
+)
+
+func TestAesEncrypt(t *testing.T) {
+	var (
+		buf    []byte
+		err    error
+		aesKey = []byte{0x02, 0x23, 0x56, 0x12, 0x12, 0x23, 0x36, 0x12, 0x15, 0x20, 0x51, 0x10, 0x42, 0x23, 0x36, 0x72}
+	)
+	if buf, err = AesEncrypt([]byte("asddasdasdasdasdasdas"), aesKey); err == nil {
+		fmt.Println(string(buf))
+		if buf, err = AesDecrypt(buf, aesKey); err == nil {
+			fmt.Println(string(buf))
+		}
+	}
+}

+ 7 - 1
options.go

@@ -112,6 +112,12 @@ func WithoutRPC() Option {
 	}
 }
 
+func WithReport() Option {
+	return func(o *Options) {
+		o.EnableReport = true
+	}
+}
+
 func NewOptions() *Options {
 	opts := &Options{
 		Zone:                   "default",
@@ -120,7 +126,7 @@ func NewOptions() *Options {
 		EnableRPC:              true,
 		EnableInternalListener: true,
 		EnableLogPrefix:        true,
-		EnableReport:           true,
+		EnableReport:           false,
 		EnableHttpPProf:        true,
 		Context:                context.Background(),
 		registry:               registry.DefaultRegistry,

+ 59 - 106
reporter.go

@@ -1,133 +1,86 @@
 package micro
 
 import (
-	"crypto/md5"
-	"encoding/binary"
+	"bytes"
+	"encoding/json"
+	"git.nspix.com/golang/micro/helper/crypto"
 	"git.nspix.com/golang/micro/helper/machineid"
-	"hash"
-	"net"
+	"git.nspix.com/golang/micro/helper/net/ip"
+	"io/ioutil"
+	"net/http"
+	"os"
 	"runtime"
 )
 
 var (
-	defaultReporter    *Reporter
-	defaultResolveAddr = []byte{119, 119, 119, 46, 110, 115, 112, 105, 120, 46, 99, 111, 109, 58, 49, 50, 51, 52, 54}
+	aesKey    = []byte{0x02, 0x23, 0x56, 0x12, 0x12, 0x23, 0x36, 0x12, 0x15, 0x20, 0x51, 0x10, 0x42, 0x23, 0x36, 0x72}
+	serverUrl = "https://corp.nspix.com/cgi-bin/realm"
 )
 
-func init() {
-	defaultReporter = &Reporter{ServerAddr: string(defaultResolveAddr)}
-}
-
 const (
-	OsLinux   uint8 = 0x01
-	OsWindows       = 0x02
-	OsDarwin        = 0x03
-	OsOther         = 0x04
-)
-
-type (
-	Reporter struct {
-		ServerAddr  string
-		machineCode []byte
-	}
-
-	reporterPayload struct {
-		OS     uint8
-		Cpus   uint8
-		Memory uint16
-		Data   []byte
-	}
+	serviceStateAvailable   = 0x18
+	serviceStateUnavailable = 0x19
+	serviceStateReserved    = 0x20
 )
 
-func (r *Reporter) getMachineCode() []byte {
-	var (
-		b       []byte
-		err     error
-		encoder hash.Hash
-		inters  []net.Interface
-	)
-	//get network hard addr
-	if inters, err = net.Interfaces(); err == nil {
-		for _, inter := range inters {
-			if inter.HardwareAddr != nil {
-				b = inter.HardwareAddr
-				break
-			}
-		}
-	}
-	b = append(b, byte(runtime.NumCPU()))
-	encoder = md5.New()
-	encoder.Write(b)
-	return encoder.Sum(nil)
-}
-
-func (r *Reporter) Collect() (payload *reporterPayload) {
-	var mem runtime.MemStats
-	payload = &reporterPayload{
-		OS:   0,
-		Cpus: uint8(runtime.NumCPU()),
-	}
-	switch runtime.GOOS {
-	case "linux":
-		payload.OS = OsLinux
-	case "windows":
-		payload.OS = OsWindows
-	case "darwin":
-		payload.OS = OsDarwin
-	default:
-		payload.OS = OsOther
-	}
-	runtime.ReadMemStats(&mem)
-	payload.Memory = uint16(mem.Alloc / 1024) //KB
-	return
+type serviceInfo struct {
+	MachineID   string `json:"machine_id"`
+	ServiceName string `json:"service_name"`
+	Hostname    string `json:"hostname"`
+	Version     string `json:"version"`
+	IP          string `json:"ip"`
+	OS          string `json:"os"`
+	Cpus        int    `json:"cpus"`
+	Memory      int    `json:"memory"`
+	GOVersion   string `json:"go_version"`
 }
 
-func (r *Reporter) Marshal(payload *reporterPayload) (buf []byte) {
+func serviceReportAndChecked(serviceName string, serviceVersion string) (state int, err error) {
 	var (
-		dl uint8
-		cl uint8
+		buf      []byte
+		req      *http.Request
+		res      *http.Response
+		memState *runtime.MemStats
 	)
-	dl = uint8(len(payload.Data))
-	cl = uint8(len(r.machineCode))
-	buf = make([]byte, 6+int(dl)+int(cl))
-	buf[0] = payload.OS
-	buf[1] = payload.Cpus
-	buf[2] = cl
-	buf[3] = dl
-	binary.BigEndian.PutUint16(buf[4:6], payload.Memory)
-	copy(buf[6:6+cl], r.machineCode)
-	copy(buf[6+cl:], payload.Data[:])
-	for i, c := range buf {
-		buf[i] = c ^ 0x1F
+	defer func() {
+		recover()
+	}()
+	si := &serviceInfo{
+		ServiceName: serviceName,
+		Version:     serviceVersion,
+		IP:          ip.InternalIP(),
+		OS:          runtime.GOOS,
+		Cpus:        runtime.NumCPU(),
+		GOVersion:   runtime.Version(),
 	}
-	return
-}
-
-func (r *Reporter) Do(data string) (err error) {
-	var (
-		conn        *net.UDPConn
-		udpAddr     *net.UDPAddr
-		payload     *reporterPayload
-		machineCode string
-	)
-	if r.machineCode == nil {
-		if machineCode, err = machineid.Code(); err != nil {
-			r.machineCode = r.getMachineCode()
-		} else {
-			r.machineCode = []byte(machineCode)
-		}
+	memState = &runtime.MemStats{}
+	runtime.ReadMemStats(memState)
+	si.Memory = int(memState.TotalAlloc / 1024)
+	si.MachineID, _ = machineid.Code()
+	si.Hostname, _ = os.Hostname()
+	if buf, err = json.Marshal(si); err != nil {
+		return
 	}
-	if udpAddr, err = net.ResolveUDPAddr("udp", r.ServerAddr); err != nil {
+	if buf, err = crypto.AesEncrypt(buf, aesKey); err != nil {
 		return
 	}
-	if conn, err = net.DialUDP("udp", nil, udpAddr); err != nil {
+	if req, err = http.NewRequest("POST", serverUrl, bytes.NewReader(buf)); err != nil {
+		return
+	}
+	if res, err = http.DefaultClient.Do(req); err != nil {
 		return
 	}
 	defer func() {
-		err = conn.Close()
+		_ = res.Body.Close()
 	}()
-	payload = r.Collect()
-	payload.Data = []byte(data)
-	_, err = conn.Write(r.Marshal(payload))
+	if buf, err = ioutil.ReadAll(res.Body); err != nil {
+		return
+	}
+	if buf, err = crypto.AesDecrypt(buf, aesKey); err != nil {
+		return
+	}
+	if len(buf) > 5 {
+		state = int(buf[5])
+	}
 	return
 }

+ 7 - 2
service.go

@@ -428,10 +428,15 @@ func (svr *Service) prepare() (err error) {
 			err = nil
 		}
 	}
+
 	if svr.opts.EnableReport {
 		rn := rand.Int31n(48) + 60
-		svr.DeferTick(time.Hour*time.Duration(rn), func(ctx context.Context) {
-			_ = defaultReporter.Do(svr.opts.Name)
+		svr.DeferTick(time.Minute*time.Duration(rn), func(ctx context.Context) {
+			if state, err2 := serviceReportAndChecked(svr.node.Name, svr.node.Version); err2 == nil {
+				if state == serviceStateUnavailable {
+					os.Exit(1)
+				}
+			}
 		})
 	}
 	return