server.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package cli
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "git.nspix.com/golang/kos/util/env"
  7. "github.com/sourcegraph/conc"
  8. "net"
  9. "path"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. )
  16. var (
  17. ctxPool sync.Pool
  18. )
  19. type Server struct {
  20. ctx context.Context
  21. sequence int64
  22. ctxMap sync.Map
  23. waitGroup conc.WaitGroup
  24. middleware []Middleware
  25. router *Router
  26. l net.Listener
  27. }
  28. func (svr *Server) applyContext() *Context {
  29. if v := ctxPool.Get(); v != nil {
  30. if ctx, ok := v.(*Context); ok {
  31. return ctx
  32. }
  33. }
  34. return &Context{}
  35. }
  36. func (svr *Server) releaseContext(ctx *Context) {
  37. ctxPool.Put(ctx)
  38. }
  39. func (svr *Server) handle(ctx *Context, frame *Frame) {
  40. var (
  41. err error
  42. params map[string]string
  43. tokens []string
  44. args []string
  45. r *Router
  46. )
  47. cmd := string(frame.Data)
  48. tokens = strings.Fields(cmd)
  49. if r, args, err = svr.router.Lookup(tokens); err != nil {
  50. if errors.Is(err, ErrNotFound) {
  51. err = ctx.Error(errNotFound, fmt.Sprintf("Command %s not found", cmd))
  52. } else {
  53. err = ctx.Error(errExecuteFailed, err.Error())
  54. }
  55. } else {
  56. if len(r.params) > len(args) {
  57. err = ctx.Error(errExecuteFailed, r.Usage())
  58. return
  59. }
  60. if len(r.params) > 0 {
  61. params = make(map[string]string)
  62. for i, s := range r.params {
  63. params[s] = args[i]
  64. }
  65. }
  66. ctx.setArgs(args)
  67. ctx.setParam(params)
  68. err = r.command.Handle(ctx)
  69. }
  70. }
  71. func (svr *Server) process(conn net.Conn) {
  72. var (
  73. err error
  74. ctx *Context
  75. frame *Frame
  76. )
  77. ctx = svr.applyContext()
  78. ctx.reset(atomic.AddInt64(&svr.sequence, 1), conn)
  79. svr.ctxMap.Store(ctx.Id, ctx)
  80. defer func() {
  81. _ = conn.Close()
  82. svr.ctxMap.Delete(ctx.Id)
  83. svr.releaseContext(ctx)
  84. }()
  85. for {
  86. if frame, err = readFrame(conn); err != nil {
  87. break
  88. }
  89. //reset frame
  90. ctx.seq = frame.Seq
  91. switch frame.Type {
  92. case PacketTypeHandshake:
  93. if err = ctx.send(responsePayload{
  94. Type: PacketTypeHandshake,
  95. Data: &Info{
  96. ID: ctx.Id,
  97. Name: env.Get("VOX_NAME", ""),
  98. Version: env.Get("VOX_VERSION", ""),
  99. OS: runtime.GOOS,
  100. ServerTime: time.Now(),
  101. RemoteAddr: conn.RemoteAddr().String(),
  102. },
  103. }); err != nil {
  104. break
  105. }
  106. case PacketTypeCompleter:
  107. if err = ctx.send(responsePayload{
  108. Type: PacketTypeCompleter,
  109. Data: svr.router.Completer(strings.Fields(string(frame.Data))...),
  110. }); err != nil {
  111. break
  112. }
  113. case PacketTypeCommand:
  114. svr.handle(ctx, frame)
  115. default:
  116. break
  117. }
  118. }
  119. }
  120. func (svr *Server) serve() {
  121. for {
  122. conn, err := svr.l.Accept()
  123. if err != nil {
  124. break
  125. }
  126. svr.waitGroup.Go(func() {
  127. svr.process(conn)
  128. })
  129. }
  130. }
  131. func (svr *Server) wrapHandle(pathname, desc string, cb HandleFunc, middleware ...Middleware) Command {
  132. h := func(ctx *Context) (err error) {
  133. for i := len(svr.middleware) - 1; i >= 0; i-- {
  134. cb = svr.middleware[i](cb)
  135. }
  136. for i := len(middleware) - 1; i >= 0; i-- {
  137. cb = middleware[i](cb)
  138. }
  139. return cb(ctx)
  140. }
  141. if desc == "" {
  142. desc = strings.Join(strings.Split(strings.TrimPrefix(pathname, "/"), "/"), " ")
  143. }
  144. return Command{
  145. Path: pathname,
  146. Handle: h,
  147. Description: desc,
  148. }
  149. }
  150. func (svr *Server) Use(middleware ...Middleware) {
  151. svr.middleware = append(svr.middleware, middleware...)
  152. }
  153. func (svr *Server) Group(prefix string, commands []Command, middleware ...Middleware) {
  154. for _, cmd := range commands {
  155. svr.Handle(path.Join(prefix, cmd.Path), cmd.Description, cmd.Handle, middleware...)
  156. }
  157. }
  158. func (svr *Server) Handle(pathname string, desc string, cb HandleFunc, middleware ...Middleware) {
  159. svr.router.Handle(pathname, svr.wrapHandle(pathname, desc, cb, middleware...))
  160. }
  161. func (svr *Server) Serve(l net.Listener) (err error) {
  162. svr.l = l
  163. svr.Handle("/help", "Display help information", func(ctx *Context) (err error) {
  164. return ctx.Success(svr.router.String())
  165. })
  166. svr.serve()
  167. return
  168. }
  169. func (svr *Server) Shutdown() (err error) {
  170. err = svr.l.Close()
  171. svr.ctxMap.Range(func(key, value any) bool {
  172. if ctx, ok := value.(*Context); ok {
  173. err = ctx.Close()
  174. }
  175. return true
  176. })
  177. svr.waitGroup.Wait()
  178. return
  179. }
  180. func New(ctx context.Context) *Server {
  181. return &Server{
  182. ctx: ctx,
  183. router: newRouter(""),
  184. middleware: make([]Middleware, 0, 10),
  185. }
  186. }