123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- package rpc
- import (
- "errors"
- "net"
- "sync"
- )
- var (
- ErrHandleExists = errors.New("handle already exists")
- )
- type HandleFunc func(ctx *Context) error
- type Server struct {
- listener net.Listener
- ch chan *Request
- ctxPool sync.Pool
- serviceMap sync.Map // map[string]HandleFunc
- exitChan chan struct{}
- }
- func (svr *Server) getContext() *Context {
- if v := svr.ctxPool.Get(); v != nil {
- return v.(*Context)
- } else {
- return &Context{}
- }
- }
- func (svr *Server) putContext(c *Context) {
- svr.ctxPool.Put(c)
- }
- func (svr *Server) wrkLoop() {
- for {
- select {
- case req, ok := <-svr.ch:
- if ok {
- svr.handleRequest(req)
- }
- case <-svr.exitChan:
- return
- }
- }
- }
- func (svr *Server) handleRequest(req *Request) {
- var (
- ok bool
- err error
- cb HandleFunc
- val interface{}
- ctx *Context
- )
- if val, ok = svr.serviceMap.Load(req); ok {
- cb = val.(HandleFunc)
- ctx = svr.getContext()
- resp := &Response{}
- ctx.Reset(req, resp)
- if err = cb(ctx); err != nil {
- _ = writeFrame(req.conn, &Frame{
- Func: FuncResponse,
- Data: resp.Encode(),
- })
- }
- }
- }
- func (svr *Server) process(conn net.Conn) {
- var (
- err error
- frame *Frame
- )
- defer func() {
- _ = conn.Close()
- }()
- for {
- if frame, err = readFrame(conn); err != nil {
- return
- }
- switch frame.Func {
- case FuncPing:
- _ = writeFrame(conn, &Frame{Func: FuncPing})
- case FuncRequest:
- req := ReadRequest(frame.Data)
- req.Reset(conn)
- select {
- case svr.ch <- req:
- default:
- }
- }
- }
- }
- func (svr *Server) HandleFunc(method string, f HandleFunc) (err error) {
- _, ok := svr.serviceMap.Load(method)
- if ok {
- err = ErrHandleExists
- } else {
- svr.serviceMap.Store(method, f)
- }
- return
- }
- func (svr *Server) Serve(l net.Listener) {
- for {
- if conn, err := l.Accept(); err == nil {
- go svr.process(conn)
- } else {
- break
- }
- }
- }
- func (svr *Server) Close() (err error) {
- err = svr.listener.Close()
- return
- }
- func NewServer() *Server {
- return &Server{}
- }
|