client.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package micro
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "git.nspix.com/golang/micro/gateway/rpc"
  8. "git.nspix.com/golang/micro/registry"
  9. )
  10. type Client struct {
  11. selector *registry.Selector
  12. clients sync.Map
  13. once sync.Once
  14. }
  15. func (c *Client) Do(ctx context.Context, r *Request) (resp Response, err error) {
  16. var (
  17. ok bool
  18. node *registry.ServiceNode
  19. value interface{}
  20. client *rpc.Client
  21. tryFlag int32
  22. )
  23. value, ok = c.clients.Load(r.ServiceName)
  24. if ok {
  25. client = value.(*rpc.Client)
  26. goto _EXEC
  27. }
  28. _RETRY:
  29. if node, err = c.selector.Select(ctx, r.ServiceName); err != nil {
  30. err = fmt.Errorf("get service %s failed cause by %s", r.ServiceName, err.Error())
  31. return
  32. }
  33. client = rpc.NewClient()
  34. if err = client.DialerContext(ctx, "tcp", fmt.Sprintf("%s:%d", node.Address, node.Port)); err != nil {
  35. return
  36. }
  37. c.clients.Store(r.ServiceName, client)
  38. _EXEC:
  39. if resp, err = client.Do(ctx, rpc.NewRequest(r.Method, r.Body)); err != nil {
  40. err = client.Close()
  41. c.clients.Delete(r.ServiceName)
  42. if atomic.CompareAndSwapInt32(&tryFlag, 0, 1) {
  43. goto _RETRY
  44. }
  45. }
  46. return
  47. }
  48. func (c *Client) Close() (err error) {
  49. c.clients.Range(func(key, value interface{}) bool {
  50. client := value.(*rpc.Client)
  51. _ = client.Close()
  52. c.clients.Delete(key)
  53. return true
  54. })
  55. return
  56. }
  57. func NewClient(reg registry.Registry) *Client {
  58. return &Client{
  59. selector: registry.NewSelector(reg),
  60. }
  61. }