Browse Source

update package name

fancl 1 year ago
parent
commit
e94c344b81
5 changed files with 77 additions and 64 deletions
  1. 10 9
      gateway/cli/client.go
  2. 3 2
      gateway/cli/executor.go
  3. 15 8
      gateway/cli/packet.go
  4. 13 10
      gateway/cli/server.go
  5. 36 35
      service.go

+ 10 - 9
gateway/cli/client.go

@@ -6,15 +6,16 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	"git.nspix.com/golang/micro/helper/net/ip"
-	byte2 "git.nspix.com/golang/micro/helper/pool/byte"
-	"git.nspix.com/golang/micro/helper/unsafestr"
-	"github.com/peterh/liner"
 	"net"
 	"os"
 	"runtime"
 	"strings"
 	"time"
+
+	"git.nspix.com/golang/micro/helper/net/ip"
+	bytepkg "git.nspix.com/golang/micro/helper/pool/byte"
+	"git.nspix.com/golang/micro/helper/unsafestr"
+	"github.com/peterh/liner"
 )
 
 type Client struct {
@@ -43,13 +44,13 @@ func (client *Client) rdyLoop(c chan error) {
 		err error
 		buf []byte
 	)
-	buf = byte2.Get(MaxReadBufferLength)
+	buf = bytepkg.Get(MaxReadBufferLength)
 	defer func() {
-		byte2.Put(buf)
+		bytepkg.Put(buf)
 	}()
 	for {
 		packet := &Frame{}
-		if packet, err = readFrame(client.conn); err != nil {
+		if packet, err = nextFrame(client.conn); err != nil {
 			c <- err
 			break
 		}
@@ -197,13 +198,13 @@ func ExecuteContext(ctx context.Context, conn net.Conn, cmd string) (s string, e
 		return
 	}
 	frame := &Frame{}
-	if frame, err = readFrame(conn); err != nil {
+	if frame, err = nextFrame(conn); err != nil {
 		return
 	}
 	if err = cli.writePack(&Frame{Type: PacketTypeData, Data: []byte(cmd)}); err != nil {
 		return
 	}
-	if frame, err = readFrame(conn); err != nil {
+	if frame, err = nextFrame(conn); err != nil {
 		return
 	}
 	if frame.Error != "" {

+ 3 - 2
gateway/cli/executor.go

@@ -3,13 +3,14 @@ package cli
 import (
 	"errors"
 	"fmt"
-	"git.nspix.com/golang/micro/helper/console"
-	"git.nspix.com/golang/micro/helper/unsafestr"
 	"sort"
 	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
+
+	"git.nspix.com/golang/micro/helper/console"
+	"git.nspix.com/golang/micro/helper/unsafestr"
 )
 
 var (

+ 15 - 8
gateway/cli/packet.go

@@ -3,8 +3,10 @@ package cli
 import (
 	"bytes"
 	"encoding/binary"
-	"git.nspix.com/golang/micro/helper/unsafestr"
 	"io"
+	"math"
+
+	"git.nspix.com/golang/micro/helper/unsafestr"
 )
 
 var (
@@ -25,11 +27,11 @@ type Frame struct {
 	Timestamp int64  `json:"timestamp"`
 }
 
-func readFrame(r io.Reader) (frame *Frame, err error) {
+func nextFrame(r io.Reader) (frame *Frame, err error) {
 	var (
 		n           int
-		dataLength  int16
-		errorLength int16
+		dataLength  uint16
+		errorLength uint16
 		errBuf      []byte
 	)
 	frame = &Frame{Feature: make([]byte, 3)}
@@ -76,19 +78,24 @@ func readFrame(r io.Reader) (frame *Frame, err error) {
 func writeFrame(w io.Writer, frame *Frame) (err error) {
 	var (
 		n           int
-		dataLength  int16
-		errorLength int16
+		dl          int
+		dataLength  uint16
+		errorLength uint16
 		errBuf      []byte
 	)
 	if _, err = w.Write(feature); err != nil {
 		return
 	}
 	if frame.Data != nil {
-		dataLength = int16(len(frame.Data))
+		dl = len(frame.Data)
+		if dl > math.MaxUint16 {
+			return io.ErrNoProgress
+		}
+		dataLength = uint16(dl)
 	}
 	if frame.Error != "" {
 		errBuf = unsafestr.StringToBytes(frame.Error)
-		errorLength = int16(len(errBuf))
+		errorLength = uint16(len(errBuf))
 	}
 	if err = binary.Write(w, binary.LittleEndian, frame.Type); err != nil {
 		return

+ 13 - 10
gateway/cli/server.go

@@ -4,9 +4,6 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
-	"git.nspix.com/golang/micro/helper/pool/bytepool"
-	"git.nspix.com/golang/micro/helper/utils"
-	"git.nspix.com/golang/micro/log"
 	"net"
 	"os"
 	"path"
@@ -15,6 +12,10 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
+
+	"git.nspix.com/golang/micro/helper/pool/bytepool"
+	"git.nspix.com/golang/micro/helper/utils"
+	"git.nspix.com/golang/micro/log"
 )
 
 const (
@@ -92,7 +93,7 @@ func (svr *Server) process(id int32, conn net.Conn) (err error) {
 	}()
 	for {
 		reqPacket := &Frame{}
-		if reqPacket, err = readFrame(conn); err != nil {
+		if reqPacket, err = nextFrame(conn); err != nil {
 			break
 		}
 		switch reqPacket.Type {
@@ -115,10 +116,12 @@ func (svr *Server) process(id int32, conn net.Conn) (err error) {
 			}
 		case PacketTypeData:
 			var (
-				ok  bool
-				ctx *Context
+				ok     bool
+				cmdStr string
+				ctx    *Context
 			)
-			tokens := strings.Fields(strings.TrimSpace(string(reqPacket.Data)))
+			cmdStr = strings.TrimSpace(string(reqPacket.Data))
+			tokens := strings.Fields(cmdStr)
 			svr.locker.Lock()
 			if ctx, ok = svr.contextMap[id]; !ok {
 				ctx = &Context{
@@ -129,15 +132,15 @@ func (svr *Server) process(id int32, conn net.Conn) (err error) {
 			}
 			svr.locker.Unlock()
 			ctx.WithContext(svr.ctx)
-			ctx.reset(strings.TrimSpace(string(reqPacket.Data)))
+			ctx.reset(cmdStr)
 			if res, err = svr.executor.Do(ctx, tokens...); err == nil {
 				if res.Code == 0 {
 					err = svr.writePack(conn, &Frame{Type: PacketTypeData, Data: res.Data})
 				} else {
-					err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: fmt.Sprintf("ERROR %d: %s", res.Code, res.Error)})
+					err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: fmt.Sprintf("ERROR(%d): %s", res.Code, res.Error)})
 				}
 			} else {
-				err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: fmt.Sprintf("ERROR 5000: %s", err.Error())})
+				err = svr.writePack(conn, &Frame{Type: PacketTypeData, Error: fmt.Sprintf("ERROR(5000): %s", err.Error())})
 			}
 		default:
 			return

+ 36 - 35
service.go

@@ -4,6 +4,21 @@ import (
 	"context"
 	"crypto/md5"
 	"encoding/hex"
+	"io/ioutil"
+	"math"
+	"math/rand"
+	"net"
+	httppkg "net/http"
+	"net/http/pprof"
+	"os"
+	"os/signal"
+	"path"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"syscall"
+	"time"
+
 	"git.nspix.com/golang/micro/broker"
 	"git.nspix.com/golang/micro/gateway"
 	"git.nspix.com/golang/micro/gateway/cli"
@@ -18,20 +33,6 @@ import (
 	"git.nspix.com/golang/micro/log"
 	"git.nspix.com/golang/micro/registry"
 	"github.com/google/btree"
-	"io/ioutil"
-	"math"
-	"math/rand"
-	"net"
-	hp "net/http"
-	"net/http/pprof"
-	"os"
-	"os/signal"
-	"path"
-	"strings"
-	"sync"
-	"sync/atomic"
-	"syscall"
-	"time"
 )
 
 var (
@@ -129,7 +130,7 @@ func (svr *Service) worker() {
 	}
 }
 
-//DeferTick 定时执行一个任务
+// DeferTick 定时执行一个任务
 func (svr *Service) DeferTick(duration time.Duration, callback HandleTickerFunc, opts ...TickOption) int64 {
 	tick := &tickPtr{
 		sequence: atomic.AddInt64(&tickSequence, 1),
@@ -150,7 +151,7 @@ func (svr *Service) DeferTick(duration time.Duration, callback HandleTickerFunc,
 	return tick.sequence
 }
 
-//Handle 处理函数
+// Handle 处理函数
 func (svr *Service) Handle(method string, cb HandleFunc, opts ...HandleOption) {
 	if atomic.LoadInt32(&svr.readyFlag) != 1 {
 		svr.deferHandles = append(svr.deferHandles, handleEntry{
@@ -161,7 +162,7 @@ func (svr *Service) Handle(method string, cb HandleFunc, opts ...HandleOption) {
 		return
 	}
 	//disable cli default
-	opt := &HandleOptions{HttpMethod: hp.MethodPost, DisableCli: true}
+	opt := &HandleOptions{HttpMethod: httppkg.MethodPost, DisableCli: true}
 	for _, f := range opts {
 		f(opt)
 	}
@@ -192,7 +193,7 @@ func (svr *Service) Handle(method string, cb HandleFunc, opts ...HandleOption) {
 	return
 }
 
-//NewRequest 创建一个请求
+// NewRequest 创建一个请求
 func (svr *Service) NewRequest(name, method string, body interface{}) (req *Request, err error) {
 	return &Request{
 		ServiceName: name,
@@ -202,7 +203,7 @@ func (svr *Service) NewRequest(name, method string, body interface{}) (req *Requ
 	}, nil
 }
 
-//PeekService 选取一个可靠的服务
+// PeekService 选取一个可靠的服务
 func (svr *Service) PeekService(name string) ([]*registry.ServiceNode, error) {
 	return svr.registry.Get(svr.ctx, name)
 }
@@ -313,7 +314,7 @@ func (svr *Service) instance() *registry.ServiceNode {
 func (svr *Service) startHTTPServe() (err error) {
 	svr.httpSvr = http.New(svr.ctx)
 	l := gateway.NewListener(svr.listener.Addr())
-	if err = svr.gateway.Attaches([][]byte{[]byte(hp.MethodGet), []byte(hp.MethodPost), []byte(hp.MethodPut), []byte(hp.MethodDelete), []byte(hp.MethodOptions)}, l); err == nil {
+	if err = svr.gateway.Attaches([][]byte{[]byte(httppkg.MethodGet), []byte(httppkg.MethodPost), []byte(httppkg.MethodPut), []byte(httppkg.MethodDelete), []byte(httppkg.MethodOptions)}, l); err == nil {
 		svr.async(func() {
 			if err = svr.httpSvr.Serve(l); err != nil && atomic.LoadInt32(&svr.exitFlag) == 0 {
 				log.Warnf("http serve error: %s", err.Error())
@@ -321,7 +322,7 @@ func (svr *Service) startHTTPServe() (err error) {
 			log.Infof("http server stopped")
 		})
 
-		svr.httpSvr.Handle(hp.MethodGet, "/healthy", func(ctx *http.Context) (err error) {
+		svr.httpSvr.Handle(httppkg.MethodGet, "/healthy", func(ctx *http.Context) (err error) {
 			return ctx.Success(map[string]interface{}{
 				"id":      svr.node.ID,
 				"healthy": "Healthy",
@@ -331,15 +332,15 @@ func (svr *Service) startHTTPServe() (err error) {
 		})
 
 		if svr.opts.EnableHttpPProf {
-			svr.httpSvr.Handler("GET", "/debug/pprof/", hp.HandlerFunc(pprof.Index))
-			svr.httpSvr.Handler("GET", "/debug/pprof/goroutine", hp.HandlerFunc(pprof.Index))
-			svr.httpSvr.Handler("GET", "/debug/pprof/heap", hp.HandlerFunc(pprof.Index))
-			svr.httpSvr.Handler("GET", "/debug/pprof/mutex", hp.HandlerFunc(pprof.Index))
-			svr.httpSvr.Handler("GET", "/debug/pprof/threadcreate", hp.HandlerFunc(pprof.Index))
-			svr.httpSvr.Handler("GET", "/debug/pprof/cmdline", hp.HandlerFunc(pprof.Cmdline))
-			svr.httpSvr.Handler("GET", "/debug/pprof/profile", hp.HandlerFunc(pprof.Profile))
-			svr.httpSvr.Handler("GET", "/debug/pprof/symbol", hp.HandlerFunc(pprof.Symbol))
-			svr.httpSvr.Handler("GET", "/debug/pprof/trace", hp.HandlerFunc(pprof.Trace))
+			svr.httpSvr.Handler("GET", "/debug/pprof/", httppkg.HandlerFunc(pprof.Index))
+			svr.httpSvr.Handler("GET", "/debug/pprof/goroutine", httppkg.HandlerFunc(pprof.Index))
+			svr.httpSvr.Handler("GET", "/debug/pprof/heap", httppkg.HandlerFunc(pprof.Index))
+			svr.httpSvr.Handler("GET", "/debug/pprof/mutex", httppkg.HandlerFunc(pprof.Index))
+			svr.httpSvr.Handler("GET", "/debug/pprof/threadcreate", httppkg.HandlerFunc(pprof.Index))
+			svr.httpSvr.Handler("GET", "/debug/pprof/cmdline", httppkg.HandlerFunc(pprof.Cmdline))
+			svr.httpSvr.Handler("GET", "/debug/pprof/profile", httppkg.HandlerFunc(pprof.Profile))
+			svr.httpSvr.Handler("GET", "/debug/pprof/symbol", httppkg.HandlerFunc(pprof.Symbol))
+			svr.httpSvr.Handler("GET", "/debug/pprof/trace", httppkg.HandlerFunc(pprof.Trace))
 		}
 		log.Infof("http server listen on %s", svr.listener.Addr())
 	} else {
@@ -451,7 +452,7 @@ func (svr *Service) prepare() (err error) {
 	return
 }
 
-//destroy stop and destroy service
+// destroy stop and destroy service
 func (svr *Service) destroy() (err error) {
 	if !atomic.CompareAndSwapInt32(&svr.exitFlag, 0, 1) {
 		return
@@ -499,7 +500,7 @@ func (svr *Service) destroy() (err error) {
 	return
 }
 
-//Reload reload server
+// Reload reload server
 func (svr *Service) Reload() (err error) {
 	if svr.opts.Server == nil {
 		return
@@ -511,7 +512,7 @@ func (svr *Service) Reload() (err error) {
 	return svr.opts.Server.Start(svr.ctx)
 }
 
-//Run setup service
+// Run setup service
 func (svr *Service) Run() (err error) {
 	if svr.opts.EnableLogPrefix {
 		log.Prefix(svr.opts.Name)
@@ -546,14 +547,14 @@ func (svr *Service) Run() (err error) {
 	return svr.destroy()
 }
 
-//Shutdown close service
+// Shutdown close service
 func (svr *Service) Shutdown() {
 	if atomic.LoadInt32(&svr.exitFlag) == 0 {
 		svr.cancelFunc()
 	}
 }
 
-//Context return the service context
+// Context return the service context
 func (svr *Service) Context() context.Context {
 	return svr.ctx
 }