package gateway import ( "bytes" "context" "errors" "io" "net" "sync" "time" "git.nspix.com/golang/micro/stats" ) const ( MinFeatureLength = 3 ) var ( ErrShortFeature = errors.New("short feature") ErrFeatureExists = errors.New("feature already exists") requestCounter = stats.NewCounter("GatewayRequestCount", "Number of gateway request") requestDropCounter = stats.NewCountersWithSingleLabel("GatewayRequestDropedCount", "Number of dropped by unusual request", "Reason") ) type ( listener struct { feature []byte l *Listener } Gateway struct { listeners []*listener l net.Listener ch chan net.Conn } ) func (g *Gateway) Attach(feature []byte, l *Listener) (err error) { //特征量不够大 if len(feature) < MinFeatureLength { return ErrShortFeature } //判断重复 for _, v := range g.listeners { if bytes.Equal(v.feature, feature) { return ErrFeatureExists } } g.listeners = append(g.listeners, &listener{ feature: feature, l: l, }) return } func (g *Gateway) Attaches(features [][]byte, l *Listener) (err error) { for _, b := range features { if err = g.Attach(b, l); err != nil { break } } return } func (g *Gateway) Detach(feature []byte) (err error) { for i, l := range g.listeners { if bytes.Equal(l.feature, feature) { g.listeners = append(g.listeners[:i], g.listeners[i+1:]...) break } } return } func (g *Gateway) process(conn net.Conn) { var ( n int err error missing uint8 feature = make([]byte, MinFeatureLength) ) requestCounter.Add(1) //set deadline if err = conn.SetReadDeadline(time.Now().Add(time.Millisecond * 800)); err != nil { requestDropCounter.Add("ERROR", 1) return } if n, err = io.ReadFull(conn, feature); err != nil { requestDropCounter.Add("EOF", 1) _ = conn.Close() return } //reset deadline if err = conn.SetReadDeadline(time.Time{}); err != nil { requestDropCounter.Add("ERROR", 1) return } missing = 1 for _, l := range g.listeners { if bytes.Equal(feature[:n], l.feature[:n]) { l.l.Receive(wrapConn(conn, feature[:n])) missing = 0 break } } if missing == 1 { requestDropCounter.Add("MISSING", 1) } } func (g *Gateway) wroker(ctx context.Context) { var ( ok bool conn net.Conn ) for { select { case conn, ok = <-g.ch: if ok { g.process(conn) } case <-ctx.Done(): return } } } func (g *Gateway) schedule(ctx context.Context) { for { conn, err := g.l.Accept() if err != nil { return } select { case g.ch <- conn: case <-ctx.Done(): return } } } //运行项目 func (g *Gateway) Run(ctx context.Context) { var wg sync.WaitGroup wg.Add(2) go func() { g.wroker(ctx) wg.Done() }() go func() { g.schedule(ctx) wg.Done() }() wg.Wait() } func New(l net.Listener) *Gateway { return &Gateway{ l: l, ch: make(chan net.Conn, 10), listeners: make([]*listener, 0), } }