server.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package cli
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. byte2 "git.nspix.com/golang/micro/helper/pool/byte"
  6. "git.nspix.com/golang/micro/helper/utils"
  7. "net"
  8. "os"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. )
  15. const (
  16. MaxReadBufferLength = 64 * 1024
  17. EOL = "\n"
  18. )
  19. type HandleFunc func(ctx *Context) (err error)
  20. type Server struct {
  21. seq int32
  22. locker sync.RWMutex
  23. executor *Executor
  24. contextMap map[int32]*Context
  25. }
  26. func (svr *Server) writePack(conn net.Conn, packet *Frame) (err error) {
  27. if packet.Timestamp == 0 {
  28. packet.Timestamp = time.Now().Unix()
  29. }
  30. err = writeFrame(conn, packet)
  31. return
  32. }
  33. func (svr *Server) process(id int32, conn net.Conn) (err error) {
  34. var (
  35. buf []byte
  36. res *Response
  37. )
  38. buffer := byte2.Get(MaxReadBufferLength)
  39. defer func() {
  40. byte2.Put(buffer)
  41. svr.locker.Lock()
  42. delete(svr.contextMap, id)
  43. svr.locker.Unlock()
  44. err = conn.Close()
  45. }()
  46. for {
  47. reqPacket := &Frame{}
  48. if reqPacket, err = readFrame(conn); err != nil {
  49. break
  50. }
  51. switch reqPacket.Type {
  52. case PacketTypeCompleter:
  53. tokens := strings.Fields(strings.ToLower(strings.TrimSpace(string(reqPacket.Data))))
  54. cs := svr.executor.Completer(tokens...)
  55. if buf, err = json.Marshal(cs); err == nil {
  56. err = svr.writePack(conn, &Frame{Type: PacketTypeCompleter, Data: buf})
  57. }
  58. case PacketTypeEcho:
  59. vs := make(map[string]string)
  60. vs["os"] = runtime.GOOS
  61. vs["cid"] = fmt.Sprint(id)
  62. vs["login_at"] = time.Now().Format(time.RFC822)
  63. vs["client_addr"] = conn.RemoteAddr().String()
  64. vs["name"] = os.Getenv("MICRO_SERVICE_NAME")
  65. vs["version"] = os.Getenv("MICRO_SERVICE_VERSION")
  66. if buf, err = json.Marshal(vs); err == nil {
  67. err = svr.writePack(conn, &Frame{Type: PacketTypeEcho, Data: buf})
  68. }
  69. case PacketTypeData:
  70. tokens := strings.Fields(strings.TrimSpace(string(reqPacket.Data)))
  71. var ok bool
  72. var ctx *Context
  73. svr.locker.Lock()
  74. if ctx, ok = svr.contextMap[id]; !ok {
  75. ctx = &Context{
  76. ID: id,
  77. }
  78. ctx.Set("ID", id)
  79. svr.contextMap[id] = ctx
  80. }
  81. svr.locker.Unlock()
  82. ctx.reset(strings.TrimSpace(string(reqPacket.Data)))
  83. if res, err = svr.executor.Do(ctx, tokens...); err == nil {
  84. if res.Code == 0 {
  85. err = svr.writePack(conn, &Frame{Type: PacketTypeData, Data: res.Data})
  86. } else {
  87. err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: res.Error})
  88. }
  89. } else {
  90. err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: err.Error()})
  91. }
  92. default:
  93. return
  94. }
  95. }
  96. return
  97. }
  98. func (svr *Server) Handle(path string, cb HandleFunc) {
  99. var tokens []string
  100. if strings.HasPrefix(path, "/") {
  101. tokens = strings.Split(path[1:], "/")
  102. } else {
  103. tokens = utils.BreakUp(path)
  104. }
  105. svr.locker.Lock()
  106. defer svr.locker.Unlock()
  107. var (
  108. err error
  109. length int
  110. p *Executor
  111. q *Executor
  112. )
  113. length = len(tokens)
  114. p = svr.executor
  115. for i, token := range tokens {
  116. token = strings.TrimSpace(strings.ToLower(token))
  117. if token == ""{
  118. continue
  119. }
  120. if q, err = p.Children(token); err == nil {
  121. if i == length-1 {
  122. panic(path + " already exists")
  123. }
  124. p = q
  125. } else {
  126. q = NewExecutor(token, "", strings.Title(strings.Join(tokens, " ")))
  127. if i == length-1 {
  128. q.handleFunc = cb
  129. }
  130. p.Append(q)
  131. p = q
  132. }
  133. }
  134. }
  135. func (svr *Server) Serve(listener net.Listener) (err error) {
  136. var (
  137. conn net.Conn
  138. )
  139. svr.Handle("help", func(ctx *Context) (err error) {
  140. return ctx.Success(svr.executor.String())
  141. })
  142. for {
  143. if conn, err = listener.Accept(); err != nil {
  144. break
  145. }
  146. go func() {
  147. _ = svr.process(atomic.AddInt32(&svr.seq, 1), conn)
  148. }()
  149. }
  150. return
  151. }
  152. func (svr *Server) Append(child *Executor) {
  153. svr.executor.Append(child)
  154. }
  155. func New() *Server {
  156. return &Server{
  157. seq: 0,
  158. executor: NewExecutor("ROOT"),
  159. contextMap: make(map[int32]*Context),
  160. }
  161. }