package gateway import ( "bytes" "context" "errors" "io" "net" "sync" "time" ) const ( MinFeatureLength = 3 ) var ( ErrShortFeature = errors.New("short feature") ErrFeatureExists = errors.New("feature already exists") ) type ( listener struct { feature []byte l *Listener } Gateway struct { listeners []*listener l net.Listener ch chan net.Conn } ) func (g *Gateway) Attach(feature []byte, l *Listener) (err error) { //特征量不够大 if len(feature) < MinFeatureLength { return ErrShortFeature } //判断重复 for _, v := range g.listeners { if bytes.Equal(v.feature, feature) { return ErrFeatureExists } } g.listeners = append(g.listeners, &listener{ feature: feature, l: l, }) return } func (g *Gateway) Attaches(features [][]byte, l *Listener) (err error) { for _, b := range features { if err = g.Attach(b, l); err != nil { break } } return } func (g *Gateway) Detach(feature []byte) (err error) { for i, l := range g.listeners { if bytes.Equal(l.feature, feature) { g.listeners = append(g.listeners[:i], g.listeners[i+1:]...) break } } return } func (g *Gateway) loop(ctx context.Context) { var ( n int ok bool err error conn net.Conn feature = make([]byte, MinFeatureLength) ) for { select { case conn, ok = <-g.ch: if ok { //set deadline _ = conn.SetReadDeadline(time.Now().Add(time.Millisecond * 800)) if n, err = io.ReadFull(conn, feature); err != nil { _ = conn.Close() break } //reset deadline _ = conn.SetReadDeadline(time.Time{}) for _, l := range g.listeners { if bytes.Equal(feature[:n], l.feature[:n]) { l.l.Receive(wrapConn(conn, feature[:n])) break } } } case <-ctx.Done(): return } } } func (g *Gateway) schedule(ctx context.Context) { for { conn, err := g.l.Accept() if err != nil { return } select { case g.ch <- conn: case <-ctx.Done(): return } } } //运行项目 func (g *Gateway) Run(ctx context.Context) { var wg sync.WaitGroup wg.Add(2) go func() { g.loop(ctx) wg.Done() }() go func() { g.schedule(ctx) wg.Done() }() wg.Wait() } func New(l net.Listener) *Gateway { return &Gateway{ l: l, ch: make(chan net.Conn, 10), listeners: make([]*listener, 0), } }