server.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package rpc
  2. import (
  3. "git.nspix.com/golang/micro/log"
  4. "net"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. type HandleFunc func(ctx *Context) error
  9. type Server struct {
  10. listener net.Listener
  11. ch chan *Request
  12. ctxPool sync.Pool
  13. serviceMap sync.Map // map[string]HandleFunc
  14. exitFlag int32
  15. exitChan chan struct{}
  16. }
  17. func (svr *Server) getContext() *Context {
  18. if v := svr.ctxPool.Get(); v != nil {
  19. return v.(*Context)
  20. } else {
  21. return &Context{}
  22. }
  23. }
  24. func (svr *Server) putContext(c *Context) {
  25. svr.ctxPool.Put(c)
  26. }
  27. func (svr *Server) wrkLoop() {
  28. for {
  29. select {
  30. case req, ok := <-svr.ch:
  31. if ok {
  32. svr.handleRequest(req)
  33. }
  34. case <-svr.exitChan:
  35. return
  36. }
  37. }
  38. }
  39. func (svr *Server) handleRequest(req *Request) {
  40. var (
  41. ok bool
  42. err error
  43. cb HandleFunc
  44. val interface{}
  45. ctx *Context
  46. )
  47. if val, ok = svr.serviceMap.Load(req.Method); ok {
  48. cb = val.(HandleFunc)
  49. ctx = svr.getContext()
  50. ctx.Reset(req, NewResponse())
  51. if err = cb(ctx); err == nil {
  52. if err = writeFrame(req.conn, &Frame{
  53. Func: FuncResponse,
  54. Sequence: req.Sequence,
  55. Data: ctx.Response().Bytes(),
  56. }); err != nil {
  57. log.Warnf("RPC: write sequence(%d) response failed cause by: %s", req.Sequence, err.Error())
  58. }
  59. }
  60. } else {
  61. ctx = svr.getContext()
  62. resp := NewResponse()
  63. resp.code = 404
  64. resp.message = "not found"
  65. ctx.Reset(req, resp)
  66. if err = writeFrame(req.conn, &Frame{
  67. Func: FuncResponse,
  68. Sequence: req.Sequence,
  69. Data: ctx.Response().Bytes(),
  70. }); err != nil {
  71. log.Warnf("RPC: write sequence(%d) response failed cause by: %s", req.Sequence, err.Error())
  72. }
  73. }
  74. }
  75. func (svr *Server) process(conn net.Conn) {
  76. var (
  77. err error
  78. frame *Frame
  79. )
  80. defer func() {
  81. _ = conn.Close()
  82. }()
  83. for {
  84. if frame, err = readFrame(conn); err != nil {
  85. return
  86. }
  87. switch frame.Func {
  88. case FuncPing:
  89. if err = writeFrame(conn, &Frame{Func: FuncPing}); err != nil {
  90. log.Warnf("RPC: write ping frame error: %s", err.Error())
  91. }
  92. case FuncRequest:
  93. //读取一个请求
  94. if req, err2 := ReadRequest(frame.Data); err2 == nil {
  95. req.reset(frame.Sequence, conn)
  96. select {
  97. case svr.ch <- req:
  98. default:
  99. }
  100. } else {
  101. log.Warnf("RPC: read request error: %s", err2.Error())
  102. }
  103. }
  104. }
  105. }
  106. func (svr *Server) Handle(method string, f HandleFunc) {
  107. svr.serviceMap.Store(method, f)
  108. return
  109. }
  110. func (svr *Server) Serve(l net.Listener) (err error) {
  111. svr.listener = l
  112. go func() {
  113. svr.wrkLoop()
  114. }()
  115. for {
  116. if conn, err2 := svr.listener.Accept(); err2 == nil {
  117. go svr.process(conn)
  118. } else {
  119. err = err2
  120. break
  121. }
  122. }
  123. return
  124. }
  125. func (svr *Server) Close() (err error) {
  126. if atomic.CompareAndSwapInt32(&svr.exitFlag, 0, 1) {
  127. err = svr.listener.Close()
  128. close(svr.exitChan)
  129. }
  130. return
  131. }
  132. func NewServer() *Server {
  133. return &Server{
  134. ch: make(chan *Request, 10),
  135. exitChan: make(chan struct{}),
  136. }
  137. }