Browse Source

fix handling of failed watches that result in snapshot

This also fixes the cursor not being used in some cases.
Eugene Yakubovich 9 năm trước cách đây
mục cha
commit
c27f3c7c54
2 tập tin đã thay đổi với 23 bổ sung14 xóa
  1. 16 12
      subnet/etcd.go
  2. 7 2
      subnet/watch.go

+ 16 - 12
subnet/etcd.go

@@ -111,7 +111,7 @@ func findLeaseByIP(leases []Lease, pubIP ip.IP4) *Lease {
 
 func (m *EtcdManager) tryAcquireLease(ctx context.Context, network string, config *Config, extIP ip.IP4, attrs *LeaseAttrs) (*Lease, error) {
 	var err error
-	leases, err := m.getLeases(ctx, network)
+	leases, _, err := m.getLeases(ctx, network)
 	if err != nil {
 		return nil, err
 	}
@@ -215,10 +215,14 @@ OuterLoop:
 	}
 }
 
-func (m *EtcdManager) getLeases(ctx context.Context, network string) ([]Lease, error) {
+// getLeases queries etcd to get a list of currently allocated leases for a given network.
+// It returns the leases along with the "as-of" etcd-index that can be used as the starting
+// point for etcd watch.
+func (m *EtcdManager) getLeases(ctx context.Context, network string) ([]Lease, uint64, error) {
 	resp, err := m.registry.getSubnets(ctx, network)
 
 	leases := []Lease{}
+	index := uint64(0)
 
 	switch {
 	case err == nil:
@@ -228,8 +232,8 @@ func (m *EtcdManager) getLeases(ctx context.Context, network string) ([]Lease, e
 				attrs := &LeaseAttrs{}
 				if err = json.Unmarshal([]byte(node.Value), attrs); err == nil {
 					exp := time.Time{}
-					if resp.Node.Expiration != nil {
-						exp = *resp.Node.Expiration
+					if node.Expiration != nil {
+						exp = *node.Expiration
 					}
 
 					lease := Lease{
@@ -241,15 +245,16 @@ func (m *EtcdManager) getLeases(ctx context.Context, network string) ([]Lease, e
 				}
 			}
 		}
+		index = resp.Node.ModifiedIndex
 
 	case err.(*etcd.EtcdError).ErrorCode == etcdKeyNotFound:
 		// key not found: treat it as empty set
 
 	default:
-		return nil, err
+		return nil, 0, err
 	}
 
-	return leases, nil
+	return leases, index, nil
 }
 
 func (m *EtcdManager) RenewLease(ctx context.Context, network string, lease *Lease) error {
@@ -339,19 +344,18 @@ func parseSubnetWatchResponse(resp *etcd.Response) (WatchResult, error) {
 	}, nil
 }
 
+// watchReset is called when incremental watch failed and we need to grab a snapshot
 func (m *EtcdManager) watchReset(ctx context.Context, network string) (WatchResult, error) {
 	wr := WatchResult{}
 
-	leases, err := m.getLeases(ctx, network)
+	leases, index, err := m.getLeases(ctx, network)
 	if err != nil {
 		return wr, fmt.Errorf("failed to retrieve subnet leases: %v", err)
 	}
 
-	for _, l := range leases {
-		e := Event{SubnetAdded, l}
-		wr.Events = append(wr.Events, e)
-	}
-
+	cursor := index + 1
+	wr.Snapshot = leases
+	wr.Cursor = cursor
 	return wr, nil
 }
 

+ 7 - 2
subnet/watch.go

@@ -21,11 +21,16 @@ import (
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 )
 
+// WatchLeases performs a long term watch of the given network's subnet leases
+// and communicates addition/deletion events on receiver channel. It takes care
+// of handling "fall-behind" logic where the history window has advanced too far
+// and it needs to diff the latest snapshot with its saved state and generate events
 func WatchLeases(ctx context.Context, sm Manager, network string, receiver chan []Event) {
 	lw := &leaseWatcher{}
+	var cursor interface{}
 
 	for {
-		res, err := sm.WatchLeases(ctx, network, lw.cursor)
+		res, err := sm.WatchLeases(ctx, network, cursor)
 		if err != nil {
 			if err == context.Canceled || err == context.DeadlineExceeded {
 				return
@@ -35,6 +40,7 @@ func WatchLeases(ctx context.Context, sm Manager, network string, receiver chan
 			time.Sleep(time.Second)
 			continue
 		}
+		cursor = res.Cursor
 
 		batch := []Event{}
 
@@ -52,7 +58,6 @@ func WatchLeases(ctx context.Context, sm Manager, network string, receiver chan
 
 type leaseWatcher struct {
 	leases []Lease
-	cursor interface{}
 }
 
 func (lw *leaseWatcher) reset(leases []Lease) []Event {