gateway.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package gateway
  2. import (
  3. "bytes"
  4. "context"
  5. "errors"
  6. "io"
  7. "net"
  8. "sync"
  9. "time"
  10. "git.nspix.com/golang/micro/stats"
  11. )
  12. const (
  13. MinFeatureLength = 3
  14. )
  15. var (
  16. ErrShortFeature = errors.New("short feature")
  17. ErrFeatureExists = errors.New("feature already exists")
  18. requestCounter = stats.NewCounter("GatewayRequestCount", "Number of gateway request")
  19. requestDropCounter = stats.NewCountersWithSingleLabel("GatewayRequestDropedCount", "Number of dropped by unusual request", "Reason")
  20. )
  21. type (
  22. listener struct {
  23. feature []byte
  24. l *Listener
  25. }
  26. Gateway struct {
  27. listeners []*listener
  28. l net.Listener
  29. ch chan net.Conn
  30. }
  31. )
  32. func (g *Gateway) Attach(feature []byte, l *Listener) (err error) {
  33. //特征量不够大
  34. if len(feature) < MinFeatureLength {
  35. return ErrShortFeature
  36. }
  37. //判断重复
  38. for _, v := range g.listeners {
  39. if bytes.Equal(v.feature, feature) {
  40. return ErrFeatureExists
  41. }
  42. }
  43. g.listeners = append(g.listeners, &listener{
  44. feature: feature,
  45. l: l,
  46. })
  47. return
  48. }
  49. func (g *Gateway) Attaches(features [][]byte, l *Listener) (err error) {
  50. for _, b := range features {
  51. if err = g.Attach(b, l); err != nil {
  52. break
  53. }
  54. }
  55. return
  56. }
  57. func (g *Gateway) Detach(feature []byte) (err error) {
  58. for i, l := range g.listeners {
  59. if bytes.Equal(l.feature, feature) {
  60. g.listeners = append(g.listeners[:i], g.listeners[i+1:]...)
  61. break
  62. }
  63. }
  64. return
  65. }
  66. func (g *Gateway) process(conn net.Conn) {
  67. var (
  68. n int
  69. err error
  70. missing uint8
  71. feature = make([]byte, MinFeatureLength)
  72. )
  73. requestCounter.Add(1)
  74. //set deadline
  75. if err = conn.SetReadDeadline(time.Now().Add(time.Millisecond * 800)); err != nil {
  76. requestDropCounter.Add("ERROR", 1)
  77. return
  78. }
  79. if n, err = io.ReadFull(conn, feature); err != nil {
  80. requestDropCounter.Add("EOF", 1)
  81. _ = conn.Close()
  82. return
  83. }
  84. //reset deadline
  85. if err = conn.SetReadDeadline(time.Time{}); err != nil {
  86. requestDropCounter.Add("ERROR", 1)
  87. return
  88. }
  89. missing = 1
  90. for _, l := range g.listeners {
  91. if bytes.Equal(feature[:n], l.feature[:n]) {
  92. l.l.Receive(wrapConn(conn, feature[:n]))
  93. missing = 0
  94. break
  95. }
  96. }
  97. if missing == 1 {
  98. requestDropCounter.Add("MISSING", 1)
  99. }
  100. }
  101. func (g *Gateway) wroker(ctx context.Context) {
  102. var (
  103. ok bool
  104. conn net.Conn
  105. )
  106. for {
  107. select {
  108. case conn, ok = <-g.ch:
  109. if ok {
  110. g.process(conn)
  111. }
  112. case <-ctx.Done():
  113. return
  114. }
  115. }
  116. }
  117. func (g *Gateway) schedule(ctx context.Context) {
  118. for {
  119. conn, err := g.l.Accept()
  120. if err != nil {
  121. return
  122. }
  123. select {
  124. case g.ch <- conn:
  125. case <-ctx.Done():
  126. return
  127. }
  128. }
  129. }
  130. //运行项目
  131. func (g *Gateway) Run(ctx context.Context) {
  132. var wg sync.WaitGroup
  133. wg.Add(2)
  134. go func() {
  135. g.wroker(ctx)
  136. wg.Done()
  137. }()
  138. go func() {
  139. g.schedule(ctx)
  140. wg.Done()
  141. }()
  142. wg.Wait()
  143. }
  144. func New(l net.Listener) *Gateway {
  145. return &Gateway{
  146. l: l,
  147. ch: make(chan net.Conn, 10),
  148. listeners: make([]*listener, 0),
  149. }
  150. }