package cli import ( "context" "encoding/json" "fmt" "git.nspix.com/golang/micro/helper/pool/bytepool" "git.nspix.com/golang/micro/helper/utils" "git.nspix.com/golang/micro/log" "net" "os" "path" "runtime" "strings" "sync" "sync/atomic" "time" ) const ( MaxReadBufferLength = 64 * 1024 EOL = "\n" ) type HandleFunc func(ctx *Context) (err error) type Middleware func(next HandleFunc) HandleFunc type ( Options struct { Usage string Description string Middleware []Middleware } Option func(o *Options) ) type Server struct { seq int32 ctx context.Context locker sync.RWMutex executor *Executor contextMap map[int32]*Context } func WithUsage(usage string) Option { return func(o *Options) { o.Usage = usage } } func WithDescription(desc string) Option { return func(o *Options) { o.Description = desc } } func WithMiddleware(ms ...Middleware) Option { return func(o *Options) { if o.Middleware == nil { o.Middleware = make([]Middleware, 0) } o.Middleware = append(o.Middleware, ms...) } } func (svr *Server) writePack(conn net.Conn, packet *Frame) (err error) { if packet.Timestamp == 0 { packet.Timestamp = time.Now().Unix() } err = writeFrame(conn, packet) return } func (svr *Server) process(id int32, conn net.Conn) (err error) { var ( buf []byte res *Response ) buffer := bytepool.Get(MaxReadBufferLength) defer func() { bytepool.Put(buffer) svr.locker.Lock() delete(svr.contextMap, id) svr.locker.Unlock() err = conn.Close() if v := recover(); v != nil { log.Errorf("handle %d command failed cause by %v", id, v) } }() for { reqPacket := &Frame{} if reqPacket, err = readFrame(conn); err != nil { break } switch reqPacket.Type { case PacketTypeCompleter: tokens := strings.Fields(strings.ToLower(strings.TrimSpace(string(reqPacket.Data)))) cs := svr.executor.Completer(tokens...) if buf, err = json.Marshal(cs); err == nil { err = svr.writePack(conn, &Frame{Type: PacketTypeCompleter, Data: buf}) } case PacketTypeEcho: vs := make(map[string]string) vs["os"] = runtime.GOOS vs["cid"] = fmt.Sprint(id) vs["login_at"] = time.Now().Format(time.RFC822) vs["client_addr"] = conn.RemoteAddr().String() vs["name"] = os.Getenv("MICRO_SERVICE_NAME") vs["version"] = os.Getenv("MICRO_SERVICE_VERSION") if buf, err = json.Marshal(vs); err == nil { err = svr.writePack(conn, &Frame{Type: PacketTypeEcho, Data: buf}) } case PacketTypeData: var ( ok bool ctx *Context ) tokens := strings.Fields(strings.TrimSpace(string(reqPacket.Data))) svr.locker.Lock() if ctx, ok = svr.contextMap[id]; !ok { ctx = &Context{ ID: id, } ctx.Set("ID", id) svr.contextMap[id] = ctx } svr.locker.Unlock() ctx.WithContext(svr.ctx) ctx.reset(strings.TrimSpace(string(reqPacket.Data))) if res, err = svr.executor.Do(ctx, tokens...); err == nil { if res.Code == 0 { err = svr.writePack(conn, &Frame{Type: PacketTypeData, Data: res.Data}) } else { err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: fmt.Sprintf("ERROR %d: %s", res.Code, res.Error)}) } } else { err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: fmt.Sprintf("ERROR 5000: %s", err.Error())}) } default: return } } return } func (svr *Server) formatDescription(ss ...string) (usage string, description string) { for _, s := range ss { if strings.HasPrefix(s, ":") { usage += "{" + s[1:] + "} " } else { usage += s + " " description += s + " " } } return } func (svr *Server) NotFoundHandle(cb HandleFunc) { svr.executor.NotFoundHandle = cb } func (svr *Server) Handle(path string, cb HandleFunc, cbs ...Option) { var ( err error length int p *Executor q *Executor pFlag int32 ) var tokens []string if strings.HasPrefix(path, "/") { tokens = strings.Split(path[1:], "/") } else { tokens = utils.BreakUp(path) } opts := &Options{} for _, o := range cbs { o(opts) } if opts.Usage == "" && opts.Description == "" { opts.Usage, opts.Description = svr.formatDescription(tokens...) } svr.locker.Lock() defer svr.locker.Unlock() length = len(tokens) p = svr.executor for i, token := range tokens { token = strings.TrimSpace(strings.ToLower(token)) if token == "" { continue } if atomic.LoadInt32(&pFlag) == 1 { if !strings.HasPrefix(token, ":") { panic(fmt.Sprintf("param %s must begin with :", token)) } } if strings.HasPrefix(token, ":") { if atomic.CompareAndSwapInt32(&pFlag, 0, 1) { p.handleFunc = cb p.usage = opts.Usage p.description = opts.Description p.middleware = opts.Middleware p.params = make([]string, 0, 10) } if p == nil { panic("path can't empty") } p.params = append(p.params, token[1:]) continue } if q, err = p.Children(token); err == nil { if i == length-1 { panic(path + " already exists") } p = q } else { q = NewExecutor(token) if i == length-1 { q.usage = opts.Usage q.description = opts.Description q.middleware = opts.Middleware q.handleFunc = cb } p.Append(q) p = q } } } func (svr *Server) tcpServe(listener net.Listener) { var ( err error conn net.Conn ) for { if conn, err = listener.Accept(); err != nil { break } go func() { _ = svr.process(atomic.AddInt32(&svr.seq, 1), conn) }() } } func (svr *Server) unixServe(filename string) (listener net.Listener) { var ( err error addr *net.UnixAddr ) if addr, err = net.ResolveUnixAddr("unix", filename); err != nil { return } if listener, err = net.ListenUnix("unix", addr); err != nil { return } svr.tcpServe(listener) return } func (svr *Server) Serve(listener net.Listener) (err error) { svr.Handle("help", func(ctx *Context) (err error) { return ctx.Success(svr.executor.String()) }) var ( wg sync.WaitGroup unixListener net.Listener ) if listener != nil { wg.Add(1) go func() { svr.tcpServe(listener) wg.Done() }() } if runtime.GOOS == "linux" { go func() { //listen unix unixListener = svr.unixServe(path.Join(os.TempDir(), os.Getenv("MICRO_SERVICE_NAME")+".sock")) }() } wg.Wait() if unixListener != nil { err = unixListener.Close() } return } func (svr *Server) Append(child *Executor) { svr.executor.Append(child) } func New(ctx context.Context) *Server { return &Server{ seq: 0, ctx: ctx, executor: NewExecutor("ROOT"), contextMap: make(map[int32]*Context), } }