server.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package rpc
  2. import (
  3. "net"
  4. "sync"
  5. "sync/atomic"
  6. "git.nspix.com/golang/micro/log"
  7. )
  8. type HandleFunc func(ctx *Context) error
  9. type Server struct {
  10. listener net.Listener
  11. ch chan *Request
  12. ctxPool sync.Pool
  13. serviceMap sync.Map // map[string]HandleFunc
  14. sessions sync.Map
  15. exitFlag int32
  16. exitChan chan struct{}
  17. }
  18. func (svr *Server) getContext() *Context {
  19. if v := svr.ctxPool.Get(); v != nil {
  20. return v.(*Context)
  21. } else {
  22. return &Context{}
  23. }
  24. }
  25. func (svr *Server) putContext(c *Context) {
  26. svr.ctxPool.Put(c)
  27. }
  28. func (svr *Server) wrkLoop() {
  29. for {
  30. select {
  31. case req, ok := <-svr.ch:
  32. if ok {
  33. svr.handleRequest(req)
  34. }
  35. case <-svr.exitChan:
  36. return
  37. }
  38. }
  39. }
  40. func (svr *Server) handleRequest(req *Request) {
  41. var (
  42. ok bool
  43. err error
  44. cb HandleFunc
  45. val interface{}
  46. ctx *Context
  47. )
  48. if val, ok = svr.serviceMap.Load(req.Method); ok {
  49. cb = val.(HandleFunc)
  50. ctx = svr.getContext()
  51. ctx.Reset(req, NewResponse())
  52. if err = cb(ctx); err == nil {
  53. if err = writeFrame(req.conn, &Frame{
  54. Func: FuncResponse,
  55. Sequence: req.Sequence,
  56. Data: ctx.Response().Bytes(),
  57. }); err != nil {
  58. log.Warnf("RPC: write sequence(%d) response failed cause by: %s", req.Sequence, err.Error())
  59. }
  60. }
  61. } else {
  62. ctx = svr.getContext()
  63. resp := NewResponse()
  64. resp.code = 404
  65. resp.message = "not found"
  66. ctx.Reset(req, resp)
  67. if err = writeFrame(req.conn, &Frame{
  68. Func: FuncResponse,
  69. Sequence: req.Sequence,
  70. Data: ctx.Response().Bytes(),
  71. }); err != nil {
  72. log.Warnf("RPC: write sequence(%d) response failed cause by: %s", req.Sequence, err.Error())
  73. }
  74. }
  75. }
  76. func (svr *Server) process(conn net.Conn) {
  77. var (
  78. err error
  79. frame *Frame
  80. )
  81. log.Infof("RPC: connection %s connecting", conn.RemoteAddr())
  82. svr.sessions.Store(conn.LocalAddr().String(), conn)
  83. defer func() {
  84. log.Infof("RPC: connection %s closed", conn.RemoteAddr())
  85. svr.sessions.Delete(conn.LocalAddr().String())
  86. _ = conn.Close()
  87. }()
  88. for {
  89. if frame, err = readFrame(conn); err != nil {
  90. return
  91. }
  92. switch frame.Func {
  93. case FuncPing:
  94. if err = writeFrame(conn, &Frame{Func: FuncPing}); err != nil {
  95. log.Warnf("RPC: write ping frame error: %s", err.Error())
  96. }
  97. case FuncRequest:
  98. //read request
  99. if req, err2 := ReadRequest(frame.Data); err2 == nil {
  100. req.reset(frame.Sequence, conn)
  101. select {
  102. case svr.ch <- req:
  103. default:
  104. }
  105. } else {
  106. log.Warnf("RPC: read request error: %s", err2.Error())
  107. }
  108. }
  109. }
  110. }
  111. func (svr *Server) Handle(method string, f HandleFunc) {
  112. svr.serviceMap.Store(method, f)
  113. return
  114. }
  115. func (svr *Server) Serve(l net.Listener) (err error) {
  116. svr.listener = l
  117. go func() {
  118. svr.wrkLoop()
  119. }()
  120. for {
  121. if conn, err2 := svr.listener.Accept(); err2 == nil {
  122. go svr.process(conn)
  123. } else {
  124. err = err2
  125. break
  126. }
  127. }
  128. return
  129. }
  130. func (svr *Server) Close() (err error) {
  131. if atomic.CompareAndSwapInt32(&svr.exitFlag, 0, 1) {
  132. //clear sessions
  133. svr.sessions.Range(func(key, value interface{}) bool {
  134. c := value.(net.Conn)
  135. c.Close()
  136. return true
  137. })
  138. err = svr.listener.Close()
  139. close(svr.exitChan)
  140. }
  141. return
  142. }
  143. func NewServer() *Server {
  144. return &Server{
  145. ch: make(chan *Request, 10),
  146. exitChan: make(chan struct{}),
  147. }
  148. }