Explorar el Código

Fixes a number of races

Couple of race related bugs found by straneous
unit testing. Most of the problems are in
mocks/tests themselves.
Eugene Yakubovich hace 9 años
padre
commit
36d0ce1b3a
Se han modificado 6 ficheros con 131 adiciones y 52 borrados
  1. 0 3
      remote/remote_test.go
  2. 34 4
      subnet/mock_etcd.go
  3. 5 15
      subnet/mock_etcd_test.go
  4. 78 23
      subnet/mock_registry.go
  5. 3 2
      subnet/subnet_test.go
  6. 11 5
      subnet/watch.go

+ 0 - 3
remote/remote_test.go

@@ -165,9 +165,6 @@ func TestWatchLeases(t *testing.T) {
 		f.wg.Done()
 	}()
 
-	// skip over the initial snapshot
-	<-events
-
 	attrs := &subnet.LeaseAttrs{
 		PublicIP: mustParseIP4("1.1.1.2"),
 	}

+ 34 - 4
subnet/mock_etcd.go

@@ -17,6 +17,7 @@ package subnet
 import (
 	"fmt"
 	"strings"
+	"sync"
 	"time"
 
 	etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
@@ -26,8 +27,9 @@ import (
 const DEFAULT_TTL time.Duration = 8760 * time.Hour // one year
 
 type mockEtcd struct {
+	mux      sync.Mutex
 	nodes    map[string]*etcd.Node
-	watchers map[*watcher]*watcher
+	watchers map[*watcher]struct{}
 	// A given number of past events must be available for watchers, because
 	// flannel always uses a new watcher instead of re-using old ones, and
 	// the new watcher's index may be slightly in the past
@@ -39,7 +41,7 @@ func newMockEtcd() *mockEtcd {
 	me := &mockEtcd{
 		index:    1000,
 		nodes:    make(map[string]*etcd.Node),
-		watchers: make(map[*watcher]*watcher),
+		watchers: make(map[*watcher]struct{}),
 		events:   make([]*etcd.Response, 0, 50),
 	}
 	me.nodes["/"] = me.newNode("/", "", true)
@@ -146,6 +148,9 @@ func (me *mockEtcd) copyNode(node *etcd.Node, recursive bool) *etcd.Node {
 }
 
 func (me *mockEtcd) Get(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
+	me.mux.Lock()
+	defer me.mux.Unlock()
+
 	node, _, err := me.findNode(key)
 	if err != nil {
 		return nil, err
@@ -173,7 +178,7 @@ func (me *mockEtcd) sendEvent(resp *etcd.Response) {
 	me.events = append(me.events, resp)
 
 	// and notify watchers
-	for w := range me.watchers {
+	for w, _ := range me.watchers {
 		w.notifyEvent(resp)
 	}
 }
@@ -285,6 +290,9 @@ func (me *mockEtcd) set(ctx context.Context, key, value string, opts *etcd.SetOp
 }
 
 func (me *mockEtcd) Set(ctx context.Context, key, value string, opts *etcd.SetOptions) (*etcd.Response, error) {
+	me.mux.Lock()
+	defer me.mux.Unlock()
+
 	return me.set(ctx, key, value, opts, "set")
 }
 
@@ -318,6 +326,9 @@ func (me *mockEtcd) deleteNode(node *etcd.Node, parent *etcd.Node, recursive boo
 }
 
 func (me *mockEtcd) Delete(ctx context.Context, key string, opts *etcd.DeleteOptions) (*etcd.Response, error) {
+	me.mux.Lock()
+	defer me.mux.Unlock()
+
 	node, parent, err := me.findNode(key)
 	if err != nil {
 		return nil, err
@@ -350,6 +361,9 @@ func (me *mockEtcd) Delete(ctx context.Context, key string, opts *etcd.DeleteOpt
 }
 
 func (me *mockEtcd) Create(ctx context.Context, key, value string) (*etcd.Response, error) {
+	me.mux.Lock()
+	defer me.mux.Unlock()
+
 	return me.set(ctx, key, value, &etcd.SetOptions{PrevExist: etcd.PrevNoExist}, "create")
 }
 
@@ -358,6 +372,9 @@ func (me *mockEtcd) CreateInOrder(ctx context.Context, dir, value string, opts *
 }
 
 func (me *mockEtcd) Update(ctx context.Context, key, value string) (*etcd.Response, error) {
+	me.mux.Lock()
+	defer me.mux.Unlock()
+
 	return me.set(ctx, key, value, &etcd.SetOptions{PrevExist: etcd.PrevExist}, "update")
 }
 
@@ -395,17 +412,23 @@ func (w *watcher) notifyEvent(resp *etcd.Response) {
 }
 
 func (w *watcher) Next(ctx context.Context) (*etcd.Response, error) {
+	w.parent.mux.Lock()
+
 	// If the event is already in the history log return it from there
+
 	for _, e := range w.parent.events {
 		if e.Index > w.after && w.shouldGrabEvent(e) {
 			w.after = e.Index
+			w.parent.mux.Unlock()
 			return e, nil
 		}
 	}
 
 	// Watch must handle adding and removing itself from the parent when
 	// it's done to ensure it can be garbage collected correctly
-	w.parent.watchers[w] = w
+	w.parent.watchers[w] = struct{}{}
+
+	w.parent.mux.Unlock()
 
 	// Otherwise wait for new events
 	for {
@@ -416,10 +439,17 @@ func (w *watcher) Next(ctx context.Context) (*etcd.Response, error) {
 				continue
 			}
 			w.after = e.Index
+
+			w.parent.mux.Lock()
 			delete(w.parent.watchers, w)
+			w.parent.mux.Unlock()
+
 			return e, nil
 		case <-ctx.Done():
+			w.parent.mux.Lock()
 			delete(w.parent.watchers, w)
+			w.parent.mux.Unlock()
+
 			return nil, context.Canceled
 		}
 	}

+ 5 - 15
subnet/mock_etcd_test.go

@@ -16,7 +16,6 @@ package subnet
 
 import (
 	"fmt"
-	"sync"
 	"testing"
 
 	etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
@@ -85,6 +84,7 @@ func watchMockEtcd(ctx context.Context, watcher etcd.Watcher, result chan error)
 	numEvents := 0
 	for {
 		resp, err := watcher.Next(ctx)
+
 		if err != nil {
 			if err == context.Canceled {
 				break
@@ -138,19 +138,11 @@ func TestMockEtcd(t *testing.T) {
 	e = &etcd.Response{Action: "create", Index: 1002}
 	expectSuccess(t, r, err, e, "")
 
-	wg := sync.WaitGroup{}
-	wg.Add(1)
-	startWg := sync.WaitGroup{}
-	startWg.Add(1)
+	wopts := &etcd.WatcherOptions{AfterIndex: m.index, Recursive: true}
+	watcher := m.Watcher("/coreos.com/network", wopts)
+
 	result := make(chan error, 1)
-	go func() {
-		wopts := &etcd.WatcherOptions{AfterIndex: m.index, Recursive: true}
-		watcher := m.Watcher("/coreos.com/network", wopts)
-		startWg.Done()
-		watchMockEtcd(ctx, watcher, result)
-		wg.Done()
-	}()
-	startWg.Wait()
+	go watchMockEtcd(ctx, watcher, result)
 
 	// Populate etcd with some keys
 	netKey1 := "/coreos.com/network/foobar/config"
@@ -237,8 +229,6 @@ func TestMockEtcd(t *testing.T) {
 		t.Fatalf("Unexpected non-nil response to get after delete %v", r)
 	}
 
-	wg.Wait()
-
 	// Check errors from watch goroutine
 	watchResult := <-result
 	if watchResult != nil {

+ 78 - 23
subnet/mock_registry.go

@@ -65,6 +65,7 @@ type event struct {
 }
 
 type MockSubnetRegistry struct {
+	mux           sync.Mutex
 	networks      map[string]*netwk
 	networkEvents chan event
 	index         uint64
@@ -87,6 +88,9 @@ func NewMockRegistry(network, config string, initialSubnets []Lease) *MockSubnet
 }
 
 func (msr *MockSubnetRegistry) getNetworkConfig(ctx context.Context, network string) (string, error) {
+	msr.mux.Lock()
+	defer msr.mux.Unlock()
+
 	n, ok := msr.networks[network]
 	if !ok {
 		return "", fmt.Errorf("Network %s not found", network)
@@ -95,6 +99,9 @@ func (msr *MockSubnetRegistry) getNetworkConfig(ctx context.Context, network str
 }
 
 func (msr *MockSubnetRegistry) setConfig(network, config string) error {
+	msr.mux.Lock()
+	defer msr.mux.Unlock()
+
 	n, ok := msr.networks[network]
 	if !ok {
 		return fmt.Errorf("Network %s not found", network)
@@ -104,14 +111,23 @@ func (msr *MockSubnetRegistry) setConfig(network, config string) error {
 }
 
 func (msr *MockSubnetRegistry) getSubnets(ctx context.Context, network string) ([]Lease, uint64, error) {
+	msr.mux.Lock()
+	defer msr.mux.Unlock()
+
 	n, ok := msr.networks[network]
 	if !ok {
 		return nil, 0, fmt.Errorf("Network %s not found", network)
 	}
-	return n.subnets, msr.index, nil
+
+	subs := make([]Lease, len(n.subnets))
+	copy(subs, n.subnets)
+	return subs, msr.index, nil
 }
 
 func (msr *MockSubnetRegistry) getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) {
+	msr.mux.Lock()
+	defer msr.mux.Unlock()
+
 	n, ok := msr.networks[network]
 	if !ok {
 		return nil, 0, fmt.Errorf("Network %s not found", network)
@@ -125,6 +141,9 @@ func (msr *MockSubnetRegistry) getSubnet(ctx context.Context, network string, sn
 }
 
 func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
+	msr.mux.Lock()
+	defer msr.mux.Unlock()
+
 	n, ok := msr.networks[network]
 	if !ok {
 		return time.Time{}, fmt.Errorf("Network %s not found", network)
@@ -165,6 +184,9 @@ func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, network string,
 }
 
 func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
+	msr.mux.Lock()
+	defer msr.mux.Unlock()
+
 	n, ok := msr.networks[network]
 	if !ok {
 		return time.Time{}, fmt.Errorf("Network %s not found", network)
@@ -198,6 +220,9 @@ func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, network string,
 }
 
 func (msr *MockSubnetRegistry) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error {
+	msr.mux.Lock()
+	defer msr.mux.Unlock()
+
 	n, ok := msr.networks[network]
 	if !ok {
 		return fmt.Errorf("Network %s not found", network)
@@ -225,64 +250,79 @@ func (msr *MockSubnetRegistry) deleteSubnet(ctx context.Context, network string,
 }
 
 func (msr *MockSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error) {
+	msr.mux.Lock()
 	n, ok := msr.networks[network]
+	msr.mux.Unlock()
+
 	if !ok {
-		return Event{}, msr.index, fmt.Errorf("Network %s not found", network)
+		return Event{}, 0, fmt.Errorf("Network %s not found", network)
 	}
 
 	for {
-		if since < msr.index {
-			return Event{}, msr.index, etcd.Error{
+		msr.mux.Lock()
+		index := msr.index
+		msr.mux.Unlock()
+
+		if since < index {
+			return Event{}, 0, etcd.Error{
 				Code:    etcd.ErrorCodeEventIndexCleared,
 				Cause:   "out of date",
 				Message: "cursor is out of date",
-				Index:   msr.index,
+				Index:   index,
 			}
 		}
 
 		select {
 		case <-ctx.Done():
-			return Event{}, msr.index, ctx.Err()
+			return Event{}, 0, ctx.Err()
 
 		case e := <-n.subnetsEvents:
-			if e.index <= since {
-				continue
+			if e.index > since {
+				return e.evt, e.index, nil
 			}
-			return e.evt, msr.index, nil
 		}
 	}
 }
 
 func (msr *MockSubnetRegistry) watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) {
+	msr.mux.Lock()
 	n, ok := msr.networks[network]
+	msr.mux.Unlock()
+
 	if !ok {
-		return Event{}, msr.index, fmt.Errorf("Network %s not found", network)
+		return Event{}, 0, fmt.Errorf("Network %s not found", network)
 	}
 
 	for {
-		if since < msr.index {
+		msr.mux.Lock()
+		index := msr.index
+		msr.mux.Unlock()
+
+		if since < index {
 			return Event{}, msr.index, etcd.Error{
 				Code:    etcd.ErrorCodeEventIndexCleared,
 				Cause:   "out of date",
 				Message: "cursor is out of date",
-				Index:   msr.index,
+				Index:   index,
 			}
 		}
 
 		select {
 		case <-ctx.Done():
-			return Event{}, msr.index, ctx.Err()
+			return Event{}, index, ctx.Err()
 
 		case e := <-n.subnetEventsChan(sn):
-			if e.index <= since {
-				continue
+			if e.index > since {
+				return e.evt, index, nil
 			}
-			return e.evt, msr.index, nil
 		}
 	}
 }
 
 func (msr *MockSubnetRegistry) expireSubnet(network string, sn ip.IP4Net) {
+	msr.mux.Lock()
+	defer msr.mux.Unlock()
+
 	n, ok := msr.networks[network]
 	if !ok {
 		return
@@ -310,6 +350,9 @@ func configKeyToNetworkKey(configKey string) string {
 }
 
 func (msr *MockSubnetRegistry) getNetworks(ctx context.Context) ([]string, uint64, error) {
+	msr.mux.Lock()
+	defer msr.mux.Unlock()
+
 	ns := []string{}
 
 	for n, _ := range msr.networks {
@@ -320,30 +363,36 @@ func (msr *MockSubnetRegistry) getNetworks(ctx context.Context) ([]string, uint6
 }
 
 func (msr *MockSubnetRegistry) watchNetworks(ctx context.Context, since uint64) (Event, uint64, error) {
+	msr.mux.Lock()
+	index := msr.index
+	msr.mux.Unlock()
+
 	for {
-		if since < msr.index {
-			return Event{}, msr.index, etcd.Error{
+		if since < index {
+			return Event{}, 0, etcd.Error{
 				Code:    etcd.ErrorCodeEventIndexCleared,
 				Cause:   "out of date",
 				Message: "cursor is out of date",
-				Index:   msr.index,
+				Index:   index,
 			}
 		}
 
 		select {
 		case <-ctx.Done():
-			return Event{}, msr.index, ctx.Err()
+			return Event{}, 0, ctx.Err()
 
 		case e := <-msr.networkEvents:
-			if e.index <= since {
-				continue
+			if e.index > since {
+				return e.evt, e.index, nil
 			}
-			return e.evt, msr.index, nil
 		}
 	}
 }
 
 func (msr *MockSubnetRegistry) getNetwork(ctx context.Context, network string) (*netwk, error) {
+	msr.mux.Lock()
+	defer msr.mux.Unlock()
+
 	n, ok := msr.networks[network]
 	if !ok {
 		return nil, fmt.Errorf("Network %s not found", network)
@@ -353,6 +402,9 @@ func (msr *MockSubnetRegistry) getNetwork(ctx context.Context, network string) (
 }
 
 func (msr *MockSubnetRegistry) CreateNetwork(ctx context.Context, network, config string) error {
+	msr.mux.Lock()
+	defer msr.mux.Unlock()
+
 	_, ok := msr.networks[network]
 	if ok {
 		return fmt.Errorf("Network %s already exists", network)
@@ -378,6 +430,9 @@ func (msr *MockSubnetRegistry) CreateNetwork(ctx context.Context, network, confi
 }
 
 func (msr *MockSubnetRegistry) DeleteNetwork(ctx context.Context, network string) error {
+	msr.mux.Lock()
+	defer msr.mux.Unlock()
+
 	_, ok := msr.networks[network]
 	if !ok {
 		return fmt.Errorf("Network %s not found", network)

+ 3 - 2
subnet/subnet_test.go

@@ -175,7 +175,7 @@ func TestWatchLeaseAdded(t *testing.T) {
 	evtBatch = <-events
 
 	if len(evtBatch) != 1 {
-		t.Fatalf("WatchLeases produced wrong sized event batch")
+		t.Fatalf("WatchLeases produced wrong sized event batch: got %v, expected 1", len(evtBatch))
 	}
 
 	evt := evtBatch[0]
@@ -203,6 +203,7 @@ func TestWatchLeaseRemoved(t *testing.T) {
 	go WatchLeases(ctx, sm, "_", l, events)
 
 	evtBatch := <-events
+
 	for _, evt := range evtBatch {
 		if evt.Lease.Key() == l.Key() {
 			t.Errorf("WatchLeases returned our own lease")
@@ -220,7 +221,7 @@ func TestWatchLeaseRemoved(t *testing.T) {
 
 	evtBatch = <-events
 	if len(evtBatch) != 1 {
-		t.Fatalf("WatchLeases produced wrong sized event batch")
+		t.Fatalf("WatchLeases produced wrong sized event batch: %#v", evtBatch)
 	}
 
 	evt := evtBatch[0]

+ 11 - 5
subnet/watch.go

@@ -55,7 +55,7 @@ func WatchLeases(ctx context.Context, sm Manager, network string, ownLease *Leas
 			batch = lw.reset(res.Snapshot)
 		}
 
-		if batch != nil {
+		if len(batch) > 0 {
 			receiver <- batch
 		}
 	}
@@ -91,10 +91,15 @@ func (lw *leaseWatcher) reset(leases []Lease) []Event {
 
 	// everything left in sm.leases has been deleted
 	for _, l := range lw.leases {
+		if lw.ownLease != nil && l.Subnet.Equal(lw.ownLease.Subnet) {
+			continue
+		}
 		batch = append(batch, Event{EventRemoved, l, ""})
 	}
 
-	lw.leases = leases
+	// copy the leases over (caution: don't just assign a slice)
+	lw.leases = make([]Lease, len(leases))
+	copy(lw.leases, leases)
 
 	return batch
 }
@@ -128,6 +133,7 @@ func (lw *leaseWatcher) add(lease *Lease) Event {
 	}
 
 	lw.leases = append(lw.leases, *lease)
+
 	return Event{EventAdded, lw.leases[len(lw.leases)-1], ""}
 }
 
@@ -144,8 +150,8 @@ func (lw *leaseWatcher) remove(lease *Lease) Event {
 }
 
 func deleteLease(l []Lease, i int) []Lease {
-	l[i], l = l[len(l)-1], l[:len(l)-1]
-	return l
+	l[i] = l[len(l)-1]
+	return l[:len(l)-1]
 }
 
 // WatchNetworks performs a long term watch of flannel networks and communicates
@@ -177,7 +183,7 @@ func WatchNetworks(ctx context.Context, sm Manager, receiver chan []Event) {
 			batch = nw.reset(res.Snapshot)
 		}
 
-		if batch != nil {
+		if len(batch) > 0 {
 			receiver <- batch
 		}
 	}