package gateway import ( "bytes" "context" "errors" "io" "net" "sync" "sync/atomic" "time" ) const ( MinFeatureLength = 3 ) var ( ErrShortFeature = errors.New("short feature") ErrFeatureExists = errors.New("feature already exists") ) type ( listener struct { feature []byte l *Listener } Gateway struct { listeners []*listener l net.Listener ch chan net.Conn state *State } ) func (g *Gateway) Attach(feature []byte, l *Listener) (err error) { //特征量不够大 if len(feature) < MinFeatureLength { return ErrShortFeature } //判断重复 for _, v := range g.listeners { if bytes.Equal(v.feature, feature) { return ErrFeatureExists } } g.listeners = append(g.listeners, &listener{ feature: feature, l: l, }) return } func (g *Gateway) Attaches(features [][]byte, l *Listener) (err error) { for _, b := range features { if err = g.Attach(b, l); err != nil { break } } return } func (g *Gateway) Detach(feature []byte) (err error) { for i, l := range g.listeners { if bytes.Equal(l.feature, feature) { g.listeners = append(g.listeners[:i], g.listeners[i+1:]...) break } } return } func (g *Gateway) process(conn net.Conn) { var ( n int err error feature = make([]byte, MinFeatureLength) ) atomic.AddInt32(&g.state.NumOfRequest, 1) //set deadline if err = conn.SetReadDeadline(time.Now().Add(time.Second)); err != nil { return } if n, err = io.ReadFull(conn, feature); err != nil { _ = conn.Close() return } //reset deadline if err = conn.SetReadDeadline(time.Time{}); err != nil { return } for _, l := range g.listeners { if bytes.Compare(feature[:n], l.feature[:n]) == 0 { l.l.Receive(wrapConn(conn, g.state, feature[:n])) break } } } func (g *Gateway) worker(ctx context.Context) { var ( ok bool conn net.Conn ) for { select { case conn, ok = <-g.ch: if ok { g.process(conn) } case <-ctx.Done(): return } } } func (g *Gateway) schedule(ctx context.Context) { for { conn, err := g.l.Accept() if err != nil { return } select { case g.ch <- conn: case <-ctx.Done(): return } } } func (g *Gateway) State() *State { return g.state } //Run 运行项目 func (g *Gateway) Run(ctx context.Context) { var wg sync.WaitGroup wg.Add(2) go func() { g.worker(ctx) wg.Done() }() go func() { g.schedule(ctx) wg.Done() }() wg.Wait() for _, l := range g.listeners { _ = l.l.Close() } } func New(l net.Listener) *Gateway { return &Gateway{ l: l, state: &State{}, ch: make(chan net.Conn, 10), listeners: make([]*listener, 0), } }