client.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. package cli
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "git.nspix.com/golang/kos/util/env"
  7. "github.com/peterh/liner"
  8. "io"
  9. "math"
  10. "net"
  11. "os"
  12. "path/filepath"
  13. "strings"
  14. "sync"
  15. "sync/atomic"
  16. "time"
  17. )
  18. type Client struct {
  19. name string
  20. ctx context.Context
  21. address string
  22. sequence uint16
  23. conn net.Conn
  24. liner *liner.State
  25. mutex sync.Mutex
  26. exitChan chan struct{}
  27. readyChan chan struct{}
  28. commandChan chan *Frame
  29. completerChan chan *Frame
  30. Timeout time.Duration
  31. exitFlag int32
  32. }
  33. func (client *Client) getSequence() uint16 {
  34. client.mutex.Lock()
  35. defer client.mutex.Unlock()
  36. if client.sequence >= math.MaxUint16 {
  37. client.sequence = 0
  38. }
  39. client.sequence++
  40. n := client.sequence
  41. return n
  42. }
  43. func (client *Client) dialContext(ctx context.Context, address string) (conn net.Conn, err error) {
  44. var (
  45. pos int
  46. network string
  47. dialer net.Dialer
  48. )
  49. if pos = strings.Index(address, "://"); pos > -1 {
  50. network = address[:pos]
  51. address = address[pos+3:]
  52. } else {
  53. network = "tcp"
  54. }
  55. if conn, err = dialer.DialContext(ctx, network, address); err != nil {
  56. return
  57. }
  58. return
  59. }
  60. func (client *Client) renderBanner(info *Info) {
  61. client.name = info.Name
  62. fmt.Printf("Welcome to the %s(%s) monitor\n", info.Name, info.Version)
  63. fmt.Printf("Your connection id is %d\n", info.ID)
  64. fmt.Printf("Last login: %s from %s\n", info.ServerTime.Format(time.RFC822), info.RemoteAddr)
  65. fmt.Printf("Type 'help' for help. Type 'exit' for quit. Type 'cls' to clear input statement.\n")
  66. }
  67. func (client *Client) ioLoop(r io.Reader) {
  68. defer func() {
  69. _ = client.Close()
  70. }()
  71. for {
  72. frame, err := readFrame(r)
  73. if err != nil {
  74. return
  75. }
  76. switch frame.Type {
  77. case PacketTypeHandshake:
  78. info := &Info{}
  79. if err = json.Unmarshal(frame.Data, info); err == nil {
  80. client.renderBanner(info)
  81. }
  82. select {
  83. case client.readyChan <- struct{}{}:
  84. case <-client.exitChan:
  85. return
  86. }
  87. case PacketTypeCompleter:
  88. select {
  89. case client.completerChan <- frame:
  90. case <-client.exitChan:
  91. return
  92. }
  93. case PacketTypeCommand:
  94. select {
  95. case client.commandChan <- frame:
  96. case <-client.exitChan:
  97. return
  98. }
  99. }
  100. }
  101. }
  102. func (client *Client) waitResponse(seq uint16, timeout time.Duration) {
  103. timer := time.NewTimer(timeout)
  104. defer timer.Stop()
  105. for {
  106. select {
  107. case <-timer.C:
  108. fmt.Println("timeout waiting for response")
  109. return
  110. case <-client.exitChan:
  111. return
  112. case res, ok := <-client.commandChan:
  113. if !ok {
  114. break
  115. }
  116. if res.Seq == seq {
  117. if res.Error != "" {
  118. fmt.Print(res.Error)
  119. } else {
  120. fmt.Print(string(res.Data))
  121. }
  122. if res.Flag == FlagComplete {
  123. fmt.Println("")
  124. return
  125. }
  126. }
  127. }
  128. }
  129. }
  130. func (client *Client) completer(str string) (ss []string) {
  131. var (
  132. err error
  133. seq uint16
  134. )
  135. ss = make([]string, 0)
  136. seq = client.getSequence()
  137. if err = writeFrame(client.conn, newFrame(PacketTypeCompleter, FlagComplete, seq, client.Timeout, []byte(str))); err != nil {
  138. return
  139. }
  140. select {
  141. case <-time.After(time.Second * 5):
  142. case frame, ok := <-client.completerChan:
  143. if ok {
  144. err = json.Unmarshal(frame.Data, &ss)
  145. }
  146. }
  147. return
  148. }
  149. func (client *Client) Execute(s string) (err error) {
  150. var (
  151. seq uint16
  152. )
  153. if client.conn, err = client.dialContext(client.ctx, client.address); err != nil {
  154. return err
  155. }
  156. defer func() {
  157. _ = client.Close()
  158. }()
  159. go client.ioLoop(client.conn)
  160. seq = client.getSequence()
  161. if err = writeFrame(client.conn, newFrame(PacketTypeCommand, FlagComplete, seq, client.Timeout, []byte(s))); err != nil {
  162. return err
  163. }
  164. client.waitResponse(seq, client.Timeout)
  165. return
  166. }
  167. func (client *Client) Shell() (err error) {
  168. var (
  169. seq uint16
  170. line string
  171. )
  172. client.liner.SetCtrlCAborts(true)
  173. if client.conn, err = client.dialContext(client.ctx, client.address); err != nil {
  174. return err
  175. }
  176. defer func() {
  177. _ = client.Close()
  178. }()
  179. if err = writeFrame(client.conn, newFrame(PacketTypeHandshake, FlagComplete, client.getSequence(), client.Timeout, nil)); err != nil {
  180. return
  181. }
  182. go client.ioLoop(client.conn)
  183. select {
  184. case <-client.readyChan:
  185. case <-client.ctx.Done():
  186. return
  187. }
  188. client.liner.SetCompleter(client.completer)
  189. for {
  190. if line, err = client.liner.Prompt(client.name + "> "); err != nil {
  191. break
  192. }
  193. if atomic.LoadInt32(&client.exitFlag) == 1 {
  194. fmt.Println(Bye)
  195. break
  196. }
  197. line = strings.TrimSpace(line)
  198. if line == "" {
  199. continue
  200. }
  201. if strings.ToLower(line) == "exit" || strings.ToLower(line) == "quit" {
  202. fmt.Println(Bye)
  203. return
  204. }
  205. if strings.ToLower(line) == "clear" || strings.ToLower(line) == "cls" {
  206. fmt.Print("\033[2J")
  207. continue
  208. }
  209. seq = client.getSequence()
  210. if err = writeFrame(client.conn, newFrame(PacketTypeCommand, FlagComplete, seq, client.Timeout, []byte(line))); err != nil {
  211. break
  212. }
  213. client.liner.AppendHistory(line)
  214. client.waitResponse(seq, client.Timeout)
  215. }
  216. return
  217. }
  218. func (client *Client) Close() (err error) {
  219. if !atomic.CompareAndSwapInt32(&client.exitFlag, 0, 1) {
  220. return
  221. }
  222. close(client.exitChan)
  223. if client.conn != nil {
  224. err = client.conn.Close()
  225. }
  226. if client.liner != nil {
  227. err = client.liner.Close()
  228. }
  229. return
  230. }
  231. func NewClient(ctx context.Context, addr string) *Client {
  232. var (
  233. err error
  234. timeout time.Duration
  235. )
  236. if ctx == nil {
  237. ctx = context.Background()
  238. }
  239. duration := env.Get("VOX_TIMEOUT", "30s")
  240. if timeout, err = time.ParseDuration(duration); err != nil {
  241. timeout = time.Second * 30
  242. }
  243. return &Client{
  244. ctx: ctx,
  245. address: addr,
  246. name: filepath.Base(os.Args[0]),
  247. Timeout: timeout,
  248. liner: liner.NewLiner(),
  249. readyChan: make(chan struct{}, 1),
  250. exitChan: make(chan struct{}),
  251. commandChan: make(chan *Frame, 5),
  252. completerChan: make(chan *Frame, 5),
  253. }
  254. }