gateway.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package gateway
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "io"
  7. "net"
  8. "sync"
  9. "time"
  10. "git.nspix.com/golang/micro/stats"
  11. )
  12. const (
  13. MinFeatureLength = 3
  14. )
  15. var (
  16. ErrShortFeature = errors.New("short feature")
  17. ErrFeatureExists = errors.New("feature already exists")
  18. requestCounter = stats.NewCounter("GatewayRequestCount", "Number of gateway request")
  19. requestDropCounter = stats.NewCountersWithSingleLabel("GatewayRequestDropedCount", "Number of dropped by unusual request", "Reason")
  20. )
  21. type (
  22. listener struct {
  23. feature []byte
  24. l *Listener
  25. }
  26. Gateway struct {
  27. listeners []*listener
  28. l net.Listener
  29. ch chan net.Conn
  30. enableStats bool
  31. }
  32. )
  33. func (g *Gateway) Attach(feature []byte, l *Listener) (err error) {
  34. //特征量不够大
  35. if len(feature) < MinFeatureLength {
  36. return ErrShortFeature
  37. }
  38. //判断重复
  39. for _, v := range g.listeners {
  40. if bytes.Equal(v.feature, feature) {
  41. return ErrFeatureExists
  42. }
  43. }
  44. g.listeners = append(g.listeners, &listener{
  45. feature: feature,
  46. l: l,
  47. })
  48. return
  49. }
  50. func (g *Gateway) Attaches(features [][]byte, l *Listener) (err error) {
  51. for _, b := range features {
  52. if err = g.Attach(b, l); err != nil {
  53. break
  54. }
  55. }
  56. return
  57. }
  58. func (g *Gateway) Detach(feature []byte) (err error) {
  59. for i, l := range g.listeners {
  60. if bytes.Equal(l.feature, feature) {
  61. g.listeners = append(g.listeners[:i], g.listeners[i+1:]...)
  62. break
  63. }
  64. }
  65. return
  66. }
  67. func (g *Gateway) process(conn net.Conn) {
  68. var (
  69. n int
  70. err error
  71. missing uint8
  72. feature = make([]byte, MinFeatureLength)
  73. )
  74. if g.enableStats {
  75. requestCounter.Add(1)
  76. }
  77. //set deadline
  78. if err = conn.SetReadDeadline(time.Now().Add(time.Second)); err != nil {
  79. if g.enableStats {
  80. requestDropCounter.Add("ERROR", 1)
  81. }
  82. return
  83. }
  84. if n, err = io.ReadFull(conn, feature); err != nil {
  85. if g.enableStats {
  86. requestDropCounter.Add("EOF", 1)
  87. }
  88. _ = conn.Close()
  89. return
  90. }
  91. //reset deadline
  92. if err = conn.SetReadDeadline(time.Time{}); err != nil {
  93. if g.enableStats {
  94. requestDropCounter.Add("ERROR", 1)
  95. }
  96. return
  97. }
  98. missing = 1
  99. for _, l := range g.listeners {
  100. if bytes.Equal(feature[:n], l.feature[:n]) {
  101. l.l.Receive(wrapConn(conn, feature[:n]))
  102. missing = 0
  103. break
  104. }
  105. }
  106. if missing == 1 {
  107. if g.enableStats {
  108. requestDropCounter.Add("MISSING", 1)
  109. }
  110. }
  111. }
  112. func (g *Gateway) worker(ctx context.Context) {
  113. var (
  114. ok bool
  115. conn net.Conn
  116. )
  117. for {
  118. select {
  119. case conn, ok = <-g.ch:
  120. if ok {
  121. g.process(conn)
  122. }
  123. case <-ctx.Done():
  124. return
  125. }
  126. }
  127. }
  128. func (g *Gateway) schedule(ctx context.Context) {
  129. for {
  130. conn, err := g.l.Accept()
  131. if err != nil {
  132. return
  133. }
  134. select {
  135. case g.ch <- conn:
  136. case <-ctx.Done():
  137. return
  138. }
  139. }
  140. }
  141. //Run 运行项目
  142. func (g *Gateway) Run(ctx context.Context) {
  143. var wg sync.WaitGroup
  144. wg.Add(2)
  145. go func() {
  146. g.worker(ctx)
  147. wg.Done()
  148. }()
  149. go func() {
  150. g.schedule(ctx)
  151. wg.Done()
  152. }()
  153. wg.Wait()
  154. for _, l := range g.listeners {
  155. _ = l.l.Close()
  156. }
  157. }
  158. func New(l net.Listener, stats bool) *Gateway {
  159. return &Gateway{
  160. l: l,
  161. enableStats: stats,
  162. ch: make(chan net.Conn, 10),
  163. listeners: make([]*listener, 0),
  164. }
  165. }