Browse Source

add rpc support

lxg 4 years ago
parent
commit
78e8c43c72

+ 36 - 15
.idea/workspace.xml

@@ -2,10 +2,17 @@
 <project version="4">
   <component name="ChangeListManager">
     <list default="true" id="cd58867b-089e-4508-9033-393b8939261c" name="Default Changelist" comment="">
+      <change afterPath="$PROJECT_DIR$/cmd/mock/rpc/main.go" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/gateway/rpc/codec/codec.go" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/gateway/rpc/codec/gob_codec.go" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/cmd/main.go" beforeDir="false" afterPath="$PROJECT_DIR$/cmd/main.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/gateway/conn.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/conn.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/gateway/rpc/client.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/client.go" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/gateway/rpc/context.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/context.go" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/micro.go" beforeDir="false" afterPath="$PROJECT_DIR$/micro.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/gateway/rpc/message.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/message.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/gateway/rpc/request.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/request.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/gateway/rpc/response.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/response.go" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/gateway/rpc/server.go" beforeDir="false" afterPath="$PROJECT_DIR$/gateway/rpc/server.go" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/service.go" beforeDir="false" afterPath="$PROJECT_DIR$/service.go" afterDir="false" />
     </list>
     <option name="SHOW_DIALOG" value="false" />
@@ -63,7 +70,7 @@
       <recent name="D:\workspace\golang\micro\internal\micro\transport\http" />
     </key>
   </component>
-  <component name="RunManager" selected="Go Build.go build git.nspix.com/golang/micro/cmd">
+  <component name="RunManager" selected="Go Build.go build git.nspix.com/golang/micro/cmd/mock/rpc">
     <configuration name="go build git.nspix.com/golang/micro/cmd" type="GoApplicationRunConfiguration" factoryName="Go Application" temporary="true" nameIsGenerated="true">
       <module name="micro" />
       <working_directory value="$PROJECT_DIR$" />
@@ -73,6 +80,15 @@
       <directory value="$PROJECT_DIR$" />
       <method v="2" />
     </configuration>
+    <configuration name="go build git.nspix.com/golang/micro/cmd/mock/rpc" type="GoApplicationRunConfiguration" factoryName="Go Application" temporary="true" nameIsGenerated="true">
+      <module name="micro" />
+      <working_directory value="$PROJECT_DIR$" />
+      <kind value="PACKAGE" />
+      <filePath value="$PROJECT_DIR$/cmd/mock/rpc/main.go" />
+      <package value="git.nspix.com/golang/micro/cmd/mock/rpc" />
+      <directory value="$PROJECT_DIR$" />
+      <method v="2" />
+    </configuration>
     <configuration name="go build git.nspix.com/micro/cmd" type="GoApplicationRunConfiguration" factoryName="Go Application" temporary="true" nameIsGenerated="true">
       <module name="micro" />
       <working_directory value="$PROJECT_DIR$" />
@@ -100,24 +116,13 @@
       <directory value="$PROJECT_DIR$" />
       <method v="2" />
     </configuration>
-    <configuration name="TestService in git.nspix.com/micro/internal/micro/registry" type="GoTestRunConfiguration" factoryName="Go Test" temporary="true" nameIsGenerated="true">
-      <module name="micro" />
-      <working_directory value="$PROJECT_DIR$/internal/micro/registry" />
-      <framework value="gotest" />
-      <kind value="PACKAGE" />
-      <package value="git.nspix.com/micro/internal/micro/registry" />
-      <directory value="$PROJECT_DIR$" />
-      <filePath value="$PROJECT_DIR$" />
-      <pattern value="^\QTestService\E$" />
-      <method v="2" />
-    </configuration>
     <recent_temporary>
       <list>
+        <item itemvalue="Go Build.go build git.nspix.com/golang/micro/cmd/mock/rpc" />
         <item itemvalue="Go Build.go build git.nspix.com/golang/micro/cmd" />
         <item itemvalue="Go Build.go build git.nspix.com/micro/cmd/mock/client" />
         <item itemvalue="Go Build.go build git.nspix.com/micro/cmd/mock/server" />
         <item itemvalue="Go Build.go build git.nspix.com/micro/cmd" />
-        <item itemvalue="Go Test.TestService in git.nspix.com/micro/internal/micro/registry" />
       </list>
     </recent_temporary>
   </component>
@@ -140,4 +145,20 @@
   <component name="VgoProject">
     <integration-enabled>true</integration-enabled>
   </component>
+  <component name="XDebuggerManager">
+    <breakpoint-manager>
+      <breakpoints>
+        <line-breakpoint enabled="true" type="DlvLineBreakpoint">
+          <url>file://$PROJECT_DIR$/gateway/rpc/client.go</url>
+          <line>112</line>
+          <option name="timeStamp" value="4" />
+        </line-breakpoint>
+        <line-breakpoint enabled="true" type="DlvLineBreakpoint">
+          <url>file://$PROJECT_DIR$/gateway/rpc/client.go</url>
+          <line>62</line>
+          <option name="timeStamp" value="8" />
+        </line-breakpoint>
+      </breakpoints>
+    </breakpoint-manager>
+  </component>
 </project>

+ 35 - 0
cmd/mock/rpc/main.go

@@ -0,0 +1,35 @@
+package main
+
+import (
+	"context"
+	"fmt"
+	"git.nspix.com/golang/micro/gateway/rpc"
+	"time"
+)
+
+type (
+	mathRequest struct {
+		NumA int `json:"num_a"`
+		NumB int `json:"num_b"`
+	}
+
+	mathResponse struct {
+		Value int `json:"value"`
+	}
+)
+
+func main() {
+	c := rpc.NewClient()
+	if err := c.Dialer("tcp", "127.0.0.1:52743"); err != nil {
+		panic(err)
+	}
+	defer c.Close()
+	ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
+	if res, err := c.Do(ctx, rpc.NewRequest("math.add", &mathRequest{NumA: 100, NumB: 12})); err == nil {
+		var i mathResponse
+		fmt.Println(res.Decode(&i))
+		fmt.Println(i)
+	} else {
+		fmt.Println(err)
+	}
+}

+ 1 - 1
gateway/conn.go

@@ -13,7 +13,7 @@ type Conn struct {
 func (c *Conn) Read(b []byte) (n int, err error) {
 	var m int
 	if len(c.buf) > 0 {
-		if len(b) > len(c.buf) {
+		if len(b) >= len(c.buf) {
 			m = copy(b[:], c.buf[:])
 			c.buf = c.buf[m:]
 		}

+ 47 - 25
gateway/rpc/client.go

@@ -16,7 +16,12 @@ type (
 		isConnected       int32
 		transactionLocker sync.RWMutex
 		transaction       map[uint16]*transaction
+		exitFlag          int32
+		exitChan          chan struct{}
+		network           string
+		address           string
 	}
+
 	transaction struct {
 		sequence   uint16
 		response   *Response
@@ -40,15 +45,33 @@ func (t *transaction) Done(r *Response) {
 	}
 }
 
+func (c *Client) commit(seq uint16) *transaction {
+	c.transactionLocker.Lock()
+	trans := &transaction{
+		sequence:   seq,
+		isCanceled: false,
+		ch:         make(chan *transaction),
+	}
+	c.transaction[seq] = trans
+	c.transactionLocker.Unlock()
+	return trans
+}
+
 func (c *Client) rdyLoop() {
 	defer atomic.StoreInt32(&c.isConnected, 0)
 	for {
 		if frame, err := readFrame(c.conn); err == nil {
-			c.transactionLocker.RLock()
-			ch, ok := c.transaction[frame.Sequence]
-			c.transactionLocker.RUnlock()
-			if ok {
-				ch.Done(ReadResponse(frame.Data))
+			if frame.Func == FuncResponse {
+				c.transactionLocker.RLock()
+				ch, ok := c.transaction[frame.Sequence]
+				c.transactionLocker.RUnlock()
+				if ok {
+					if res, err := ReadResponse(frame.Data); err == nil {
+						ch.Done(res)
+					} else {
+						ch.Cancel()
+					}
+				}
 			}
 		} else {
 			break
@@ -56,15 +79,15 @@ func (c *Client) rdyLoop() {
 	}
 }
 
-func (c *Client) Close() (err error) {
-	if c.conn != nil {
-		err = c.conn.Close()
-	}
-	return
+func (c *Client) Dialer(network string, addr string) (err error) {
+	c.network = network
+	c.address = addr
+	return c.dialer()
+
 }
 
-func (c *Client) Dialer(network string, addr string) (err error) {
-	if c.conn, err = net.DialTimeout(network, addr, time.Second*10); err != nil {
+func (c *Client) dialer() (err error) {
+	if c.conn, err = net.DialTimeout(c.network, c.address, time.Second*10); err != nil {
 		return
 	} else {
 		atomic.StoreInt32(&c.isConnected, 1)
@@ -73,18 +96,6 @@ func (c *Client) Dialer(network string, addr string) (err error) {
 	return
 }
 
-func (c *Client) commit(seq uint16) *transaction {
-	c.transactionLocker.Lock()
-	trans := &transaction{
-		sequence:   seq,
-		isCanceled: false,
-		ch:         make(chan *transaction),
-	}
-	c.transaction[seq] = trans
-	c.transactionLocker.Unlock()
-	return trans
-}
-
 func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error) {
 	if atomic.LoadInt32(&c.isConnected) == 0 {
 		err = io.ErrClosedPipe
@@ -95,7 +106,7 @@ func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error
 	if err = writeFrame(c.conn, &Frame{
 		Func:     FuncRequest,
 		Sequence: seq,
-		Data:     req.Encode(),
+		Data:     req.Bytes(),
 	}); err != nil {
 		return
 	}
@@ -114,8 +125,19 @@ func (c *Client) Do(ctx context.Context, req *Request) (res *Response, err error
 	return
 }
 
+func (c *Client) Close() (err error) {
+	if atomic.CompareAndSwapInt32(&c.exitFlag, 0, 1) {
+		if c.conn != nil {
+			err = c.conn.Close()
+		}
+		close(c.exitChan)
+	}
+	return
+}
+
 func NewClient() *Client {
 	return &Client{
+		exitChan:    make(chan struct{}),
 		transaction: make(map[uint16]*transaction),
 	}
 }

+ 15 - 0
gateway/rpc/codec/codec.go

@@ -0,0 +1,15 @@
+package codec
+
+import "io"
+
+
+var (
+	DefaultCodec = &GobCodec{}
+)
+
+type Codec interface {
+	Encode(i interface{}) (b []byte, err error)
+	EncodeTo(w io.Writer, i interface{}) (err error)
+	Decode(b []byte, i interface{}) (err error)
+	DecodeFrom(r io.Reader, i interface{}) (err error)
+}

+ 30 - 0
gateway/rpc/codec/gob_codec.go

@@ -0,0 +1,30 @@
+package codec
+
+import (
+	"bytes"
+	"encoding/gob"
+	"io"
+)
+
+type GobCodec struct {
+}
+
+func (codec *GobCodec) EncodeTo(w io.Writer, i interface{}) (err error) {
+	return gob.NewEncoder(w).Encode(i)
+}
+
+func (codec *GobCodec) DecodeFrom(r io.Reader, i interface{}) (err error) {
+	return gob.NewDecoder(r).Decode(i)
+}
+
+func (codec *GobCodec) Encode(i interface{}) (b []byte, err error) {
+	writer := &bytes.Buffer{}
+	if err = gob.NewEncoder(writer).Encode(i); err == nil {
+		b = writer.Bytes()
+	}
+	return
+}
+
+func (codec *GobCodec) Decode(b []byte, i interface{}) (err error) {
+	return gob.NewDecoder(bytes.NewReader(b)).Decode(i)
+}

+ 13 - 3
gateway/rpc/context.go

@@ -10,14 +10,24 @@ func (c *Context) Reset(req *Request, res *Response) {
 	c.res = res
 }
 
+func (c *Context) Request() *Request {
+	return c.req
+}
+
+func (c *Context) Response() *Response {
+	return c.res
+}
+
 func (c *Context) Bind(i interface{}) (err error) {
-	return
+	return c.Request().Decode(i)
 }
 
 func (c *Context) Success(i interface{}) (err error) {
-	return
+	return c.Response().SetBody(i)
 }
 
 func (c *Context) Error(code int, message string) (err error) {
+	c.Response().code = uint16(code)
+	c.Response().message = message
 	return
-}
+}

+ 35 - 3
gateway/rpc/message.go

@@ -1,7 +1,9 @@
 package rpc
 
 import (
+	"bytes"
 	"encoding/binary"
+	"errors"
 	"io"
 )
 
@@ -11,7 +13,13 @@ const (
 	FuncResponse       = 0x03
 )
 
+var (
+	feature           = []byte{82, 80, 67}				//RPC数据包
+	errInvalidPackage = errors.New("invalid package")
+)
+
 type Frame struct {
+	Feature  []byte //特征码 RPC
 	Func     uint8
 	Sequence uint16
 	Length   uint16
@@ -19,7 +27,17 @@ type Frame struct {
 }
 
 func readFrame(r io.Reader) (frame *Frame, err error) {
-	frame = &Frame{}
+	var (
+		n int
+	)
+	frame = &Frame{Feature: make([]byte, 3)}
+	if _, err = io.ReadFull(r, frame.Feature); err != nil {
+		return
+	}
+	if !bytes.Equal(frame.Feature, feature) {
+		err = errInvalidPackage
+		return
+	}
 	if err = binary.Read(r, binary.LittleEndian, &frame.Func); err != nil {
 		return
 	}
@@ -30,14 +48,24 @@ func readFrame(r io.Reader) (frame *Frame, err error) {
 		return
 	}
 	frame.Data = make([]byte, frame.Length)
-	_, err = io.ReadFull(r, frame.Data)
+	if n, err = io.ReadFull(r, frame.Data); err == nil {
+		if n < int(frame.Length) {
+			err = io.ErrShortBuffer
+		}
+	}
 	return
 }
 
 func writeFrame(w io.Writer, frame *Frame) (err error) {
+	var (
+		n int
+	)
 	if frame.Length == 0 {
 		frame.Length = uint16(len(frame.Data))
 	}
+	if _, err = w.Write(feature); err != nil {
+		return
+	}
 	if err = binary.Write(w, binary.LittleEndian, frame.Func); err != nil {
 		return
 	}
@@ -47,6 +75,10 @@ func writeFrame(w io.Writer, frame *Frame) (err error) {
 	if err = binary.Write(w, binary.LittleEndian, frame.Length); err != nil {
 		return
 	}
-	_, err = w.Write(frame.Data)
+	if n, err = w.Write(frame.Data); err == nil {
+		if n < int(frame.Length) {
+			err = io.ErrShortWrite
+		}
+	}
 	return
 }

+ 113 - 16
gateway/rpc/request.go

@@ -1,38 +1,135 @@
 package rpc
 
 import (
-	"encoding/json"
+	"bytes"
+	"encoding/binary"
+	"git.nspix.com/golang/micro/gateway/rpc/codec"
+	"io"
 	"net"
 )
 
 type Request struct {
-	body   []byte
-	conn   net.Conn
-	Method string
-	Params map[string]interface{}
+	codec         codec.Codec
+	conn          net.Conn
+	Method        string
+	Header        map[string]string
+	ContentLength int
+	Sequence      uint16
+	Body          interface{}
+	body          []byte
 }
 
-func (r *Request) Bind(v interface{}) (err error) {
-	err = json.Unmarshal(r.body, v)
-	return
+func (r *Request) reset(seq uint16, conn net.Conn) {
+	r.Sequence = seq
+	r.conn = conn
+}
+
+//解码内容
+func (r *Request) Decode(i interface{}) (err error) {
+	return r.codec.Decode(r.body, i)
 }
 
-func (r *Request) Encode() []byte  {
+func (r *Request) RemoteAddr() net.Addr {
+	if r.conn != nil {
+		return r.conn.RemoteAddr()
+	}
 	return nil
 }
 
-func (r *Request) ParamValue(name string) interface{} {
+func (r *Request) Bytes() []byte {
+	buf := &bytes.Buffer{}
+	if _, err := r.WriteTo(buf); err == nil {
+		return buf.Bytes()
+	}
 	return nil
 }
 
-func (r *Request) Reset(conn net.Conn)  {
-	r.conn = conn
+func (r *Request) WriteTo(w io.Writer) (n int64, err error) {
+	var (
+		headerBuffer []byte
+		bodyBuffer   []byte
+		m            int
+		headerLen    int
+		bodyLen      int
+	)
+	if r.Header == nil {
+		r.Header = make(map[string]string)
+	}
+	r.Header["_METHOD"] = r.Method
+	if headerBuffer, err = r.codec.Encode(r.Header); err != nil {
+		return
+	}
+	if bodyBuffer, err = r.codec.Encode(r.Body); err != nil {
+		return
+	}
+	headerLen = len(headerBuffer)
+	bodyLen = len(bodyBuffer)
+	if err = binary.Write(w, binary.LittleEndian, uint16(headerLen)); err != nil {
+		return
+	}
+	n += 2
+	if err = binary.Write(w, binary.LittleEndian, uint16(bodyLen)); err != nil {
+		return
+	}
+	n += 2
+	if m, err = w.Write(headerBuffer); err == nil {
+		n += int64(m)
+	}
+	if m, err = w.Write(bodyBuffer); err == nil {
+		n += int64(m)
+	}
+	return
 }
 
-func ReadRequest(b []byte) *Request {
-	return &Request{}
+func ReadRequest(b []byte) (req *Request, err error) {
+	var (
+		p            []byte
+		headerBuffer []byte
+		bodyBuffer   []byte
+		headerLen    uint16
+		bodyLen      uint16
+	)
+	req = &Request{
+		codec:  codec.DefaultCodec,
+		Header: make(map[string]string),
+	}
+	if b == nil || len(b) <= 4 {
+		err = io.ErrShortBuffer
+		return
+	}
+	headerLen = binary.LittleEndian.Uint16(b[0:2])
+	bodyLen = binary.LittleEndian.Uint16(b[2:4])
+	p = b[4:]
+	if headerLen > 0 {
+		if len(p) < int(headerLen) {
+			err = io.ErrShortBuffer
+			return
+		}
+		headerBuffer = p[:headerLen]
+		if err = req.codec.Decode(headerBuffer, &req.Header); err != nil {
+			return
+		}
+		req.Method = req.Header["_METHOD"]
+		p = p[headerLen:]
+	}
+	if bodyLen > 0 {
+		if len(p) < int(bodyLen) {
+			err = io.ErrShortBuffer
+			return
+		}
+		bodyBuffer = p[:bodyLen]
+		req.body = make([]byte, bodyLen)
+		copy(req.body[:], bodyBuffer[:])
+		req.ContentLength = int(bodyLen)
+	}
+	return
 }
 
-func NewRequest() *Request {
-	return &Request{}
+func NewRequest(method string, i interface{}) *Request {
+	return &Request{
+		Method: method,
+		Body:   i,
+		codec:  codec.DefaultCodec,
+		Header: make(map[string]string),
+	}
 }

+ 112 - 11
gateway/rpc/response.go

@@ -1,26 +1,127 @@
 package rpc
 
+import (
+	"bytes"
+	"encoding/binary"
+	"errors"
+	"git.nspix.com/golang/micro/gateway/rpc/codec"
+	"git.nspix.com/golang/micro/utils/unsafestr"
+	"io"
+)
+
 const (
-	StatusOk    = 0x01
-	StatusError = 0x05
+	SuccessCode = 0
 )
 
 type Response struct {
-	Code  uint8
-	Error string
-	body  []byte
+	codec   codec.Codec
+	code    uint16 //状态码
+	message string //错误消息
+	error   error  //错误实体
+	Body    []byte //内容
+}
+
+//获取状态码
+func (r *Response) StatusCode() int {
+	return int(r.code)
+}
+
+//获取错误
+func (r *Response) Error() (err error) {
+	if r.error != nil {
+		return r.error
+	}
+	if r.code != SuccessCode {
+		r.error = errors.New(r.message)
+		err = r.error
+	}
+	return
 }
 
-func (r *Response) Encode()[]byte  {
+func (r *Response) SetBody(i interface{}) (err error) {
+	r.Body, err = r.codec.Encode(i)
+	return
+}
+
+//解码内容
+func (r *Response) Decode(i interface{}) (err error) {
+	return r.codec.Decode(r.Body, i)
+}
+
+func (r *Response) Bytes() []byte {
+	buf := &bytes.Buffer{}
+	if _, err := r.WriteTo(buf); err == nil {
+		return buf.Bytes()
+	}
 	return nil
 }
 
-func (r *Response) Send(v interface{}) (err error) {
-	
+//  |--code--|--msg length--|--body length--|--msg--|--body--|
+//  |-- 2 --|-- 2 ----------| --2-----------|
+func (r *Response) WriteTo(w io.Writer) (n int64, err error) {
+	var (
+		m int
+	)
+	if err = binary.Write(w, binary.LittleEndian, r.code); err != nil {
+		return
+	}
+	n += 2
+	msgLen := len(r.message)
+	bodyLen := len(r.Body)
+	if err = binary.Write(w, binary.LittleEndian, uint16(msgLen)); err != nil {
+		return
+	}
+	n += 2
+	if err = binary.Write(w, binary.LittleEndian, uint16(bodyLen)); err != nil {
+		return
+	}
+	n += 2
+	if m, err = w.Write(unsafestr.StringToBytes(r.message)); err == nil {
+		n += int64(m)
+	}
+	if m, err = w.Write(r.Body); err == nil {
+		n += int64(m)
+	}
 	return
 }
 
-func ReadResponse(b []byte) *Response  {
-	r := &Response{}
-	return r
+func ReadResponse(b []byte) (resp *Response, err error) {
+	resp = &Response{
+		codec:  codec.DefaultCodec,
+	}
+	if b == nil || len(b) <= 6 {
+		err = io.ErrShortBuffer
+		return
+	}
+	var (
+		p       []byte
+		msgLen  uint16
+		bodyLen uint16
+	)
+	resp.code = binary.LittleEndian.Uint16(b[:2])
+	msgLen = binary.LittleEndian.Uint16(b[2:4])
+	bodyLen = binary.LittleEndian.Uint16(b[4:6])
+	p = b[6:]
+	if msgLen > 0 {
+		if len(p) < int(msgLen) {
+			err = io.ErrShortBuffer
+			return
+		}
+		resp.message = unsafestr.BytesToString(p[:msgLen])
+		p = p[msgLen:]
+	}
+	if bodyLen > 0 {
+		if len(p) < int(bodyLen) {
+			err = io.ErrShortBuffer
+			return
+		}
+		resp.Body = p[:bodyLen]
+	}
+	return
 }
+
+func NewResponse() *Response  {
+	return &Response{
+		codec:   codec.DefaultCodec,
+	}
+}

+ 61 - 29
gateway/rpc/server.go

@@ -1,13 +1,10 @@
 package rpc
 
 import (
-	"errors"
+	"git.nspix.com/golang/micro/log"
 	"net"
 	"sync"
-)
-
-var (
-	ErrHandleExists = errors.New("handle already exists")
+	"sync/atomic"
 )
 
 type HandleFunc func(ctx *Context) error
@@ -17,6 +14,7 @@ type Server struct {
 	ch         chan *Request
 	ctxPool    sync.Pool
 	serviceMap sync.Map // map[string]HandleFunc
+	exitFlag   int32
 	exitChan   chan struct{}
 }
 
@@ -53,16 +51,36 @@ func (svr *Server) handleRequest(req *Request) {
 		val interface{}
 		ctx *Context
 	)
-	if val, ok = svr.serviceMap.Load(req); ok {
+	log.Debugf("rcp handle request %s@%d", req.Method, req.Sequence)
+	if val, ok = svr.serviceMap.Load(req.Method); ok {
 		cb = val.(HandleFunc)
 		ctx = svr.getContext()
-		resp := &Response{}
+		ctx.Reset(req, NewResponse())
+		if err = cb(ctx); err == nil {
+			if err = writeFrame(req.conn, &Frame{
+				Func:     FuncResponse,
+				Sequence: req.Sequence,
+				Data:     ctx.Response().Bytes(),
+			}); err != nil {
+				log.Warnf("rcp write %d response error: %s", req.Sequence, err.Error())
+			} else {
+				log.Debugf("rcp write %d response success", req.Sequence)
+			}
+		}
+	} else {
+		ctx = svr.getContext()
+		resp := NewResponse()
+		resp.code = 404
+		resp.message = "not found"
 		ctx.Reset(req, resp)
-		if err = cb(ctx); err != nil {
-			_ = writeFrame(req.conn, &Frame{
-				Func: FuncResponse,
-				Data: resp.Encode(),
-			})
+		if err = writeFrame(req.conn, &Frame{
+			Func:     FuncResponse,
+			Sequence: req.Sequence,
+			Data:     ctx.Response().Bytes(),
+		}); err != nil {
+			log.Warnf("rcp write %d response error: %s", req.Sequence, err.Error())
+		} else {
+			log.Debugf("rcp write %d response success", req.Sequence)
 		}
 	}
 }
@@ -81,43 +99,57 @@ func (svr *Server) process(conn net.Conn) {
 		}
 		switch frame.Func {
 		case FuncPing:
-			_ = writeFrame(conn, &Frame{Func: FuncPing})
+			if err = writeFrame(conn, &Frame{Func: FuncPing}); err != nil {
+				log.Warnf("rcp write ping frame error: %s", err.Error())
+			}
 		case FuncRequest:
-			req := ReadRequest(frame.Data)
-			req.Reset(conn)
-			select {
-			case svr.ch <- req:
-			default:
+			//读取一个请求
+			if req, err2 := ReadRequest(frame.Data); err2 == nil {
+				req.reset(frame.Sequence, conn)
+				log.Debugf("rcp read request %s@%d", req.Method, req.Sequence)
+				select {
+				case svr.ch <- req:
+				default:
+				}
+			} else {
+				log.Warnf("read rpc request error: %s", err2.Error())
 			}
 		}
 	}
 }
 
-func (svr *Server) HandleFunc(method string, f HandleFunc) (err error) {
-	_, ok := svr.serviceMap.Load(method)
-	if ok {
-		err = ErrHandleExists
-	} else {
-		svr.serviceMap.Store(method, f)
-	}
+func (svr *Server) Handle(method string, f HandleFunc) {
+	svr.serviceMap.Store(method, f)
 	return
 }
 
-func (svr *Server) Serve(l net.Listener) {
+func (svr *Server) Serve(l net.Listener) (err error) {
+	svr.listener = l
+	go func() {
+		svr.wrkLoop()
+	}()
 	for {
-		if conn, err := l.Accept(); err == nil {
+		if conn, err2 := svr.listener.Accept(); err2 == nil {
 			go svr.process(conn)
 		} else {
+			err = err2
 			break
 		}
 	}
+	return
 }
 
 func (svr *Server) Close() (err error) {
-	err = svr.listener.Close()
+	if atomic.CompareAndSwapInt32(&svr.exitFlag, 0, 1) {
+		err = svr.listener.Close()
+		close(svr.exitChan)
+	}
 	return
 }
 
 func NewServer() *Server {
-	return &Server{}
+	return &Server{
+		ch:       make(chan *Request, 10),
+		exitChan: make(chan struct{}),
+	}
 }

+ 15 - 4
service.go

@@ -6,6 +6,7 @@ import (
 	"encoding/hex"
 	"git.nspix.com/golang/micro/gateway"
 	"git.nspix.com/golang/micro/gateway/http"
+	"git.nspix.com/golang/micro/gateway/rpc"
 	"git.nspix.com/golang/micro/log"
 	"git.nspix.com/golang/micro/registry"
 	"git.nspix.com/golang/micro/utils/docker"
@@ -30,6 +31,7 @@ type Service struct {
 	gateway    *gateway.Gateway
 	wg         sync.WaitGroup
 	httpSvr    *http.Server
+	rpcSvr     *rpc.Server
 	upTime     time.Time
 }
 
@@ -77,7 +79,9 @@ func (svr *Service) Handle(method string, cb HandleFunc, opts ...HandleOption) {
 
 	//启动RPC功能
 	if svr.opts.EnableRPC {
-
+		svr.rpcSvr.Handle(method, func(ctx *rpc.Context) error {
+			return cb(ctx)
+		})
 	}
 	return
 }
@@ -138,7 +142,9 @@ func (svr *Service) prepare() (err error) {
 		l := gateway.NewListener(svr.listener.Addr())
 		if err = svr.gateway.Attaches([][]byte{[]byte("GET"), []byte("POST"), []byte("PUT"), []byte("DELETE"), []byte("OPTIONS")}, l); err == nil {
 			svr.wrapSync(func() {
-				_ = svr.httpSvr.Serve(l)
+				if err = svr.httpSvr.Serve(l); err != nil {
+					log.Warnf("http serve error: %s", err.Error())
+				}
 			})
 			svr.httpSvr.Handle("GET", "/healthy", func(ctx *http.Context) (err error) {
 				return ctx.Success(map[string]interface{}{
@@ -154,10 +160,14 @@ func (svr *Service) prepare() (err error) {
 	if svr.opts.EnableHttp {
 		l := gateway.NewListener(svr.listener.Addr())
 		if err = svr.gateway.Attach([]byte("RPC"), l); err == nil {
-			serve := http.New()
 			svr.wrapSync(func() {
-				_ = serve.Serve(l)
+				if err = svr.rpcSvr.Serve(l); err != nil {
+					log.Warnf("rpc serve error: %s", err.Error())
+				}
 			})
+			log.Infof("attach rpc server success")
+		} else {
+			log.Warnf("attach rpc listener error: %s", err.Error())
 		}
 	}
 	if err = svr.registry.Register(svr.node); err == nil {
@@ -216,6 +226,7 @@ func New(opts ...OptionMiddleware) *Service {
 		opts:     o,
 		upTime:   time.Now(),
 		httpSvr:  http.New(),
+		rpcSvr:   rpc.NewServer(),
 		registry: registry.DefaultRegistry,
 	}
 	svr.ctx, svr.cancelFunc = context.WithCancel(o.Context)