server.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package cli
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "git.nspix.com/golang/micro/utils/bytepool"
  6. "git.nspix.com/golang/micro/utils/helper"
  7. "net"
  8. "os"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. )
  15. const (
  16. MaxReadBufferLength = 64 * 1024
  17. EOL = "\n"
  18. )
  19. type HandleFunc func(ctx *Context) (err error)
  20. type Server struct {
  21. seq int32
  22. locker sync.RWMutex
  23. executor *Executor
  24. contextMap map[int32]*Context
  25. }
  26. func (svr *Server) writePack(conn net.Conn, packet *Frame) (err error) {
  27. if packet.Timestamp == 0 {
  28. packet.Timestamp = time.Now().Unix()
  29. }
  30. err = writeFrame(conn, packet)
  31. return
  32. }
  33. func (svr *Server) process(id int32, conn net.Conn) (err error) {
  34. var (
  35. buf []byte
  36. res *Response
  37. )
  38. buffer := bytepool.Get(MaxReadBufferLength)
  39. defer func() {
  40. bytepool.Put(buffer)
  41. svr.locker.Lock()
  42. delete(svr.contextMap, id)
  43. svr.locker.Unlock()
  44. err = conn.Close()
  45. }()
  46. for {
  47. reqPacket := &Frame{}
  48. if reqPacket, err = readFrame(conn); err != nil {
  49. break
  50. }
  51. switch reqPacket.Type {
  52. case PacketTypeCompleter:
  53. tokens := strings.Fields(strings.ToLower(strings.TrimSpace(string(reqPacket.Data))))
  54. cs := svr.executor.Completer(tokens...)
  55. if buf, err = json.Marshal(cs); err == nil {
  56. err = svr.writePack(conn, &Frame{Type: PacketTypeCompleter, Data: buf})
  57. }
  58. case PacketTypeEcho:
  59. vs := make(map[string]string)
  60. vs["os"] = runtime.GOOS
  61. vs["cid"] = fmt.Sprint(id)
  62. vs["login_at"] = time.Now().Format(time.RFC822)
  63. vs["client_addr"] = conn.RemoteAddr().String()
  64. vs["name"] = os.Getenv("MICRO_SERVICE_NAME")
  65. vs["version"] = os.Getenv("MICRO_SERVICE_VERSION")
  66. if buf, err = json.Marshal(vs); err == nil {
  67. err = svr.writePack(conn, &Frame{Type: PacketTypeEcho, Data: buf})
  68. }
  69. case PacketTypeData:
  70. tokens := strings.Fields(strings.TrimSpace(string(reqPacket.Data)))
  71. var ok bool
  72. var ctx *Context
  73. svr.locker.Lock()
  74. if ctx, ok = svr.contextMap[id]; !ok {
  75. ctx = &Context{
  76. ID: id,
  77. }
  78. ctx.Set("ID", id)
  79. svr.contextMap[id] = ctx
  80. }
  81. svr.locker.Unlock()
  82. ctx.reset(strings.TrimSpace(string(reqPacket.Data)))
  83. if res, err = svr.executor.Do(ctx, tokens...); err == nil {
  84. if res.Code == 0 {
  85. err = svr.writePack(conn, &Frame{Type: PacketTypeData, Data: res.Data})
  86. } else {
  87. err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: res.Error})
  88. }
  89. } else {
  90. err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: err.Error()})
  91. }
  92. default:
  93. return
  94. }
  95. }
  96. return
  97. }
  98. func (svr *Server) Handle(path string, cb HandleFunc) {
  99. tokens := helper.BreakUp(path)
  100. svr.locker.Lock()
  101. defer svr.locker.Unlock()
  102. var (
  103. err error
  104. length int
  105. p *Executor
  106. q *Executor
  107. )
  108. length = len(tokens)
  109. p = svr.executor
  110. for i, token := range tokens {
  111. token = strings.TrimSpace(strings.ToLower(token))
  112. if q, err = p.Children(token); err == nil {
  113. if i == length-1 {
  114. panic(path + " already exists")
  115. }
  116. p = q
  117. } else {
  118. q = NewExecutor(token, "", strings.Title(strings.Join(tokens, " ")))
  119. if i == length-1 {
  120. q.handleFunc = cb
  121. }
  122. p.Append(q)
  123. p = q
  124. }
  125. }
  126. }
  127. func (svr *Server) Serve(listener net.Listener) (err error) {
  128. var (
  129. conn net.Conn
  130. )
  131. svr.Handle("help", func(ctx *Context) (err error) {
  132. return ctx.Success(svr.executor.String())
  133. })
  134. for {
  135. if conn, err = listener.Accept(); err != nil {
  136. break
  137. }
  138. go func() {
  139. _ = svr.process(atomic.AddInt32(&svr.seq, 1), conn)
  140. }()
  141. }
  142. return
  143. }
  144. func (svr *Server) Append(child *Executor) {
  145. svr.executor.Append(child)
  146. }
  147. func New() *Server {
  148. return &Server{
  149. seq: 0,
  150. executor: NewExecutor("ROOT"),
  151. contextMap: make(map[int32]*Context),
  152. }
  153. }