server.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package rpc
  2. import (
  3. "context"
  4. "net"
  5. "net/http"
  6. "sync"
  7. "sync/atomic"
  8. "git.nspix.com/golang/micro/log"
  9. )
  10. type HandleFunc func(ctx *Context) error
  11. type Server struct {
  12. ctx context.Context
  13. listener net.Listener
  14. ch chan *Request
  15. ctxPool sync.Pool
  16. serviceMap sync.Map // map[string]HandleFunc
  17. sessions sync.Map
  18. exitFlag int32
  19. exitChan chan struct{}
  20. }
  21. func (svr *Server) getContext() *Context {
  22. if v := svr.ctxPool.Get(); v != nil {
  23. return v.(*Context)
  24. } else {
  25. return &Context{}
  26. }
  27. }
  28. func (svr *Server) putContext(c *Context) {
  29. svr.ctxPool.Put(c)
  30. }
  31. func (svr *Server) wrkLoop() {
  32. for {
  33. select {
  34. case req, ok := <-svr.ch:
  35. if ok {
  36. svr.handleRequest(req)
  37. }
  38. case <-svr.exitChan:
  39. return
  40. }
  41. }
  42. }
  43. func (svr *Server) handleRequest(req *Request) {
  44. var (
  45. ok bool
  46. err error
  47. cb HandleFunc
  48. val interface{}
  49. ctx *Context
  50. )
  51. if val, ok = svr.serviceMap.Load(req.Method); ok {
  52. cb = val.(HandleFunc)
  53. ctx = svr.getContext()
  54. ctx.Reset(req, NewResponse())
  55. if err = cb(ctx); err == nil {
  56. if err = writeFrame(req.conn, &Frame{
  57. Func: FuncResponse,
  58. Sequence: req.Sequence,
  59. Data: ctx.Response().Bytes(),
  60. }); err != nil {
  61. log.Warnf("RPC: write request(%s@%d) response error: %s", req.Method, req.Sequence, err.Error())
  62. }
  63. } else {
  64. log.Warnf("RPC: handle request(%s@%d) error: %s", req.Method, req.Sequence, err.Error())
  65. resp := NewResponse()
  66. resp.code = http.StatusServiceUnavailable
  67. resp.message = http.StatusText(http.StatusServiceUnavailable)
  68. ctx.Reset(req, resp)
  69. if err = writeFrame(req.conn, &Frame{
  70. Func: FuncResponse,
  71. Sequence: req.Sequence,
  72. Data: ctx.Response().Bytes(),
  73. }); err != nil {
  74. log.Warnf("RPC: write request(%s@%d) response error: %s", req.Method, req.Sequence, err.Error())
  75. }
  76. }
  77. } else {
  78. ctx = svr.getContext()
  79. resp := NewResponse()
  80. resp.code = http.StatusNotFound
  81. resp.message = http.StatusText(http.StatusNotFound)
  82. ctx.Reset(req, resp)
  83. if err = writeFrame(req.conn, &Frame{
  84. Func: FuncResponse,
  85. Sequence: req.Sequence,
  86. Data: ctx.Response().Bytes(),
  87. }); err != nil {
  88. log.Warnf("RPC: write request(%s@%d) response failed cause by: %s", req.Method, req.Sequence, err.Error())
  89. }
  90. }
  91. }
  92. func (svr *Server) process(conn net.Conn) {
  93. var (
  94. err error
  95. frame *Frame
  96. )
  97. svr.sessions.Store(conn.LocalAddr().String(), conn)
  98. defer func() {
  99. svr.sessions.Delete(conn.LocalAddr().String())
  100. _ = conn.Close()
  101. }()
  102. for {
  103. if frame, err = readFrame(conn); err != nil {
  104. return
  105. }
  106. switch frame.Func {
  107. case FuncPing:
  108. if err = writeFrame(conn, &Frame{Func: FuncPing}); err != nil {
  109. return
  110. }
  111. case FuncRequest:
  112. if req, err2 := ReadRequest(frame.Data); err2 == nil {
  113. req.reset(frame.Sequence, conn)
  114. select {
  115. case svr.ch <- req:
  116. default:
  117. }
  118. }
  119. }
  120. }
  121. }
  122. func (svr *Server) Handle(method string, f HandleFunc) {
  123. svr.serviceMap.Store(method, f)
  124. return
  125. }
  126. func (svr *Server) Serve(l net.Listener) (err error) {
  127. svr.listener = l
  128. go func() {
  129. svr.wrkLoop()
  130. }()
  131. for {
  132. if conn, err2 := svr.listener.Accept(); err2 == nil {
  133. go svr.process(conn)
  134. } else {
  135. err = err2
  136. break
  137. }
  138. }
  139. return
  140. }
  141. func (svr *Server) Close() (err error) {
  142. if atomic.CompareAndSwapInt32(&svr.exitFlag, 0, 1) {
  143. //clear sessions
  144. svr.sessions.Range(func(key, value interface{}) bool {
  145. c := value.(net.Conn)
  146. _ = c.Close()
  147. return true
  148. })
  149. err = svr.listener.Close()
  150. close(svr.exitChan)
  151. }
  152. return
  153. }
  154. func New(ctx context.Context) *Server {
  155. return &Server{
  156. ctx: ctx,
  157. ch: make(chan *Request, 10),
  158. exitChan: make(chan struct{}),
  159. }
  160. }