clientConn.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package goStrongswanVici
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. DefaultReadTimeout = 15 * time.Second
  11. )
  12. // This object is not thread safe.
  13. // if you want concurrent, you need create more clients.
  14. type ClientConn struct {
  15. conn net.Conn
  16. responseChan chan segment
  17. eventHandlers map[string]func(response map[string]interface{})
  18. lastError error
  19. // ReadTimeout specifies a time limit for requests made
  20. // by this client.
  21. ReadTimeout time.Duration
  22. lock sync.RWMutex
  23. }
  24. func (c *ClientConn) Close() error {
  25. c.lock.Lock()
  26. defer c.lock.Unlock()
  27. close(c.responseChan)
  28. c.lastError = io.ErrClosedPipe
  29. return c.conn.Close()
  30. }
  31. func NewClientConn(conn net.Conn) (client *ClientConn) {
  32. client = &ClientConn{
  33. conn: conn,
  34. responseChan: make(chan segment, 2),
  35. eventHandlers: map[string]func(response map[string]interface{}){},
  36. ReadTimeout: DefaultReadTimeout,
  37. }
  38. go client.readThread()
  39. return client
  40. }
  41. // it dial from unix:///var/run/charon.vici
  42. func NewClientConnFromDefaultSocket() (client *ClientConn, err error) {
  43. conn, err := net.Dial("unix", "/var/run/charon.vici")
  44. if err != nil {
  45. return
  46. }
  47. return NewClientConn(conn), nil
  48. }
  49. func (c *ClientConn) Request(apiname string, request map[string]interface{}) (response map[string]interface{}, err error) {
  50. err = writeSegment(c.conn, segment{
  51. typ: stCMD_REQUEST,
  52. name: apiname,
  53. msg: request,
  54. })
  55. if err != nil {
  56. fmt.Printf("error writing segment \n")
  57. return
  58. }
  59. outMsg := c.readResponse()
  60. c.lock.RLock()
  61. err = c.lastError
  62. if err != nil {
  63. c.lock.RUnlock()
  64. return nil, err
  65. }
  66. c.lock.RUnlock()
  67. if outMsg.typ != stCMD_RESPONSE {
  68. return nil, fmt.Errorf("[%s] response error %d", apiname, outMsg.typ)
  69. }
  70. return outMsg.msg, nil
  71. }
  72. func (c *ClientConn) readResponse() segment {
  73. select {
  74. case outMsg := <-c.responseChan:
  75. return outMsg
  76. case <-time.After(c.ReadTimeout):
  77. if c.lastError == nil {
  78. c.lock.Lock()
  79. c.lastError = fmt.Errorf("Timeout waiting for message response")
  80. c.lock.Unlock()
  81. }
  82. return segment{}
  83. }
  84. }
  85. func (c *ClientConn) RegisterEvent(name string, handler func(response map[string]interface{})) (err error) {
  86. c.lock.Lock()
  87. if c.eventHandlers[name] != nil {
  88. c.lock.Unlock()
  89. return fmt.Errorf("[event %s] register a event twice.", name)
  90. }
  91. c.eventHandlers[name] = handler
  92. err = writeSegment(c.conn, segment{
  93. typ: stEVENT_REGISTER,
  94. name: name,
  95. })
  96. if err != nil {
  97. delete(c.eventHandlers, name)
  98. c.lock.Unlock()
  99. return
  100. }
  101. c.lock.Unlock()
  102. outMsg := c.readResponse()
  103. // fmt.Printf("registerEvent %#v\n", outMsg)
  104. c.lock.Lock()
  105. lastError := c.lastError
  106. if lastError != nil {
  107. delete(c.eventHandlers, name)
  108. c.lock.Unlock()
  109. return err
  110. }
  111. if outMsg.typ != stEVENT_CONFIRM {
  112. delete(c.eventHandlers, name)
  113. c.lock.Unlock()
  114. return fmt.Errorf("[event %s] response error %d", name, outMsg.typ)
  115. }
  116. c.lock.Unlock()
  117. return nil
  118. }
  119. func (c *ClientConn) UnregisterEvent(name string) (err error) {
  120. err = writeSegment(c.conn, segment{
  121. typ: stEVENT_UNREGISTER,
  122. name: name,
  123. })
  124. if err != nil {
  125. return
  126. }
  127. outMsg := c.readResponse()
  128. // fmt.Printf("UnregisterEvent %#v\n", outMsg)
  129. c.lock.Lock()
  130. if c.lastError != nil {
  131. c.lock.Unlock()
  132. return c.lastError
  133. }
  134. c.lock.Unlock()
  135. if outMsg.typ != stEVENT_CONFIRM {
  136. return fmt.Errorf("[event %s] response error %d", name, outMsg.typ)
  137. }
  138. c.lock.Lock()
  139. delete(c.eventHandlers, name)
  140. c.lock.Unlock()
  141. return nil
  142. }
  143. func (c *ClientConn) readThread() {
  144. for {
  145. outMsg, err := readSegment(c.conn)
  146. if err != nil {
  147. c.lock.Lock()
  148. c.lastError = err
  149. c.lock.Unlock()
  150. return
  151. }
  152. switch outMsg.typ {
  153. case stCMD_RESPONSE, stEVENT_CONFIRM:
  154. c.responseChan <- outMsg
  155. case stEVENT:
  156. c.lock.Lock()
  157. handler := c.eventHandlers[outMsg.name]
  158. c.lock.Unlock()
  159. if handler != nil {
  160. handler(outMsg.msg)
  161. }
  162. default:
  163. c.lock.Lock()
  164. c.lastError = fmt.Errorf("[Client.readThread] unknow msg type %d", outMsg.typ)
  165. c.lock.Unlock()
  166. return
  167. }
  168. }
  169. }