client.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package micro
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "git.nspix.com/golang/micro/gateway/rpc"
  7. "git.nspix.com/golang/micro/registry"
  8. )
  9. type Client struct {
  10. selector *registry.Selector
  11. clients sync.Map
  12. once sync.Once
  13. }
  14. func (c *Client) Do(ctx context.Context, r *Request) (resp Response, err error) {
  15. var (
  16. ok bool
  17. node *registry.ServiceNode
  18. value interface{}
  19. client *rpc.Client
  20. )
  21. value, ok = c.clients.Load(r.ServiceName)
  22. if ok {
  23. client = value.(*rpc.Client)
  24. goto _EXEC
  25. }
  26. if node, err = c.selector.Select(ctx, r.ServiceName); err != nil {
  27. err = fmt.Errorf("selector %s failed (%s)", r.ServiceName, err.Error())
  28. return
  29. }
  30. client = rpc.NewClient()
  31. if err = client.DialerContext(ctx, "tcp", fmt.Sprintf("%s:%d", node.Address, node.Port)); err != nil {
  32. return
  33. }
  34. c.clients.Store(r.ServiceName, client)
  35. _EXEC:
  36. if resp, err = client.Do(ctx, rpc.NewRequest(r.Method, r.Body)); err != nil {
  37. client.Close()
  38. c.clients.Delete(r.ServiceName)
  39. }
  40. return
  41. }
  42. func (c *Client) Close() (err error) {
  43. c.clients.Range(func(key, value interface{}) bool {
  44. client := value.(*rpc.Client)
  45. _ = client.Close()
  46. c.clients.Delete(key)
  47. return true
  48. })
  49. return
  50. }
  51. func NewClient(reg registry.Registry) *Client {
  52. return &Client{
  53. selector: registry.NewSelector(reg),
  54. }
  55. }