client.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package cli
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "git.nspix.com/golang/micro/utils/bytepool"
  8. "git.nspix.com/golang/micro/utils/unsafestr"
  9. "github.com/peterh/liner"
  10. "net"
  11. "os"
  12. "strings"
  13. "time"
  14. )
  15. type Client struct {
  16. appName string
  17. conn net.Conn
  18. context context.Context
  19. readyChan chan struct{}
  20. completerChan chan *Frame
  21. responseChan chan *Frame
  22. }
  23. func (client *Client) writePack(packet *Frame) (err error) {
  24. if packet.Timestamp == 0 {
  25. packet.Timestamp = time.Now().Unix()
  26. }
  27. err = writeFrame(client.conn, packet)
  28. return
  29. }
  30. func (client *Client) stdout(args ...interface{}) {
  31. fmt.Fprint(os.Stdout, args...)
  32. }
  33. func (client *Client) rdyLoop(c chan error) {
  34. var (
  35. err error
  36. buf []byte
  37. )
  38. buf = bytepool.Get(MaxReadBufferLength)
  39. defer func() {
  40. bytepool.Put(buf)
  41. }()
  42. for {
  43. packet := &Frame{}
  44. if packet, err = readFrame(client.conn); err != nil {
  45. c <- err
  46. break
  47. }
  48. switch packet.Type {
  49. case PacketTypeCompleter:
  50. client.completerChan <- packet
  51. case PacketTypeEcho:
  52. vs := make(map[string]string)
  53. if err = json.Unmarshal(packet.Data, &vs); err == nil {
  54. time.Sleep(time.Millisecond * 200)
  55. client.stdout("\u001B[2K")
  56. client.stdout("\u001B[0G")
  57. client.appName = vs["name"]
  58. client.stdout(fmt.Sprintf("Welcome to the %s monitor.", vs["name"]), EOL)
  59. client.stdout(fmt.Sprintf("Server Version: %s, connection id is %s", vs["version"], vs["cid"]), EOL)
  60. client.stdout(fmt.Sprintf("Last login: %s from %s", vs["login_at"], vs["client_addr"]), EOL)
  61. client.stdout("Type 'help;' for help. Type 'cls' to clear the current input statement.", EOL)
  62. select {
  63. case client.readyChan <- struct{}{}:
  64. default:
  65. }
  66. }
  67. case PacketTypeData:
  68. client.responseChan <- packet
  69. }
  70. }
  71. select {
  72. case c <- err:
  73. case <-client.context.Done():
  74. }
  75. }
  76. func (client *Client) interactive(c chan error) {
  77. var (
  78. err error
  79. line string
  80. state *liner.State
  81. )
  82. state = liner.NewLiner()
  83. state.SetCompleter(client.CompleterHandleFunc)
  84. select {
  85. case <-client.readyChan:
  86. case <-client.context.Done():
  87. return
  88. }
  89. defer func() {
  90. _ = state.Close()
  91. c <- err
  92. }()
  93. for {
  94. if line, err = state.Prompt(client.appName + "> "); err != nil {
  95. break
  96. }
  97. line = strings.TrimSpace(line)
  98. if line == "" {
  99. continue
  100. }
  101. if strings.ToLower(line) == "exit" || strings.ToLower(line) == "quit" {
  102. client.stdout("Bye Bye", EOL)
  103. return
  104. }
  105. if strings.ToLower(line) == "clear" || strings.ToLower(line) == "cls" {
  106. client.stdout("\033[2J")
  107. continue
  108. }
  109. if err = client.writePack(&Frame{Type: PacketTypeData, Data: []byte(line)}); err != nil {
  110. break
  111. }
  112. state.AppendHistory(line)
  113. select {
  114. case <-client.context.Done():
  115. err = client.context.Err()
  116. return
  117. case <-time.After(time.Second * 30):
  118. client.stdout("request timeout", EOL)
  119. case res, ok := <-client.responseChan:
  120. if !ok {
  121. break
  122. }
  123. if res.Error != "" {
  124. client.stdout(res.Error, EOL)
  125. } else {
  126. client.stdout(string(bytes.Trim(res.Data, "\r\n")), EOL)
  127. }
  128. }
  129. }
  130. }
  131. func (client *Client) CompleterHandleFunc(str string) (ss []string) {
  132. var err error
  133. if err = client.writePack(&Frame{
  134. Type: PacketTypeCompleter,
  135. Data: unsafestr.StringToBytes(str),
  136. }); err != nil {
  137. return
  138. }
  139. select {
  140. case <-time.After(time.Second * 5):
  141. return nil
  142. case <-client.context.Done():
  143. return nil
  144. case resp, ok := <-client.completerChan:
  145. if ok {
  146. ss = make([]string, 0)
  147. err = json.Unmarshal(resp.Data, &ss)
  148. }
  149. }
  150. return
  151. }
  152. func (client *Client) Run() (err error) {
  153. var (
  154. errChan chan error
  155. )
  156. errChan = make(chan error)
  157. defer func() {
  158. _ = client.conn.Close()
  159. }()
  160. if err = client.writePack(&Frame{
  161. Type: PacketTypeEcho,
  162. }); err != nil {
  163. return
  164. }
  165. client.stdout("connecting")
  166. go client.rdyLoop(errChan)
  167. go client.interactive(errChan)
  168. err = <-errChan
  169. return
  170. }
  171. func OpenInteractive(ctx context.Context, conn net.Conn) (err error) {
  172. if ctx == nil {
  173. ctx = context.Background()
  174. }
  175. cli := &Client{
  176. conn: conn,
  177. context: ctx,
  178. readyChan: make(chan struct{}),
  179. completerChan: make(chan *Frame),
  180. responseChan: make(chan *Frame, 1),
  181. }
  182. err = cli.Run()
  183. return
  184. }