package cli import ( "context" "encoding/json" "fmt" "git.nspix.com/golang/micro/helper/pool/bytepool" "git.nspix.com/golang/micro/helper/utils" "net" "os" "path" "runtime" "strings" "sync" "sync/atomic" "time" ) const ( MaxReadBufferLength = 64 * 1024 EOL = "\n" ) type HandleFunc func(ctx *Context) (err error) type Server struct { seq int32 ctx context.Context locker sync.RWMutex executor *Executor contextMap map[int32]*Context } func (svr *Server) writePack(conn net.Conn, packet *Frame) (err error) { if packet.Timestamp == 0 { packet.Timestamp = time.Now().Unix() } err = writeFrame(conn, packet) return } func (svr *Server) process(id int32, conn net.Conn) (err error) { var ( buf []byte res *Response ) buffer := bytepool.Get(MaxReadBufferLength) defer func() { bytepool.Put(buffer) svr.locker.Lock() delete(svr.contextMap, id) svr.locker.Unlock() err = conn.Close() }() for { reqPacket := &Frame{} if reqPacket, err = readFrame(conn); err != nil { break } switch reqPacket.Type { case PacketTypeCompleter: tokens := strings.Fields(strings.ToLower(strings.TrimSpace(string(reqPacket.Data)))) cs := svr.executor.Completer(tokens...) if buf, err = json.Marshal(cs); err == nil { err = svr.writePack(conn, &Frame{Type: PacketTypeCompleter, Data: buf}) } case PacketTypeEcho: vs := make(map[string]string) vs["os"] = runtime.GOOS vs["cid"] = fmt.Sprint(id) vs["login_at"] = time.Now().Format(time.RFC822) vs["client_addr"] = conn.RemoteAddr().String() vs["name"] = os.Getenv("MICRO_SERVICE_NAME") vs["version"] = os.Getenv("MICRO_SERVICE_VERSION") if buf, err = json.Marshal(vs); err == nil { err = svr.writePack(conn, &Frame{Type: PacketTypeEcho, Data: buf}) } case PacketTypeData: var ( ok bool ctx *Context ) tokens := strings.Fields(strings.TrimSpace(string(reqPacket.Data))) svr.locker.Lock() if ctx, ok = svr.contextMap[id]; !ok { ctx = &Context{ ID: id, } ctx.Set("ID", id) svr.contextMap[id] = ctx } svr.locker.Unlock() ctx.WithContext(svr.ctx) ctx.reset(strings.TrimSpace(string(reqPacket.Data))) if res, err = svr.executor.Do(ctx, tokens...); err == nil { if res.Code == 0 { err = svr.writePack(conn, &Frame{Type: PacketTypeData, Data: res.Data}) } else { err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: res.Error}) } } else { err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: err.Error()}) } default: return } } return } func (svr *Server) Handle(path string, cb HandleFunc) { var tokens []string if strings.HasPrefix(path, "/") { tokens = strings.Split(path[1:], "/") } else { tokens = utils.BreakUp(path) } svr.locker.Lock() defer svr.locker.Unlock() var ( err error length int p *Executor q *Executor ) length = len(tokens) p = svr.executor for i, token := range tokens { token = strings.TrimSpace(strings.ToLower(token)) if token == "" { continue } //处理参数 if strings.HasPrefix(token, ":") { if p != nil { if p.params == nil { p.params = make([]string, 0, 10) } p.params = append(p.params, token[1:]) p.handleFunc = cb } continue } if q, err = p.Children(token); err == nil { if i == length-1 { panic(path + " already exists") } p = q } else { q = NewExecutor(token, "", strings.Title(strings.Join(tokens, " "))) if i == length-1 { q.handleFunc = cb } p.Append(q) p = q } } } func (svr *Server) tcpServe(listener net.Listener) { var ( err error conn net.Conn ) for { if conn, err = listener.Accept(); err != nil { break } go func() { _ = svr.process(atomic.AddInt32(&svr.seq, 1), conn) }() } } func (svr *Server) unixServe(filename string) { var ( err error addr *net.UnixAddr listener net.Listener ) if addr, err = net.ResolveUnixAddr("unix", filename); err != nil { return } if listener, err = net.ListenUnix("unix", addr); err != nil { return } svr.tcpServe(listener) err = listener.Close() } func (svr *Server) Serve(listener net.Listener) (err error) { svr.Handle("help", func(ctx *Context) (err error) { return ctx.Success(svr.executor.String()) }) var wg sync.WaitGroup if listener != nil { wg.Add(1) go func() { svr.tcpServe(listener) wg.Done() }() } if runtime.GOOS == "linux" { wg.Add(1) go func() { svr.unixServe(path.Join(os.TempDir(), os.Getenv("MICRO_SERVICE_NAME")+".sock")) wg.Done() }() } wg.Wait() return } func (svr *Server) Append(child *Executor) { svr.executor.Append(child) } func New(ctx context.Context) *Server { return &Server{ seq: 0, ctx: ctx, executor: NewExecutor("ROOT"), contextMap: make(map[int32]*Context), } }