123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- package rpc
- import (
- "git.nspix.com/golang/micro/log"
- "net"
- "sync"
- "sync/atomic"
- )
- type HandleFunc func(ctx *Context) error
- type Server struct {
- listener net.Listener
- ch chan *Request
- ctxPool sync.Pool
- serviceMap sync.Map // map[string]HandleFunc
- exitFlag int32
- exitChan chan struct{}
- }
- func (svr *Server) getContext() *Context {
- if v := svr.ctxPool.Get(); v != nil {
- return v.(*Context)
- } else {
- return &Context{}
- }
- }
- func (svr *Server) putContext(c *Context) {
- svr.ctxPool.Put(c)
- }
- func (svr *Server) wrkLoop() {
- for {
- select {
- case req, ok := <-svr.ch:
- if ok {
- svr.handleRequest(req)
- }
- case <-svr.exitChan:
- return
- }
- }
- }
- func (svr *Server) handleRequest(req *Request) {
- var (
- ok bool
- err error
- cb HandleFunc
- val interface{}
- ctx *Context
- )
- log.Debugf("rcp handle request %s@%d", req.Method, req.Sequence)
- if val, ok = svr.serviceMap.Load(req.Method); ok {
- cb = val.(HandleFunc)
- ctx = svr.getContext()
- ctx.Reset(req, NewResponse())
- if err = cb(ctx); err == nil {
- if err = writeFrame(req.conn, &Frame{
- Func: FuncResponse,
- Sequence: req.Sequence,
- Data: ctx.Response().Bytes(),
- }); err != nil {
- log.Warnf("rcp write %d response error: %s", req.Sequence, err.Error())
- } else {
- log.Debugf("rcp write %d response success", req.Sequence)
- }
- }
- } else {
- ctx = svr.getContext()
- resp := NewResponse()
- resp.code = 404
- resp.message = "not found"
- ctx.Reset(req, resp)
- if err = writeFrame(req.conn, &Frame{
- Func: FuncResponse,
- Sequence: req.Sequence,
- Data: ctx.Response().Bytes(),
- }); err != nil {
- log.Warnf("rcp write %d response error: %s", req.Sequence, err.Error())
- } else {
- log.Debugf("rcp write %d response success", req.Sequence)
- }
- }
- }
- func (svr *Server) process(conn net.Conn) {
- var (
- err error
- frame *Frame
- )
- defer func() {
- _ = conn.Close()
- }()
- for {
- if frame, err = readFrame(conn); err != nil {
- return
- }
- switch frame.Func {
- case FuncPing:
- if err = writeFrame(conn, &Frame{Func: FuncPing}); err != nil {
- log.Warnf("rcp write ping frame error: %s", err.Error())
- }
- case FuncRequest:
- //读取一个请求
- if req, err2 := ReadRequest(frame.Data); err2 == nil {
- req.reset(frame.Sequence, conn)
- log.Debugf("rcp read request %s@%d from %s", req.Method, req.Sequence, req.RemoteAddr().String())
- select {
- case svr.ch <- req:
- default:
- }
- } else {
- log.Warnf("read rpc request error: %s", err2.Error())
- }
- }
- }
- }
- func (svr *Server) Handle(method string, f HandleFunc) {
- svr.serviceMap.Store(method, f)
- return
- }
- func (svr *Server) Serve(l net.Listener) (err error) {
- svr.listener = l
- go func() {
- svr.wrkLoop()
- }()
- for {
- if conn, err2 := svr.listener.Accept(); err2 == nil {
- go svr.process(conn)
- } else {
- err = err2
- break
- }
- }
- return
- }
- func (svr *Server) Close() (err error) {
- if atomic.CompareAndSwapInt32(&svr.exitFlag, 0, 1) {
- err = svr.listener.Close()
- close(svr.exitChan)
- }
- return
- }
- func NewServer() *Server {
- return &Server{
- ch: make(chan *Request, 10),
- exitChan: make(chan struct{}),
- }
- }
|