client.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. package rpc
  2. import (
  3. "context"
  4. "errors"
  5. "io"
  6. "net"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "git.nspix.com/golang/micro/log"
  11. )
  12. var (
  13. DefaultTimeout = time.Second * 5
  14. )
  15. type (
  16. Client struct {
  17. conn net.Conn
  18. seq int32
  19. once sync.Once
  20. isConnected int32
  21. transactionLocker sync.RWMutex
  22. transaction map[uint16]*transaction
  23. exitFlag int32
  24. exitChan chan struct{}
  25. network string
  26. address string
  27. connLock sync.Mutex
  28. pintAt time.Time
  29. Timeout time.Duration
  30. }
  31. transaction struct {
  32. sequence uint16
  33. response *Response
  34. isCanceled bool
  35. ch chan *transaction
  36. }
  37. )
  38. func (t *transaction) Cancel() {
  39. t.isCanceled = true
  40. close(t.ch)
  41. }
  42. func (t *transaction) Done(r *Response) {
  43. t.response = r
  44. if t.ch != nil && !t.isCanceled {
  45. select {
  46. case t.ch <- t:
  47. default:
  48. }
  49. }
  50. }
  51. func (c *Client) commit(seq uint16) *transaction {
  52. c.transactionLocker.Lock()
  53. trans := &transaction{
  54. sequence: seq,
  55. isCanceled: false,
  56. ch: make(chan *transaction),
  57. }
  58. c.transaction[seq] = trans
  59. c.transactionLocker.Unlock()
  60. return trans
  61. }
  62. func (c *Client) eventLoop() {
  63. ticker := time.NewTicker(time.Second * 10)
  64. defer ticker.Stop()
  65. for {
  66. select {
  67. case <-c.exitChan:
  68. return
  69. case <-ticker.C:
  70. if atomic.LoadInt32(&c.isConnected) == 1 {
  71. _ = writeFrame(c.conn, &Frame{
  72. Func: FuncPing,
  73. })
  74. }
  75. }
  76. }
  77. }
  78. func (c *Client) rdyLoop() {
  79. log.Infof("RPC: connection %s connected", c.conn.LocalAddr())
  80. defer atomic.StoreInt32(&c.isConnected, 0)
  81. for {
  82. if frame, err := readFrame(c.conn); err == nil {
  83. switch frame.Func {
  84. case FuncPing:
  85. c.pintAt = time.Now()
  86. case FuncResponse:
  87. c.transactionLocker.RLock()
  88. ch, ok := c.transaction[frame.Sequence]
  89. c.transactionLocker.RUnlock()
  90. if ok {
  91. if res, err := ReadResponse(frame.Data); err == nil {
  92. ch.Done(res)
  93. } else {
  94. ch.Cancel()
  95. }
  96. } else {
  97. log.Warnf("RPC: connection %s response %d dropped", c.conn.LocalAddr(), frame.Sequence)
  98. }
  99. }
  100. } else {
  101. log.Infof("RPC: connection %s closed", c.conn.LocalAddr())
  102. break
  103. }
  104. }
  105. }
  106. func (c *Client) DialerContext(ctx context.Context, network string, addr string) (err error) {
  107. var (
  108. ok bool
  109. deadline time.Time
  110. )
  111. if deadline, ok = ctx.Deadline(); !ok {
  112. deadline = time.Now().Add(c.Timeout)
  113. }
  114. c.network = network
  115. c.address = addr
  116. c.once.Do(func() {
  117. go c.eventLoop()
  118. })
  119. return c.dialer(deadline.Sub(time.Now()))
  120. }
  121. func (c *Client) Dialer(network string, addr string) (err error) {
  122. c.network = network
  123. c.address = addr
  124. c.once.Do(func() {
  125. go c.eventLoop()
  126. })
  127. return c.dialer(c.Timeout)
  128. }
  129. func (c *Client) dialer(timeout time.Duration) (err error) {
  130. c.connLock.Lock()
  131. defer c.connLock.Unlock()
  132. if atomic.LoadInt32(&c.isConnected) == 1 {
  133. return
  134. }
  135. if c.conn, err = net.DialTimeout(c.network, c.address, timeout); err != nil {
  136. return
  137. } else {
  138. atomic.StoreInt32(&c.isConnected, 1)
  139. go c.rdyLoop()
  140. }
  141. return
  142. }
  143. func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error) {
  144. if atomic.LoadInt32(&c.isConnected) == 0 {
  145. var (
  146. ok bool
  147. deadline time.Time
  148. )
  149. if deadline, ok = ctx.Deadline(); !ok {
  150. deadline = time.Now().Add(c.Timeout)
  151. }
  152. if err = c.dialer(deadline.Sub(time.Now())); err != nil {
  153. err = io.ErrClosedPipe
  154. return
  155. }
  156. }
  157. seq := uint16(atomic.AddInt32(&c.seq, 1))
  158. if err = writeFrame(c.conn, &Frame{
  159. Func: FuncRequest,
  160. Sequence: seq,
  161. Data: req.Bytes(),
  162. }); err != nil {
  163. return
  164. }
  165. trans := c.commit(seq)
  166. select {
  167. case t, ok := <-trans.ch:
  168. if ok {
  169. res = t.response
  170. } else {
  171. //canceled
  172. err = io.ErrClosedPipe
  173. }
  174. case <-c.exitChan:
  175. err = io.ErrClosedPipe
  176. case <-ctx.Done():
  177. trans.Cancel()
  178. err = errors.New("Client.Timeout exceeded while awaiting response")
  179. }
  180. return
  181. }
  182. func (c *Client) Close() (err error) {
  183. if atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
  184. c.transactionLocker.Lock()
  185. for _, t := range c.transaction {
  186. t.Cancel()
  187. }
  188. c.transactionLocker.Unlock()
  189. if c.conn != nil {
  190. err = c.conn.Close()
  191. }
  192. c.isConnected = 0
  193. close(c.exitChan)
  194. }
  195. return
  196. }
  197. func NewClient() *Client {
  198. return &Client{
  199. Timeout: DefaultTimeout,
  200. exitChan: make(chan struct{}),
  201. transaction: make(map[uint16]*transaction),
  202. }
  203. }