server.go 4.8 KB


  1. package cli
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "net"
  8. "path"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. "git.nspix.com/golang/kos/util/env"
  15. "github.com/sourcegraph/conc"
  16. )
  17. var (
  18. ctxPool sync.Pool
  19. )
  20. type Server struct {
  21. ctx context.Context
  22. sequenceLocker sync.Mutex
  23. sequence int64
  24. ctxMap sync.Map
  25. waitGroup conc.WaitGroup
  26. middleware []Middleware
  27. router *Router
  28. l net.Listener
  29. exitFlag int32
  30. }
  31. func (svr *Server) applyContext() *Context {
  32. if v := ctxPool.Get(); v != nil {
  33. if ctx, ok := v.(*Context); ok {
  34. return ctx
  35. }
  36. }
  37. return &Context{}
  38. }
  39. func (svr *Server) releaseContext(ctx *Context) {
  40. ctxPool.Put(ctx)
  41. }
  42. func (svr *Server) execute(ctx *Context, frame *Frame) (err error) {
  43. var (
  44. params map[string]string
  45. tokens []string
  46. args []string
  47. r *Router
  48. )
  49. cmd := string(frame.Data)
  50. tokens = strings.Fields(cmd)
  51. if frame.Timeout > 0 {
  52. childCtx, cancelFunc := context.WithTimeout(svr.ctx, time.Duration(frame.Timeout))
  53. ctx.setContext(childCtx)
  54. defer func() {
  55. cancelFunc()
  56. }()
  57. } else {
  58. ctx.setContext(svr.ctx)
  59. }
  60. if r, args, err = svr.router.Lookup(tokens); err != nil {
  61. if errors.Is(err, ErrNotFound) {
  62. err = ctx.Error(errNotFound, fmt.Sprintf("Command %s not found", cmd))
  63. } else {
  64. err = ctx.Error(errExecuteFailed, err.Error())
  65. }
  66. } else {
  67. if len(r.params) > len(args) {
  68. err = ctx.Error(errExecuteFailed, r.Usage())
  69. return
  70. }
  71. if len(r.params) > 0 {
  72. params = make(map[string]string)
  73. for i, s := range r.params {
  74. params[s] = args[i]
  75. }
  76. }
  77. ctx.setArgs(args)
  78. ctx.setParam(params)
  79. err = r.command.Handle(ctx)
  80. }
  81. return
  82. }
  83. func (svr *Server) nextSequence() int64 {
  84. svr.sequenceLocker.Lock()
  85. defer svr.sequenceLocker.Unlock()
  86. if svr.sequence >= math.MaxInt64 {
  87. svr.sequence = 1
  88. }
  89. svr.sequence++
  90. return svr.sequence
  91. }
  92. func (svr *Server) process(conn net.Conn) {
  93. var (
  94. err error
  95. ctx *Context
  96. frame *Frame
  97. )
  98. ctx = svr.applyContext()
  99. ctx.reset(svr.nextSequence(), conn)
  100. svr.ctxMap.Store(ctx.Id, ctx)
  101. defer func() {
  102. _ = conn.Close()
  103. svr.ctxMap.Delete(ctx.Id)
  104. svr.releaseContext(ctx)
  105. }()
  106. for {
  107. if frame, err = readFrame(conn); err != nil {
  108. break
  109. }
  110. //reset frame
  111. ctx.seq = frame.Seq
  112. switch frame.Type {
  113. case PacketTypeHandshake:
  114. if err = ctx.send(responsePayload{
  115. Type: PacketTypeHandshake,
  116. Data: &Info{
  117. ID: ctx.Id,
  118. Name: env.Get("VOX_NAME", ""),
  119. Version: env.Get("VOX_VERSION", ""),
  120. OS: runtime.GOOS,
  121. ServerTime: time.Now(),
  122. RemoteAddr: conn.RemoteAddr().String(),
  123. },
  124. }); err != nil {
  125. break
  126. }
  127. case PacketTypeCompleter:
  128. if err = ctx.send(responsePayload{
  129. Type: PacketTypeCompleter,
  130. Data: svr.router.Completer(strings.Fields(string(frame.Data))...),
  131. }); err != nil {
  132. break
  133. }
  134. case PacketTypeCommand:
  135. if err = svr.execute(ctx, frame); err != nil {
  136. break
  137. }
  138. default:
  139. break
  140. }
  141. }
  142. }
  143. func (svr *Server) serve() {
  144. for {
  145. conn, err := svr.l.Accept()
  146. if err != nil {
  147. break
  148. }
  149. svr.waitGroup.Go(func() {
  150. svr.process(conn)
  151. })
  152. }
  153. }
  154. func (svr *Server) wrapHandle(pathname, desc string, cb HandleFunc, middleware ...Middleware) Command {
  155. h := func(ctx *Context) (err error) {
  156. for i := len(svr.middleware) - 1; i >= 0; i-- {
  157. cb = svr.middleware[i](cb)
  158. }
  159. for i := len(middleware) - 1; i >= 0; i-- {
  160. cb = middleware[i](cb)
  161. }
  162. return cb(ctx)
  163. }
  164. if desc == "" {
  165. desc = strings.Join(strings.Split(strings.TrimPrefix(pathname, "/"), "/"), " ")
  166. }
  167. return Command{
  168. Path: pathname,
  169. Handle: h,
  170. Description: desc,
  171. }
  172. }
  173. func (svr *Server) Use(middleware ...Middleware) {
  174. svr.middleware = append(svr.middleware, middleware...)
  175. }
  176. func (svr *Server) Group(prefix string, commands []Command, middleware ...Middleware) {
  177. for _, cmd := range commands {
  178. svr.Handle(path.Join(prefix, cmd.Path), cmd.Description, cmd.Handle, middleware...)
  179. }
  180. }
  181. func (svr *Server) Handle(pathname string, desc string, cb HandleFunc, middleware ...Middleware) {
  182. svr.router.Handle(pathname, svr.wrapHandle(pathname, desc, cb, middleware...))
  183. }
  184. func (svr *Server) Serve(l net.Listener) (err error) {
  185. svr.l = l
  186. svr.Handle("/help", "Display help information", func(ctx *Context) (err error) {
  187. return ctx.Success(svr.router.String())
  188. })
  189. svr.serve()
  190. atomic.StoreInt32(&svr.exitFlag, 0)
  191. return
  192. }
  193. func (svr *Server) Shutdown() (err error) {
  194. if !atomic.CompareAndSwapInt32(&svr.exitFlag, 0, 1) {
  195. return
  196. }
  197. err = svr.l.Close()
  198. svr.ctxMap.Range(func(key, value any) bool {
  199. if ctx, ok := value.(*Context); ok {
  200. err = ctx.Close()
  201. }
  202. return true
  203. })
  204. svr.waitGroup.Wait()
  205. return
  206. }
  207. func New(ctx context.Context) *Server {
  208. return &Server{
  209. ctx: ctx,
  210. router: newRouter(""),
  211. middleware: make([]Middleware, 0, 10),
  212. }
  213. }