123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- package rpc
- import (
- "context"
- "io"
- "net"
- "sync"
- "sync/atomic"
- "time"
- )
- type (
- Client struct {
- conn net.Conn
- seq uint16
- isConnected int32
- transactionLocker sync.RWMutex
- transaction map[uint16]*transaction
- exitFlag int32
- exitChan chan struct{}
- network string
- address string
- }
- transaction struct {
- sequence uint16
- response *Response
- isCanceled bool
- ch chan *transaction
- }
- )
- func (t *transaction) Cancel() {
- t.isCanceled = true
- close(t.ch)
- }
- func (t *transaction) Done(r *Response) {
- t.response = r
- if t.ch != nil && !t.isCanceled {
- select {
- case t.ch <- t:
- default:
- }
- }
- }
- func (c *Client) commit(seq uint16) *transaction {
- c.transactionLocker.Lock()
- trans := &transaction{
- sequence: seq,
- isCanceled: false,
- ch: make(chan *transaction),
- }
- c.transaction[seq] = trans
- c.transactionLocker.Unlock()
- return trans
- }
- func (c *Client) rdyLoop() {
- defer atomic.StoreInt32(&c.isConnected, 0)
- for {
- if frame, err := readFrame(c.conn); err == nil {
- if frame.Func == FuncResponse {
- c.transactionLocker.RLock()
- ch, ok := c.transaction[frame.Sequence]
- c.transactionLocker.RUnlock()
- if ok {
- if res, err := ReadResponse(frame.Data); err == nil {
- ch.Done(res)
- } else {
- ch.Cancel()
- }
- }
- }
- } else {
- break
- }
- }
- }
- func (c *Client) Dialer(network string, addr string) (err error) {
- c.network = network
- c.address = addr
- return c.dialer()
- }
- func (c *Client) dialer() (err error) {
- if c.conn, err = net.DialTimeout(c.network, c.address, time.Second*10); err != nil {
- return
- } else {
- atomic.StoreInt32(&c.isConnected, 1)
- go c.rdyLoop()
- }
- return
- }
- func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error) {
- if atomic.LoadInt32(&c.isConnected) == 0 {
- err = io.ErrClosedPipe
- return
- }
- c.seq++
- seq := c.seq
- if err = writeFrame(c.conn, &Frame{
- Func: FuncRequest,
- Sequence: seq,
- Data: req.Bytes(),
- }); err != nil {
- return
- }
- trans := c.commit(seq)
- select {
- case t, ok := <-trans.ch:
- if ok {
- res = t.response
- } else {
- err = io.ErrClosedPipe
- }
- case <-ctx.Done():
- trans.Cancel()
- err = ctx.Err()
- }
- return
- }
- func (c *Client) Close() (err error) {
- if atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
- if c.conn != nil {
- err = c.conn.Close()
- }
- close(c.exitChan)
- }
- return
- }
- func NewClient() *Client {
- return &Client{
- exitChan: make(chan struct{}),
- transaction: make(map[uint16]*transaction),
- }
- }
|