Browse Source

properly handle nil cursor and use Etcd vs Modified index

This fixes two bugs:
1. watch was not getting a snapshot with nil cursor.
2. watch was not advancing properly due to wrong index used.

Also reworks somewhat broken watch related tests.
Eugene Yakubovich 9 years ago
parent
commit
762e65f7a7
4 changed files with 42 additions and 67 deletions
  1. 16 49
      remote/remote_test.go
  2. 5 4
      subnet/etcd.go
  3. 13 6
      subnet/mock_registry.go
  4. 8 8
      subnet/subnet_test.go

+ 16 - 49
remote/remote_test.go

@@ -99,52 +99,11 @@ func doTestWatch(t *testing.T, sm subnet.Manager) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	res := make(chan error)
-	barrier := make(chan struct{})
+	events := make(chan []subnet.Event)
+	go subnet.WatchLeases(ctx, sm, "_", events)
 
-	sm.WatchLeases(ctx, "_", nil)
-
-	var expectedSubnet ip.IP4Net
-
-	go func() {
-		wr, err := sm.WatchLeases(ctx, "_", nil)
-		if err != nil {
-			res <- fmt.Errorf("WatchLeases failed: %v", err)
-			return
-		}
-		if len(wr.Events) > 0 && len(wr.Snapshot) > 0 {
-			res <- fmt.Errorf("WatchLeases returned events and snapshots")
-			return
-		}
-
-		res <- nil
-		<-barrier
-
-		wr, err = sm.WatchLeases(ctx, "_", wr.Cursor)
-		if err != nil {
-			res <- fmt.Errorf("WatchLeases failed: %v", err)
-			return
-		}
-		if len(wr.Events) == 0 {
-			res <- fmt.Errorf("WatchLeases returned empty events")
-			return
-		}
-
-		if wr.Events[0].Type != subnet.SubnetAdded {
-			res <- fmt.Errorf("WatchLeases returned event with wrong EventType: %v vs %v", wr.Events[0].Type, subnet.SubnetAdded)
-			return
-		}
-
-		if !wr.Events[0].Lease.Subnet.Equal(expectedSubnet) {
-			res <- fmt.Errorf("WatchLeases returned unexpected subnet: %v vs %v", wr.Events[0].Lease.Subnet, expectedSubnet)
-		}
-
-		res <- nil
-	}()
-
-	if err := <-res; err != nil {
-		t.Fatal(err.Error())
-	}
+	// skip over the initial snapshot
+	<-events
 
 	attrs := &subnet.LeaseAttrs{
 		PublicIP: mustParseIP4("1.1.1.2"),
@@ -158,10 +117,18 @@ func doTestWatch(t *testing.T, sm subnet.Manager) {
 		t.Errorf("AcquireLease returned subnet not in network: %v (in %v)", l.Subnet, expectedNetwork)
 	}
 
-	expectedSubnet = l.Subnet
+	evtBatch := <-events
+
+	if len(evtBatch) != 1 {
+		t.Fatalf("WatchSubnets produced wrong sized event batch")
+	}
+
+	evt := evtBatch[0]
+	if evt.Type != subnet.SubnetAdded {
+		t.Fatalf("WatchSubnets produced wrong event type")
+	}
 
-	barrier <- struct{}{}
-	if err := <-res; err != nil {
-		t.Fatal(err.Error())
+	if evt.Lease.Key() != l.Key() {
+		t.Errorf("WatchSubnet produced wrong subnet: expected %s, got %s", l.Key(), evt.Lease.Key())
 	}
 }

+ 5 - 4
subnet/etcd.go

@@ -245,10 +245,11 @@ func (m *EtcdManager) getLeases(ctx context.Context, network string) ([]Lease, u
 				}
 			}
 		}
-		index = resp.Node.ModifiedIndex
+		index = resp.EtcdIndex
 
 	case err.(*etcd.EtcdError).ErrorCode == etcdKeyNotFound:
 		// key not found: treat it as empty set
+		index = resp.EtcdIndex
 
 	default:
 		return nil, 0, err
@@ -274,11 +275,11 @@ func (m *EtcdManager) RenewLease(ctx context.Context, network string, lease *Lea
 }
 
 func (m *EtcdManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (WatchResult, error) {
-	nextIndex := uint64(0)
-	if cursor != nil {
-		nextIndex = cursor.(uint64)
+	if cursor == nil {
+		return m.watchReset(ctx, network)
 	}
 
+	nextIndex := cursor.(uint64)
 	resp, err := m.registry.watchSubnets(ctx, network, nextIndex)
 
 	switch {

+ 13 - 6
subnet/mock_registry.go

@@ -121,12 +121,17 @@ func (msr *mockSubnetRegistry) updateSubnet(ctx context.Context, network, sn, da
 }
 
 func (msr *mockSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (*etcd.Response, error) {
-	select {
-	case <-ctx.Done():
-		return nil, ctx.Err()
-
-	case r := <-msr.events:
-		return r, nil
+	for {
+		select {
+		case <-ctx.Done():
+			return nil, ctx.Err()
+
+		case r := <-msr.events:
+			if r.Node.ModifiedIndex < since {
+				continue
+			}
+			return r, nil
+		}
 	}
 }
 
@@ -142,8 +147,10 @@ func (msr *mockSubnetRegistry) hasSubnet(sn string) bool {
 func (msr *mockSubnetRegistry) expireSubnet(sn string) {
 	for i, n := range msr.subnets.Nodes {
 		if n.Key == sn {
+			msr.index += 1
 			msr.subnets.Nodes[i] = msr.subnets.Nodes[len(msr.subnets.Nodes)-1]
 			msr.subnets.Nodes = msr.subnets.Nodes[:len(msr.subnets.Nodes)-2]
+			n.ModifiedIndex = msr.index
 			msr.events <- &etcd.Response{
 				Action: "expire",
 				Node:   n,

+ 8 - 8
subnet/subnet_test.go

@@ -77,13 +77,13 @@ func TestWatchLeaseAdded(t *testing.T) {
 	events := make(chan []Event)
 	go WatchLeases(ctx, sm, "", events)
 
+	// skip over the initial snapshot
+	<-events
+
 	expected := "10.3.3.0-24"
 	msr.createSubnet(ctx, "_", expected, `{"PublicIP": "1.1.1.1"}`, 0)
 
-	evtBatch, ok := <-events
-	if !ok {
-		t.Fatalf("WatchSubnets did not publish")
-	}
+	evtBatch := <-events
 
 	if len(evtBatch) != 1 {
 		t.Fatalf("WatchSubnets produced wrong sized event batch")
@@ -111,13 +111,13 @@ func TestWatchLeaseRemoved(t *testing.T) {
 	events := make(chan []Event)
 	go WatchLeases(ctx, sm, "", events)
 
+	// skip over the initial snapshot
+	<-events
+
 	expected := "10.3.4.0-24"
 	msr.expireSubnet(expected)
 
-	evtBatch, ok := <-events
-	if !ok {
-		t.Fatalf("WatchSubnets did not publish")
-	}
+	evtBatch := <-events
 
 	if len(evtBatch) != 1 {
 		t.Fatalf("WatchSubnets produced wrong sized event batch")