123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- package gateway
- import (
- "bytes"
- "context"
- "errors"
- "io"
- "net"
- "sync"
- )
- const (
- MinFeatureLength = 3
- )
- var (
- ErrShortFeature = errors.New("short feature")
- ErrFeatureExists = errors.New("feature already exists")
- )
- type (
- listener struct {
- feature []byte
- l *Listener
- }
- Gateway struct {
- listeners []*listener
- l net.Listener
- ch chan net.Conn
- }
- )
- func (g *Gateway) Attach(feature []byte, l *Listener) (err error) {
- //特征量不够大
- if len(feature) < MinFeatureLength {
- return ErrShortFeature
- }
- //判断重复
- for _, v := range g.listeners {
- if bytes.Equal(v.feature, feature) {
- return ErrFeatureExists
- }
- }
- g.listeners = append(g.listeners, &listener{
- feature: feature,
- l: l,
- })
- return
- }
- func (g *Gateway) Attaches(features [][]byte, l *Listener) (err error) {
- for _, b := range features {
- if err = g.Attach(b, l); err != nil {
- break
- }
- }
- return
- }
- func (g *Gateway) Detach(feature []byte) (err error) {
- for i, l := range g.listeners {
- if bytes.Equal(l.feature, feature) {
- g.listeners = append(g.listeners[:i], g.listeners[i+1:]...)
- break
- }
- }
- return
- }
- func (g *Gateway) loop(ctx context.Context) {
- var (
- n int
- ok bool
- err error
- conn net.Conn
- feature = make([]byte, MinFeatureLength)
- )
- for {
- select {
- case conn, ok = <-g.ch:
- if ok {
- if n, err = io.ReadFull(conn, feature); err != nil {
- continue
- }
- for _, l := range g.listeners {
- if bytes.Equal(feature[:n], l.feature[:n]) {
- l.l.Receive(wrapConn(conn, feature[:n]))
- break
- }
- }
- }
- case <-ctx.Done():
- return
- }
- }
- }
- func (g *Gateway) schedule(ctx context.Context) {
- for {
- conn, err := g.l.Accept()
- if err != nil {
- return
- }
- select {
- case g.ch <- conn:
- case <-ctx.Done():
- return
- }
- }
- }
- //运行项目
- func (g *Gateway) Run(ctx context.Context) {
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- g.loop(ctx)
- wg.Done()
- }()
- go func() {
- g.schedule(ctx)
- wg.Done()
- }()
- wg.Wait()
- }
- func New(l net.Listener) *Gateway {
- return &Gateway{
- l: l,
- ch: make(chan net.Conn, 10),
- listeners: make([]*listener, 0),
- }
- }
|