瀏覽代碼

添加数据上报

lxg 4 年之前
父節點
當前提交
695cf2e23a
共有 5 個文件被更改,包括 219 次插入33 次删除
  1. 7 1
      micro.go
  2. 32 0
      options.go
  3. 22 0
      options_test.go
  4. 86 32
      service.go
  5. 72 0
      utils/metric/metric.go

+ 7 - 1
micro.go

@@ -2,14 +2,19 @@ package micro
 
 import (
 	"context"
+
 	"git.nspix.com/golang/micro/gateway/http"
 	"git.nspix.com/golang/micro/gateway/rpc"
 	"git.nspix.com/golang/micro/registry"
 )
 
+const (
+	EnvironmentDocker string = "docker"
+	EnvironmentHost          = "host"
+)
+
 type (
 	applicationKey struct {
-
 	}
 
 	HandleOptions struct {
@@ -30,6 +35,7 @@ type (
 		PeekService(name string) ([]*registry.ServiceNode, error)                   //选择一个服务
 		Handle(method string, cb HandleFunc, opts ...HandleOption)                  //注册一个处理器
 		NewRequest(name, method string, body interface{}) (req *Request, err error) //创建一个rpc请求
+		Environment() string
 	}
 
 	Server interface {

+ 32 - 0
options.go

@@ -2,10 +2,15 @@ package micro
 
 import (
 	"context"
+	"strings"
+
 	"git.nspix.com/golang/micro/registry"
+	"git.nspix.com/golang/micro/utils/metric"
 )
 
 type (
+	ReadMetricFunc func() metric.Values
+
 	Options struct {
 		Zone                   string            //注册域
 		Name                   string            //名称
@@ -18,12 +23,27 @@ type (
 		Server                 Server            //加载的服务
 		Port                   int               //绑定端口
 		Address                string            //绑定地址
+		TSDBUrl                string            //TSDB数据上报URL
+		MetricCallback         ReadMetricFunc    //读取系统数据的回调
 		Context                context.Context
+		shortName              string
 	}
 
 	Option func(o *Options)
 )
 
+func (o *Options) ShortName() string {
+	if o.shortName != "" {
+		return o.shortName
+	}
+	if pos := strings.LastIndex(o.Name, "/"); pos != -1 {
+		o.shortName = o.Name[pos+1:]
+	} else {
+		o.shortName = o.Name
+	}
+	return o.shortName
+}
+
 func WithName(name string, version string) Option {
 	return func(o *Options) {
 		o.Name = name
@@ -43,6 +63,18 @@ func WithRegistry(r registry.Registry) Option {
 	}
 }
 
+func WithTSDBUrl(uri string) Option {
+	return func(o *Options) {
+		o.TSDBUrl = uri
+	}
+}
+
+func WithMetricCallback(cb ReadMetricFunc) Option {
+	return func(o *Options) {
+		o.MetricCallback = cb
+	}
+}
+
 func WithServer(s Server) Option {
 	return func(o *Options) {
 		o.Server = s

+ 22 - 0
options_test.go

@@ -0,0 +1,22 @@
+package micro
+
+import "testing"
+
+func TestOptions_ShortName(t *testing.T) {
+	tests := []struct {
+		name string
+		o    *Options
+		want string
+	}{
+		{"package", &Options{Name: "git.nspix.com/golang/solar"}, "solar"},
+		{"string", &Options{Name: "solar"}, "solar"},
+		{"package", &Options{Name: "a.solar"}, "a.solar"},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := tt.o.ShortName(); got != tt.want {
+				t.Errorf("Options.ShortName() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}

+ 86 - 32
service.go

@@ -7,8 +7,10 @@ import (
 	"net"
 	"os"
 	"os/signal"
+	"runtime"
 	"strings"
 	"sync"
+	"sync/atomic"
 	"syscall"
 	"time"
 
@@ -18,23 +20,29 @@ import (
 	"git.nspix.com/golang/micro/log"
 	"git.nspix.com/golang/micro/registry"
 	"git.nspix.com/golang/micro/utils/docker"
+	"git.nspix.com/golang/micro/utils/metric"
 	"git.nspix.com/golang/micro/utils/net/ip"
 	"git.nspix.com/golang/micro/utils/unsafestr"
 )
 
+var (
+	upMetricProcessFlag int32
+)
+
 type Service struct {
-	opts       *Options
-	ctx        context.Context
-	cancelFunc context.CancelFunc
-	registry   registry.Registry
-	node       *registry.ServiceNode
-	listener   net.Listener
-	gateway    *gateway.Gateway
-	wg         sync.WaitGroup
-	httpSvr    *http.Server
-	rpcSvr     *rpc.Server
-	upTime     time.Time
-	client     *Client
+	opts        *Options
+	ctx         context.Context
+	cancelFunc  context.CancelFunc
+	registry    registry.Registry
+	node        *registry.ServiceNode
+	listener    net.Listener
+	gateway     *gateway.Gateway
+	wg          sync.WaitGroup
+	httpSvr     *http.Server
+	rpcSvr      *rpc.Server
+	upTime      time.Time
+	client      *Client
+	environment string
 }
 
 func (svr *Service) wrapSync(f func()) {
@@ -47,25 +55,63 @@ func (svr *Service) wrapSync(f func()) {
 
 func (svr *Service) eventLoop() {
 	var (
-		err    error
-		ticker *time.Ticker
+		err            error
+		registryTicker *time.Ticker
+		upMetricTicker *time.Ticker
 	)
-	ticker = time.NewTicker(time.Second * 10)
-	defer ticker.Stop()
+	registryTicker = time.NewTicker(time.Second * 20)
+	upMetricTicker = time.NewTicker(time.Second * 5)
+	defer func() {
+		registryTicker.Stop()
+	}()
 	for {
 		select {
-		case <-ticker.C:
+		case <-registryTicker.C:
 			if !svr.opts.DisableRegister {
 				if err = svr.registry.Register(svr.node); err != nil {
 					log.Warnf("registry service %s error: %s", svr.opts.Name, err.Error())
 				}
 			}
+		case <-upMetricTicker.C:
+			if svr.opts.TSDBUrl != "" {
+				if atomic.CompareAndSwapInt32(&upMetricProcessFlag, 0, 1) {
+					go svr.upMetrics()
+				}
+			}
 		case <-svr.ctx.Done():
 			return
 		}
 	}
 }
 
+func (svr *Service) upMetrics() {
+	ms := make(metric.Values, 0)
+	tags := make(map[string]string)
+	tags["host"] = ip.InternalIP()
+	tags["name"] = svr.opts.ShortName()
+	tags["version"] = svr.opts.Version
+	memStats := &runtime.MemStats{}
+	runtime.ReadMemStats(memStats)
+	ms = append(ms, metric.New("sys.go.goroutine", float64(runtime.NumGoroutine())).SetTags(tags))
+	ms = append(ms, metric.New("sys.go.ccall", float64(runtime.NumCgoCall())).SetTags(tags))
+	ms = append(ms, metric.New("sys.go.cpus", float64(runtime.NumCPU())).SetTags(tags))
+	ms = append(ms, metric.New("sys.go.gc", float64(memStats.NumGC)).SetTags(tags))
+	ms = append(ms, metric.New("sys.go.next_gc", float64(memStats.NextGC)).SetTags(tags))
+	ms = append(ms, metric.New("sys.go.heap_alloc", float64(memStats.HeapAlloc)).SetTags(tags))
+	ms = append(ms, metric.New("sys.go.heap_inuse", float64(memStats.HeapInuse)).SetTags(tags))
+	ms = append(ms, metric.New("sys.go.memory_allow", float64(memStats.Alloc)).SetTags(tags))
+	ms = append(ms, metric.New("sys.go.memory_frees", float64(memStats.Frees)).SetTags(tags))
+
+	if svr.opts.MetricCallback != nil {
+		vs := svr.opts.MetricCallback()
+		for _, v := range vs {
+			v.SetTags(tags)
+			ms = append(ms, v)
+		}
+	}
+	metric.Push(svr.ctx, svr.opts.TSDBUrl, ms)
+}
+
 func (svr *Service) Handle(method string, cb HandleFunc, opts ...HandleOption) {
 	opt := &HandleOptions{HttpMethod: "POST"}
 	for _, f := range opts {
@@ -117,13 +163,18 @@ func (svr *Service) Node() *registry.ServiceNode {
 	return svr.node
 }
 
-func (svr *Service) generateInstance() {
+func (svr *Service) Environment() string {
+	return svr.environment
+}
+
+func (svr *Service) instance() *registry.ServiceNode {
 	var (
 		err      error
 		id       string
 		dockerID string
 		tcpAddr  *net.TCPAddr
 		ipLocal  string
+		node     *registry.ServiceNode
 	)
 	if id, err = docker.SelfContainerID(); err != nil {
 		//生成唯一ID
@@ -133,8 +184,9 @@ func (svr *Service) generateInstance() {
 		id = hex.EncodeToString(e5.Sum(nil))
 	} else {
 		dockerID = id
+		svr.environment = EnvironmentDocker
 	}
-	svr.node = &registry.ServiceNode{
+	node = &registry.ServiceNode{
 		ID:        id,
 		Name:      svr.opts.Name,
 		Version:   svr.opts.Version,
@@ -146,21 +198,22 @@ func (svr *Service) generateInstance() {
 	} else {
 		ipLocal = svr.opts.Address
 	}
-	svr.node.Address = ipLocal
+	node.Address = ipLocal
 	if svr.listener != nil {
 		if tcpAddr, err = net.ResolveTCPAddr("tcp", svr.listener.Addr().String()); err == nil {
-			svr.node.Port = tcpAddr.Port
+			node.Port = tcpAddr.Port
 		}
 	} else {
-		svr.node.Port = svr.opts.Port
+		node.Port = svr.opts.Port
 	}
-	svr.node.Metadata["docker-id"] = dockerID
+	node.Metadata["docker-id"] = dockerID
 	if svr.opts.EnableHttp {
-		svr.node.Metadata["enable-http"] = "true"
+		node.Metadata["enable-http"] = "true"
 	}
 	if svr.opts.EnableRPC {
-		svr.node.Metadata["enable-rpc"] = "true"
+		node.Metadata["enable-rpc"] = "true"
 	}
+	return node
 }
 
 func (svr *Service) startHttpServe() (err error) {
@@ -240,7 +293,7 @@ func (svr *Service) prepare() (err error) {
 			err = svr.startRpcServe()
 		}
 	}
-	svr.generateInstance()
+	svr.node = svr.instance()
 	svr.wrapSync(func() {
 		svr.eventLoop()
 	})
@@ -304,12 +357,13 @@ func New(opts ...Option) *Service {
 		opt(o)
 	}
 	svr := &Service{
-		opts:     o,
-		upTime:   time.Now(),
-		httpSvr:  http.New(),
-		rpcSvr:   rpc.NewServer(),
-		registry: o.registry,
-		client:   NewClient(o.registry),
+		opts:        o,
+		upTime:      time.Now(),
+		httpSvr:     http.New(),
+		rpcSvr:      rpc.NewServer(),
+		registry:    o.registry,
+		client:      NewClient(o.registry),
+		environment: EnvironmentHost,
 	}
 	svr.ctx, svr.cancelFunc = context.WithCancel(o.Context)
 	return svr

+ 72 - 0
utils/metric/metric.go

@@ -0,0 +1,72 @@
+package metric
+
+import (
+	"bytes"
+	"context"
+	"encoding/json"
+	"errors"
+	"net/http"
+	"time"
+)
+
+type (
+	Value struct {
+		Metric string            `json:"metric"`
+		Value  float64           `json:"value"`
+		Tags   map[string]string `json:"tags"`
+	}
+
+	Values []*Value
+)
+
+func (m *Value) SetTag(key string, value string) *Value {
+	if m.Tags == nil {
+		m.Tags = make(map[string]string)
+	}
+	m.Tags[key] = value
+	return m
+}
+
+func (m *Value) SetTags(tags map[string]string) *Value {
+	for k, v := range tags {
+		m.Tags[k] = v
+	}
+	return m
+}
+
+func (m *Value) Bytes() []byte {
+	buf, _ := json.Marshal(m)
+	return buf
+}
+
+func (m Values) Bytes() []byte {
+	buf, _ := json.Marshal(m)
+	return buf
+}
+
+func New(metric string, value float64) *Value {
+	m := &Value{
+		Metric: metric,
+		Value:  value,
+	}
+	m.Tags = make(map[string]string)
+	return m
+}
+
+func Push(ctx context.Context, uri string, ms Values) (err error) {
+	var (
+		req *http.Request
+		res *http.Response
+	)
+	if req, err = http.NewRequest("POST", uri, bytes.NewReader(ms.Bytes())); err != nil {
+		return
+	}
+	cc, _ := context.WithTimeout(ctx, time.Second*5)
+	if res, err = http.DefaultClient.Do(req.WithContext(cc)); err == nil {
+		if !(res.StatusCode == 200 || res.StatusCode == 204) {
+			err = errors.New("read response error:" + res.Status)
+		}
+		_ = res.Body.Close()
+	}
+	return
+}