discovery.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package registry
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "git.nspix.com/golang/micro/helper/httpclient"
  9. "git.nspix.com/golang/micro/helper/utils"
  10. "io"
  11. "io/ioutil"
  12. "net/http"
  13. "time"
  14. )
  15. const (
  16. DefaultDiscoveryUrl = "https://discovery.nspix.com"
  17. )
  18. type (
  19. Discovery struct {
  20. baseUrl string
  21. Timeout time.Duration
  22. }
  23. discoveryResponse struct {
  24. Code int `json:"errno"`
  25. Message string `json:"errmsg"`
  26. Data json.RawMessage `json:"result"`
  27. }
  28. )
  29. func (r *Discovery) sendRequest(ctx context.Context, method, path string, body io.Reader, data interface{}) (err error) {
  30. var (
  31. req *http.Request
  32. res *http.Response
  33. )
  34. if req, err = http.NewRequest(method, r.baseUrl+path, body); err != nil {
  35. return
  36. }
  37. if body != nil {
  38. req.Header.Set("Content-Type", "application/json")
  39. }
  40. if res, err = httpclient.Do(ctx, req); err != nil {
  41. return
  42. }
  43. defer func() {
  44. _ = res.Body.Close()
  45. }()
  46. rv := &discoveryResponse{}
  47. if err = json.NewDecoder(res.Body).Decode(rv); err == nil {
  48. if rv.Code == 0 {
  49. if data != nil {
  50. err = json.Unmarshal(rv.Data, data)
  51. }
  52. } else {
  53. err = errors.New(rv.Message)
  54. }
  55. }
  56. return
  57. }
  58. func (r *Discovery) Register(ctx context.Context, instance *ServiceNode) (err error) {
  59. var (
  60. buf []byte
  61. )
  62. if buf, err = json.Marshal(instance); err != nil {
  63. return
  64. }
  65. if ctx == nil{
  66. ctx = context.Background()
  67. }
  68. cc, cancelFunc := context.WithTimeout(ctx, r.Timeout)
  69. defer func() {
  70. cancelFunc()
  71. }()
  72. return r.sendRequest(cc, "POST", "/register", bytes.NewReader(buf), nil)
  73. }
  74. func (r *Discovery) Deregister(ctx context.Context, instance *ServiceNode) (err error) {
  75. cc, cancelFunc := context.WithTimeout(ctx, r.Timeout)
  76. if ctx == nil{
  77. ctx = context.Background()
  78. }
  79. defer func() {
  80. cancelFunc()
  81. }()
  82. return r.sendRequest(cc, "DELETE", "/deregister/"+instance.ID, nil, nil)
  83. }
  84. func (r *Discovery) Get(ctx context.Context, name string) (instances []*ServiceNode, err error) {
  85. instances = make([]*ServiceNode, 0)
  86. if ctx == nil{
  87. ctx = context.Background()
  88. }
  89. cc, cancelFunc := context.WithTimeout(ctx, r.Timeout)
  90. defer func() {
  91. cancelFunc()
  92. }()
  93. err = r.sendRequest(cc, "GET", "/service?name="+name, nil, &instances)
  94. return
  95. }
  96. func (r *Discovery) Fetch(ctx context.Context) (instances []*ServiceNode, err error) {
  97. instances = make([]*ServiceNode, 0)
  98. if ctx == nil{
  99. ctx = context.Background()
  100. }
  101. cc, cancelFunc := context.WithTimeout(ctx, r.Timeout)
  102. defer func() {
  103. cancelFunc()
  104. }()
  105. err = r.sendRequest(cc, "GET", "/services", nil, &instances)
  106. return
  107. }
  108. func NewDiscovery(uri string) *Discovery {
  109. if uri == "" {
  110. //兼容k8s的情况
  111. namespaceFile := "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
  112. if utils.FileExists(namespaceFile) {
  113. if buf, err := ioutil.ReadFile(namespaceFile); err == nil {
  114. uri = fmt.Sprintf("http://discovery.%s.svc.cluster.local", string(buf))
  115. }
  116. }
  117. }
  118. if uri == "" {
  119. uri = DefaultDiscoveryUrl
  120. }
  121. return &Discovery{
  122. baseUrl: uri,
  123. Timeout: time.Second * 10,
  124. }
  125. }