client.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package rpc
  2. import (
  3. "context"
  4. "errors"
  5. "io"
  6. "net"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.nspix.com/golang/micro/log"
  11. )
  12. var (
  13. DefaultTimeout = time.Second * 5
  14. )
  15. type (
  16. Client struct {
  17. conn net.Conn
  18. seq int32
  19. once sync.Once
  20. isConnected int32
  21. transactionLocker sync.RWMutex
  22. transaction map[uint16]*transaction
  23. exitFlag int32
  24. exitChan chan struct{}
  25. network string
  26. address string
  27. connLock sync.Mutex
  28. pintAt time.Time
  29. Timeout time.Duration
  30. }
  31. transaction struct {
  32. sequence uint16
  33. response *Response
  34. canceledFlag int32
  35. ch chan *transaction
  36. }
  37. )
  38. func (t *transaction) Cancel() {
  39. if atomic.CompareAndSwapInt32(&t.canceledFlag, 0, 1) {
  40. close(t.ch)
  41. }
  42. }
  43. func (t *transaction) Done(r *Response) {
  44. t.response = r
  45. if t.ch != nil && atomic.LoadInt32(&t.canceledFlag) == 0 {
  46. select {
  47. case t.ch <- t:
  48. default:
  49. }
  50. }
  51. }
  52. func (c *Client) commit(seq uint16) *transaction {
  53. c.transactionLocker.Lock()
  54. trans := &transaction{
  55. sequence: seq,
  56. ch: make(chan *transaction),
  57. }
  58. c.transaction[seq] = trans
  59. c.transactionLocker.Unlock()
  60. return trans
  61. }
  62. func (c *Client) eventLoop() {
  63. ticker := time.NewTicker(time.Second * 10)
  64. defer ticker.Stop()
  65. for {
  66. select {
  67. case <-c.exitChan:
  68. return
  69. case <-ticker.C:
  70. if atomic.LoadInt32(&c.isConnected) == 1 {
  71. _ = writeFrame(c.conn, &Frame{
  72. Func: FuncPing,
  73. })
  74. }
  75. }
  76. }
  77. }
  78. func (c *Client) rdyLoop() {
  79. defer atomic.StoreInt32(&c.isConnected, 0)
  80. for {
  81. if frame, err := readFrame(c.conn); err == nil {
  82. switch frame.Func {
  83. case FuncPing:
  84. c.pintAt = time.Now()
  85. case FuncResponse:
  86. c.transactionLocker.RLock()
  87. ch, ok := c.transaction[frame.Sequence]
  88. c.transactionLocker.RUnlock()
  89. if ok {
  90. if res, err := ReadResponse(frame.Data); err == nil {
  91. ch.Done(res)
  92. } else {
  93. ch.Cancel()
  94. }
  95. } else {
  96. log.Warnf("RPC: connection %s response %d dropped", c.conn.LocalAddr(), frame.Sequence)
  97. }
  98. }
  99. } else {
  100. log.Infof("RPC: connection %s closed", c.conn.LocalAddr())
  101. break
  102. }
  103. }
  104. }
  105. func (c *Client) DialerContext(ctx context.Context, network string, addr string) (err error) {
  106. var (
  107. ok bool
  108. deadline time.Time
  109. )
  110. if deadline, ok = ctx.Deadline(); !ok {
  111. deadline = time.Now().Add(c.Timeout)
  112. }
  113. c.network = network
  114. c.address = addr
  115. c.once.Do(func() {
  116. go c.eventLoop()
  117. })
  118. return c.dialer(deadline.Sub(time.Now()))
  119. }
  120. func (c *Client) Dialer(network string, addr string) (err error) {
  121. c.network = network
  122. c.address = addr
  123. c.once.Do(func() {
  124. go c.eventLoop()
  125. })
  126. return c.dialer(c.Timeout)
  127. }
  128. func (c *Client) dialer(timeout time.Duration) (err error) {
  129. c.connLock.Lock()
  130. defer c.connLock.Unlock()
  131. if atomic.LoadInt32(&c.isConnected) == 1 {
  132. return
  133. }
  134. if c.conn, err = net.DialTimeout(c.network, c.address, timeout); err != nil {
  135. return
  136. } else {
  137. atomic.StoreInt32(&c.isConnected, 1)
  138. go c.rdyLoop()
  139. }
  140. return
  141. }
  142. func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error) {
  143. if atomic.LoadInt32(&c.isConnected) == 0 {
  144. var (
  145. ok bool
  146. deadline time.Time
  147. )
  148. if deadline, ok = ctx.Deadline(); !ok {
  149. deadline = time.Now().Add(c.Timeout)
  150. }
  151. if err = c.dialer(deadline.Sub(time.Now())); err != nil {
  152. err = io.ErrClosedPipe
  153. return
  154. }
  155. }
  156. seq := uint16(atomic.AddInt32(&c.seq, 1))
  157. if err = writeFrame(c.conn, &Frame{
  158. Func: FuncRequest,
  159. Sequence: seq,
  160. Data: req.Bytes(),
  161. }); err != nil {
  162. return
  163. }
  164. trans := c.commit(seq)
  165. select {
  166. case t, ok := <-trans.ch:
  167. if ok {
  168. res = t.response
  169. } else {
  170. //canceled
  171. err = io.ErrClosedPipe
  172. }
  173. case <-c.exitChan:
  174. err = io.ErrClosedPipe
  175. case <-ctx.Done():
  176. trans.Cancel()
  177. err = errors.New("Client.Timeout exceeded while awaiting response")
  178. }
  179. return
  180. }
  181. func (c *Client) Close() (err error) {
  182. if atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
  183. c.transactionLocker.Lock()
  184. for _, t := range c.transaction {
  185. t.Cancel()
  186. }
  187. c.transactionLocker.Unlock()
  188. if c.conn != nil {
  189. err = c.conn.Close()
  190. }
  191. c.isConnected = 0
  192. close(c.exitChan)
  193. }
  194. return
  195. }
  196. func NewClient() *Client {
  197. return &Client{
  198. Timeout: DefaultTimeout,
  199. exitChan: make(chan struct{}),
  200. transaction: make(map[uint16]*transaction),
  201. }
  202. }