server.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package rpc
  2. import (
  3. "errors"
  4. "net"
  5. "sync"
  6. )
  7. var (
  8. ErrHandleExists = errors.New("handle already exists")
  9. )
  10. type HandleFunc func(ctx *Context) error
  11. type Server struct {
  12. listener net.Listener
  13. ch chan *Request
  14. ctxPool sync.Pool
  15. serviceMap sync.Map // map[string]HandleFunc
  16. exitChan chan struct{}
  17. }
  18. func (svr *Server) getContext() *Context {
  19. if v := svr.ctxPool.Get(); v != nil {
  20. return v.(*Context)
  21. } else {
  22. return &Context{}
  23. }
  24. }
  25. func (svr *Server) putContext(c *Context) {
  26. svr.ctxPool.Put(c)
  27. }
  28. func (svr *Server) wrkLoop() {
  29. for {
  30. select {
  31. case req, ok := <-svr.ch:
  32. if ok {
  33. svr.handleRequest(req)
  34. }
  35. case <-svr.exitChan:
  36. return
  37. }
  38. }
  39. }
  40. func (svr *Server) handleRequest(req *Request) {
  41. var (
  42. ok bool
  43. err error
  44. cb HandleFunc
  45. val interface{}
  46. ctx *Context
  47. )
  48. if val, ok = svr.serviceMap.Load(req); ok {
  49. cb = val.(HandleFunc)
  50. ctx = svr.getContext()
  51. resp := &Response{}
  52. ctx.Reset(req, resp)
  53. if err = cb(ctx); err != nil {
  54. _ = writeFrame(req.conn, &Frame{
  55. Func: FuncResponse,
  56. Data: resp.Encode(),
  57. })
  58. }
  59. }
  60. }
  61. func (svr *Server) process(conn net.Conn) {
  62. var (
  63. err error
  64. frame *Frame
  65. )
  66. defer func() {
  67. _ = conn.Close()
  68. }()
  69. for {
  70. if frame, err = readFrame(conn); err != nil {
  71. return
  72. }
  73. switch frame.Func {
  74. case FuncPing:
  75. _ = writeFrame(conn, &Frame{Func: FuncPing})
  76. case FuncRequest:
  77. req := ReadRequest(frame.Data)
  78. req.Reset(conn)
  79. select {
  80. case svr.ch <- req:
  81. default:
  82. }
  83. }
  84. }
  85. }
  86. func (svr *Server) HandleFunc(method string, f HandleFunc) (err error) {
  87. _, ok := svr.serviceMap.Load(method)
  88. if ok {
  89. err = ErrHandleExists
  90. } else {
  91. svr.serviceMap.Store(method, f)
  92. }
  93. return
  94. }
  95. func (svr *Server) Serve(l net.Listener) {
  96. for {
  97. if conn, err := l.Accept(); err == nil {
  98. go svr.process(conn)
  99. } else {
  100. break
  101. }
  102. }
  103. }
  104. func (svr *Server) Close() (err error) {
  105. err = svr.listener.Close()
  106. return
  107. }
  108. func NewServer() *Server {
  109. return &Server{}
  110. }