server.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package rpc
  2. import (
  3. "context"
  4. "git.nspix.com/golang/micro/gateway/state"
  5. "net"
  6. "net/http"
  7. "sync"
  8. "sync/atomic"
  9. "git.nspix.com/golang/micro/log"
  10. )
  11. type HandleFunc func(ctx *Context) error
  12. type Server struct {
  13. ctx context.Context
  14. listener net.Listener
  15. ch chan *Request
  16. ctxPool sync.Pool
  17. state *state.State
  18. serviceMap sync.Map // map[string]HandleFunc
  19. sessions sync.Map
  20. exitFlag int32
  21. exitChan chan struct{}
  22. }
  23. func (svr *Server) getContext() *Context {
  24. if v := svr.ctxPool.Get(); v != nil {
  25. return v.(*Context)
  26. } else {
  27. return &Context{}
  28. }
  29. }
  30. func (svr *Server) putContext(c *Context) {
  31. svr.ctxPool.Put(c)
  32. }
  33. func (svr *Server) wrkLoop() {
  34. for {
  35. select {
  36. case req, ok := <-svr.ch:
  37. if ok {
  38. svr.handleRequest(req)
  39. }
  40. case <-svr.exitChan:
  41. return
  42. }
  43. }
  44. }
  45. func (svr *Server) handleRequest(req *Request) {
  46. var (
  47. ok bool
  48. err error
  49. cb HandleFunc
  50. val interface{}
  51. ctx *Context
  52. )
  53. atomic.AddInt64(&svr.state.Counter.Request, 1)
  54. if val, ok = svr.serviceMap.Load(req.Method); ok {
  55. cb = val.(HandleFunc)
  56. ctx = svr.getContext()
  57. ctx.Reset(req, NewResponse())
  58. if err = cb(ctx); err == nil {
  59. if err = writeFrame(req.conn, &Frame{
  60. Func: FuncResponse,
  61. Sequence: req.Sequence,
  62. Data: ctx.Response().Bytes(),
  63. }); err != nil {
  64. log.Warnf("RPC: write request(%s@%d) response error: %s", req.Method, req.Sequence, err.Error())
  65. }
  66. } else {
  67. log.Warnf("RPC: handle request(%s@%d) error: %s", req.Method, req.Sequence, err.Error())
  68. resp := NewResponse()
  69. resp.code = http.StatusServiceUnavailable
  70. resp.message = http.StatusText(http.StatusServiceUnavailable)
  71. ctx.Reset(req, resp)
  72. if err = writeFrame(req.conn, &Frame{
  73. Func: FuncResponse,
  74. Sequence: req.Sequence,
  75. Data: ctx.Response().Bytes(),
  76. }); err != nil {
  77. log.Warnf("RPC: write request(%s@%d) response error: %s", req.Method, req.Sequence, err.Error())
  78. }
  79. }
  80. } else {
  81. ctx = svr.getContext()
  82. resp := NewResponse()
  83. resp.code = http.StatusNotFound
  84. resp.message = http.StatusText(http.StatusNotFound)
  85. ctx.Reset(req, resp)
  86. if err = writeFrame(req.conn, &Frame{
  87. Func: FuncResponse,
  88. Sequence: req.Sequence,
  89. Data: ctx.Response().Bytes(),
  90. }); err != nil {
  91. log.Warnf("RPC: write request(%s@%d) response failed cause by: %s", req.Method, req.Sequence, err.Error())
  92. }
  93. }
  94. }
  95. func (svr *Server) process(conn net.Conn) {
  96. var (
  97. err error
  98. frame *Frame
  99. )
  100. svr.sessions.Store(conn.LocalAddr().String(), conn)
  101. defer func() {
  102. svr.sessions.Delete(conn.LocalAddr().String())
  103. _ = conn.Close()
  104. }()
  105. for {
  106. if frame, err = readFrame(conn); err != nil {
  107. return
  108. }
  109. switch frame.Func {
  110. case FuncPing:
  111. if err = writeFrame(conn, &Frame{Func: FuncPing}); err != nil {
  112. return
  113. }
  114. case FuncRequest:
  115. if req, err2 := ReadRequest(frame.Data); err2 == nil {
  116. req.reset(frame.Sequence, conn)
  117. select {
  118. case svr.ch <- req:
  119. default:
  120. }
  121. }
  122. }
  123. }
  124. }
  125. func (svr *Server) Handle(method string, f HandleFunc) {
  126. svr.serviceMap.Store(method, f)
  127. return
  128. }
  129. func (svr *Server) Serve(l net.Listener) (err error) {
  130. svr.listener = l
  131. go func() {
  132. svr.wrkLoop()
  133. }()
  134. for {
  135. if conn, err2 := svr.listener.Accept(); err2 == nil {
  136. go svr.process(conn)
  137. } else {
  138. err = err2
  139. break
  140. }
  141. }
  142. return
  143. }
  144. func (svr *Server) Close() (err error) {
  145. if atomic.CompareAndSwapInt32(&svr.exitFlag, 0, 1) {
  146. //clear sessions
  147. svr.sessions.Range(func(key, value interface{}) bool {
  148. c := value.(net.Conn)
  149. _ = c.Close()
  150. return true
  151. })
  152. err = svr.listener.Close()
  153. close(svr.exitChan)
  154. }
  155. return
  156. }
  157. func New(ctx context.Context) *Server {
  158. return &Server{
  159. ctx: ctx,
  160. state: &state.State{},
  161. ch: make(chan *Request, 10),
  162. exitChan: make(chan struct{}),
  163. }
  164. }