package rpc import ( "context" "io" "net" "sync" "sync/atomic" "time" ) type ( Client struct { conn net.Conn seq uint16 isConnected int32 transactionLocker sync.RWMutex transaction map[uint16]*transaction } transaction struct { sequence uint16 response *Response isCanceled bool ch chan *transaction } ) func (t *transaction) Cancel() { t.isCanceled = true close(t.ch) } func (t *transaction) Done(r *Response) { t.response = r if t.ch != nil && !t.isCanceled { select { case t.ch <- t: default: } } } func (c *Client) rdyLoop() { defer atomic.StoreInt32(&c.isConnected, 0) for { if frame, err := readFrame(c.conn); err == nil { c.transactionLocker.RLock() ch, ok := c.transaction[frame.Sequence] c.transactionLocker.RUnlock() if ok { ch.Done(ReadResponse(frame.Data)) } } else { break } } } func (c *Client) Close() (err error) { if c.conn != nil { err = c.conn.Close() } return } func (c *Client) Dialer(network string, addr string) (err error) { if c.conn, err = net.DialTimeout(network, addr, time.Second*10); err != nil { return } else { atomic.StoreInt32(&c.isConnected, 1) go c.rdyLoop() } return } func (c *Client) commit(seq uint16) *transaction { c.transactionLocker.Lock() trans := &transaction{ sequence: seq, isCanceled: false, ch: make(chan *transaction), } c.transaction[seq] = trans c.transactionLocker.Unlock() return trans } func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error) { if atomic.LoadInt32(&c.isConnected) == 0 { err = io.ErrClosedPipe return } c.seq++ seq := c.seq if err = writeFrame(c.conn, &Frame{ Func: FuncRequest, Sequence: seq, Data: req.Encode(), }); err != nil { return } trans := c.commit(seq) select { case t, ok := <-trans.ch: if ok { res = t.response } else { err = io.ErrClosedPipe } case <-ctx.Done(): trans.Cancel() err = ctx.Err() } return } func NewClient() *Client { return &Client{ transaction: make(map[uint16]*transaction), } }