server.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package cli
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "git.nspix.com/golang/micro/utils/bytepool"
  6. "net"
  7. "os"
  8. "runtime"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. const (
  15. MaxReadBufferLength = 64 * 1024
  16. EOL = "\n"
  17. )
  18. type Server struct {
  19. seq int32
  20. locker sync.RWMutex
  21. executor *Executor
  22. contextMap map[int32]*Context
  23. }
  24. func (svr *Server) writePack(conn net.Conn, packet *Frame) (err error) {
  25. if packet.Timestamp == 0 {
  26. packet.Timestamp = time.Now().Unix()
  27. }
  28. err = writeFrame(conn, packet)
  29. return
  30. }
  31. func (svr *Server) process(id int32, conn net.Conn) (err error) {
  32. var (
  33. buf []byte
  34. )
  35. buffer := bytepool.Get(MaxReadBufferLength)
  36. defer func() {
  37. bytepool.Put(buffer)
  38. svr.locker.Lock()
  39. delete(svr.contextMap, id)
  40. svr.locker.Unlock()
  41. err = conn.Close()
  42. }()
  43. for {
  44. reqPacket := &Frame{}
  45. if reqPacket, err = readFrame(conn); err != nil {
  46. break
  47. }
  48. switch reqPacket.Type {
  49. case PacketTypeCompleter:
  50. tokens := strings.Fields(strings.ToLower(strings.TrimSpace(string(reqPacket.Data))))
  51. cs := svr.executor.Completer(tokens...)
  52. if buf, err = json.Marshal(cs); err == nil {
  53. err = svr.writePack(conn, &Frame{Type: PacketTypeCompleter, Data: buf})
  54. }
  55. case PacketTypeEcho:
  56. vs := make(map[string]string)
  57. vs["os"] = runtime.GOOS
  58. vs["cid"] = fmt.Sprint(id)
  59. vs["login_at"] = time.Now().Format(time.RFC822)
  60. vs["client_addr"] = conn.RemoteAddr().String()
  61. vs["name"] = os.Getenv("MICRO_SERVICE_NAME")
  62. vs["version"] = os.Getenv("MICRO_SERVICE_VERSION")
  63. if buf, err = json.Marshal(vs); err == nil {
  64. err = svr.writePack(conn, &Frame{Type: PacketTypeEcho, Data: buf})
  65. }
  66. case PacketTypeData:
  67. tokens := strings.Fields(strings.TrimSpace(string(reqPacket.Data)))
  68. var ok bool
  69. var ctx *Context
  70. svr.locker.Lock()
  71. if ctx, ok = svr.contextMap[id]; !ok {
  72. ctx = &Context{
  73. ID: id,
  74. }
  75. ctx.Set("ID", id)
  76. svr.contextMap[id] = ctx
  77. }
  78. svr.locker.Unlock()
  79. ctx.CmdStr = strings.TrimSpace(string(reqPacket.Data))
  80. if buf, err = svr.executor.Do(ctx, tokens...); err == nil {
  81. err = svr.writePack(conn, &Frame{Type: PacketTypeData, Data: buf})
  82. } else {
  83. err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: err.Error()})
  84. }
  85. default:
  86. return
  87. }
  88. }
  89. return
  90. }
  91. func (svr *Server) Serve(listener net.Listener) (err error) {
  92. var (
  93. conn net.Conn
  94. )
  95. svr.executor.Append(NewExecutor("help", "help", "Display this help").WithHandle(func(ctx *Context) []byte {
  96. return []byte(svr.executor.String())
  97. }))
  98. for {
  99. if conn, err = listener.Accept(); err != nil {
  100. break
  101. }
  102. go func() {
  103. _ = svr.process(atomic.AddInt32(&svr.seq, 1), conn)
  104. }()
  105. }
  106. return
  107. }
  108. func (svr *Server) Append(child *Executor) {
  109. svr.executor.Append(child)
  110. }
  111. func New() *Server {
  112. return &Server{
  113. seq: 0,
  114. executor: NewExecutor("ROOT"),
  115. contextMap: make(map[int32]*Context),
  116. }
  117. }