ソースを参照

优化服务注册

lxg 3 年 前
コミット
9c0e32ae2b
5 ファイル変更92 行追加46 行削除
  1. 19 18
      options.go
  2. 50 12
      registry/discovery.go
  3. 8 5
      registry/registry.go
  4. 1 1
      registry/selector.go
  5. 14 10
      service.go

+ 19 - 18
options.go

@@ -9,24 +9,25 @@ import (
 
 type (
 	Options struct {
-		Zone                   string            //注册域
-		Name                   string            //名称
-		Version                string            //版本号
-		EnableHttp             bool              //启用HTTP功能
-		EnableRPC              bool              //启用RPC功能
-		EnableInternalListener bool              //启用内置网络监听服务
-		DisableRegister        bool              //禁用注册
-		registry               registry.Registry //注册仓库
-		Server                 Server            //加载的服务
-		Port                   int               //绑定端口
-		Address                string            //绑定地址
-		EnableHttpPProf        bool              //启用HTTP调试工具
-		EnableStats            bool              //启用数据统计
-		EnableLogPrefix        bool              //启用日志前缀
-		EnableCli              bool              //启用cli模式
-		EnableReport           bool              //启用数据上报
-		Context                context.Context
-		shortName              string
+		Zone                    string            //注册域
+		Name                    string            //名称
+		Version                 string            //版本号
+		EnableHttp              bool              //启用HTTP功能
+		EnableRPC               bool              //启用RPC功能
+		EnableInternalListener  bool              //启用内置网络监听服务
+		DisableRegister         bool              //禁用注册
+		registry                registry.Registry //注册仓库
+		Server                  Server            //加载的服务
+		Port                    int               //绑定端口
+		Address                 string            //绑定地址
+		EnableHttpPProf         bool              //启用HTTP调试工具
+		EnableStats             bool              //启用数据统计
+		EnableLogPrefix         bool              //启用日志前缀
+		EnableCli               bool              //启用cli模式
+		EnableReport            bool              //启用数据上报
+		EnableHttpAuthorization bool              //启用请求授权认证
+		Context                 context.Context
+		shortName               string
 	}
 
 	Option func(o *Options)

+ 50 - 12
registry/discovery.go

@@ -5,8 +5,11 @@ import (
 	"context"
 	"encoding/json"
 	"errors"
+	"fmt"
 	"git.nspix.com/golang/micro/helper/httpclient"
+	"git.nspix.com/golang/micro/helper/utils"
 	"io"
+	"io/ioutil"
 	"net/http"
 	"time"
 )
@@ -18,6 +21,7 @@ const (
 type (
 	Discovery struct {
 		baseUrl string
+		Timeout time.Duration
 	}
 
 	discoveryResponse struct {
@@ -57,41 +61,75 @@ func (r *Discovery) sendRequest(ctx context.Context, method, path string, body i
 	return
 }
 
-func (r *Discovery) Register(instance *ServiceNode) (err error) {
+func (r *Discovery) Register(ctx context.Context, instance *ServiceNode) (err error) {
 	var (
 		buf []byte
 	)
 	if buf, err = json.Marshal(instance); err != nil {
 		return
 	}
-	ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
-	return r.sendRequest(ctx, "POST", "/register", bytes.NewReader(buf), nil)
+	if ctx == nil{
+		ctx = context.Background()
+	}
+	cc, cancelFunc := context.WithTimeout(ctx, r.Timeout)
+	defer func() {
+		cancelFunc()
+	}()
+	return r.sendRequest(cc, "POST", "/register", bytes.NewReader(buf), nil)
 }
 
-func (r *Discovery) Deregister(instance *ServiceNode) (err error) {
-	ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
-	return r.sendRequest(ctx, "DELETE", "/deregister/"+instance.ID, nil, nil)
+func (r *Discovery) Deregister(ctx context.Context, instance *ServiceNode) (err error) {
+	cc, cancelFunc := context.WithTimeout(ctx, r.Timeout)
+	if ctx == nil{
+		ctx = context.Background()
+	}
+	defer func() {
+		cancelFunc()
+	}()
+	return r.sendRequest(cc, "DELETE", "/deregister/"+instance.ID, nil, nil)
 }
 
-func (r *Discovery) Get(name string) (instances []*ServiceNode, err error) {
+func (r *Discovery) Get(ctx context.Context, name string) (instances []*ServiceNode, err error) {
 	instances = make([]*ServiceNode, 0)
-	ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
-	err = r.sendRequest(ctx, "GET", "/service?name="+name, nil, &instances)
+	if ctx == nil{
+		ctx = context.Background()
+	}
+	cc, cancelFunc := context.WithTimeout(ctx, r.Timeout)
+	defer func() {
+		cancelFunc()
+	}()
+	err = r.sendRequest(cc, "GET", "/service?name="+name, nil, &instances)
 	return
 }
 
-func (r *Discovery) Fetch() (instances []*ServiceNode, err error) {
+func (r *Discovery) Fetch(ctx context.Context) (instances []*ServiceNode, err error) {
 	instances = make([]*ServiceNode, 0)
-	ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
-	err = r.sendRequest(ctx, "GET", "/services", nil, &instances)
+	if ctx == nil{
+		ctx = context.Background()
+	}
+	cc, cancelFunc := context.WithTimeout(ctx, r.Timeout)
+	defer func() {
+		cancelFunc()
+	}()
+	err = r.sendRequest(cc, "GET", "/services", nil, &instances)
 	return
 }
 
 func NewDiscovery(uri string) *Discovery {
+	if uri == "" {
+		//兼容k8s的情况
+		namespaceFile := "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
+		if utils.FileExists(namespaceFile) {
+			if buf, err := ioutil.ReadFile(namespaceFile); err == nil {
+				uri = fmt.Sprintf("http://discovery.%s.svc.cluster.local", string(buf))
+			}
+		}
+	}
 	if uri == "" {
 		uri = DefaultDiscoveryUrl
 	}
 	return &Discovery{
 		baseUrl: uri,
+		Timeout: time.Second * 10,
 	}
 }

+ 8 - 5
registry/registry.go

@@ -1,6 +1,9 @@
 package registry
 
-import "os"
+import (
+	"context"
+	"os"
+)
 
 var (
 	DefaultRegistry Registry
@@ -12,10 +15,10 @@ const (
 
 type (
 	Registry interface {
-		Register(instance *ServiceNode) (err error)
-		Deregister(instance *ServiceNode) (err error)
-		Get(name string) ([]*ServiceNode, error)
-		Fetch() ([]*ServiceNode, error)
+		Register(ctx context.Context, instance *ServiceNode) (err error)
+		Deregister(ctx context.Context, instance *ServiceNode) (err error)
+		Get(ctx context.Context, name string) ([]*ServiceNode, error)
+		Fetch(ctx context.Context) ([]*ServiceNode, error)
 	}
 )
 

+ 1 - 1
registry/selector.go

@@ -17,7 +17,7 @@ func (selector *Selector) Select(ctx context.Context, name string) (node *Servic
 	var (
 		nodes []*ServiceNode
 	)
-	if nodes, err = selector.registry.Get(name); err != nil {
+	if nodes, err = selector.registry.Get(ctx, name); err != nil {
 		return
 	}
 	for _, node = range nodes {

+ 14 - 10
service.go

@@ -109,10 +109,10 @@ func (svr *Service) worker() {
 			}
 		case <-ticker.C:
 			if !svr.opts.DisableRegister {
-				if err = svr.registry.Register(svr.node); err != nil {
+				if err = svr.registry.Register(svr.ctx, svr.node); err != nil {
 					svr.triesRegister++
-					if svr.triesRegister%15 == 0 {
-						log.Warnf("try register service %s failed to many times", svr.opts.ShortName())
+					if svr.triesRegister%18 == 0 {
+						log.Warnf("service registered %s failed %d times, last error: %s", svr.opts.ShortName(), svr.triesRegister, err.Error())
 					}
 				} else {
 					svr.triesRegister = 0
@@ -189,7 +189,7 @@ func (svr *Service) NewRequest(name, method string, body interface{}) (req *Requ
 
 //PeekService 选取一个可靠的服务
 func (svr *Service) PeekService(name string) ([]*registry.ServiceNode, error) {
-	return svr.registry.Get(name)
+	return svr.registry.Get(svr.ctx, name)
 }
 
 func (svr *Service) HttpServe() *http.Server {
@@ -286,6 +286,10 @@ func (svr *Service) instance() *registry.ServiceNode {
 	if svr.opts.EnableStats {
 		node.Metadata["prometheus"] = "enable"
 	}
+	//启用HTTP授权认证
+	if svr.opts.EnableHttpAuthorization {
+		node.Metadata["http-authorization"] = "enable"
+	}
 	return node
 }
 
@@ -411,8 +415,8 @@ func (svr *Service) prepare() (err error) {
 		svr.worker()
 	})
 	if !svr.opts.DisableRegister {
-		if err = svr.registry.Register(svr.node); err != nil {
-			log.Warnf("try register %s failed cause by %s", svr.opts.ShortName(), err.Error())
+		if err = svr.registry.Register(svr.ctx, svr.node); err != nil {
+			log.Warnf("service %s registered failed cause by %s", svr.opts.ShortName(), err.Error())
 			svr.triesRegister++
 			err = nil
 		}
@@ -432,14 +436,14 @@ func (svr *Service) destroy() (err error) {
 		return
 	}
 	log.Infof("service stopping")
-	svr.cancelFunc()
 	if !svr.opts.DisableRegister {
-		if err = svr.registry.Deregister(svr.node); err != nil {
-			log.Warnf("deregister service %s error: %s", svr.opts.Name, err.Error())
+		if err = svr.registry.Deregister(svr.ctx, svr.node); err != nil {
+			log.Warnf("service %s deregister error: %s", svr.opts.Name, err.Error())
 		} else {
-			log.Infof("deregister service %s successful", svr.opts.Name)
+			log.Infof("service %s deregister successful", svr.opts.Name)
 		}
 	}
+	svr.cancelFunc()
 	if svr.listener != nil {
 		if err = svr.listener.Close(); err != nil {
 			log.Warnf(err.Error())