package micro import ( "context" "crypto/md5" "encoding/hex" "git.nspix.com/golang/micro/helper/random" "git.nspix.com/golang/micro/helper/utils" "io/ioutil" "math" "math/rand" "net" hp "net/http" "net/http/pprof" "os" "os/signal" "path" "strings" "sync" "sync/atomic" "syscall" "time" "git.nspix.com/golang/micro/broker" "git.nspix.com/golang/micro/gateway/cli" "git.nspix.com/golang/micro/helper/machineid" "git.nspix.com/golang/micro/stats/prometheusbackend" "github.com/google/btree" "github.com/prometheus/client_golang/prometheus/promhttp" "git.nspix.com/golang/micro/gateway" "git.nspix.com/golang/micro/gateway/http" "git.nspix.com/golang/micro/gateway/rpc" "git.nspix.com/golang/micro/helper/docker" "git.nspix.com/golang/micro/helper/net/ip" "git.nspix.com/golang/micro/helper/unsafestr" "git.nspix.com/golang/micro/log" "git.nspix.com/golang/micro/registry" ) var ( tickSequence int64 ) type Service struct { opts *Options ctx context.Context cancelFunc context.CancelFunc registry registry.Registry node *registry.ServiceNode listener net.Listener gateway *gateway.Gateway wg sync.WaitGroup httpSvr *http.Server rpcSvr *rpc.Server cliSvr *cli.Server upTime time.Time client *Client triesRegister int tickTimer *time.Timer tickTree *btree.BTree deferHandles []handleEntry environment string readyFlag int32 exitFlag int32 } func (svr *Service) async(f func()) { svr.wg.Add(1) go func() { f() svr.wg.Done() }() } func (svr *Service) worker() { var ( err error ticker *time.Ticker ) ticker = time.NewTicker(time.Second * 20) defer func() { ticker.Stop() }() for { select { case <-svr.tickTimer.C: node := svr.tickTree.Min() if node == nil { svr.tickTimer.Reset(math.MaxInt64) break } tick := node.(*tickPtr) if tick.next.Before(time.Now()) { svr.tickTree.Delete(node) if !tick.options.Canceled { if atomic.CompareAndSwapInt32(&tick.running, 0, 1) { svr.async(func() { tick.callback(tick.options.Context) }) } } next := svr.tickTree.Min() if next == nil { svr.tickTimer.Reset(math.MaxInt64) } else { svr.tickTimer.Reset(next.(*tickPtr).next.Sub(time.Now())) } } else { svr.tickTimer.Reset(tick.next.Sub(time.Now())) } case <-ticker.C: if !svr.opts.DisableRegister { if err = svr.registry.Register(svr.ctx, svr.node); err != nil { svr.triesRegister++ if svr.triesRegister%18 == 0 { log.Warnf("service registered %s failed %d times, last error: %s", svr.opts.ShortName(), svr.triesRegister, err.Error()) } } else { svr.triesRegister = 0 } } case <-svr.ctx.Done(): return } } } //DeferTick 定时执行一个任务 func (svr *Service) DeferTick(duration time.Duration, callback HandleTickerFunc, opts ...TickOption) int64 { tick := &tickPtr{ sequence: atomic.AddInt64(&tickSequence, 1), next: time.Now().Add(duration), options: &TickOptions{}, callback: callback, } for _, optFunc := range opts { optFunc(tick.options) } if tick.options.Context == nil { tick.options.Context = svr.ctx } svr.tickTimer.Reset(0) svr.tickTree.ReplaceOrInsert(tick) return tick.sequence } //Handle 处理函数 func (svr *Service) Handle(method string, cb HandleFunc, opts ...HandleOption) { if atomic.LoadInt32(&svr.readyFlag) != 1 { svr.deferHandles = append(svr.deferHandles, handleEntry{ Method: method, Func: cb, Options: opts, }) return } //disable cli default opt := &HandleOptions{HttpMethod: "POST", DisableCli: true} for _, f := range opts { f(opt) } //HTTP处理 if svr.opts.EnableHttp && !opt.DisableHttp { if opt.HttpPath == "" { opt.HttpPath = strings.ReplaceAll(method, ".", "/") } if opt.HttpPath[0] != '/' { opt.HttpPath = "/" + opt.HttpPath } svr.httpSvr.Handle(opt.HttpMethod, opt.HttpPath, func(ctx *http.Context) (err error) { return cb(ctx) }) } //启动RPC功能 if svr.opts.EnableRPC && !opt.DisableRpc { svr.rpcSvr.Handle(method, func(ctx *rpc.Context) error { return cb(ctx) }) } //启用CLI模式 if svr.opts.EnableCli && !opt.DisableCli { svr.cliSvr.Handle(method, func(ctx *cli.Context) (err error) { return cb(ctx) }) } return } //NewRequest 创建一个请求 func (svr *Service) NewRequest(name, method string, body interface{}) (req *Request, err error) { return &Request{ ServiceName: name, Method: method, Body: body, client: svr.client, }, nil } //PeekService 选取一个可靠的服务 func (svr *Service) PeekService(name string) ([]*registry.ServiceNode, error) { return svr.registry.Get(svr.ctx, name) } func (svr *Service) HttpServe() *http.Server { return svr.httpSvr } func (svr *Service) CliServe() *cli.Server { return svr.cliSvr } func (svr *Service) RPCServe() *rpc.Server { return svr.rpcSvr } func (svr *Service) Node() *registry.ServiceNode { return svr.node } func (svr *Service) Environment() string { return svr.environment } func (svr *Service) getMachineID() (machineID string) { var ( buf []byte err error ) cacheFile := path.Join(utils.CacheDir(), utils.HiddenFilePrefix()+svr.opts.ShortName()) if utils.FileExists(cacheFile) { if buf, err = ioutil.ReadFile(cacheFile); err == nil { machineID = string(buf) return } } if machineID == "" { if machineID, err = machineid.Code(); err != nil { machineID = random.String(64) } } _ = os.MkdirAll(path.Dir(cacheFile), 0644) _ = ioutil.WriteFile(cacheFile, []byte(machineID), 0755) return machineID } func (svr *Service) instance() *registry.ServiceNode { var ( err error id string dockerID string tcpAddr *net.TCPAddr ipLocal string node *registry.ServiceNode ) if id, err = docker.SelfContainerID(); err != nil { //生成唯一ID e5 := md5.New() e5.Write(unsafestr.StringToBytes(svr.opts.Name)) e5.Write(unsafestr.StringToBytes(svr.opts.Version)) id = hex.EncodeToString(e5.Sum(nil)) } else { dockerID = id svr.environment = EnvironmentDocker } node = ®istry.ServiceNode{ ID: id, Name: svr.opts.Name, Version: svr.opts.Version, Metadata: map[string]string{}, Addresses: make(map[string]registry.Addr), } if svr.opts.Address == "" { ipLocal = ip.InternalIP() } else { ipLocal = svr.opts.Address } node.Address = ipLocal if svr.listener != nil { if tcpAddr, err = net.ResolveTCPAddr("tcp", svr.listener.Addr().String()); err == nil { node.Port = tcpAddr.Port } } else { node.Port = svr.opts.Port } //上报机器码 node.Metadata["machine-code"] = svr.getMachineID() node.Metadata["docker-id"] = dockerID if svr.opts.EnableHttp { node.Metadata["enable-http"] = "true" } if svr.opts.EnableRPC { node.Metadata["enable-rpc"] = "true" } //服务注册时候处理 if svr.opts.EnableStats { node.Metadata["prometheus"] = "enable" } //添加服务注册参数 if svr.opts.RegistryArguments != nil { for k, v := range svr.opts.RegistryArguments { node.Metadata["registry-"+k] = v } } return node } func (svr *Service) startHTTPServe() (err error) { svr.httpSvr = http.New(svr.ctx) l := gateway.NewListener(svr.listener.Addr()) if err = svr.gateway.Attaches([][]byte{[]byte("GET"), []byte("POST"), []byte("PUT"), []byte("DELETE"), []byte("OPTIONS")}, l); err == nil { svr.async(func() { if err = svr.httpSvr.Serve(l); err != nil && atomic.LoadInt32(&svr.exitFlag) == 0 { log.Warnf("http serve error: %s", err.Error()) } log.Infof("http server stopped") }) svr.httpSvr.Handle("GET", "/healthy", func(ctx *http.Context) (err error) { return ctx.Success(map[string]interface{}{ "id": svr.node.ID, "healthy": "Healthy", "uptime": time.Now().Sub(svr.upTime).String(), }) }) if svr.opts.EnableStats { prometheusbackend.Init(svr.opts.shortName) svr.httpSvr.Handler("GET", "/metrics", promhttp.Handler()) } if svr.opts.EnableHttpPProf { svr.httpSvr.Handler("GET", "/debug/pprof/", hp.HandlerFunc(pprof.Index)) svr.httpSvr.Handler("GET", "/debug/pprof/goroutine", hp.HandlerFunc(pprof.Index)) svr.httpSvr.Handler("GET", "/debug/pprof/heap", hp.HandlerFunc(pprof.Index)) svr.httpSvr.Handler("GET", "/debug/pprof/mutex", hp.HandlerFunc(pprof.Index)) svr.httpSvr.Handler("GET", "/debug/pprof/threadcreate", hp.HandlerFunc(pprof.Index)) svr.httpSvr.Handler("GET", "/debug/pprof/cmdline", hp.HandlerFunc(pprof.Cmdline)) svr.httpSvr.Handler("GET", "/debug/pprof/profile", hp.HandlerFunc(pprof.Profile)) svr.httpSvr.Handler("GET", "/debug/pprof/symbol", hp.HandlerFunc(pprof.Symbol)) svr.httpSvr.Handler("GET", "/debug/pprof/trace", hp.HandlerFunc(pprof.Trace)) } log.Infof("attach http listener success") } else { log.Warnf("attach http listener failed cause by %s", err.Error()) } return } func (svr *Service) startRPCServe() (err error) { svr.rpcSvr = rpc.New(svr.ctx) l := gateway.NewListener(svr.listener.Addr()) if err = svr.gateway.Attach([]byte("RPC"), l); err == nil { svr.async(func() { if err = svr.rpcSvr.Serve(l); err != nil && atomic.LoadInt32(&svr.exitFlag) == 0 { log.Warnf("rpc serve start error: %s", err.Error()) } log.Infof("rpc server stopped") }) log.Infof("attach rpc listener success") } else { log.Warnf("attach rpc listener failed cause by %s", err.Error()) } return } func (svr *Service) startCliServe() (err error) { svr.cliSvr = cli.New(svr.ctx) l := gateway.NewListener(svr.listener.Addr()) if err = svr.gateway.Attach([]byte("CLI"), l); err == nil { svr.async(func() { if err = svr.cliSvr.Serve(l); err != nil && atomic.LoadInt32(&svr.exitFlag) == 0 { log.Warnf("cli serve start error: %s", err.Error()) } log.Infof("cli server stopped") }) log.Infof("attach cli listener success") } else { log.Warnf("attach cli listener failed cause by %s", err.Error()) } return } func (svr *Service) prepare() (err error) { svr.ctx = WithContext(svr.ctx, svr) if svr.opts.EnableInternalListener { var tcpAddr *net.TCPAddr //绑定指定的端口 if svr.opts.Port != 0 { tcpAddr = &net.TCPAddr{ Port: svr.opts.Port, } } //默认指定为本机IP if svr.opts.Address == "" { svr.opts.Address = ip.InternalIP() } //绑定指定的IP if tcpAddr == nil { tcpAddr = &net.TCPAddr{ IP: net.ParseIP(svr.opts.Address), } } else { tcpAddr.IP = net.ParseIP(svr.opts.Address) } _ = os.Setenv("MICRO_SERVICE_NAME", svr.opts.ShortName()) _ = os.Setenv("MICRO_SERVICE_VERSION", svr.opts.Version) if svr.listener, err = net.ListenTCP("tcp", tcpAddr); err != nil { return } log.Infof("server listen on: %s", svr.listener.Addr()) svr.gateway = gateway.New(svr.listener, svr.opts.EnableStats) svr.async(func() { svr.gateway.Run(svr.ctx) }) //开启HTTP服务 if svr.opts.EnableHttp { err = svr.startHTTPServe() } //开启RCP服务 if svr.opts.EnableRPC { err = svr.startRPCServe() } //开启cli服务 if svr.opts.EnableCli { err = svr.startCliServe() } } svr.node = svr.instance() svr.async(func() { svr.worker() }) if !svr.opts.DisableRegister { if err = svr.registry.Register(svr.ctx, svr.node); err != nil { log.Warnf("service %s registered failed cause by %s", svr.opts.ShortName(), err.Error()) svr.triesRegister++ err = nil } } if svr.opts.EnableReport { rn := rand.Int31n(48) + 60 svr.DeferTick(time.Hour*time.Duration(rn), func(ctx context.Context) { _ = defaultReporter.Do(svr.opts.Name) }) } return } //destroy stop and destroy service func (svr *Service) destroy() (err error) { if !atomic.CompareAndSwapInt32(&svr.exitFlag, 0, 1) { return } log.Infof("service stopping") if !svr.opts.DisableRegister { if err = svr.registry.Deregister(svr.ctx, svr.node); err != nil { log.Warnf("service %s deregister error: %s", svr.opts.Name, err.Error()) } else { log.Infof("service %s deregister successful", svr.opts.Name) } } svr.cancelFunc() if svr.listener != nil { if err = svr.listener.Close(); err != nil { log.Warnf(err.Error()) } else { log.Infof("listener closed") } } if err = svr.client.Close(); err != nil { log.Warnf(err.Error()) } svr.tickTimer.Stop() for svr.tickTree.Len() > 0 { node := svr.tickTree.DeleteMin() if node == nil { break } tick := node.(*tickPtr) if tick.options.MustBeExecute { tick.callback(tick.options.Context) } } svr.wg.Wait() log.Infof("service stopped") return } //Reload reload server func (svr *Service) Reload() (err error) { if svr.opts.Server == nil { return } log.Infof("reloading server %s", svr.opts.Name) if err = svr.opts.Server.Stop(); err != nil { return } return svr.opts.Server.Start(svr.ctx) } //Run setup service func (svr *Service) Run() (err error) { if svr.opts.EnableLogPrefix { log.Prefix(svr.opts.Name) } log.Infof("service starting") if err = svr.prepare(); err != nil { return } //set global broker component broker.SetGlobal(broker.NewInPrcBus(svr.ctx)) //start server if svr.opts.Server != nil { if err = svr.opts.Server.Start(svr.ctx); err != nil { return } } log.Infof("service started") if atomic.CompareAndSwapInt32(&svr.readyFlag, 0, 1) { for _, he := range svr.deferHandles { svr.Handle(he.Method, he.Func, he.Options...) } } ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL) select { case <-ch: case <-svr.ctx.Done(): } //stop server if svr.opts.Server != nil { err = svr.opts.Server.Stop() } return svr.destroy() } func New(opts ...Option) *Service { o := NewOptions() for _, opt := range opts { opt(o) } svr := &Service{ opts: o, upTime: time.Now(), registry: o.registry, deferHandles: make([]handleEntry, 0), tickTimer: time.NewTimer(math.MaxInt64), tickTree: btree.New(64), client: NewClient(o.registry), environment: EnvironmentHost, } svr.ctx, svr.cancelFunc = context.WithCancel(o.Context) return svr }