|
@@ -4,6 +4,7 @@ import (
|
|
"context"
|
|
"context"
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"fmt"
|
|
"fmt"
|
|
|
|
+ "math"
|
|
"net"
|
|
"net"
|
|
"os"
|
|
"os"
|
|
"path"
|
|
"path"
|
|
@@ -75,6 +76,23 @@ func (svr *Server) writePack(conn net.Conn, packet *Frame) (err error) {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func (svr *Server) flushResponse(conn net.Conn, res *Response) (err error) {
|
|
|
|
+ if res.Code != 0 {
|
|
|
|
+ return svr.writePack(conn, &Frame{Type: PacketTypeData, Error: fmt.Sprintf("ERROR(%d): %s", res.Code, res.Error)})
|
|
|
|
+ }
|
|
|
|
+ offset := 0
|
|
|
|
+ chunkSize := math.MaxInt16 - 1
|
|
|
|
+ n := len(res.Data) / chunkSize
|
|
|
|
+ for i := 0; i < n; i++ {
|
|
|
|
+ if err = svr.writePack(conn, &Frame{Type: PacketTypeData, Data: res.Data[offset:chunkSize]}); err != nil {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ offset += chunkSize
|
|
|
|
+ }
|
|
|
|
+ err = svr.writePack(conn, &Frame{Type: PacketTypeData, Data: res.Data[offset:]})
|
|
|
|
+ return
|
|
|
|
+}
|
|
|
|
+
|
|
func (svr *Server) process(id int32, conn net.Conn) (err error) {
|
|
func (svr *Server) process(id int32, conn net.Conn) (err error) {
|
|
var (
|
|
var (
|
|
buf []byte
|
|
buf []byte
|
|
@@ -88,12 +106,13 @@ func (svr *Server) process(id int32, conn net.Conn) (err error) {
|
|
svr.locker.Unlock()
|
|
svr.locker.Unlock()
|
|
err = conn.Close()
|
|
err = conn.Close()
|
|
if v := recover(); v != nil {
|
|
if v := recover(); v != nil {
|
|
- log.Errorf("handle %d command failed cause by %v", id, v)
|
|
|
|
|
|
+ log.Errorf("[CLI]: handle connection %d command failed cause by %v", id, v)
|
|
}
|
|
}
|
|
}()
|
|
}()
|
|
for {
|
|
for {
|
|
reqPacket := &Frame{}
|
|
reqPacket := &Frame{}
|
|
if reqPacket, err = nextFrame(conn); err != nil {
|
|
if reqPacket, err = nextFrame(conn); err != nil {
|
|
|
|
+ log.Warnf("[CLI]: read connection %d frame error: %s", id, err.Error())
|
|
break
|
|
break
|
|
}
|
|
}
|
|
switch reqPacket.Type {
|
|
switch reqPacket.Type {
|
|
@@ -134,12 +153,11 @@ func (svr *Server) process(id int32, conn net.Conn) (err error) {
|
|
ctx.WithContext(svr.ctx)
|
|
ctx.WithContext(svr.ctx)
|
|
ctx.reset(cmdStr)
|
|
ctx.reset(cmdStr)
|
|
if res, err = svr.executor.Do(ctx, tokens...); err == nil {
|
|
if res, err = svr.executor.Do(ctx, tokens...); err == nil {
|
|
- if res.Code == 0 {
|
|
|
|
- err = svr.writePack(conn, &Frame{Type: PacketTypeData, Data: res.Data})
|
|
|
|
- } else {
|
|
|
|
- err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: fmt.Sprintf("ERROR(%d): %s", res.Code, res.Error)})
|
|
|
|
|
|
+ if err = svr.flushResponse(conn, res); err != nil {
|
|
|
|
+ log.Warnf("[CLI]: flush connection %d response error: %s", id, err.Error())
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
|
|
+ log.Debugf("[CLI]: handle connection %d command [%s] error: %s", id, cmdStr, err.Error())
|
|
err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: fmt.Sprintf("ERROR(5000): %s", err.Error())})
|
|
err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: fmt.Sprintf("ERROR(5000): %s", err.Error())})
|
|
}
|
|
}
|
|
default:
|
|
default:
|