server.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package cli
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "net"
  8. "path"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "time"
  13. "git.nspix.com/golang/kos/util/env"
  14. "github.com/sourcegraph/conc"
  15. )
  16. var (
  17. ctxPool sync.Pool
  18. )
  19. type Server struct {
  20. ctx context.Context
  21. sequenceLocker sync.Mutex
  22. sequence int64
  23. ctxMap sync.Map
  24. waitGroup conc.WaitGroup
  25. middleware []Middleware
  26. router *Router
  27. l net.Listener
  28. }
  29. func (svr *Server) applyContext() *Context {
  30. if v := ctxPool.Get(); v != nil {
  31. if ctx, ok := v.(*Context); ok {
  32. return ctx
  33. }
  34. }
  35. return &Context{}
  36. }
  37. func (svr *Server) releaseContext(ctx *Context) {
  38. ctxPool.Put(ctx)
  39. }
  40. func (svr *Server) handle(ctx *Context, frame *Frame) {
  41. var (
  42. err error
  43. params map[string]string
  44. tokens []string
  45. args []string
  46. r *Router
  47. )
  48. cmd := string(frame.Data)
  49. tokens = strings.Fields(cmd)
  50. if r, args, err = svr.router.Lookup(tokens); err != nil {
  51. if errors.Is(err, ErrNotFound) {
  52. err = ctx.Error(errNotFound, fmt.Sprintf("Command %s not found", cmd))
  53. } else {
  54. err = ctx.Error(errExecuteFailed, err.Error())
  55. }
  56. } else {
  57. if len(r.params) > len(args) {
  58. err = ctx.Error(errExecuteFailed, r.Usage())
  59. return
  60. }
  61. if len(r.params) > 0 {
  62. params = make(map[string]string)
  63. for i, s := range r.params {
  64. params[s] = args[i]
  65. }
  66. }
  67. ctx.setArgs(args)
  68. ctx.setParam(params)
  69. err = r.command.Handle(ctx)
  70. }
  71. }
  72. func (svr *Server) nextSequence() int64 {
  73. svr.sequenceLocker.Lock()
  74. defer svr.sequenceLocker.Unlock()
  75. if svr.sequence >= math.MaxInt64 {
  76. svr.sequence = 1
  77. }
  78. svr.sequence++
  79. return svr.sequence
  80. }
  81. func (svr *Server) process(conn net.Conn) {
  82. var (
  83. err error
  84. ctx *Context
  85. frame *Frame
  86. )
  87. ctx = svr.applyContext()
  88. ctx.reset(svr.nextSequence(), conn)
  89. svr.ctxMap.Store(ctx.Id, ctx)
  90. defer func() {
  91. _ = conn.Close()
  92. svr.ctxMap.Delete(ctx.Id)
  93. svr.releaseContext(ctx)
  94. }()
  95. for {
  96. if frame, err = readFrame(conn); err != nil {
  97. break
  98. }
  99. //reset frame
  100. ctx.seq = frame.Seq
  101. switch frame.Type {
  102. case PacketTypeHandshake:
  103. if err = ctx.send(responsePayload{
  104. Type: PacketTypeHandshake,
  105. Data: &Info{
  106. ID: ctx.Id,
  107. Name: env.Get("VOX_NAME", ""),
  108. Version: env.Get("VOX_VERSION", ""),
  109. OS: runtime.GOOS,
  110. ServerTime: time.Now(),
  111. RemoteAddr: conn.RemoteAddr().String(),
  112. },
  113. }); err != nil {
  114. break
  115. }
  116. case PacketTypeCompleter:
  117. if err = ctx.send(responsePayload{
  118. Type: PacketTypeCompleter,
  119. Data: svr.router.Completer(strings.Fields(string(frame.Data))...),
  120. }); err != nil {
  121. break
  122. }
  123. case PacketTypeCommand:
  124. svr.handle(ctx, frame)
  125. default:
  126. break
  127. }
  128. }
  129. }
  130. func (svr *Server) serve() {
  131. for {
  132. conn, err := svr.l.Accept()
  133. if err != nil {
  134. break
  135. }
  136. svr.waitGroup.Go(func() {
  137. svr.process(conn)
  138. })
  139. }
  140. }
  141. func (svr *Server) wrapHandle(pathname, desc string, cb HandleFunc, middleware ...Middleware) Command {
  142. h := func(ctx *Context) (err error) {
  143. for i := len(svr.middleware) - 1; i >= 0; i-- {
  144. cb = svr.middleware[i](cb)
  145. }
  146. for i := len(middleware) - 1; i >= 0; i-- {
  147. cb = middleware[i](cb)
  148. }
  149. return cb(ctx)
  150. }
  151. if desc == "" {
  152. desc = strings.Join(strings.Split(strings.TrimPrefix(pathname, "/"), "/"), " ")
  153. }
  154. return Command{
  155. Path: pathname,
  156. Handle: h,
  157. Description: desc,
  158. }
  159. }
  160. func (svr *Server) Use(middleware ...Middleware) {
  161. svr.middleware = append(svr.middleware, middleware...)
  162. }
  163. func (svr *Server) Group(prefix string, commands []Command, middleware ...Middleware) {
  164. for _, cmd := range commands {
  165. svr.Handle(path.Join(prefix, cmd.Path), cmd.Description, cmd.Handle, middleware...)
  166. }
  167. }
  168. func (svr *Server) Handle(pathname string, desc string, cb HandleFunc, middleware ...Middleware) {
  169. svr.router.Handle(pathname, svr.wrapHandle(pathname, desc, cb, middleware...))
  170. }
  171. func (svr *Server) Serve(l net.Listener) (err error) {
  172. svr.l = l
  173. svr.Handle("/help", "Display help information", func(ctx *Context) (err error) {
  174. return ctx.Success(svr.router.String())
  175. })
  176. svr.serve()
  177. return
  178. }
  179. func (svr *Server) Shutdown() (err error) {
  180. err = svr.l.Close()
  181. svr.ctxMap.Range(func(key, value any) bool {
  182. if ctx, ok := value.(*Context); ok {
  183. err = ctx.Close()
  184. }
  185. return true
  186. })
  187. svr.waitGroup.Wait()
  188. return
  189. }
  190. func New(ctx context.Context) *Server {
  191. return &Server{
  192. ctx: ctx,
  193. router: newRouter(""),
  194. middleware: make([]Middleware, 0, 10),
  195. }
  196. }