Browse Source

优化服务接口

fancl 2 years ago
parent
commit
9e30db74c4

+ 2 - 1
broker/inprocess.go

@@ -44,6 +44,7 @@ func (bus *InProcBus) Publish(e *Event) {
 	select {
 	case bus.eventChan <- e:
 	default:
+		log.Warnf("event queue is full, event %s@%s has been discarded", e.Name, e.Namespace)
 	}
 	return
 }
@@ -97,7 +98,7 @@ func NewInPrcBus(ctx context.Context) *InProcBus {
 		eventChan: make(chan *Event, 1024),
 	}
 	bus.once.Do(func() {
-		for i := 0; i < runtime.NumCPU(); i++ {
+		for i := 0; i < runtime.NumCPU()*2; i++ {
 			go bus.worker()
 		}
 	})

+ 3 - 3
gateway/cli/server.go

@@ -4,7 +4,7 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
-	byte2 "git.nspix.com/golang/micro/helper/pool/byte"
+	"git.nspix.com/golang/micro/helper/pool/bytepool"
 	"git.nspix.com/golang/micro/helper/utils"
 	"net"
 	"os"
@@ -45,9 +45,9 @@ func (svr *Server) process(id int32, conn net.Conn) (err error) {
 		buf []byte
 		res *Response
 	)
-	buffer := byte2.Get(MaxReadBufferLength)
+	buffer := bytepool.Get(MaxReadBufferLength)
 	defer func() {
-		byte2.Put(buffer)
+		bytepool.Put(buffer)
 		svr.locker.Lock()
 		delete(svr.contextMap, id)
 		svr.locker.Unlock()

+ 21 - 9
gateway/http/context.go

@@ -3,6 +3,8 @@ package http
 import (
 	"encoding/json"
 	"net/http"
+	"os"
+	"path"
 	"strings"
 )
 
@@ -39,25 +41,35 @@ func (c *Context) Bind(i interface{}) (err error) {
 }
 
 func (c *Context) Success(val interface{}) (err error) {
+	return c.JSON(&Response{Result: val})
+}
+
+func (c *Context) Error(code int, message string) (err error) {
+	return c.JSON(&Response{Code: code, Message: message})
+}
+
+func (c *Context) JSON(v interface{}) (err error) {
 	c.response.Header().Set("Content-Type", "application/json")
 	enc := json.NewEncoder(c.response)
 	if strings.Contains(c.request.Header.Get("User-Agent"), "curl") {
 		enc.SetIndent("", "\t")
 	}
-	if err = enc.Encode(&Response{Result: val}); err != nil {
+	if err = enc.Encode(v); err != nil {
 		c.response.WriteHeader(http.StatusBadGateway)
 	}
 	return
 }
 
-func (c *Context) Error(code int, message string) (err error) {
-	c.response.Header().Set("Content-Type", "application/json")
-	enc := json.NewEncoder(c.response)
-	if strings.Contains(c.request.Header.Get("User-Agent"), "curl") {
-		enc.SetIndent("", "\t")
-	}
-	if err = enc.Encode(&Response{Code: code, Message: message}); err != nil {
-		c.response.WriteHeader(http.StatusBadGateway)
+func (c *Context) SendFile(filename string) (err error) {
+	var (
+		fi os.FileInfo
+		fp *os.File
+	)
+	if fi, err = os.Stat(filename); err == nil {
+		if fp, err = os.Open(filename); err == nil {
+			http.ServeContent(c.Response(), c.Request(), path.Base(filename), fi.ModTime(), fp)
+			err = fp.Close()
+		}
 	}
 	return
 }

+ 5 - 0
gateway/http/server.go

@@ -3,10 +3,12 @@ package http
 import (
 	"context"
 	"git.nspix.com/golang/micro/gateway/http/router"
+	"git.nspix.com/golang/micro/gateway/state"
 	"golang.org/x/net/websocket"
 	"net"
 	"net/http"
 	"sync"
+	"sync/atomic"
 )
 
 var (
@@ -31,6 +33,7 @@ type Server struct {
 	middleware     []Middleware
 	keepAlive      bool
 	isSetKeepAlive bool
+	state          *state.State
 	hookHandle     http.Handler
 	router         *router.Router
 }
@@ -115,6 +118,7 @@ func (r *Server) ServeHTTP(res http.ResponseWriter, req *http.Request) {
 		r.hookHandle.ServeHTTP(res, req)
 		return
 	}
+	atomic.AddInt64(&r.state.Counter.Request, 1)
 	if req.Method == http.MethodOptions {
 		res.Header().Add("Vary", "Origin")
 		res.Header().Add("Vary", "Access-Control-Request-Method")
@@ -180,6 +184,7 @@ func (r *Server) Shutdown(ctx context.Context) (err error) {
 func New(ctx context.Context) *Server {
 	return &Server{
 		ctx:    ctx,
+		state:  &state.State{},
 		router: router.New(),
 	}
 }

+ 23 - 9
gateway/rpc/server.go

@@ -2,7 +2,9 @@ package rpc
 
 import (
 	"context"
+	"git.nspix.com/golang/micro/gateway/state"
 	"net"
+	"net/http"
 	"sync"
 	"sync/atomic"
 
@@ -16,6 +18,7 @@ type Server struct {
 	listener   net.Listener
 	ch         chan *Request
 	ctxPool    sync.Pool
+	state      *state.State
 	serviceMap sync.Map // map[string]HandleFunc
 	sessions   sync.Map
 	exitFlag   int32
@@ -55,6 +58,7 @@ func (svr *Server) handleRequest(req *Request) {
 		val interface{}
 		ctx *Context
 	)
+	atomic.AddInt64(&svr.state.Counter.Request, 1)
 	if val, ok = svr.serviceMap.Load(req.Method); ok {
 		cb = val.(HandleFunc)
 		ctx = svr.getContext()
@@ -65,21 +69,34 @@ func (svr *Server) handleRequest(req *Request) {
 				Sequence: req.Sequence,
 				Data:     ctx.Response().Bytes(),
 			}); err != nil {
-				log.Warnf("RPC: write sequence(%d) response failed cause by: %s", req.Sequence, err.Error())
+				log.Warnf("RPC: write request(%s@%d) response error: %s", req.Method, req.Sequence, err.Error())
+			}
+		} else {
+			log.Warnf("RPC: handle request(%s@%d) error: %s", req.Method, req.Sequence, err.Error())
+			resp := NewResponse()
+			resp.code = http.StatusServiceUnavailable
+			resp.message = http.StatusText(http.StatusServiceUnavailable)
+			ctx.Reset(req, resp)
+			if err = writeFrame(req.conn, &Frame{
+				Func:     FuncResponse,
+				Sequence: req.Sequence,
+				Data:     ctx.Response().Bytes(),
+			}); err != nil {
+				log.Warnf("RPC: write request(%s@%d) response error: %s", req.Method, req.Sequence, err.Error())
 			}
 		}
 	} else {
 		ctx = svr.getContext()
 		resp := NewResponse()
-		resp.code = 404
-		resp.message = "not found"
+		resp.code = http.StatusNotFound
+		resp.message = http.StatusText(http.StatusNotFound)
 		ctx.Reset(req, resp)
 		if err = writeFrame(req.conn, &Frame{
 			Func:     FuncResponse,
 			Sequence: req.Sequence,
 			Data:     ctx.Response().Bytes(),
 		}); err != nil {
-			log.Warnf("RPC: write sequence(%d) response failed cause by: %s", req.Sequence, err.Error())
+			log.Warnf("RPC: write request(%s@%d) response failed cause by: %s", req.Method, req.Sequence, err.Error())
 		}
 	}
 }
@@ -101,19 +118,15 @@ func (svr *Server) process(conn net.Conn) {
 		switch frame.Func {
 		case FuncPing:
 			if err = writeFrame(conn, &Frame{Func: FuncPing}); err != nil {
-				log.Warnf("RPC: write ping frame error: %s", err.Error())
 				return
 			}
 		case FuncRequest:
-			//read request
 			if req, err2 := ReadRequest(frame.Data); err2 == nil {
 				req.reset(frame.Sequence, conn)
 				select {
 				case svr.ch <- req:
 				default:
 				}
-			} else {
-				log.Warnf("RPC: read request error: %s", err2.Error())
 			}
 		}
 	}
@@ -145,7 +158,7 @@ func (svr *Server) Close() (err error) {
 		//clear sessions
 		svr.sessions.Range(func(key, value interface{}) bool {
 			c := value.(net.Conn)
-			c.Close()
+			_ = c.Close()
 			return true
 		})
 		err = svr.listener.Close()
@@ -157,6 +170,7 @@ func (svr *Server) Close() (err error) {
 func New(ctx context.Context) *Server {
 	return &Server{
 		ctx:      ctx,
+		state:    &state.State{},
 		ch:       make(chan *Request, 10),
 		exitChan: make(chan struct{}),
 	}

+ 8 - 0
gateway/state/state.go

@@ -0,0 +1,8 @@
+package state
+
+type State struct {
+	Counter struct {
+		Request int64 `json:"request"`
+		Failure int64 `json:"failure"`
+	} `json:"counter"`
+}

+ 1 - 1
helper/pool/bufferpoll/pool.go → helper/pool/bufferpool/pool.go

@@ -1,4 +1,4 @@
-package bufferpoll
+package bufferpool
 
 import (
 	"bytes"

+ 1 - 1
helper/pool/bytepoll/pool.go → helper/pool/bytepool/pool.go

@@ -1,4 +1,4 @@
-package bytepoll
+package bytepool
 
 import "sync"
 

+ 3 - 3
log/loghub.go

@@ -4,7 +4,7 @@ import (
 	"bytes"
 	"encoding/binary"
 	"fmt"
-	"git.nspix.com/golang/micro/helper/pool/bufferpoll"
+	"git.nspix.com/golang/micro/helper/pool/bufferpool"
 	"net"
 	"os"
 	"time"
@@ -93,8 +93,8 @@ func (lg *LogHub) write(level int, s string) {
 		ls += " [" + lg.prefix + "] "
 	}
 	ls += s
-	bf = bufferpoll.Get()
-	defer bufferpoll.Put(bf)
+	bf = bufferpool.Get()
+	defer bufferpool.Put(bf)
 	// | 1byte level | 8 bytes timestamp | 1 byte name length | n byte name | 2 byte content length | n byte content |
 	bf.WriteByte(uint8(level))
 	_ = binary.Write(bf, binary.LittleEndian, uint64(time.Now().Unix()))

+ 19 - 27
service.go

@@ -4,8 +4,20 @@ import (
 	"context"
 	"crypto/md5"
 	"encoding/hex"
+	"git.nspix.com/golang/micro/broker"
+	"git.nspix.com/golang/micro/gateway"
+	"git.nspix.com/golang/micro/gateway/cli"
+	"git.nspix.com/golang/micro/gateway/http"
+	"git.nspix.com/golang/micro/gateway/rpc"
+	"git.nspix.com/golang/micro/helper/docker"
+	"git.nspix.com/golang/micro/helper/machineid"
+	"git.nspix.com/golang/micro/helper/net/ip"
 	"git.nspix.com/golang/micro/helper/random"
+	"git.nspix.com/golang/micro/helper/unsafestr"
 	"git.nspix.com/golang/micro/helper/utils"
+	"git.nspix.com/golang/micro/log"
+	"git.nspix.com/golang/micro/registry"
+	"github.com/google/btree"
 	"io/ioutil"
 	"math"
 	"math/rand"
@@ -20,22 +32,6 @@ import (
 	"sync/atomic"
 	"syscall"
 	"time"
-
-	"git.nspix.com/golang/micro/broker"
-	"git.nspix.com/golang/micro/gateway/cli"
-	"git.nspix.com/golang/micro/helper/machineid"
-	"git.nspix.com/golang/micro/stats/prometheusbackend"
-	"github.com/google/btree"
-	"github.com/prometheus/client_golang/prometheus/promhttp"
-
-	"git.nspix.com/golang/micro/gateway"
-	"git.nspix.com/golang/micro/gateway/http"
-	"git.nspix.com/golang/micro/gateway/rpc"
-	"git.nspix.com/golang/micro/helper/docker"
-	"git.nspix.com/golang/micro/helper/net/ip"
-	"git.nspix.com/golang/micro/helper/unsafestr"
-	"git.nspix.com/golang/micro/log"
-	"git.nspix.com/golang/micro/registry"
 )
 
 var (
@@ -315,6 +311,7 @@ func (svr *Service) startHTTPServe() (err error) {
 			}
 			log.Infof("http server stopped")
 		})
+
 		svr.httpSvr.Handle("GET", "/healthy", func(ctx *http.Context) (err error) {
 			return ctx.Success(map[string]interface{}{
 				"id":      svr.node.ID,
@@ -323,11 +320,6 @@ func (svr *Service) startHTTPServe() (err error) {
 			})
 		})
 
-		if svr.opts.EnableStats {
-			prometheusbackend.Init(svr.opts.shortName)
-			svr.httpSvr.Handler("GET", "/metrics", promhttp.Handler())
-		}
-
 		if svr.opts.EnableHttpPProf {
 			svr.httpSvr.Handler("GET", "/debug/pprof/", hp.HandlerFunc(pprof.Index))
 			svr.httpSvr.Handler("GET", "/debug/pprof/goroutine", hp.HandlerFunc(pprof.Index))
@@ -339,9 +331,9 @@ func (svr *Service) startHTTPServe() (err error) {
 			svr.httpSvr.Handler("GET", "/debug/pprof/symbol", hp.HandlerFunc(pprof.Symbol))
 			svr.httpSvr.Handler("GET", "/debug/pprof/trace", hp.HandlerFunc(pprof.Trace))
 		}
-		log.Infof("attach http listener success")
+		log.Infof("http server listen on %s", svr.listener.Addr())
 	} else {
-		log.Warnf("attach http listener failed cause by %s", err.Error())
+		log.Warnf("http server listen error: %s", err.Error())
 	}
 	return
 }
@@ -356,9 +348,9 @@ func (svr *Service) startRPCServe() (err error) {
 			}
 			log.Infof("rpc server stopped")
 		})
-		log.Infof("attach rpc listener success")
+		log.Infof("rpc server listen on %s", svr.listener.Addr())
 	} else {
-		log.Warnf("attach rpc listener failed cause by %s", err.Error())
+		log.Warnf("rpc server listen error: %s", err.Error())
 	}
 	return
 }
@@ -373,9 +365,9 @@ func (svr *Service) startCliServe() (err error) {
 			}
 			log.Infof("cli server stopped")
 		})
-		log.Infof("attach cli listener success")
+		log.Infof("cli server listen on %s", svr.listener.Addr())
 	} else {
-		log.Warnf("attach cli listener failed cause by %s", err.Error())
+		log.Warnf("cli server listen error: %s", err.Error())
 	}
 	return
 }