client.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package cli
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "net"
  9. "os"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "time"
  14. "git.nspix.com/golang/micro/helper/net/ip"
  15. bytepkg "git.nspix.com/golang/micro/helper/pool/byte"
  16. "github.com/peterh/liner"
  17. )
  18. type Client struct {
  19. seq uint16
  20. appName string
  21. conn net.Conn
  22. context context.Context
  23. readyChan chan struct{}
  24. completerChan chan *Frame
  25. responseChan chan *Frame
  26. mutex sync.Mutex
  27. }
  28. func (client *Client) writePack(packet *Frame) (err error) {
  29. if packet.Timestamp == 0 {
  30. packet.Timestamp = time.Now().Unix()
  31. }
  32. client.mutex.Lock()
  33. client.seq++
  34. packet.Seq = client.seq
  35. client.mutex.Unlock()
  36. err = writeFrame(client.conn, packet)
  37. return
  38. }
  39. func (client *Client) stdout(args ...interface{}) {
  40. fmt.Fprint(os.Stdout, args...)
  41. }
  42. func (client *Client) rdyLoop(c chan error) {
  43. var (
  44. err error
  45. buf []byte
  46. )
  47. buf = bytepkg.Get(MaxReadBufferLength)
  48. defer func() {
  49. bytepkg.Put(buf)
  50. }()
  51. for {
  52. packet := &Frame{}
  53. if packet, err = nextFrame(client.conn); err != nil {
  54. c <- err
  55. break
  56. }
  57. switch packet.Type {
  58. case PacketTypeCompleter:
  59. client.completerChan <- packet
  60. case PacketTypeEcho:
  61. vs := make(map[string]string)
  62. if err = json.Unmarshal(packet.Data, &vs); err == nil {
  63. time.Sleep(time.Millisecond * 200)
  64. client.stdout("\u001B[2K")
  65. client.stdout("\u001B[0G")
  66. client.appName = vs["name"]
  67. client.stdout(fmt.Sprintf("Welcome to the %s monitor.", vs["name"]), EOL)
  68. client.stdout(fmt.Sprintf("Server Version: %s, os: %s, ip: %s", vs["version"], runtime.GOOS, ip.InternalIP()), EOL)
  69. client.stdout(fmt.Sprintf("Last login: %s from %s@%s", vs["login_at"], vs["client_addr"], vs["cid"]), EOL)
  70. client.stdout("Type 'help;' for help. Type 'cls' to clear the current input statement.", EOL)
  71. select {
  72. case client.readyChan <- struct{}{}:
  73. default:
  74. }
  75. }
  76. case PacketTypeData:
  77. select {
  78. case client.responseChan <- packet:
  79. case <-time.After(time.Second):
  80. }
  81. }
  82. }
  83. select {
  84. case c <- err:
  85. case <-client.context.Done():
  86. }
  87. }
  88. func (client *Client) waitResponse(ctx context.Context, seq uint16) (err error) {
  89. childCtx, cancelFunc := context.WithTimeout(ctx, time.Second*30)
  90. defer func() {
  91. cancelFunc()
  92. }()
  93. for {
  94. select {
  95. case <-childCtx.Done():
  96. err = client.context.Err()
  97. return
  98. case res, ok := <-client.responseChan:
  99. if !ok {
  100. break
  101. }
  102. if res.Seq == seq {
  103. if res.Error != "" {
  104. client.stdout(res.Error, EOL)
  105. } else {
  106. client.stdout(string(bytes.Trim(res.Data, "\r\n")))
  107. }
  108. }
  109. if res.Flag == FlagComplete {
  110. client.stdout(EOL)
  111. return
  112. }
  113. }
  114. }
  115. }
  116. func (client *Client) interactive(c chan error) {
  117. var (
  118. err error
  119. line string
  120. state *liner.State
  121. )
  122. state = liner.NewLiner()
  123. state.SetCompleter(client.CompleterHandleFunc)
  124. select {
  125. case <-client.readyChan:
  126. case <-client.context.Done():
  127. return
  128. }
  129. defer func() {
  130. _ = state.Close()
  131. c <- err
  132. }()
  133. for {
  134. if line, err = state.Prompt(client.appName + "> "); err != nil {
  135. break
  136. }
  137. line = strings.TrimSpace(line)
  138. if line == "" {
  139. continue
  140. }
  141. if strings.ToLower(line) == "exit" || strings.ToLower(line) == "quit" {
  142. client.stdout("Bye Bye", EOL)
  143. return
  144. }
  145. if strings.ToLower(line) == "clear" || strings.ToLower(line) == "cls" {
  146. client.stdout("\033[2J")
  147. continue
  148. }
  149. reqFrame := newFrame(PacketTypeData, FlagComplete, []byte(line))
  150. if err = client.writePack(reqFrame); err != nil {
  151. break
  152. }
  153. state.AppendHistory(line)
  154. client.waitResponse(client.context, reqFrame.Seq)
  155. }
  156. }
  157. func (client *Client) CompleterHandleFunc(str string) (ss []string) {
  158. var err error
  159. if err = client.writePack(newFrame(PacketTypeCompleter, FlagComplete, []byte(str))); err != nil {
  160. return
  161. }
  162. select {
  163. case <-time.After(time.Second * 5):
  164. return nil
  165. case <-client.context.Done():
  166. return nil
  167. case resp, ok := <-client.completerChan:
  168. if ok {
  169. ss = make([]string, 0)
  170. err = json.Unmarshal(resp.Data, &ss)
  171. }
  172. }
  173. return
  174. }
  175. func (client *Client) Run() (err error) {
  176. var (
  177. errChan chan error
  178. )
  179. errChan = make(chan error)
  180. defer func() {
  181. _ = client.conn.Close()
  182. }()
  183. if err = client.writePack(newFrame(PacketTypeEcho, FlagComplete, nil)); err != nil {
  184. return
  185. }
  186. client.stdout("connecting")
  187. go client.rdyLoop(errChan)
  188. go client.interactive(errChan)
  189. err = <-errChan
  190. return
  191. }
  192. func ExecuteContext(ctx context.Context, conn net.Conn, cmd string) (s string, err error) {
  193. if ctx == nil {
  194. ctx = context.Background()
  195. }
  196. cli := &Client{
  197. conn: conn,
  198. context: ctx,
  199. readyChan: make(chan struct{}),
  200. completerChan: make(chan *Frame),
  201. responseChan: make(chan *Frame, 1),
  202. }
  203. if err = cli.writePack(&Frame{
  204. Type: PacketTypeEcho,
  205. }); err != nil {
  206. return
  207. }
  208. frame := &Frame{}
  209. if frame, err = nextFrame(conn); err != nil {
  210. return
  211. }
  212. if err = cli.writePack(newFrame(PacketTypeData, FlagComplete, []byte(cmd))); err != nil {
  213. return
  214. }
  215. if frame, err = nextFrame(conn); err != nil {
  216. return
  217. }
  218. if frame.Error != "" {
  219. err = errors.New(frame.Error)
  220. } else {
  221. s = string(frame.Data)
  222. }
  223. return
  224. }
  225. func OpenInteractive(ctx context.Context, conn net.Conn) (err error) {
  226. if ctx == nil {
  227. ctx = context.Background()
  228. }
  229. cli := &Client{
  230. conn: conn,
  231. context: ctx,
  232. readyChan: make(chan struct{}),
  233. completerChan: make(chan *Frame),
  234. responseChan: make(chan *Frame, 1),
  235. }
  236. err = cli.Run()
  237. return
  238. }