Browse Source

fix cli long response

fancl 1 year ago
parent
commit
bbe1184dac
4 changed files with 96 additions and 37 deletions
  1. 2 2
      cmd/main.go
  2. 46 27
      gateway/cli/client.go
  3. 40 0
      gateway/cli/packet.go
  4. 8 8
      gateway/cli/server.go

+ 2 - 2
cmd/main.go

@@ -42,8 +42,8 @@ func main() {
 	micro.Handle("/show/contenxt", func(ctx micro.Context) (err error) {
 		table := console.NewTable().WithBordered()
 		table.AddRow("Name", "Age")
-		for i := 0; i < 10; i++ {
-			table.AddRow(fmt.Sprintf("zhansan%d", i), i+12)
+		for i := 0; i < 1000; i++ {
+			table.AddRow(fmt.Sprintf("zhangsang%d", i), i+12)
 		}
 		return ctx.Success(table)
 	}, func(o *micro.HandleOptions) {

+ 46 - 27
gateway/cli/client.go

@@ -10,27 +10,33 @@ import (
 	"os"
 	"runtime"
 	"strings"
+	"sync"
 	"time"
 
 	"git.nspix.com/golang/micro/helper/net/ip"
 	bytepkg "git.nspix.com/golang/micro/helper/pool/byte"
-	"git.nspix.com/golang/micro/helper/unsafestr"
 	"github.com/peterh/liner"
 )
 
 type Client struct {
+	seq           uint16
 	appName       string
 	conn          net.Conn
 	context       context.Context
 	readyChan     chan struct{}
 	completerChan chan *Frame
 	responseChan  chan *Frame
+	mutex         sync.Mutex
 }
 
 func (client *Client) writePack(packet *Frame) (err error) {
 	if packet.Timestamp == 0 {
 		packet.Timestamp = time.Now().Unix()
 	}
+	client.mutex.Lock()
+	client.seq++
+	packet.Seq = client.seq
+	client.mutex.Unlock()
 	err = writeFrame(client.conn, packet)
 	return
 }
@@ -74,7 +80,10 @@ func (client *Client) rdyLoop(c chan error) {
 				}
 			}
 		case PacketTypeData:
-			client.responseChan <- packet
+			select {
+			case client.responseChan <- packet:
+			case <-time.After(time.Second):
+			}
 		}
 	}
 	select {
@@ -83,6 +92,35 @@ func (client *Client) rdyLoop(c chan error) {
 	}
 }
 
+func (client *Client) waitResponse(ctx context.Context, seq uint16) (err error) {
+	childCtx, cancelFunc := context.WithTimeout(ctx, time.Second*30)
+	defer func() {
+		cancelFunc()
+	}()
+	for {
+		select {
+		case <-childCtx.Done():
+			err = client.context.Err()
+			return
+		case res, ok := <-client.responseChan:
+			if !ok {
+				break
+			}
+			if res.Seq == seq {
+				if res.Error != "" {
+					client.stdout(res.Error, EOL)
+				} else {
+					client.stdout(string(bytes.Trim(res.Data, "\r\n")))
+				}
+			}
+			if res.Flag == FlagComplete {
+				client.stdout(EOL)
+				return
+			}
+		}
+	}
+}
+
 func (client *Client) interactive(c chan error) {
 	var (
 		err   error
@@ -116,35 +154,18 @@ func (client *Client) interactive(c chan error) {
 			client.stdout("\033[2J")
 			continue
 		}
-		if err = client.writePack(&Frame{Type: PacketTypeData, Data: []byte(line)}); err != nil {
+		reqFrame := newFrame(PacketTypeData, FlagComplete, []byte(line))
+		if err = client.writePack(reqFrame); err != nil {
 			break
 		}
 		state.AppendHistory(line)
-		select {
-		case <-client.context.Done():
-			err = client.context.Err()
-			return
-		case <-time.After(time.Second * 30):
-			client.stdout("io timeout", EOL)
-		case res, ok := <-client.responseChan:
-			if !ok {
-				break
-			}
-			if res.Error != "" {
-				client.stdout(res.Error, EOL)
-			} else {
-				client.stdout(string(bytes.Trim(res.Data, "\r\n")), EOL)
-			}
-		}
+		client.waitResponse(client.context, reqFrame.Seq)
 	}
 }
 
 func (client *Client) CompleterHandleFunc(str string) (ss []string) {
 	var err error
-	if err = client.writePack(&Frame{
-		Type: PacketTypeCompleter,
-		Data: unsafestr.StringToBytes(str),
-	}); err != nil {
+	if err = client.writePack(newFrame(PacketTypeCompleter, FlagComplete, []byte(str))); err != nil {
 		return
 	}
 	select {
@@ -169,9 +190,7 @@ func (client *Client) Run() (err error) {
 	defer func() {
 		_ = client.conn.Close()
 	}()
-	if err = client.writePack(&Frame{
-		Type: PacketTypeEcho,
-	}); err != nil {
+	if err = client.writePack(newFrame(PacketTypeEcho, FlagComplete, nil)); err != nil {
 		return
 	}
 	client.stdout("connecting")
@@ -201,7 +220,7 @@ func ExecuteContext(ctx context.Context, conn net.Conn, cmd string) (s string, e
 	if frame, err = nextFrame(conn); err != nil {
 		return
 	}
-	if err = cli.writePack(&Frame{Type: PacketTypeData, Data: []byte(cmd)}); err != nil {
+	if err = cli.writePack(newFrame(PacketTypeData, FlagComplete, []byte(cmd))); err != nil {
 		return
 	}
 	if frame, err = nextFrame(conn); err != nil {

+ 40 - 0
gateway/cli/packet.go

@@ -19,9 +19,16 @@ const (
 	PacketTypeEcho           = 0x03
 )
 
+const (
+	FlagPart     = 0x00
+	FlagComplete = 0x01
+)
+
 type Frame struct {
 	Feature   []byte
 	Type      byte   `json:"type"`
+	Flag      byte   `json:"flag"`
+	Seq       uint16 `json:"seq"`
 	Data      []byte `json:"data"`
 	Error     string `json:"error"`
 	Timestamp int64  `json:"timestamp"`
@@ -45,6 +52,12 @@ func nextFrame(r io.Reader) (frame *Frame, err error) {
 	if err = binary.Read(r, binary.LittleEndian, &frame.Type); err != nil {
 		return
 	}
+	if err = binary.Read(r, binary.LittleEndian, &frame.Flag); err != nil {
+		return
+	}
+	if err = binary.Read(r, binary.LittleEndian, &frame.Seq); err != nil {
+		return
+	}
 	if err = binary.Read(r, binary.LittleEndian, &frame.Timestamp); err != nil {
 		return
 	}
@@ -100,6 +113,12 @@ func writeFrame(w io.Writer, frame *Frame) (err error) {
 	if err = binary.Write(w, binary.LittleEndian, frame.Type); err != nil {
 		return
 	}
+	if err = binary.Write(w, binary.LittleEndian, frame.Flag); err != nil {
+		return
+	}
+	if err = binary.Write(w, binary.LittleEndian, frame.Seq); err != nil {
+		return
+	}
 	if err = binary.Write(w, binary.LittleEndian, frame.Timestamp); err != nil {
 		return
 	}
@@ -125,3 +144,24 @@ func writeFrame(w io.Writer, frame *Frame) (err error) {
 	}
 	return
 }
+
+func (f *Frame) SetSeq(n uint16) *Frame {
+	f.Seq = n
+	return f
+}
+
+func newFrame(t, f byte, data []byte) *Frame {
+	return &Frame{
+		Type: t,
+		Flag: f,
+		Data: data,
+	}
+}
+
+func newErrorFrame(t, f byte, err string) *Frame {
+	return &Frame{
+		Type:  t,
+		Flag:  f,
+		Error: err,
+	}
+}

+ 8 - 8
gateway/cli/server.go

@@ -76,20 +76,20 @@ func (svr *Server) writePack(conn net.Conn, packet *Frame) (err error) {
 	return
 }
 
-func (svr *Server) flushResponse(conn net.Conn, res *Response) (err error) {
+func (svr *Server) flushResponse(conn net.Conn, seq uint16, res *Response) (err error) {
 	if res.Code != 0 {
-		return svr.writePack(conn, &Frame{Type: PacketTypeData, Error: fmt.Sprintf("ERROR(%d): %s", res.Code, res.Error)})
+		return svr.writePack(conn, newErrorFrame(PacketTypeData, FlagComplete, fmt.Sprintf("ERROR(%d): %s", res.Code, res.Error)).SetSeq(seq))
 	}
 	offset := 0
 	chunkSize := math.MaxInt16 - 1
 	n := len(res.Data) / chunkSize
 	for i := 0; i < n; i++ {
-		if err = svr.writePack(conn, &Frame{Type: PacketTypeData, Data: res.Data[offset:chunkSize]}); err != nil {
+		if err = svr.writePack(conn, newFrame(PacketTypeData, FlagPart, res.Data[offset:chunkSize+offset]).SetSeq(seq)); err != nil {
 			return
 		}
 		offset += chunkSize
 	}
-	err = svr.writePack(conn, &Frame{Type: PacketTypeData, Data: res.Data[offset:]})
+	err = svr.writePack(conn, newFrame(PacketTypeData, FlagComplete, res.Data[offset:]).SetSeq(seq))
 	return
 }
 
@@ -120,7 +120,7 @@ func (svr *Server) process(id int32, conn net.Conn) (err error) {
 			tokens := strings.Fields(strings.ToLower(strings.TrimSpace(string(reqPacket.Data))))
 			cs := svr.executor.Completer(tokens...)
 			if buf, err = json.Marshal(cs); err == nil {
-				err = svr.writePack(conn, &Frame{Type: PacketTypeCompleter, Data: buf})
+				err = svr.writePack(conn, newFrame(PacketTypeCompleter, FlagComplete, buf).SetSeq(reqPacket.Seq))
 			}
 		case PacketTypeEcho:
 			vs := make(map[string]string)
@@ -131,7 +131,7 @@ func (svr *Server) process(id int32, conn net.Conn) (err error) {
 			vs["name"] = os.Getenv("MICRO_SERVICE_NAME")
 			vs["version"] = os.Getenv("MICRO_SERVICE_VERSION")
 			if buf, err = json.Marshal(vs); err == nil {
-				err = svr.writePack(conn, &Frame{Type: PacketTypeEcho, Data: buf})
+				err = svr.writePack(conn, newFrame(PacketTypeEcho, FlagComplete, buf).SetSeq(reqPacket.Seq))
 			}
 		case PacketTypeData:
 			var (
@@ -153,12 +153,12 @@ func (svr *Server) process(id int32, conn net.Conn) (err error) {
 			ctx.WithContext(svr.ctx)
 			ctx.reset(cmdStr)
 			if res, err = svr.executor.Do(ctx, tokens...); err == nil {
-				if err = svr.flushResponse(conn, res); err != nil {
+				if err = svr.flushResponse(conn, reqPacket.Seq, res); err != nil {
 					log.Warnf("[CLI]: flush connection %d response error: %s", id, err.Error())
 				}
 			} else {
 				log.Debugf("[CLI]: handle connection %d command [%s] error: %s", id, cmdStr, err.Error())
-				err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: fmt.Sprintf("ERROR(5000): %s", err.Error())})
+				err = svr.writePack(conn, newErrorFrame(PacketTypeData, FlagComplete, fmt.Sprintf("ERROR(5000): %s", err.Error())).SetSeq(reqPacket.Seq))
 			}
 		default:
 			return