client.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package rpc
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. var (
  11. DefaultTimeout = time.Second * 5
  12. )
  13. type (
  14. Client struct {
  15. conn net.Conn
  16. seq uint16
  17. once sync.Once
  18. isConnected int32
  19. transactionLocker sync.RWMutex
  20. transaction map[uint16]*transaction
  21. exitFlag int32
  22. exitChan chan struct{}
  23. network string
  24. address string
  25. connLock sync.Mutex
  26. pintAt time.Time
  27. Timeout time.Duration
  28. }
  29. transaction struct {
  30. sequence uint16
  31. response *Response
  32. isCanceled bool
  33. ch chan *transaction
  34. }
  35. )
  36. func (t *transaction) Cancel() {
  37. t.isCanceled = true
  38. close(t.ch)
  39. }
  40. func (t *transaction) Done(r *Response) {
  41. t.response = r
  42. if t.ch != nil && !t.isCanceled {
  43. select {
  44. case t.ch <- t:
  45. default:
  46. }
  47. }
  48. }
  49. func (c *Client) commit(seq uint16) *transaction {
  50. c.transactionLocker.Lock()
  51. trans := &transaction{
  52. sequence: seq,
  53. isCanceled: false,
  54. ch: make(chan *transaction),
  55. }
  56. c.transaction[seq] = trans
  57. c.transactionLocker.Unlock()
  58. return trans
  59. }
  60. func (c *Client) eventLoop() {
  61. ticker := time.NewTicker(time.Second * 10)
  62. defer ticker.Stop()
  63. for {
  64. select {
  65. case <-c.exitChan:
  66. return
  67. case <-ticker.C:
  68. if atomic.LoadInt32(&c.isConnected) == 1 {
  69. _ = writeFrame(c.conn, &Frame{
  70. Func: FuncPing,
  71. })
  72. }
  73. }
  74. }
  75. }
  76. func (c *Client) rdyLoop() {
  77. defer atomic.StoreInt32(&c.isConnected, 0)
  78. for {
  79. if frame, err := readFrame(c.conn); err == nil {
  80. if frame.Func == FuncResponse {
  81. c.transactionLocker.RLock()
  82. ch, ok := c.transaction[frame.Sequence]
  83. c.transactionLocker.RUnlock()
  84. if ok {
  85. if res, err := ReadResponse(frame.Data); err == nil {
  86. ch.Done(res)
  87. } else {
  88. ch.Cancel()
  89. }
  90. }
  91. } else if frame.Func == FuncPing {
  92. c.pintAt = time.Now()
  93. }
  94. } else {
  95. break
  96. }
  97. }
  98. }
  99. func (c *Client) DialerContext(ctx context.Context, network string, addr string) (err error) {
  100. var (
  101. ok bool
  102. deadline time.Time
  103. )
  104. if deadline, ok = ctx.Deadline(); !ok {
  105. deadline = time.Now().Add(c.Timeout)
  106. }
  107. c.network = network
  108. c.address = addr
  109. c.once.Do(func() {
  110. go c.eventLoop()
  111. })
  112. return c.dialer(deadline.Sub(time.Now()))
  113. }
  114. func (c *Client) Dialer(network string, addr string) (err error) {
  115. c.network = network
  116. c.address = addr
  117. c.once.Do(func() {
  118. go c.eventLoop()
  119. })
  120. return c.dialer(c.Timeout)
  121. }
  122. func (c *Client) dialer(timeout time.Duration) (err error) {
  123. c.connLock.Lock()
  124. defer c.connLock.Unlock()
  125. if atomic.LoadInt32(&c.isConnected) == 1 {
  126. return
  127. }
  128. if c.conn, err = net.DialTimeout(c.network, c.address, timeout); err != nil {
  129. return
  130. } else {
  131. atomic.StoreInt32(&c.isConnected, 1)
  132. go c.rdyLoop()
  133. }
  134. return
  135. }
  136. func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error) {
  137. if atomic.LoadInt32(&c.isConnected) == 0 {
  138. var (
  139. ok bool
  140. deadline time.Time
  141. )
  142. if deadline, ok = ctx.Deadline(); !ok {
  143. deadline = time.Now().Add(c.Timeout)
  144. }
  145. if err = c.dialer(deadline.Sub(time.Now())); err != nil {
  146. err = io.ErrClosedPipe
  147. return
  148. }
  149. }
  150. c.seq++
  151. seq := c.seq
  152. if err = writeFrame(c.conn, &Frame{
  153. Func: FuncRequest,
  154. Sequence: seq,
  155. Data: req.Bytes(),
  156. }); err != nil {
  157. return
  158. }
  159. trans := c.commit(seq)
  160. select {
  161. case t, ok := <-trans.ch:
  162. if ok {
  163. res = t.response
  164. } else {
  165. err = io.ErrClosedPipe
  166. }
  167. case <-ctx.Done():
  168. trans.Cancel()
  169. err = ctx.Err()
  170. }
  171. return
  172. }
  173. func (c *Client) Close() (err error) {
  174. if atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
  175. if c.conn != nil {
  176. err = c.conn.Close()
  177. }
  178. c.isConnected = 0
  179. close(c.exitChan)
  180. }
  181. return
  182. }
  183. func NewClient() *Client {
  184. return &Client{
  185. Timeout: DefaultTimeout,
  186. exitChan: make(chan struct{}),
  187. transaction: make(map[uint16]*transaction),
  188. }
  189. }