Sfoglia il codice sorgente

修复获取ID错误

lxg 3 anni fa
parent
commit
12f5ddb50a
5 ha cambiato i file con 87 aggiunte e 39 eliminazioni
  1. 1 0
      broker/bus.go
  2. 13 9
      broker/global.go
  3. 13 8
      broker/inprocess.go
  4. 36 22
      helper/docker/id.go
  5. 24 0
      helper/docker/id_test.go

+ 1 - 0
broker/bus.go

@@ -5,6 +5,7 @@ import (
 )
 
 type Broker interface {
+	WithContext(ctx context.Context)
 	Publish(e *Event)
 	Dispatch(e *Event) (err error)
 	DispatchCtx(ctx context.Context, e *Event) (err error)

+ 13 - 9
broker/global.go

@@ -3,37 +3,41 @@ package broker
 import "context"
 
 var (
-	Default Broker
+	std Broker
 )
 
 func init() {
-	Default = NewInPrcBus(context.Background())
+	std = NewInPrcBus(context.Background())
 }
 
 func SetGlobal(bus Broker) {
-	Default = bus
+	std = bus
 }
 
 func GetGlobal() Broker {
-	return Default
+	return std
+}
+
+func WithContext(ctx context.Context) {
+	std.WithContext(ctx)
 }
 
 func Publish(e *Event) {
-	Default.Publish(e)
+	std.Publish(e)
 }
 
 func Dispatch(e *Event) (err error) {
-	return Default.Dispatch(e)
+	return std.Dispatch(e)
 }
 
 func DispatchCtx(ctx context.Context, e *Event) (err error) {
-	return Default.DispatchCtx(ctx, e)
+	return std.DispatchCtx(ctx, e)
 }
 
 func Subscribe(sub Subscriber) (err error) {
-	return Default.Subscribe(sub)
+	return std.Subscribe(sub)
 }
 
 func UnSubscribe(sub Subscriber) (err error) {
-	return Default.UnSubscribe(sub)
+	return std.UnSubscribe(sub)
 }

+ 13 - 8
broker/inprocess.go

@@ -41,11 +41,6 @@ func (bus *InProcBus) Subscribers() []Subscriber {
 
 // Publish 发布一个事件
 func (bus *InProcBus) Publish(e *Event) {
-	bus.once.Do(func() {
-		for i := 0; i < runtime.NumCPU(); i++ {
-			go bus.worker()
-		}
-	})
 	select {
 	case bus.eventChan <- e:
 	default:
@@ -55,7 +50,7 @@ func (bus *InProcBus) Publish(e *Event) {
 
 // Dispatch 分配事件,关心返回值
 func (bus *InProcBus) Dispatch(e *Event) (err error) {
-	return bus.DispatchCtx(context.Background(), e)
+	return bus.DispatchCtx(bus.ctx, e)
 }
 
 //DispatchCtx 分配一个事件
@@ -92,9 +87,19 @@ func (bus *InProcBus) UnSubscribe(sub Subscriber) (err error) {
 	return
 }
 
+func (bus *InProcBus) WithContext(ctx context.Context) {
+	bus.ctx = ctx
+}
+
 func NewInPrcBus(ctx context.Context) *InProcBus {
-	return &InProcBus{
+	bus := &InProcBus{
 		ctx:       ctx,
-		eventChan: make(chan *Event, 100),
+		eventChan: make(chan *Event, 1024),
 	}
+	bus.once.Do(func() {
+		for i := 0; i < runtime.NumCPU(); i++ {
+			go bus.worker()
+		}
+	})
+	return bus
 }

+ 36 - 22
helper/docker/id.go

@@ -4,47 +4,61 @@ import (
 	"bufio"
 	"bytes"
 	"errors"
+	"io"
 	"os"
+	"regexp"
 )
 
 var (
-	_dockerId     string
-	dockerFeature = []byte("docker/")
+	_containerdId string
+
+	nameFeature = []byte("1:name")
 
 	errNotMatch = errors.New("not match")
+
+	containerdIDMatcher = regexp.MustCompile(`^[0-9a-zA-Z]+$`)
 )
 
-func SelfContainerID() (did string, err error) {
-	if _dockerId != "" {
-		return _dockerId, nil
-	}
+const (
+	ContainerdIDLength = 64
+)
+
+func parseContainerID(r io.Reader) (id string, err error) {
 	var (
 		pos int
 		p   []byte
-		fp  *os.File
 	)
-	if fp, err = os.Open("/proc/self/cgroup"); err != nil {
-		return
-	}
-	defer func() {
-		_ = fp.Close()
-	}()
-	br := bufio.NewReader(fp)
+	br := bufio.NewReader(r)
 	for {
 		if p, _, err = br.ReadLine(); err != nil {
 			break
 		}
-		if pos = bytes.Index(p, dockerFeature); pos == -1 {
+		if !bytes.HasPrefix(p, nameFeature) {
 			continue
 		}
-		p = bytes.TrimSpace(p[pos+len(dockerFeature):])
-		if pos = bytes.LastIndexByte(p, '/'); pos == -1 {
-			_dockerId = string(p)
-			return _dockerId, nil
-		} else {
-			_dockerId = string(p[pos+1:])
-			return _dockerId, nil
+		if len(p) > ContainerdIDLength {
+			pos = len(p) - ContainerdIDLength
+			if containerdIDMatcher.Match(p[pos:]) {
+				return string(p[pos:]), nil
+			}
 		}
 	}
 	return "", errNotMatch
 }
+
+func SelfContainerID() (did string, err error) {
+	if _containerdId != "" {
+		return _containerdId, nil
+	}
+	var (
+		fp *os.File
+	)
+	if fp, err = os.Open("/proc/self/cgroup"); err != nil {
+		return
+	}
+	defer func() {
+		_ = fp.Close()
+	}()
+	_containerdId, err = parseContainerID(fp)
+	return _containerdId, err
+}

+ 24 - 0
helper/docker/id_test.go

@@ -0,0 +1,24 @@
+package docker
+
+import (
+	"bytes"
+	"testing"
+)
+
+func TestSelfContainerID(t *testing.T) {
+	buf := []byte(`12:memory:/system.slice/cri.service/kubepods-besteffort-pod6740ca1c_67f8_4908_b89c_e9b9463afd80.slice:cri-containerd:af23dc32d8b34716ff65db62ab4ee677584ddda943cea7213bb112c4c2bfb353
+11:pids:/system.slice/cri.service/kubepods-besteffort-pod6740ca1c_67f8_4908_b89c_e9b9463afd80.slice:cri-containerd:af23dc32d8b34716ff65db62ab4ee677584ddda943cea7213bb112c4c2bfb353
+10:devices:/system.slice/cri.service/kubepods-besteffort-pod6740ca1c_67f8_4908_b89c_e9b9463afd80.slice:cri-containerd:af23dc32d8b34716ff65db62ab4ee677584ddda943cea7213bb112c4c2bfb353
+9:cpu,cpuacct:/system.slice/cri.service/kubepods-besteffort-pod6740ca1c_67f8_4908_b89c_e9b9463afd80.slice:cri-containerd:af23dc32d8b34716ff65db62ab4ee677584ddda943cea7213bb112c4c2bfb353
+8:rdma:/
+7:blkio:/system.slice/cri.service/kubepods-besteffort-pod6740ca1c_67f8_4908_b89c_e9b9463afd80.slice:cri-containerd:af23dc32d8b34716ff65db62ab4ee677584ddda943cea7213bb112c4c2bfb353
+6:perf_event:/kubepods-besteffort-pod6740ca1c_67f8_4908_b89c_e9b9463afd80.slice:cri-containerd:af23dc32d8b34716ff65db62ab4ee677584ddda943cea7213bb112c4c2bfb353
+5:cpuset:/kubepods-besteffort-pod6740ca1c_67f8_4908_b89c_e9b9463afd80.slice:cri-containerd:af23dc32d8b34716ff65db62ab4ee677584ddda943cea7213bb112c4c2bfb353
+4:hugetlb:/kubepods-besteffort-pod6740ca1c_67f8_4908_b89c_e9b9463afd80.slice:cri-containerd:af23dc32d8b34716ff65db62ab4ee677584ddda943cea7213bb112c4c2bfb353
+3:net_cls,net_prio:/kubepods-besteffort-pod6740ca1c_67f8_4908_b89c_e9b9463afd80.slice:cri-containerd:af23dc32d8b34716ff65db62ab4ee677584ddda943cea7213bb112c4c2bfb353
+2:freezer:/kubepods-besteffort-pod6740ca1c_67f8_4908_b89c_e9b9463afd80.slice:cri-containerd:af23dc32d8b34716ff65db62ab4ee677584ddda943cea7213bb112c4c2bfb353
+1:name=systemd:/system.slice/cri.service/kubepods-besteffort-pod6740ca1c_67f8_4908_b89c_e9b9463afd80.slice:cri-containerd:af23dc32d8b34716ff65db62ab4ee677584ddda943cea7213bb112c4c2bfb353
+0::/system.slice/cri.service`)
+
+	t.Log(parseContainerID(bytes.NewReader(buf)))
+}