client.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package rpc
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. type (
  11. Client struct {
  12. conn net.Conn
  13. seq uint16
  14. isConnected int32
  15. transactionLocker sync.RWMutex
  16. transaction map[uint16]*transaction
  17. }
  18. transaction struct {
  19. sequence uint16
  20. response *Response
  21. isCanceled bool
  22. ch chan *transaction
  23. }
  24. )
  25. func (t *transaction) Cancel() {
  26. t.isCanceled = true
  27. close(t.ch)
  28. }
  29. func (t *transaction) Done(r *Response) {
  30. t.response = r
  31. if t.ch != nil && !t.isCanceled {
  32. select {
  33. case t.ch <- t:
  34. default:
  35. }
  36. }
  37. }
  38. func (c *Client) rdyLoop() {
  39. defer atomic.StoreInt32(&c.isConnected, 0)
  40. for {
  41. if frame, err := readFrame(c.conn); err == nil {
  42. c.transactionLocker.RLock()
  43. ch, ok := c.transaction[frame.Sequence]
  44. c.transactionLocker.RUnlock()
  45. if ok {
  46. ch.Done(ReadResponse(frame.Data))
  47. }
  48. } else {
  49. break
  50. }
  51. }
  52. }
  53. func (c *Client) Close() (err error) {
  54. if c.conn != nil {
  55. err = c.conn.Close()
  56. }
  57. return
  58. }
  59. func (c *Client) Dialer(network string, addr string) (err error) {
  60. if c.conn, err = net.DialTimeout(network, addr, time.Second*10); err != nil {
  61. return
  62. } else {
  63. atomic.StoreInt32(&c.isConnected, 1)
  64. go c.rdyLoop()
  65. }
  66. return
  67. }
  68. func (c *Client) commit(seq uint16) *transaction {
  69. c.transactionLocker.Lock()
  70. trans := &transaction{
  71. sequence: seq,
  72. isCanceled: false,
  73. ch: make(chan *transaction),
  74. }
  75. c.transaction[seq] = trans
  76. c.transactionLocker.Unlock()
  77. return trans
  78. }
  79. func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error) {
  80. if atomic.LoadInt32(&c.isConnected) == 0 {
  81. err = io.ErrClosedPipe
  82. return
  83. }
  84. c.seq++
  85. seq := c.seq
  86. if err = writeFrame(c.conn, &Frame{
  87. Func: FuncRequest,
  88. Sequence: seq,
  89. Data: req.Encode(),
  90. }); err != nil {
  91. return
  92. }
  93. trans := c.commit(seq)
  94. select {
  95. case t, ok := <-trans.ch:
  96. if ok {
  97. res = t.response
  98. } else {
  99. err = io.ErrClosedPipe
  100. }
  101. case <-ctx.Done():
  102. trans.Cancel()
  103. err = ctx.Err()
  104. }
  105. return
  106. }
  107. func NewClient() *Client {
  108. return &Client{
  109. transaction: make(map[uint16]*transaction),
  110. }
  111. }