service.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. package micro
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/hex"
  6. "git.nspix.com/golang/micro/gateway/cli"
  7. "git.nspix.com/golang/micro/stats/prometheusbackend"
  8. "github.com/prometheus/client_golang/prometheus/promhttp"
  9. "net"
  10. hp "net/http"
  11. "net/http/pprof"
  12. "os"
  13. "os/signal"
  14. "strings"
  15. "sync"
  16. "syscall"
  17. "time"
  18. "git.nspix.com/golang/micro/gateway"
  19. "git.nspix.com/golang/micro/gateway/http"
  20. "git.nspix.com/golang/micro/gateway/rpc"
  21. "git.nspix.com/golang/micro/log"
  22. "git.nspix.com/golang/micro/registry"
  23. "git.nspix.com/golang/micro/utils/docker"
  24. "git.nspix.com/golang/micro/utils/net/ip"
  25. "git.nspix.com/golang/micro/utils/unsafestr"
  26. )
  27. type Service struct {
  28. opts *Options
  29. ctx context.Context
  30. cancelFunc context.CancelFunc
  31. registry registry.Registry
  32. node *registry.ServiceNode
  33. listener net.Listener
  34. gateway *gateway.Gateway
  35. wg sync.WaitGroup
  36. httpSvr *http.Server
  37. rpcSvr *rpc.Server
  38. cliSvr *cli.Server
  39. upTime time.Time
  40. client *Client
  41. environment string
  42. }
  43. func (svr *Service) wrapSync(f func()) {
  44. svr.wg.Add(1)
  45. go func() {
  46. f()
  47. svr.wg.Done()
  48. }()
  49. }
  50. func (svr *Service) eventLoop() {
  51. var (
  52. err error
  53. registryTicker *time.Ticker
  54. collectTicker *time.Ticker
  55. )
  56. registryTicker = time.NewTicker(time.Second * 20)
  57. collectTicker = time.NewTicker(time.Second * 2)
  58. defer func() {
  59. collectTicker.Stop()
  60. registryTicker.Stop()
  61. }()
  62. for {
  63. select {
  64. case <-registryTicker.C:
  65. if !svr.opts.DisableRegister {
  66. if err = svr.registry.Register(svr.node); err != nil {
  67. log.Warnf("registry service %s error: %s", svr.opts.Name, err.Error())
  68. }
  69. }
  70. case <-collectTicker.C:
  71. if svr.opts.EnableReport {
  72. _ = defaultReporter.Do(svr.opts.Name)
  73. }
  74. case <-svr.ctx.Done():
  75. return
  76. }
  77. }
  78. }
  79. func (svr *Service) Handle(method string, cb HandleFunc, opts ...HandleOption) {
  80. opt := &HandleOptions{HttpMethod: "POST"}
  81. for _, f := range opts {
  82. f(opt)
  83. }
  84. //HTTP处理
  85. if svr.opts.EnableHttp && !opt.DisableHttp {
  86. if opt.HttpPath == "" {
  87. opt.HttpPath = strings.ReplaceAll(method, ".", "/")
  88. }
  89. if opt.HttpPath[0] != '/' {
  90. opt.HttpPath = "/" + opt.HttpPath
  91. }
  92. svr.httpSvr.Handle(opt.HttpMethod, opt.HttpPath, func(ctx *http.Context) (err error) {
  93. return cb(ctx)
  94. })
  95. }
  96. //启动RPC功能
  97. if svr.opts.EnableRPC && !opt.DisableRpc {
  98. svr.rpcSvr.Handle(method, func(ctx *rpc.Context) error {
  99. return cb(ctx)
  100. })
  101. }
  102. return
  103. }
  104. func (svr *Service) NewRequest(name, method string, body interface{}) (req *Request, err error) {
  105. return &Request{
  106. ServiceName: name,
  107. Method: method,
  108. Body: body,
  109. client: svr.client,
  110. }, nil
  111. }
  112. func (svr *Service) PeekService(name string) ([]*registry.ServiceNode, error) {
  113. return svr.registry.Get(name)
  114. }
  115. func (svr *Service) HttpServe() *http.Server {
  116. return svr.httpSvr
  117. }
  118. func (svr *Service) CliServe() *cli.Server {
  119. return svr.cliSvr
  120. }
  121. func (svr *Service) RPCServe() *rpc.Server {
  122. return svr.rpcSvr
  123. }
  124. func (svr *Service) Node() *registry.ServiceNode {
  125. return svr.node
  126. }
  127. func (svr *Service) Environment() string {
  128. return svr.environment
  129. }
  130. func (svr *Service) instance() *registry.ServiceNode {
  131. var (
  132. err error
  133. id string
  134. dockerID string
  135. tcpAddr *net.TCPAddr
  136. ipLocal string
  137. node *registry.ServiceNode
  138. )
  139. if id, err = docker.SelfContainerID(); err != nil {
  140. //生成唯一ID
  141. e5 := md5.New()
  142. e5.Write(unsafestr.StringToBytes(svr.opts.Name))
  143. e5.Write(unsafestr.StringToBytes(svr.opts.Version))
  144. id = hex.EncodeToString(e5.Sum(nil))
  145. } else {
  146. dockerID = id
  147. svr.environment = EnvironmentDocker
  148. }
  149. node = &registry.ServiceNode{
  150. ID: id,
  151. Name: svr.opts.Name,
  152. Version: svr.opts.Version,
  153. Metadata: map[string]string{},
  154. Addresses: make(map[string]registry.Addr),
  155. }
  156. if svr.opts.Address == "" {
  157. ipLocal = ip.InternalIP()
  158. } else {
  159. ipLocal = svr.opts.Address
  160. }
  161. node.Address = ipLocal
  162. if svr.listener != nil {
  163. if tcpAddr, err = net.ResolveTCPAddr("tcp", svr.listener.Addr().String()); err == nil {
  164. node.Port = tcpAddr.Port
  165. }
  166. } else {
  167. node.Port = svr.opts.Port
  168. }
  169. node.Metadata["docker-id"] = dockerID
  170. if svr.opts.EnableHttp {
  171. node.Metadata["enable-http"] = "true"
  172. }
  173. if svr.opts.EnableRPC {
  174. node.Metadata["enable-rpc"] = "true"
  175. }
  176. //服务注册时候处理
  177. if svr.opts.EnableStats {
  178. node.Metadata["prometheus"] = "enable"
  179. }
  180. return node
  181. }
  182. func (svr *Service) startHTTPServe() (err error) {
  183. l := gateway.NewListener(svr.listener.Addr())
  184. if err = svr.gateway.Attaches([][]byte{[]byte("GET"), []byte("POST"), []byte("PUT"), []byte("DELETE"), []byte("OPTIONS")}, l); err == nil {
  185. svr.wrapSync(func() {
  186. if err = svr.httpSvr.Serve(l); err != nil {
  187. log.Warnf("http serve error: %s", err.Error())
  188. }
  189. })
  190. svr.httpSvr.Handle("GET", "/healthy", func(ctx *http.Context) (err error) {
  191. return ctx.Success(map[string]interface{}{
  192. "id": svr.node.ID,
  193. "healthy": "healthy",
  194. "uptime": time.Now().Sub(svr.upTime).String(),
  195. })
  196. })
  197. if svr.opts.EnableStats {
  198. prometheusbackend.Init(svr.opts.shortName)
  199. svr.httpSvr.Handler("GET", "/metrics", promhttp.Handler())
  200. }
  201. if svr.opts.EnableHttpPProf {
  202. svr.httpSvr.Handler("GET", "/debug/pprof/", hp.HandlerFunc(pprof.Index))
  203. svr.httpSvr.Handler("GET", "/debug/pprof/goroutine", hp.HandlerFunc(pprof.Index))
  204. svr.httpSvr.Handler("GET", "/debug/pprof/heap", hp.HandlerFunc(pprof.Index))
  205. svr.httpSvr.Handler("GET", "/debug/pprof/mutex", hp.HandlerFunc(pprof.Index))
  206. svr.httpSvr.Handler("GET", "/debug/pprof/threadcreate", hp.HandlerFunc(pprof.Index))
  207. svr.httpSvr.Handler("GET", "/debug/pprof/cmdline", hp.HandlerFunc(pprof.Cmdline))
  208. svr.httpSvr.Handler("GET", "/debug/pprof/profile", hp.HandlerFunc(pprof.Profile))
  209. svr.httpSvr.Handler("GET", "/debug/pprof/symbol", hp.HandlerFunc(pprof.Symbol))
  210. svr.httpSvr.Handler("GET", "/debug/pprof/trace", hp.HandlerFunc(pprof.Trace))
  211. }
  212. log.Infof("attach http listener success")
  213. } else {
  214. log.Warnf("attach http listener failed cause by %s", err.Error())
  215. }
  216. return
  217. }
  218. func (svr *Service) startRPCServe() (err error) {
  219. l := gateway.NewListener(svr.listener.Addr())
  220. if err = svr.gateway.Attach([]byte("RPC"), l); err == nil {
  221. svr.wrapSync(func() {
  222. if err = svr.rpcSvr.Serve(l); err != nil {
  223. log.Warnf("rpc serve start error: %s", err.Error())
  224. }
  225. })
  226. log.Infof("attach rpc listener success")
  227. } else {
  228. log.Warnf("attach rpc listener failed cause by %s", err.Error())
  229. }
  230. return
  231. }
  232. func (svr *Service) startCliServe() (err error) {
  233. l := gateway.NewListener(svr.listener.Addr())
  234. if err = svr.gateway.Attach([]byte("CLI"), l); err == nil {
  235. svr.wrapSync(func() {
  236. if err = svr.cliSvr.Serve(l); err != nil {
  237. log.Warnf("cli serve start error: %s", err.Error())
  238. }
  239. })
  240. log.Infof("attach cli listener success")
  241. } else {
  242. log.Warnf("attach cli listener failed cause by %s", err.Error())
  243. }
  244. return
  245. }
  246. func (svr *Service) prepare() (err error) {
  247. svr.ctx = WithContext(svr.ctx, svr)
  248. if svr.opts.EnableInternalListener {
  249. var tcpAddr *net.TCPAddr
  250. //绑定指定的端口
  251. if svr.opts.Port != 0 {
  252. tcpAddr = &net.TCPAddr{
  253. Port: svr.opts.Port,
  254. }
  255. }
  256. //默认指定为本机IP
  257. if svr.opts.Address == "" {
  258. svr.opts.Address = ip.InternalIP()
  259. }
  260. //绑定指定的IP
  261. if tcpAddr == nil {
  262. tcpAddr = &net.TCPAddr{
  263. IP: net.ParseIP(svr.opts.Address),
  264. }
  265. } else {
  266. tcpAddr.IP = net.ParseIP(svr.opts.Address)
  267. }
  268. _ = os.Setenv("MICRO_SERVICE_NAME", svr.opts.ShortName())
  269. _ = os.Setenv("MICRO_SERVICE_VERSION", svr.opts.Version)
  270. if svr.listener, err = net.ListenTCP("tcp", tcpAddr); err != nil {
  271. return
  272. }
  273. log.Infof("server listen on: %s", svr.listener.Addr())
  274. svr.gateway = gateway.New(svr.listener, svr.opts.EnableStats)
  275. svr.wrapSync(func() {
  276. svr.gateway.Run(svr.ctx)
  277. })
  278. //开启HTTP服务
  279. if svr.opts.EnableHttp {
  280. err = svr.startHTTPServe()
  281. }
  282. //开启RCP服务
  283. if svr.opts.EnableRPC {
  284. err = svr.startRPCServe()
  285. }
  286. //开启cli服务
  287. if svr.opts.EnableCli {
  288. err = svr.startCliServe()
  289. }
  290. }
  291. svr.node = svr.instance()
  292. svr.wrapSync(func() {
  293. svr.eventLoop()
  294. })
  295. if !svr.opts.DisableRegister {
  296. _ = svr.registry.Register(svr.node)
  297. }
  298. return
  299. }
  300. func (svr *Service) destroy() (err error) {
  301. log.Infof("service stopping")
  302. svr.cancelFunc()
  303. if !svr.opts.DisableRegister {
  304. if err = svr.registry.Deregister(svr.node); err != nil {
  305. log.Warnf("deregister service %s error: %s", svr.opts.Name, err.Error())
  306. } else {
  307. log.Infof("deregister service %s successful", svr.opts.Name)
  308. }
  309. }
  310. if svr.listener != nil {
  311. if err = svr.listener.Close(); err != nil {
  312. log.Warnf(err.Error())
  313. }
  314. }
  315. if err = svr.client.Close(); err != nil {
  316. log.Warnf(err.Error())
  317. }
  318. log.Infof("service stopped")
  319. return
  320. }
  321. func (svr *Service) Run() (err error) {
  322. if svr.opts.EnableLogPrefix {
  323. log.Prefix(svr.opts.Name)
  324. }
  325. log.Infof("service starting")
  326. if err = svr.prepare(); err != nil {
  327. return
  328. }
  329. //start server
  330. if svr.opts.Server != nil {
  331. if err = svr.opts.Server.Start(svr.ctx); err != nil {
  332. return
  333. }
  334. }
  335. log.Infof("service started")
  336. //waiting
  337. ch := make(chan os.Signal, 1)
  338. signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL)
  339. select {
  340. case <-ch:
  341. case <-svr.ctx.Done():
  342. }
  343. //stop server
  344. if svr.opts.Server != nil {
  345. err = svr.opts.Server.Stop()
  346. }
  347. return svr.destroy()
  348. }
  349. func New(opts ...Option) *Service {
  350. o := NewOptions()
  351. for _, opt := range opts {
  352. opt(o)
  353. }
  354. svr := &Service{
  355. opts: o,
  356. upTime: time.Now(),
  357. httpSvr: http.New(),
  358. cliSvr: cli.New(),
  359. rpcSvr: rpc.NewServer(),
  360. registry: o.registry,
  361. client: NewClient(o.registry),
  362. environment: EnvironmentHost,
  363. }
  364. svr.ctx, svr.cancelFunc = context.WithCancel(o.Context)
  365. return svr
  366. }