123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- package cli
- import (
- "encoding/json"
- "fmt"
- "git.nspix.com/golang/micro/utils/bytepool"
- "net"
- "os"
- "runtime"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
- const (
- MaxReadBufferLength = 64 * 1024
- EOL = "\n"
- )
- type Server struct {
- seq int32
- locker sync.RWMutex
- executor *Executor
- contextMap map[int32]*Context
- }
- func (svr *Server) writePack(conn net.Conn, packet *Frame) (err error) {
- if packet.Timestamp == 0 {
- packet.Timestamp = time.Now().Unix()
- }
- err = writeFrame(conn, packet)
- return
- }
- func (svr *Server) process(id int32, conn net.Conn) (err error) {
- var (
- buf []byte
- )
- buffer := bytepool.Get(MaxReadBufferLength)
- defer func() {
- bytepool.Put(buffer)
- svr.locker.Lock()
- delete(svr.contextMap, id)
- svr.locker.Unlock()
- err = conn.Close()
- }()
- for {
- reqPacket := &Frame{}
- if reqPacket, err = readFrame(conn); err != nil {
- break
- }
- switch reqPacket.Type {
- case PacketTypeCompleter:
- tokens := strings.Fields(strings.ToLower(strings.TrimSpace(string(reqPacket.Data))))
- cs := svr.executor.Completer(tokens...)
- if buf, err = json.Marshal(cs); err == nil {
- err = svr.writePack(conn, &Frame{Type: PacketTypeCompleter, Data: buf})
- }
- case PacketTypeEcho:
- vs := make(map[string]string)
- vs["os"] = runtime.GOOS
- vs["cid"] = fmt.Sprint(id)
- vs["login_at"] = time.Now().Format(time.RFC822)
- vs["client_addr"] = conn.RemoteAddr().String()
- vs["name"] = os.Getenv("MICRO_SERVICE_NAME")
- vs["version"] = os.Getenv("MICRO_SERVICE_VERSION")
- if buf, err = json.Marshal(vs); err == nil {
- err = svr.writePack(conn, &Frame{Type: PacketTypeEcho, Data: buf})
- }
- case PacketTypeData:
- tokens := strings.Fields(strings.TrimSpace(string(reqPacket.Data)))
- var ok bool
- var ctx *Context
- svr.locker.Lock()
- if ctx, ok = svr.contextMap[id]; !ok {
- ctx = &Context{
- ID: id,
- }
- ctx.Set("ID", id)
- svr.contextMap[id] = ctx
- }
- svr.locker.Unlock()
- ctx.CmdStr = strings.TrimSpace(string(reqPacket.Data))
- if buf, err = svr.executor.Do(ctx, tokens...); err == nil {
- err = svr.writePack(conn, &Frame{Type: PacketTypeData, Data: buf})
- } else {
- err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: err.Error()})
- }
- default:
- return
- }
- }
- return
- }
- func (svr *Server) Serve(listener net.Listener) (err error) {
- var (
- conn net.Conn
- )
- svr.executor.Append(NewExecutor("help", "help", "Display this help").WithHandle(func(ctx *Context) []byte {
- return []byte(svr.executor.String())
- }))
- for {
- if conn, err = listener.Accept(); err != nil {
- break
- }
- go func() {
- _ = svr.process(atomic.AddInt32(&svr.seq, 1), conn)
- }()
- }
- return
- }
- func (svr *Server) Append(child *Executor) {
- svr.executor.Append(child)
- }
- func New() *Server {
- return &Server{
- seq: 0,
- executor: NewExecutor("ROOT"),
- contextMap: make(map[int32]*Context),
- }
- }
|