client.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package rpc
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. type (
  11. Client struct {
  12. conn net.Conn
  13. seq uint16
  14. isConnected int32
  15. transactionLocker sync.RWMutex
  16. transaction map[uint16]*transaction
  17. exitFlag int32
  18. exitChan chan struct{}
  19. network string
  20. address string
  21. }
  22. transaction struct {
  23. sequence uint16
  24. response *Response
  25. isCanceled bool
  26. ch chan *transaction
  27. }
  28. )
  29. func (t *transaction) Cancel() {
  30. t.isCanceled = true
  31. close(t.ch)
  32. }
  33. func (t *transaction) Done(r *Response) {
  34. t.response = r
  35. if t.ch != nil && !t.isCanceled {
  36. select {
  37. case t.ch <- t:
  38. default:
  39. }
  40. }
  41. }
  42. func (c *Client) commit(seq uint16) *transaction {
  43. c.transactionLocker.Lock()
  44. trans := &transaction{
  45. sequence: seq,
  46. isCanceled: false,
  47. ch: make(chan *transaction),
  48. }
  49. c.transaction[seq] = trans
  50. c.transactionLocker.Unlock()
  51. return trans
  52. }
  53. func (c *Client) rdyLoop() {
  54. defer atomic.StoreInt32(&c.isConnected, 0)
  55. for {
  56. if frame, err := readFrame(c.conn); err == nil {
  57. if frame.Func == FuncResponse {
  58. c.transactionLocker.RLock()
  59. ch, ok := c.transaction[frame.Sequence]
  60. c.transactionLocker.RUnlock()
  61. if ok {
  62. if res, err := ReadResponse(frame.Data); err == nil {
  63. ch.Done(res)
  64. } else {
  65. ch.Cancel()
  66. }
  67. }
  68. }
  69. } else {
  70. break
  71. }
  72. }
  73. }
  74. func (c *Client) Dialer(network string, addr string) (err error) {
  75. c.network = network
  76. c.address = addr
  77. return c.dialer()
  78. }
  79. func (c *Client) dialer() (err error) {
  80. if c.conn, err = net.DialTimeout(c.network, c.address, time.Second*10); err != nil {
  81. return
  82. } else {
  83. atomic.StoreInt32(&c.isConnected, 1)
  84. go c.rdyLoop()
  85. }
  86. return
  87. }
  88. func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error) {
  89. if atomic.LoadInt32(&c.isConnected) == 0 {
  90. err = io.ErrClosedPipe
  91. return
  92. }
  93. c.seq++
  94. seq := c.seq
  95. if err = writeFrame(c.conn, &Frame{
  96. Func: FuncRequest,
  97. Sequence: seq,
  98. Data: req.Bytes(),
  99. }); err != nil {
  100. return
  101. }
  102. trans := c.commit(seq)
  103. select {
  104. case t, ok := <-trans.ch:
  105. if ok {
  106. res = t.response
  107. } else {
  108. err = io.ErrClosedPipe
  109. }
  110. case <-ctx.Done():
  111. trans.Cancel()
  112. err = ctx.Err()
  113. }
  114. return
  115. }
  116. func (c *Client) Close() (err error) {
  117. if atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
  118. if c.conn != nil {
  119. err = c.conn.Close()
  120. }
  121. close(c.exitChan)
  122. }
  123. return
  124. }
  125. func NewClient() *Client {
  126. return &Client{
  127. exitChan: make(chan struct{}),
  128. transaction: make(map[uint16]*transaction),
  129. }
  130. }