server.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package rpc
  2. import (
  3. "context"
  4. "net"
  5. "sync"
  6. "sync/atomic"
  7. "git.nspix.com/golang/micro/log"
  8. )
  9. type HandleFunc func(ctx *Context) error
  10. type Server struct {
  11. ctx context.Context
  12. listener net.Listener
  13. ch chan *Request
  14. ctxPool sync.Pool
  15. serviceMap sync.Map // map[string]HandleFunc
  16. sessions sync.Map
  17. exitFlag int32
  18. exitChan chan struct{}
  19. }
  20. func (svr *Server) getContext() *Context {
  21. if v := svr.ctxPool.Get(); v != nil {
  22. return v.(*Context)
  23. } else {
  24. return &Context{}
  25. }
  26. }
  27. func (svr *Server) putContext(c *Context) {
  28. svr.ctxPool.Put(c)
  29. }
  30. func (svr *Server) wrkLoop() {
  31. for {
  32. select {
  33. case req, ok := <-svr.ch:
  34. if ok {
  35. svr.handleRequest(req)
  36. }
  37. case <-svr.exitChan:
  38. return
  39. }
  40. }
  41. }
  42. func (svr *Server) handleRequest(req *Request) {
  43. var (
  44. ok bool
  45. err error
  46. cb HandleFunc
  47. val interface{}
  48. ctx *Context
  49. )
  50. if val, ok = svr.serviceMap.Load(req.Method); ok {
  51. cb = val.(HandleFunc)
  52. ctx = svr.getContext()
  53. ctx.Reset(req, NewResponse())
  54. if err = cb(ctx); err == nil {
  55. if err = writeFrame(req.conn, &Frame{
  56. Func: FuncResponse,
  57. Sequence: req.Sequence,
  58. Data: ctx.Response().Bytes(),
  59. }); err != nil {
  60. log.Warnf("RPC: write sequence(%d) response failed cause by: %s", req.Sequence, err.Error())
  61. }
  62. }
  63. } else {
  64. ctx = svr.getContext()
  65. resp := NewResponse()
  66. resp.code = 404
  67. resp.message = "not found"
  68. ctx.Reset(req, resp)
  69. if err = writeFrame(req.conn, &Frame{
  70. Func: FuncResponse,
  71. Sequence: req.Sequence,
  72. Data: ctx.Response().Bytes(),
  73. }); err != nil {
  74. log.Warnf("RPC: write sequence(%d) response failed cause by: %s", req.Sequence, err.Error())
  75. }
  76. }
  77. }
  78. func (svr *Server) process(conn net.Conn) {
  79. var (
  80. err error
  81. frame *Frame
  82. )
  83. svr.sessions.Store(conn.LocalAddr().String(), conn)
  84. defer func() {
  85. svr.sessions.Delete(conn.LocalAddr().String())
  86. _ = conn.Close()
  87. }()
  88. for {
  89. if frame, err = readFrame(conn); err != nil {
  90. return
  91. }
  92. switch frame.Func {
  93. case FuncPing:
  94. if err = writeFrame(conn, &Frame{Func: FuncPing}); err != nil {
  95. log.Warnf("RPC: write ping frame error: %s", err.Error())
  96. return
  97. }
  98. case FuncRequest:
  99. //read request
  100. if req, err2 := ReadRequest(frame.Data); err2 == nil {
  101. req.reset(frame.Sequence, conn)
  102. select {
  103. case svr.ch <- req:
  104. default:
  105. }
  106. } else {
  107. log.Warnf("RPC: read request error: %s", err2.Error())
  108. }
  109. }
  110. }
  111. }
  112. func (svr *Server) Handle(method string, f HandleFunc) {
  113. svr.serviceMap.Store(method, f)
  114. return
  115. }
  116. func (svr *Server) Serve(l net.Listener) (err error) {
  117. svr.listener = l
  118. go func() {
  119. svr.wrkLoop()
  120. }()
  121. for {
  122. if conn, err2 := svr.listener.Accept(); err2 == nil {
  123. go svr.process(conn)
  124. } else {
  125. err = err2
  126. break
  127. }
  128. }
  129. return
  130. }
  131. func (svr *Server) Close() (err error) {
  132. if atomic.CompareAndSwapInt32(&svr.exitFlag, 0, 1) {
  133. //clear sessions
  134. svr.sessions.Range(func(key, value interface{}) bool {
  135. c := value.(net.Conn)
  136. c.Close()
  137. return true
  138. })
  139. err = svr.listener.Close()
  140. close(svr.exitChan)
  141. }
  142. return
  143. }
  144. func New(ctx context.Context) *Server {
  145. return &Server{
  146. ctx: ctx,
  147. ch: make(chan *Request, 10),
  148. exitChan: make(chan struct{}),
  149. }
  150. }