Procházet zdrojové kódy

Merge pull request #284 from dcbw/network-watch

subnet: add infrastructure and tests for network watches
Eugene Yakubovich před 9 roky
rodič
revize
648a1db983

+ 2 - 2
backend/hostgw/hostgw.go

@@ -136,7 +136,7 @@ func (rb *HostgwBackend) Name() string {
 func (rb *HostgwBackend) handleSubnetEvents(batch []subnet.Event) {
 	for _, evt := range batch {
 		switch evt.Type {
-		case subnet.SubnetAdded:
+		case subnet.EventAdded:
 			log.Infof("Subnet added: %v via %v", evt.Lease.Subnet, evt.Lease.Attrs.PublicIP)
 
 			if evt.Lease.Attrs.BackendType != "host-gw" {
@@ -155,7 +155,7 @@ func (rb *HostgwBackend) handleSubnetEvents(batch []subnet.Event) {
 			}
 			rb.addToRouteList(route)
 
-		case subnet.SubnetRemoved:
+		case subnet.EventRemoved:
 			log.Info("Subnet removed: ", evt.Lease.Subnet)
 
 			if evt.Lease.Attrs.BackendType != "host-gw" {

+ 2 - 2
backend/udp/udp.go

@@ -243,12 +243,12 @@ func (m *UdpBackend) monitorEvents() {
 func (m *UdpBackend) processSubnetEvents(batch []subnet.Event) {
 	for _, evt := range batch {
 		switch evt.Type {
-		case subnet.SubnetAdded:
+		case subnet.EventAdded:
 			log.Info("Subnet added: ", evt.Lease.Subnet)
 
 			setRoute(m.ctl, evt.Lease.Subnet, evt.Lease.Attrs.PublicIP, m.cfg.Port)
 
-		case subnet.SubnetRemoved:
+		case subnet.EventRemoved:
 			log.Info("Subnet removed: ", evt.Lease.Subnet)
 
 			removeRoute(m.ctl, evt.Lease.Subnet)

+ 2 - 2
backend/vxlan/vxlan.go

@@ -228,7 +228,7 @@ type vxlanLeaseAttrs struct {
 func (vb *VXLANBackend) handleSubnetEvents(batch []subnet.Event) {
 	for _, evt := range batch {
 		switch evt.Type {
-		case subnet.SubnetAdded:
+		case subnet.EventAdded:
 			log.Info("Subnet added: ", evt.Lease.Subnet)
 
 			if evt.Lease.Attrs.BackendType != "vxlan" {
@@ -244,7 +244,7 @@ func (vb *VXLANBackend) handleSubnetEvents(batch []subnet.Event) {
 			vb.rts.set(evt.Lease.Subnet, net.HardwareAddr(attrs.VtepMAC))
 			vb.dev.AddL2(neigh{IP: evt.Lease.Attrs.PublicIP, MAC: net.HardwareAddr(attrs.VtepMAC)})
 
-		case subnet.SubnetRemoved:
+		case subnet.EventRemoved:
 			log.Info("Subnet removed: ", evt.Lease.Subnet)
 
 			if evt.Lease.Attrs.BackendType != "vxlan" {

+ 33 - 10
remote/client.go

@@ -164,13 +164,11 @@ func (m *RemoteManager) RenewLease(ctx context.Context, network string, lease *s
 	return nil
 }
 
-func (m *RemoteManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (subnet.WatchResult, error) {
-	url := m.mkurl(network, "leases")
-
+func (m *RemoteManager) watch(ctx context.Context, url string, cursor interface{}, wr interface{}) error {
 	if cursor != nil {
 		c, ok := cursor.(string)
 		if !ok {
-			return subnet.WatchResult{}, fmt.Errorf("internal error: RemoteManager.WatchLeases received non-string cursor")
+			return fmt.Errorf("internal error: RemoteManager.watch received non-string cursor")
 		}
 
 		url = fmt.Sprintf("%v?next=%v", url, c)
@@ -178,19 +176,44 @@ func (m *RemoteManager) WatchLeases(ctx context.Context, network string, cursor
 
 	resp, err := m.httpGet(ctx, url)
 	if err != nil {
-		return subnet.WatchResult{}, err
+		return err
 	}
 
 	if resp.StatusCode != http.StatusOK {
-		return subnet.WatchResult{}, httpError(resp)
+		return httpError(resp)
+	}
+
+	if err := json.NewDecoder(resp.Body).Decode(wr); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (m *RemoteManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (subnet.LeaseWatchResult, error) {
+	url := m.mkurl(network, "leases")
+
+	wr := subnet.LeaseWatchResult{}
+	err := m.watch(ctx, url, cursor, &wr)
+	if err != nil {
+		return subnet.LeaseWatchResult{}, err
+	}
+	if _, ok := wr.Cursor.(string); !ok {
+		return subnet.LeaseWatchResult{}, fmt.Errorf("watch returned non-string cursor")
 	}
 
-	wr := subnet.WatchResult{}
-	if err := json.NewDecoder(resp.Body).Decode(&wr); err != nil {
-		return subnet.WatchResult{}, err
+	return wr, nil
+}
+
+func (m *RemoteManager) WatchNetworks(ctx context.Context, cursor interface{}) (subnet.NetworkWatchResult, error) {
+	wr := subnet.NetworkWatchResult{}
+	err := m.watch(ctx, m.base+"/", cursor, &wr)
+	if err != nil {
+		return subnet.NetworkWatchResult{}, err
 	}
+
 	if _, ok := wr.Cursor.(string); !ok {
-		return subnet.WatchResult{}, fmt.Errorf("lease watch returned non-string cursor")
+		return subnet.NetworkWatchResult{}, fmt.Errorf("watch returned non-string cursor")
 	}
 
 	return wr, nil

+ 70 - 6
remote/remote_test.go

@@ -33,7 +33,8 @@ const expectedNetwork = "10.1.0.0/16"
 
 func TestRemote(t *testing.T) {
 	config := fmt.Sprintf(`{"Network": %q}`, expectedNetwork)
-	sm := subnet.NewMockManager(1, config)
+	serverRegistry := subnet.NewMockRegistry(1, "", config, nil)
+	sm := subnet.NewMockManager(serverRegistry)
 
 	addr := "127.0.0.1:9999"
 
@@ -45,7 +46,7 @@ func TestRemote(t *testing.T) {
 		wg.Done()
 	}()
 
-	doTestRemote(ctx, t, addr)
+	doTestRemote(ctx, t, addr, serverRegistry)
 
 	cancel()
 	wg.Wait()
@@ -76,7 +77,7 @@ func isConnRefused(err error) bool {
 	return false
 }
 
-func doTestRemote(ctx context.Context, t *testing.T, remoteAddr string) {
+func doTestRemote(ctx context.Context, t *testing.T, remoteAddr string, serverRegistry *subnet.MockSubnetRegistry) {
 	sm, err := NewRemoteManager(remoteAddr, "", "", "")
 	if err != nil {
 		t.Fatalf("Failed to create remote mananager: %v", err)
@@ -120,10 +121,12 @@ func doTestRemote(ctx context.Context, t *testing.T, remoteAddr string) {
 		t.Errorf("RenewLease failed: %v", err)
 	}
 
-	doTestWatch(t, sm)
+	doTestWatchLeases(t, sm)
+
+	doTestWatchNetworks(t, sm, serverRegistry)
 }
 
-func doTestWatch(t *testing.T, sm subnet.Manager) {
+func doTestWatchLeases(t *testing.T, sm subnet.Manager) {
 	ctx, cancel := context.WithCancel(context.Background())
 	wg := sync.WaitGroup{}
 	wg.Add(1)
@@ -160,7 +163,7 @@ func doTestWatch(t *testing.T, sm subnet.Manager) {
 	}
 
 	evt := evtBatch[0]
-	if evt.Type != subnet.SubnetAdded {
+	if evt.Type != subnet.EventAdded {
 		t.Fatalf("WatchSubnets produced wrong event type")
 	}
 
@@ -168,3 +171,64 @@ func doTestWatch(t *testing.T, sm subnet.Manager) {
 		t.Errorf("WatchSubnet produced wrong subnet: expected %s, got %s", l.Key(), evt.Lease.Key())
 	}
 }
+
+func doTestWatchNetworks(t *testing.T, sm subnet.Manager, serverRegistry *subnet.MockSubnetRegistry) {
+	ctx, cancel := context.WithCancel(context.Background())
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	defer func() {
+		cancel()
+		wg.Wait()
+	}()
+
+	events := make(chan []subnet.Event)
+	go func() {
+		subnet.WatchNetworks(ctx, sm, events)
+		wg.Done()
+	}()
+
+	// skip over the initial snapshot
+	<-events
+
+	expectedNetname := "foobar"
+	config := fmt.Sprintf(`{"Network": %q}`, expectedNetwork)
+	_, err := serverRegistry.CreateNetwork(ctx, expectedNetname, config)
+	if err != nil {
+		t.Errorf("create network failed: %v", err)
+	}
+
+	evtBatch := <-events
+
+	if len(evtBatch) != 1 {
+		t.Fatalf("WatchNetworks create produced wrong sized event batch")
+	}
+
+	evt := evtBatch[0]
+	if evt.Type != subnet.EventAdded {
+		t.Fatalf("WatchNetworks create produced wrong event type")
+	}
+
+	if evt.Network != expectedNetname {
+		t.Errorf("WatchNetwork create produced wrong network: expected %s, got %s", expectedNetname, evt.Network)
+	}
+
+	_, err = serverRegistry.DeleteNetwork(ctx, expectedNetname)
+	if err != nil {
+		t.Errorf("delete network failed: %v", err)
+	}
+
+	evtBatch = <-events
+
+	if len(evtBatch) != 1 {
+		t.Fatalf("WatchNetworks delete produced wrong sized event batch")
+	}
+
+	evt = evtBatch[0]
+	if evt.Type != subnet.EventRemoved {
+		t.Fatalf("WatchNetworks delete produced wrong event type")
+	}
+
+	if evt.Network != expectedNetname {
+		t.Errorf("WatchNetwork delete produced wrong network: expected %s, got %s", expectedNetname, evt.Network)
+	}
+}

+ 27 - 0
remote/server.go

@@ -152,6 +152,32 @@ func handleWatchLeases(ctx context.Context, sm subnet.Manager, w http.ResponseWr
 	jsonResponse(w, http.StatusOK, wr)
 }
 
+// GET /?next=cursor watches
+// GET / retrieves all networks
+func handleNetworks(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+
+	cursor := getCursor(r.URL)
+	wr, err := sm.WatchNetworks(ctx, cursor)
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprint(w, err)
+		return
+	}
+
+	switch wr.Cursor.(type) {
+	case string:
+	case fmt.Stringer:
+		wr.Cursor = wr.Cursor.(fmt.Stringer).String()
+	default:
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprint(w, fmt.Errorf("internal error: watch cursor is of unknown type"))
+		return
+	}
+
+	jsonResponse(w, http.StatusOK, wr)
+}
+
 func bindHandler(h handler, ctx context.Context, sm subnet.Manager) http.HandlerFunc {
 	return func(resp http.ResponseWriter, req *http.Request) {
 		h(ctx, sm, resp, req)
@@ -237,6 +263,7 @@ func RunServer(ctx context.Context, sm subnet.Manager, listenAddr, cafile, certf
 	r.HandleFunc("/v1/{network}/leases", bindHandler(handleAcquireLease, ctx, sm)).Methods("POST")
 	r.HandleFunc("/v1/{network}/leases/{subnet}", bindHandler(handleRenewLease, ctx, sm)).Methods("PUT")
 	r.HandleFunc("/v1/{network}/leases", bindHandler(handleWatchLeases, ctx, sm)).Methods("GET")
+	r.HandleFunc("/v1/", bindHandler(handleNetworks, ctx, sm)).Methods("GET")
 
 	l, err := listener(listenAddr, cafile, certfile, keyfile)
 	if err != nil {

+ 155 - 28
subnet/etcd.go

@@ -19,6 +19,7 @@ import (
 	"errors"
 	"fmt"
 	"net"
+	"path"
 	"regexp"
 	"strconv"
 	"time"
@@ -35,7 +36,8 @@ const (
 )
 
 type EtcdManager struct {
-	registry Registry
+	registry     Registry
+	networkRegex *regexp.Regexp
 }
 
 var (
@@ -55,15 +57,21 @@ func NewEtcdManager(config *EtcdConfig) (Manager, error) {
 	if err != nil {
 		return nil, err
 	}
-	return &EtcdManager{r}, nil
+	return &EtcdManager{
+		registry:     r,
+		networkRegex: regexp.MustCompile(config.Prefix + `/([^/]*)/config`),
+	}, nil
 }
 
 func newEtcdManager(r Registry) Manager {
-	return &EtcdManager{r}
+	return &EtcdManager{
+		registry:     r,
+		networkRegex: regexp.MustCompile(`/coreos.com/network/([^/]*)/config`),
+	}
 }
 
 func (m *EtcdManager) GetNetworkConfig(ctx context.Context, network string) (*Config, error) {
-	cfgResp, err := m.registry.getConfig(ctx, network)
+	cfgResp, err := m.registry.getNetworkConfig(ctx, network)
 	if err != nil {
 		return nil, err
 	}
@@ -280,11 +288,7 @@ func (m *EtcdManager) RenewLease(ctx context.Context, network string, lease *Lea
 	return nil
 }
 
-func (m *EtcdManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (WatchResult, error) {
-	if cursor == nil {
-		return m.watchReset(ctx, network)
-	}
-
+func getNextIndex(cursor interface{}) (uint64, error) {
 	nextIndex := uint64(0)
 
 	if wc, ok := cursor.(watchCursor); ok {
@@ -293,13 +297,26 @@ func (m *EtcdManager) WatchLeases(ctx context.Context, network string, cursor in
 		var err error
 		nextIndex, err = strconv.ParseUint(s, 10, 64)
 		if err != nil {
-			return WatchResult{}, fmt.Errorf("failed to parse cursor: %v", err)
+			return 0, fmt.Errorf("failed to parse cursor: %v", err)
 		}
 	} else {
-		return WatchResult{}, fmt.Errorf("internal error: watch cursor is of unknown type")
+		return 0, fmt.Errorf("internal error: watch cursor is of unknown type")
+	}
+
+	return nextIndex, nil
+}
+
+func (m *EtcdManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (LeaseWatchResult, error) {
+	if cursor == nil {
+		return m.leaseWatchReset(ctx, network)
 	}
 
-	resp, err := m.registry.watchSubnets(ctx, network, nextIndex)
+	nextIndex, err := getNextIndex(cursor)
+	if err != nil {
+		return LeaseWatchResult{}, err
+	}
+
+	resp, err := m.registry.watch(ctx, path.Join(network, "subnets"), nextIndex)
 
 	switch {
 	case err == nil:
@@ -307,10 +324,35 @@ func (m *EtcdManager) WatchLeases(ctx context.Context, network string, cursor in
 
 	case isIndexTooSmall(err):
 		log.Warning("Watch of subnet leases failed because etcd index outside history window")
-		return m.watchReset(ctx, network)
+		return m.leaseWatchReset(ctx, network)
+
+	default:
+		return LeaseWatchResult{}, err
+	}
+}
+
+func (m *EtcdManager) WatchNetworks(ctx context.Context, cursor interface{}) (NetworkWatchResult, error) {
+	if cursor == nil {
+		return m.networkWatchReset(ctx)
+	}
+
+	nextIndex, err := getNextIndex(cursor)
+	if err != nil {
+		return NetworkWatchResult{}, err
+	}
+
+	resp, err := m.registry.watch(ctx, "", nextIndex)
+
+	switch {
+	case err == nil:
+		return m.parseNetworkWatchResponse(resp)
+
+	case isIndexTooSmall(err):
+		log.Warning("Watch of subnet leases failed because etcd index outside history window")
+		return m.networkWatchReset(ctx)
 
 	default:
-		return WatchResult{}, err
+		return NetworkWatchResult{}, err
 	}
 }
 
@@ -319,10 +361,10 @@ func isIndexTooSmall(err error) bool {
 	return ok && etcdErr.Code == etcd.ErrorCodeEventIndexCleared
 }
 
-func parseSubnetWatchResponse(resp *etcd.Response) (WatchResult, error) {
+func parseSubnetWatchResponse(resp *etcd.Response) (LeaseWatchResult, error) {
 	sn, err := parseSubnetKey(resp.Node.Key)
 	if err != nil {
-		return WatchResult{}, fmt.Errorf("error parsing subnet IP: %s", resp.Node.Key)
+		return LeaseWatchResult{}, fmt.Errorf("error parsing subnet IP: %s", resp.Node.Key)
 	}
 
 	evt := Event{}
@@ -330,15 +372,16 @@ func parseSubnetWatchResponse(resp *etcd.Response) (WatchResult, error) {
 	switch resp.Action {
 	case "delete", "expire":
 		evt = Event{
-			SubnetRemoved,
+			EventRemoved,
 			Lease{Subnet: sn},
+			"",
 		}
 
 	default:
 		attrs := &LeaseAttrs{}
 		err := json.Unmarshal([]byte(resp.Node.Value), attrs)
 		if err != nil {
-			return WatchResult{}, err
+			return LeaseWatchResult{}, err
 		}
 
 		exp := time.Time{}
@@ -347,36 +390,120 @@ func parseSubnetWatchResponse(resp *etcd.Response) (WatchResult, error) {
 		}
 
 		evt = Event{
-			SubnetAdded,
+			EventAdded,
 			Lease{
 				Subnet:     sn,
 				Attrs:      attrs,
 				Expiration: exp,
 			},
+			"",
 		}
 	}
 
-	cursor := watchCursor{resp.Node.ModifiedIndex}
+	return LeaseWatchResult{
+		Cursor: watchCursor{resp.Node.ModifiedIndex},
+		Events: []Event{evt},
+	}, nil
+}
+
+// Returns network name from config key (eg, /coreos.com/network/foobar/config),
+// if the 'config' key isn't present we don't consider the network valid
+func (m *EtcdManager) parseNetworkKey(s string) (string, error) {
+	if parts := m.networkRegex.FindStringSubmatch(s); len(parts) == 2 {
+		return parts[1], nil
+	}
+
+	return "", errors.New("Error parsing Network key")
+}
+
+func (m *EtcdManager) parseNetworkWatchResponse(resp *etcd.Response) (NetworkWatchResult, error) {
+	netname, err := m.parseNetworkKey(resp.Node.Key)
+	if err != nil {
+		// Ignore non .../<netname>/config keys
+		return NetworkWatchResult{}, nil
+	}
+
+	evt := Event{}
+
+	switch resp.Action {
+	case "delete":
+		evt = Event{
+			EventRemoved,
+			Lease{},
+			netname,
+		}
+
+	default:
+		_, err := ParseConfig(resp.Node.Value)
+		if err != nil {
+			return NetworkWatchResult{}, err
+		}
+
+		evt = Event{
+			EventAdded,
+			Lease{},
+			netname,
+		}
+	}
 
-	return WatchResult{
-		Cursor: cursor,
+	return NetworkWatchResult{
+		Cursor: watchCursor{resp.Node.ModifiedIndex},
 		Events: []Event{evt},
 	}, nil
 }
 
-// watchReset is called when incremental watch failed and we need to grab a snapshot
-func (m *EtcdManager) watchReset(ctx context.Context, network string) (WatchResult, error) {
-	wr := WatchResult{}
+// getNetworks queries etcd to get a list of network names.  It returns the
+// networks along with the 'as-of' etcd-index that can be used as the starting
+// point for etcd watch.
+func (m *EtcdManager) getNetworks(ctx context.Context) ([]string, uint64, error) {
+	resp, err := m.registry.getNetworks(ctx)
+
+	networks := []string{}
 
-	leases, index, err := m.getLeases(ctx, network)
+	if err == nil {
+		for _, node := range resp.Node.Nodes {
+			netname, err := m.parseNetworkKey(node.Key)
+			if err == nil {
+				networks = append(networks, netname)
+			}
+		}
+
+		return networks, resp.Index, nil
+	}
+
+	if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
+		// key not found: treat it as empty set
+		return networks, etcdErr.Index, nil
+	}
+
+	return nil, 0, err
+}
+
+// leaseWatchReset is called when incremental lease watch failed and we need to grab a snapshot
+func (m *EtcdManager) leaseWatchReset(ctx context.Context, network string) (LeaseWatchResult, error) {
+	wr := LeaseWatchResult{}
 
+	leases, index, err := m.getLeases(ctx, network)
 	if err != nil {
 		return wr, fmt.Errorf("failed to retrieve subnet leases: %v", err)
 	}
 
-	cursor := watchCursor{index}
+	wr.Cursor = watchCursor{index}
 	wr.Snapshot = leases
-	wr.Cursor = cursor
+	return wr, nil
+}
+
+// networkWatchReset is called when incremental network watch failed and we need to grab a snapshot
+func (m *EtcdManager) networkWatchReset(ctx context.Context) (NetworkWatchResult, error) {
+	wr := NetworkWatchResult{}
+
+	networks, index, err := m.getNetworks(ctx)
+	if err != nil {
+		return wr, fmt.Errorf("failed to retrieve networks: %v", err)
+	}
+
+	wr.Cursor = watchCursor{index}
+	wr.Snapshot = networks
 	return wr, nil
 }
 

+ 186 - 50
subnet/mock_registry.go

@@ -16,63 +16,79 @@ package subnet
 
 import (
 	"fmt"
+	"path"
+	"sort"
+	"strings"
 	"time"
 
 	etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 )
 
-type mockSubnetRegistry struct {
-	config  *etcd.Node
-	subnets *etcd.Node
-	events  chan *etcd.Response
-	index   uint64
-	ttl     time.Duration
+type MockSubnetRegistry struct {
+	networks map[string]*etcd.Node
+	events   chan *etcd.Response
+	index    uint64
+	ttl      time.Duration
 }
 
-func newMockRegistry(ttlOverride time.Duration, config string, initialSubnets []*etcd.Node) *mockSubnetRegistry {
+const networkKeyPrefix = "/coreos.com/network"
+
+func NewMockRegistry(ttlOverride time.Duration, network, config string, initialSubnets []*etcd.Node) *MockSubnetRegistry {
 	index := uint64(0)
+
+	node := &etcd.Node{Key: normalizeNetKey(network), Value: config, ModifiedIndex: 0, Nodes: make([]*etcd.Node, 0, 20)}
 	for _, n := range initialSubnets {
 		if n.ModifiedIndex > index {
 			index = n.ModifiedIndex
 		}
+		node.Nodes = append(node.Nodes, n)
 	}
 
-	return &mockSubnetRegistry{
-		config: &etcd.Node{
-			Value: config,
-		},
-		subnets: &etcd.Node{
-			Nodes: initialSubnets,
-		},
+	msr := &MockSubnetRegistry{
 		events: make(chan *etcd.Response, 1000),
 		index:  index + 1,
 		ttl:    ttlOverride,
 	}
+
+	msr.networks = make(map[string]*etcd.Node)
+	msr.networks[network] = node
+	return msr
 }
 
-func (msr *mockSubnetRegistry) getConfig(ctx context.Context, network string) (*etcd.Response, error) {
+func (msr *MockSubnetRegistry) getNetworkConfig(ctx context.Context, network string) (*etcd.Response, error) {
 	return &etcd.Response{
 		Index: msr.index,
-		Node:  msr.config,
+		Node:  msr.networks[network],
 	}, nil
 }
 
-func (msr *mockSubnetRegistry) setConfig(config string) {
-	msr.config = &etcd.Node{
-		Key:   "config",
-		Value: config,
+func (msr *MockSubnetRegistry) setConfig(network, config string) error {
+	n, ok := msr.networks[network]
+	if !ok {
+		return fmt.Errorf("Network %s not found", network)
 	}
+	n.Value = config
+	return nil
 }
 
-func (msr *mockSubnetRegistry) getSubnets(ctx context.Context, network string) (*etcd.Response, error) {
+func (msr *MockSubnetRegistry) getSubnets(ctx context.Context, network string) (*etcd.Response, error) {
+	n, ok := msr.networks[network]
+	if !ok {
+		return nil, fmt.Errorf("Network %s not found", network)
+	}
 	return &etcd.Response{
-		Node:  msr.subnets,
+		Node:  n,
 		Index: msr.index,
 	}, nil
 }
 
-func (msr *mockSubnetRegistry) createSubnet(ctx context.Context, network, sn, data string, ttl time.Duration) (*etcd.Response, error) {
+func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, network, sn, data string, ttl time.Duration) (*etcd.Response, error) {
+	n, ok := msr.networks[network]
+	if !ok {
+		return nil, fmt.Errorf("Network %s not found", network)
+	}
+
 	msr.index += 1
 
 	if msr.ttl > 0 {
@@ -88,7 +104,7 @@ func (msr *mockSubnetRegistry) createSubnet(ctx context.Context, network, sn, da
 		Expiration:    &exp,
 	}
 
-	msr.subnets.Nodes = append(msr.subnets.Nodes, node)
+	n.Nodes = append(n.Nodes, node)
 	msr.events <- &etcd.Response{
 		Action: "add",
 		Node:   node,
@@ -100,23 +116,28 @@ func (msr *mockSubnetRegistry) createSubnet(ctx context.Context, network, sn, da
 	}, nil
 }
 
-func (msr *mockSubnetRegistry) updateSubnet(ctx context.Context, network, sn, data string, ttl time.Duration) (*etcd.Response, error) {
+func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, network, sn, data string, ttl time.Duration) (*etcd.Response, error) {
+	n, ok := msr.networks[network]
+	if !ok {
+		return nil, fmt.Errorf("Network %s not found", network)
+	}
+
 	msr.index += 1
 
 	exp := time.Now().Add(ttl)
 
-	for _, n := range msr.subnets.Nodes {
-		if n.Key == sn {
-			n.Value = data
-			n.ModifiedIndex = msr.index
-			n.Expiration = &exp
+	for _, sub := range n.Nodes {
+		if sub.Key == sn {
+			sub.Value = data
+			sub.ModifiedIndex = msr.index
+			sub.Expiration = &exp
 			msr.events <- &etcd.Response{
 				Action: "add",
-				Node:   n,
+				Node:   sub,
 			}
 
 			return &etcd.Response{
-				Node:  n,
+				Node:  sub,
 				Index: msr.index,
 			}, nil
 		}
@@ -125,20 +146,26 @@ func (msr *mockSubnetRegistry) updateSubnet(ctx context.Context, network, sn, da
 	return nil, fmt.Errorf("Subnet not found")
 }
 
-func (msr *mockSubnetRegistry) deleteSubnet(ctx context.Context, network, sn string) (*etcd.Response, error) {
+func (msr *MockSubnetRegistry) deleteSubnet(ctx context.Context, network, sn string) (*etcd.Response, error) {
+	n, ok := msr.networks[network]
+	if !ok {
+		return nil, fmt.Errorf("Network %s not found", network)
+	}
+
 	msr.index += 1
 
-	for i, n := range msr.subnets.Nodes {
-		if n.Key == sn {
-			msr.subnets.Nodes[i] = msr.subnets.Nodes[len(msr.subnets.Nodes)-1]
-			msr.subnets.Nodes = msr.subnets.Nodes[:len(msr.subnets.Nodes)-1]
+	for i, sub := range n.Nodes {
+		if sub.Key == sn {
+			n.Nodes[i] = n.Nodes[len(n.Nodes)-1]
+			n.Nodes = n.Nodes[:len(n.Nodes)-1]
+			sub.ModifiedIndex = msr.index
 			msr.events <- &etcd.Response{
 				Action: "delete",
-				Node:   n,
+				Node:   sub,
 			}
 
 			return &etcd.Response{
-				Node:  n,
+				Node:  sub,
 				Index: msr.index,
 			}, nil
 		}
@@ -148,8 +175,17 @@ func (msr *mockSubnetRegistry) deleteSubnet(ctx context.Context, network, sn str
 
 }
 
-func (msr *mockSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (*etcd.Response, error) {
+func (msr *MockSubnetRegistry) watch(ctx context.Context, network string, since uint64) (*etcd.Response, error) {
 	for {
+		if since < msr.index {
+			return nil, etcd.Error{
+				Code:    etcd.ErrorCodeEventIndexCleared,
+				Cause:   "out of date",
+				Message: "cursor is out of date",
+				Index:   msr.index,
+			}
+		}
+
 		select {
 		case <-ctx.Done():
 			return nil, ctx.Err()
@@ -163,27 +199,127 @@ func (msr *mockSubnetRegistry) watchSubnets(ctx context.Context, network string,
 	}
 }
 
-func (msr *mockSubnetRegistry) hasSubnet(sn string) bool {
-	for _, n := range msr.subnets.Nodes {
-		if n.Key == sn {
+func (msr *MockSubnetRegistry) hasSubnet(network, sn string) bool {
+	n, ok := msr.networks[network]
+	if !ok {
+		return false
+	}
+
+	for _, sub := range n.Nodes {
+		if sub.Key == sn {
 			return true
 		}
 	}
 	return false
 }
 
-func (msr *mockSubnetRegistry) expireSubnet(sn string) {
-	for i, n := range msr.subnets.Nodes {
-		if n.Key == sn {
+func (msr *MockSubnetRegistry) expireSubnet(network, sn string) {
+	n, ok := msr.networks[network]
+	if !ok {
+		return
+	}
+
+	for i, sub := range n.Nodes {
+		if sub.Key == sn {
 			msr.index += 1
-			msr.subnets.Nodes[i] = msr.subnets.Nodes[len(msr.subnets.Nodes)-1]
-			msr.subnets.Nodes = msr.subnets.Nodes[:len(msr.subnets.Nodes)-2]
-			n.ModifiedIndex = msr.index
+			n.Nodes[i] = n.Nodes[len(n.Nodes)-1]
+			n.Nodes = n.Nodes[:len(n.Nodes)-2]
+			sub.ModifiedIndex = msr.index
 			msr.events <- &etcd.Response{
 				Action: "expire",
-				Node:   n,
+				Node:   sub,
 			}
 			return
 		}
 	}
 }
+
+func (msr *MockSubnetRegistry) getNetworks(ctx context.Context) (*etcd.Response, error) {
+	var keys []string
+	for k := range msr.networks {
+		keys = append(keys, k)
+	}
+	sort.Strings(keys)
+
+	networks := &etcd.Node{Key: networkKeyPrefix, Value: "", ModifiedIndex: msr.index, Nodes: make([]*etcd.Node, 0, len(keys))}
+	for _, k := range keys {
+		networks.Nodes = append(networks.Nodes, msr.networks[k])
+	}
+
+	return &etcd.Response{
+		Node:  networks,
+		Index: msr.index,
+	}, nil
+}
+
+func (msr *MockSubnetRegistry) getNetwork(ctx context.Context, network string) (*etcd.Response, error) {
+	n, ok := msr.networks[network]
+	if !ok {
+		return nil, fmt.Errorf("Network %s not found", network)
+	}
+
+	return &etcd.Response{
+		Node:  n,
+		Index: msr.index,
+	}, nil
+}
+
+func (msr *MockSubnetRegistry) CreateNetwork(ctx context.Context, network, config string) (*etcd.Response, error) {
+	_, ok := msr.networks[network]
+	if ok {
+		return nil, fmt.Errorf("Network %s already exists", network)
+	}
+
+	msr.index += 1
+
+	node := &etcd.Node{
+		Key:           normalizeNetKey(network),
+		Value:         config,
+		ModifiedIndex: msr.index,
+	}
+
+	msr.networks[network] = node
+	msr.events <- &etcd.Response{
+		Action: "add",
+		Node:   node,
+	}
+
+	return &etcd.Response{
+		Node:  node,
+		Index: msr.index,
+	}, nil
+}
+
+func (msr *MockSubnetRegistry) DeleteNetwork(ctx context.Context, network string) (*etcd.Response, error) {
+	n, ok := msr.networks[network]
+	if !ok {
+		return nil, fmt.Errorf("Network %s not found", network)
+	}
+
+	msr.index += 1
+
+	n.ModifiedIndex = msr.index
+
+	delete(msr.networks, network)
+	msr.events <- &etcd.Response{
+		Action: "delete",
+		Node:   n,
+	}
+
+	return &etcd.Response{
+		Node:  n,
+		Index: msr.index,
+	}, nil
+}
+
+func normalizeNetKey(key string) string {
+	match := networkKeyPrefix
+	newKey := key
+	if !strings.HasPrefix(newKey, match+"/") {
+		newKey = path.Join(match, key)
+	}
+	if !strings.HasSuffix(newKey, "/config") {
+		newKey = path.Join(newKey, "config")
+	}
+	return newKey
+}

+ 2 - 5
subnet/mock_subnet.go

@@ -14,9 +14,6 @@
 
 package subnet
 
-import "time"
-
-func NewMockManager(ttlOverride time.Duration, config string) Manager {
-	r := newMockRegistry(ttlOverride, config, nil)
-	return newEtcdManager(r)
+func NewMockManager(registry *MockSubnetRegistry) Manager {
+	return newEtcdManager(registry)
 }

+ 12 - 6
subnet/registry.go

@@ -27,12 +27,13 @@ import (
 )
 
 type Registry interface {
-	getConfig(ctx context.Context, network string) (*etcd.Response, error)
+	getNetworkConfig(ctx context.Context, network string) (*etcd.Response, error)
 	getSubnets(ctx context.Context, network string) (*etcd.Response, error)
 	createSubnet(ctx context.Context, network, sn, data string, ttl time.Duration) (*etcd.Response, error)
 	updateSubnet(ctx context.Context, network, sn, data string, ttl time.Duration) (*etcd.Response, error)
 	deleteSubnet(ctx context.Context, network, sn string) (*etcd.Response, error)
-	watchSubnets(ctx context.Context, network string, since uint64) (*etcd.Response, error)
+	watch(ctx context.Context, path string, since uint64) (*etcd.Response, error)
+	getNetworks(ctx context.Context) (*etcd.Response, error)
 }
 
 type EtcdConfig struct {
@@ -86,7 +87,7 @@ func newEtcdSubnetRegistry(config *EtcdConfig) (Registry, error) {
 	return r, nil
 }
 
-func (esr *etcdSubnetRegistry) getConfig(ctx context.Context, network string) (*etcd.Response, error) {
+func (esr *etcdSubnetRegistry) getNetworkConfig(ctx context.Context, network string) (*etcd.Response, error) {
 	key := path.Join(esr.etcdCfg.Prefix, network, "config")
 	resp, err := esr.client().Get(ctx, key, nil)
 	if err != nil {
@@ -131,13 +132,18 @@ func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, network, sn str
 	return esr.client().Delete(ctx, key, nil)
 }
 
-func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (*etcd.Response, error) {
-	key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
+func (esr *etcdSubnetRegistry) watch(ctx context.Context, subpath string, since uint64) (*etcd.Response, error) {
+	key := path.Join(esr.etcdCfg.Prefix, subpath)
 	opts := &etcd.WatcherOptions{
 		AfterIndex: since,
 		Recursive:  true,
 	}
-	return esr.client().Watcher(key, opts).Next(ctx)
+	e, err := esr.client().Watcher(key, opts).Next(ctx)
+	return e, err
+}
+
+func (esr *etcdSubnetRegistry) getNetworks(ctx context.Context) (*etcd.Response, error) {
+	return esr.client().Get(ctx, esr.etcdCfg.Prefix, &etcd.GetOptions{Recursive: true})
 }
 
 func (esr *etcdSubnetRegistry) client() etcd.KeysAPI {

+ 24 - 14
subnet/subnet.go

@@ -44,33 +44,42 @@ type (
 	EventType int
 
 	Event struct {
-		Type  EventType `json:"type"`
-		Lease Lease     `json:"lease"`
+		Type    EventType `json:"type"`
+		Lease   Lease     `json:"lease,omitempty"`
+		Network string    `json:"network,omitempty"`
 	}
 )
 
 const (
-	SubnetAdded EventType = iota
-	SubnetRemoved
+	EventAdded EventType = iota
+	EventRemoved
 )
 
-type WatchResult struct {
-	// Either Events or Leases should be set.
-	// If Leases are not empty, it means the cursor
-	// was out of range and Snapshot contains the current
-	// list of leases
+type LeaseWatchResult struct {
+	// Either Events or Snapshot will be set.  If Events is empty, it means
+	// the cursor was out of range and Snapshot contains the current list
+	// of items, even if empty.
 	Events   []Event     `json:"events"`
 	Snapshot []Lease     `json:"snapshot"`
 	Cursor   interface{} `json:"cursor"`
 }
 
+type NetworkWatchResult struct {
+	// Either Events or Snapshot will be set.  If Events is empty, it means
+	// the cursor was out of range and Snapshot contains the current list
+	// of items, even if empty.
+	Events   []Event     `json:"events"`
+	Snapshot []string    `json:"snapshot"`
+	Cursor   interface{} `json:"cursor,omitempty"`
+}
+
 func (et EventType) MarshalJSON() ([]byte, error) {
 	s := ""
 
 	switch et {
-	case SubnetAdded:
+	case EventAdded:
 		s = "added"
-	case SubnetRemoved:
+	case EventRemoved:
 		s = "removed"
 	default:
 		return nil, errors.New("bad event type")
@@ -81,9 +90,9 @@ func (et EventType) MarshalJSON() ([]byte, error) {
 func (et *EventType) UnmarshalJSON(data []byte) error {
 	switch string(data) {
 	case "\"added\"":
-		*et = SubnetAdded
+		*et = EventAdded
 	case "\"removed\"":
-		*et = SubnetRemoved
+		*et = EventRemoved
 	default:
 		fmt.Println(string(data))
 		return errors.New("bad event type")
@@ -96,5 +105,6 @@ type Manager interface {
 	GetNetworkConfig(ctx context.Context, network string) (*Config, error)
 	AcquireLease(ctx context.Context, network string, attrs *LeaseAttrs) (*Lease, error)
 	RenewLease(ctx context.Context, network string, lease *Lease) error
-	WatchLeases(ctx context.Context, network string, cursor interface{}) (WatchResult, error)
+	WatchLeases(ctx context.Context, network string, cursor interface{}) (LeaseWatchResult, error)
+	WatchNetworks(ctx context.Context, cursor interface{}) (NetworkWatchResult, error)
 }

+ 123 - 16
subnet/subnet_test.go

@@ -27,7 +27,7 @@ import (
 	"github.com/coreos/flannel/pkg/ip"
 )
 
-func newDummyRegistry(ttlOverride time.Duration) *mockSubnetRegistry {
+func newDummyRegistry(ttlOverride time.Duration) *MockSubnetRegistry {
 	subnets := []*etcd.Node{
 		&etcd.Node{Key: "10.3.1.0-24", Value: `{ "PublicIP": "1.1.1.1" }`, ModifiedIndex: 10},
 		&etcd.Node{Key: "10.3.2.0-24", Value: `{ "PublicIP": "1.1.1.1" }`, ModifiedIndex: 11},
@@ -36,7 +36,7 @@ func newDummyRegistry(ttlOverride time.Duration) *mockSubnetRegistry {
 	}
 
 	config := `{ "Network": "10.3.0.0/16", "SubnetMin": "10.3.1.0", "SubnetMax": "10.3.5.0" }`
-	return newMockRegistry(ttlOverride, config, subnets)
+	return NewMockRegistry(ttlOverride, "_", config, subnets)
 }
 
 func TestAcquireLease(t *testing.T) {
@@ -48,7 +48,7 @@ func TestAcquireLease(t *testing.T) {
 		PublicIP: extIaddr,
 	}
 
-	l, err := sm.AcquireLease(context.Background(), "", &attrs)
+	l, err := sm.AcquireLease(context.Background(), "_", &attrs)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
@@ -58,7 +58,7 @@ func TestAcquireLease(t *testing.T) {
 	}
 
 	// Acquire again, should reuse
-	if l, err = sm.AcquireLease(context.Background(), "", &attrs); err != nil {
+	if l, err = sm.AcquireLease(context.Background(), "_", &attrs); err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
@@ -76,7 +76,7 @@ func TestConfigChanged(t *testing.T) {
 		PublicIP: extIaddr,
 	}
 
-	l, err := sm.AcquireLease(context.Background(), "", &attrs)
+	l, err := sm.AcquireLease(context.Background(), "_", &attrs)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
@@ -87,10 +87,10 @@ func TestConfigChanged(t *testing.T) {
 
 	// Change config
 	config := `{ "Network": "10.4.0.0/16" }`
-	msr.setConfig(config)
+	msr.setConfig("_", config)
 
 	// Acquire again, should not reuse
-	if l, err = sm.AcquireLease(context.Background(), "", &attrs); err != nil {
+	if l, err = sm.AcquireLease(context.Background(), "_", &attrs); err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
@@ -117,7 +117,7 @@ func acquireLease(ctx context.Context, t *testing.T, sm Manager) *Lease {
 		PublicIP: extIaddr,
 	}
 
-	l, err := sm.AcquireLease(ctx, "", &attrs)
+	l, err := sm.AcquireLease(ctx, "_", &attrs)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
@@ -135,7 +135,7 @@ func TestWatchLeaseAdded(t *testing.T) {
 	l := acquireLease(ctx, t, sm)
 
 	events := make(chan []Event)
-	go WatchLeases(ctx, sm, "", l, events)
+	go WatchLeases(ctx, sm, "_", l, events)
 
 	evtBatch := <-events
 	for _, evt := range evtBatch {
@@ -155,7 +155,7 @@ func TestWatchLeaseAdded(t *testing.T) {
 
 	evt := evtBatch[0]
 
-	if evt.Type != SubnetAdded {
+	if evt.Type != EventAdded {
 		t.Fatalf("WatchLeases produced wrong event type")
 	}
 
@@ -175,7 +175,7 @@ func TestWatchLeaseRemoved(t *testing.T) {
 	l := acquireLease(ctx, t, sm)
 
 	events := make(chan []Event)
-	go WatchLeases(ctx, sm, "", l, events)
+	go WatchLeases(ctx, sm, "_", l, events)
 
 	evtBatch := <-events
 	for _, evt := range evtBatch {
@@ -185,7 +185,7 @@ func TestWatchLeaseRemoved(t *testing.T) {
 	}
 
 	expected := "10.3.4.0-24"
-	msr.expireSubnet(expected)
+	msr.expireSubnet("_", expected)
 
 	evtBatch = <-events
 	if len(evtBatch) != 1 {
@@ -194,7 +194,7 @@ func TestWatchLeaseRemoved(t *testing.T) {
 
 	evt := evtBatch[0]
 
-	if evt.Type != SubnetRemoved {
+	if evt.Type != EventRemoved {
 		t.Fatalf("WatchLeases produced wrong event type")
 	}
 
@@ -229,18 +229,22 @@ func TestRenewLease(t *testing.T) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	l, err := sm.AcquireLease(ctx, "", &attrs)
+	l, err := sm.AcquireLease(ctx, "_", &attrs)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
-	go LeaseRenewer(ctx, sm, "", l)
+	go LeaseRenewer(ctx, sm, "_", l)
 
 	fmt.Println("Waiting for lease to pass original expiration")
 	time.Sleep(2 * time.Second)
 
 	// check that it's still good
-	for _, n := range msr.subnets.Nodes {
+	net, err := msr.getNetwork(ctx, "_")
+	if err != nil {
+		t.Error("Failed to renew lease: could not get networks: %v", err)
+	}
+	for _, n := range net.Node.Nodes {
 		if n.Key == l.Subnet.StringSep(".", "-") {
 			if n.Expiration.Before(time.Now()) {
 				t.Error("Failed to renew lease: expiration did not advance")
@@ -259,3 +263,106 @@ func TestRenewLease(t *testing.T) {
 
 	t.Fatalf("Failed to find acquired lease")
 }
+
+func TestWatchGetNetworks(t *testing.T) {
+	msr := newDummyRegistry(0)
+	sm := newEtcdManager(msr)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	// Kill the previously added "_" network
+	msr.DeleteNetwork(ctx, "_")
+
+	expected := "foobar"
+	msr.CreateNetwork(ctx, expected, `{"Network": "10.1.1.0/16", "Backend": {"Type": "bridge"}}`)
+
+	resp, err := sm.WatchNetworks(ctx, nil)
+	if err != nil {
+		t.Errorf("WatchNetworks(nil) failed:", err)
+	}
+
+	if len(resp.Snapshot) != 1 {
+		t.Errorf("WatchNetworks(nil) produced wrong number of networks: expected 1, got %d", len(resp.Snapshot))
+	}
+
+	if resp.Snapshot[0] != expected {
+		t.Errorf("WatchNetworks(nil) produced wrong network: expected %s, got %s", expected, resp.Snapshot[0])
+	}
+}
+
+func TestWatchNetworkAdded(t *testing.T) {
+	msr := newDummyRegistry(0)
+	sm := newEtcdManager(msr)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	events := make(chan []Event)
+	go WatchNetworks(ctx, sm, events)
+
+	// skip over the initial snapshot
+	<-events
+
+	expected := "foobar"
+	msr.CreateNetwork(ctx, expected, `{"Network": "10.1.1.0/16", "Backend": {"Type": "bridge"}}`)
+
+	evtBatch := <-events
+
+	if len(evtBatch) != 1 {
+		t.Fatalf("WatchNetworks produced wrong sized event batch")
+	}
+
+	evt := evtBatch[0]
+
+	if evt.Type != EventAdded {
+		t.Fatalf("WatchNetworks produced wrong event type")
+	}
+
+	actual := evt.Network
+	if actual != expected {
+		t.Errorf("WatchNetworks produced wrong network: expected %s, got %s", expected, actual)
+	}
+}
+
+func TestWatchNetworkRemoved(t *testing.T) {
+	msr := newDummyRegistry(0)
+	sm := newEtcdManager(msr)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	events := make(chan []Event)
+	go WatchNetworks(ctx, sm, events)
+
+	// skip over the initial snapshot
+	<-events
+
+	expected := "blah"
+	msr.CreateNetwork(ctx, expected, `{"Network": "10.1.1.0/16", "Backend": {"Type": "bridge"}}`)
+
+	// skip over the create event
+	<-events
+
+	_, err := msr.DeleteNetwork(ctx, expected)
+	if err != nil {
+		t.Fatalf("WatchNetworks failed to delete the network")
+	}
+
+	evtBatch := <-events
+
+	if len(evtBatch) != 1 {
+		t.Fatalf("WatchNetworks produced wrong sized event batch")
+	}
+
+	evt := evtBatch[0]
+
+	if evt.Type != EventRemoved {
+		t.Fatalf("WatchNetworks produced wrong event type")
+	}
+
+	actual := evt.Network
+	if actual != expected {
+		t.Errorf("WatchNetwork produced wrong network: expected %s, got %s", expected, actual)
+	}
+}

+ 112 - 11
subnet/watch.go

@@ -46,10 +46,10 @@ func WatchLeases(ctx context.Context, sm Manager, network string, ownLease *Leas
 
 		batch := []Event{}
 
-		if len(res.Snapshot) > 0 {
-			batch = lw.reset(res.Snapshot)
-		} else {
+		if len(res.Events) > 0 {
 			batch = lw.update(res.Events)
+		} else {
+			batch = lw.reset(res.Snapshot)
 		}
 
 		if batch != nil {
@@ -82,13 +82,13 @@ func (lw *leaseWatcher) reset(leases []Lease) []Event {
 
 		if !found {
 			// new lease
-			batch = append(batch, Event{SubnetAdded, nl})
+			batch = append(batch, Event{EventAdded, nl, ""})
 		}
 	}
 
 	// everything left in sm.leases has been deleted
 	for _, l := range lw.leases {
-		batch = append(batch, Event{SubnetRemoved, l})
+		batch = append(batch, Event{EventRemoved, l, ""})
 	}
 
 	lw.leases = leases
@@ -105,10 +105,10 @@ func (lw *leaseWatcher) update(events []Event) []Event {
 		}
 
 		switch e.Type {
-		case SubnetAdded:
+		case EventAdded:
 			batch = append(batch, lw.add(&e.Lease))
 
-		case SubnetRemoved:
+		case EventRemoved:
 			batch = append(batch, lw.remove(&e.Lease))
 		}
 	}
@@ -120,27 +120,128 @@ func (lw *leaseWatcher) add(lease *Lease) Event {
 	for i, l := range lw.leases {
 		if l.Subnet.Equal(lease.Subnet) {
 			lw.leases[i] = *lease
-			return Event{SubnetAdded, lw.leases[i]}
+			return Event{EventAdded, lw.leases[i], ""}
 		}
 	}
 
 	lw.leases = append(lw.leases, *lease)
-	return Event{SubnetAdded, lw.leases[len(lw.leases)-1]}
+	return Event{EventAdded, lw.leases[len(lw.leases)-1], ""}
 }
 
 func (lw *leaseWatcher) remove(lease *Lease) Event {
 	for i, l := range lw.leases {
 		if l.Subnet.Equal(lease.Subnet) {
 			lw.leases = deleteLease(lw.leases, i)
-			return Event{SubnetRemoved, l}
+			return Event{EventRemoved, l, ""}
 		}
 	}
 
 	log.Errorf("Removed subnet (%s) was not found", lease.Subnet)
-	return Event{SubnetRemoved, *lease}
+	return Event{EventRemoved, *lease, ""}
 }
 
 func deleteLease(l []Lease, i int) []Lease {
 	l[i], l = l[len(l)-1], l[:len(l)-1]
 	return l
 }
+
+// WatchNetworks performs a long term watch of flannel networks and communicates
+// addition/deletion events on receiver channel. It takes care of handling
+// "fall-behind" logic where the history window has advanced too far and it
+// needs to diff the latest snapshot with its saved state and generate events
+func WatchNetworks(ctx context.Context, sm Manager, receiver chan []Event) {
+	nw := newNetWatcher()
+	var cursor interface{}
+
+	for {
+		res, err := sm.WatchNetworks(ctx, cursor)
+		if err != nil {
+			if err == context.Canceled || err == context.DeadlineExceeded {
+				return
+			}
+
+			log.Errorf("Watch networks: %v", err)
+			time.Sleep(time.Second)
+			continue
+		}
+		cursor = res.Cursor
+
+		batch := []Event{}
+
+		if len(res.Events) > 0 {
+			batch = nw.update(res.Events)
+		} else {
+			batch = nw.reset(res.Snapshot)
+		}
+
+		if batch != nil {
+			receiver <- batch
+		}
+	}
+}
+
+type netWatcher struct {
+	networks map[string]bool
+}
+
+func newNetWatcher() *netWatcher {
+	return &netWatcher{networks: make(map[string]bool)}
+}
+
+func (nw *netWatcher) reset(networks []string) []Event {
+	batch := []Event{}
+	newNetworks := make(map[string]bool)
+
+	for _, netname := range networks {
+		if nw.networks[netname] {
+			delete(nw.networks, netname)
+		} else {
+			// new network
+			batch = append(batch, Event{EventAdded, Lease{}, netname})
+		}
+		newNetworks[netname] = true
+	}
+
+	// everything left in sm.networks has been deleted
+	for netname, _ := range nw.networks {
+		batch = append(batch, Event{EventRemoved, Lease{}, netname})
+	}
+
+	nw.networks = newNetworks
+
+	return batch
+}
+
+func (nw *netWatcher) update(events []Event) []Event {
+	batch := []Event{}
+
+	for _, e := range events {
+		switch e.Type {
+		case EventAdded:
+			batch = append(batch, nw.add(e.Network))
+
+		case EventRemoved:
+			batch = append(batch, nw.remove(e.Network))
+		}
+	}
+
+	return batch
+}
+
+func (nw *netWatcher) add(network string) Event {
+	if _, ok := nw.networks[network]; !ok {
+		nw.networks[network] = true
+	}
+
+	return Event{EventAdded, Lease{}, network}
+}
+
+func (nw *netWatcher) remove(network string) Event {
+	if _, ok := nw.networks[network]; ok {
+		delete(nw.networks, network)
+	} else {
+		log.Errorf("Removed network (%s) was not found", network)
+	}
+
+	return Event{EventRemoved, Lease{}, network}
+}