client.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package rpc
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. var (
  11. DefaultTimeout = time.Second * 5
  12. )
  13. type (
  14. Client struct {
  15. conn net.Conn
  16. seq uint16
  17. isConnected int32
  18. transactionLocker sync.RWMutex
  19. transaction map[uint16]*transaction
  20. exitFlag int32
  21. exitChan chan struct{}
  22. network string
  23. address string
  24. connLock sync.Mutex
  25. Timeout time.Duration
  26. }
  27. transaction struct {
  28. sequence uint16
  29. response *Response
  30. isCanceled bool
  31. ch chan *transaction
  32. }
  33. )
  34. func (t *transaction) Cancel() {
  35. t.isCanceled = true
  36. close(t.ch)
  37. }
  38. func (t *transaction) Done(r *Response) {
  39. t.response = r
  40. if t.ch != nil && !t.isCanceled {
  41. select {
  42. case t.ch <- t:
  43. default:
  44. }
  45. }
  46. }
  47. func (c *Client) commit(seq uint16) *transaction {
  48. c.transactionLocker.Lock()
  49. trans := &transaction{
  50. sequence: seq,
  51. isCanceled: false,
  52. ch: make(chan *transaction),
  53. }
  54. c.transaction[seq] = trans
  55. c.transactionLocker.Unlock()
  56. return trans
  57. }
  58. func (c *Client) eventLoop() {
  59. ticker := time.NewTicker(time.Second * 10)
  60. defer ticker.Stop()
  61. for {
  62. select {
  63. case <-c.exitChan:
  64. return
  65. case <-ticker.C:
  66. if atomic.LoadInt32(&c.isConnected) == 1 {
  67. _ = writeFrame(c.conn, &Frame{
  68. Func: FuncPing,
  69. })
  70. }
  71. }
  72. }
  73. }
  74. func (c *Client) rdyLoop() {
  75. defer atomic.StoreInt32(&c.isConnected, 0)
  76. for {
  77. if frame, err := readFrame(c.conn); err == nil {
  78. if frame.Func == FuncResponse {
  79. c.transactionLocker.RLock()
  80. ch, ok := c.transaction[frame.Sequence]
  81. c.transactionLocker.RUnlock()
  82. if ok {
  83. if res, err := ReadResponse(frame.Data); err == nil {
  84. ch.Done(res)
  85. } else {
  86. ch.Cancel()
  87. }
  88. }
  89. } else if frame.Func == FuncPing {
  90. }
  91. } else {
  92. break
  93. }
  94. }
  95. }
  96. func (c *Client) Dialer(network string, addr string) (err error) {
  97. c.network = network
  98. c.address = addr
  99. go c.eventLoop()
  100. return c.dialer(c.Timeout)
  101. }
  102. func (c *Client) dialer(timeout time.Duration) (err error) {
  103. c.connLock.Lock()
  104. defer c.connLock.Unlock()
  105. if atomic.LoadInt32(&c.isConnected) == 1 {
  106. return
  107. }
  108. if c.conn, err = net.DialTimeout(c.network, c.address, timeout); err != nil {
  109. return
  110. } else {
  111. atomic.StoreInt32(&c.isConnected, 1)
  112. go c.rdyLoop()
  113. }
  114. return
  115. }
  116. func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error) {
  117. if atomic.LoadInt32(&c.isConnected) == 0 {
  118. var (
  119. ok bool
  120. deadline time.Time
  121. )
  122. if deadline, ok = ctx.Deadline(); !ok {
  123. deadline = time.Now().Add(c.Timeout)
  124. }
  125. if err = c.dialer(time.Now().Sub(deadline)); err != nil {
  126. err = io.ErrClosedPipe
  127. return
  128. }
  129. }
  130. c.seq++
  131. seq := c.seq
  132. if err = writeFrame(c.conn, &Frame{
  133. Func: FuncRequest,
  134. Sequence: seq,
  135. Data: req.Bytes(),
  136. }); err != nil {
  137. return
  138. }
  139. trans := c.commit(seq)
  140. select {
  141. case t, ok := <-trans.ch:
  142. if ok {
  143. res = t.response
  144. } else {
  145. err = io.ErrClosedPipe
  146. }
  147. case <-ctx.Done():
  148. trans.Cancel()
  149. err = ctx.Err()
  150. }
  151. return
  152. }
  153. func (c *Client) Close() (err error) {
  154. if atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
  155. if c.conn != nil {
  156. err = c.conn.Close()
  157. }
  158. c.isConnected = 0
  159. close(c.exitChan)
  160. }
  161. return
  162. }
  163. func NewClient() *Client {
  164. return &Client{
  165. Timeout: DefaultTimeout,
  166. exitChan: make(chan struct{}),
  167. transaction: make(map[uint16]*transaction),
  168. }
  169. }