Browse Source

Merge branch 'master' into fix

Tom Denham 8 years ago
parent
commit
c55b98bd6a

+ 0 - 1
.gitignore

@@ -1,6 +1,5 @@
 dist/*.tar.gz
 dist/flanneld*
-dist/iptables-*
 dist/libpthread*
 dist/ld64*
 dist/libc*

+ 1 - 1
Dockerfile.amd64

@@ -8,5 +8,5 @@ COPY dist/flanneld-$FLANNEL_ARCH /opt/bin/flanneld
 COPY dist/iptables-$FLANNEL_ARCH /usr/local/bin/iptables
 COPY dist/mk-docker-opts.sh /opt/bin/
 
-CMD ["/opt/bin/flanneld"]
+ENTRYPOINT ["/opt/bin/flanneld"]
 

+ 1 - 1
Dockerfile.arm

@@ -12,4 +12,4 @@ COPY dist/ld64.so.1-$FLANNEL_ARCH /lib/ld64.so.1
 COPY dist/libc.so.6-$FLANNEL_ARCH /lib/libc.so.6
 
 
-CMD ["/opt/bin/flanneld"]
+ENTRYPOINT ["/opt/bin/flanneld"]

+ 2 - 0
Dockerfile.arm64

@@ -10,3 +10,5 @@ COPY dist/mk-docker-opts.sh /opt/bin/
 COPY dist/libpthread.so.0-$FLANNEL_ARCH /lib/libpthread.so.0
 COPY dist/ld64.so.1-$FLANNEL_ARCH /lib/ld64.so.1
 COPY dist/libc.so.6-$FLANNEL_ARCH /lib/libc.so.6
+
+ENTRYPOINT ["/opt/bin/flanneld"]

+ 2 - 0
Dockerfile.ppc64le

@@ -10,3 +10,5 @@ COPY dist/mk-docker-opts.sh /opt/bin/
 COPY dist/libpthread.so.0-$FLANNEL_ARCH /lib/libpthread.so.0
 COPY dist/ld64.so.1-$FLANNEL_ARCH /lib/ld64.so.1
 COPY dist/libc.so.6-$FLANNEL_ARCH /lib/libc.so.6
+
+ENTRYPOINT ["/opt/bin/flanneld"]

+ 1 - 1
Dockerfile.s390x

@@ -11,4 +11,4 @@ COPY dist/libpthread.so.0-$FLANNEL_ARCH /lib/libpthread.so.0
 COPY dist/ld64.so.1-$FLANNEL_ARCH /lib/ld64.so.1
 COPY dist/libc.so.6-$FLANNEL_ARCH /lib/libc.so.6
 
-CMD ["/opt/bin/flanneld"]
+ENTRYPOINT ["/opt/bin/flanneld"]

+ 2 - 2
Documentation/kube-flannel.yml

@@ -55,7 +55,7 @@ spec:
       serviceAccountName: flannel
       containers:
       - name: kube-flannel
-        image: quay.io/coreos/flannel:v0.7.0-amd64
+        image: quay.io/coreos/flannel:v0.7.1-amd64
         command: [ "/opt/bin/flanneld", "--ip-masq", "--kube-subnet-mgr" ]
         securityContext:
           privileged: true
@@ -74,7 +74,7 @@ spec:
         - name: flannel-cfg
           mountPath: /etc/kube-flannel/
       - name: install-cni
-        image: quay.io/coreos/flannel:v0.7.0-amd64
+        image: quay.io/coreos/flannel:v0.7.1-amd64
         command: [ "/bin/sh", "-c", "set -e -x; cp -f /etc/kube-flannel/cni-conf.json /etc/cni/net.d/10-flannel.conf; while true; do sleep 3600; done" ]
         volumeMounts:
         - name: cni

+ 91 - 0
Documentation/minikube.yml

@@ -0,0 +1,91 @@
+# This manifest is intended for dev work, so there are some differences from the "normal" manifest
+#   - no namespace (make kubectl simpler)
+#   - special image name (flannel-minikube)
+#   - never pull the image
+#   - host-gw backend (since vxlan doesn't work in minikube)
+---
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  name: flannel
+---
+kind: ConfigMap
+apiVersion: v1
+metadata:
+  name: kube-flannel-cfg
+  labels:
+    tier: node
+    app: flannel
+data:
+  cni-conf.json: |
+    {
+      "name": "cbr0",
+      "type": "flannel",
+      "delegate": {
+        "isDefaultGateway": true
+      }
+    }
+  net-conf.json: |
+    {
+      "Network": "10.33.0.0/16",
+      "Backend": {
+        "Type": "host-gw"
+      }
+    }
+---
+apiVersion: extensions/v1beta1
+kind: DaemonSet
+metadata:
+  name: kube-flannel-ds
+  labels:
+    tier: node
+    app: flannel
+spec:
+  template:
+    metadata:
+      labels:
+        tier: node
+        app: flannel
+    spec:
+      hostNetwork: true
+      serviceAccountName: flannel
+      containers:
+      - name: kube-flannel
+        image: flannel/minikube
+        imagePullPolicy: Never
+        command: [ "/opt/bin/flanneld", "--ip-masq", "--kube-subnet-mgr" ]
+        securityContext:
+          privileged: true
+        env:
+        - name: POD_NAME
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.name
+        - name: POD_NAMESPACE
+          valueFrom:
+            fieldRef:
+              fieldPath: metadata.namespace
+        volumeMounts:
+        - name: run
+          mountPath: /run
+        - name: flannel-cfg
+          mountPath: /etc/kube-flannel/
+      - name: install-cni
+        image: flannel/minikube
+        imagePullPolicy: Never
+        command: [ "/bin/sh", "-c", "set -e -x; cat /etc/kube-flannel/cni-conf.json; cp -f /etc/kube-flannel/cni-conf.json /etc/cni/net.d/10-flannel.conf; while true; do sleep 3600; done" ]
+        volumeMounts:
+        - name: cni
+          mountPath: /etc/cni/net.d
+        - name: flannel-cfg
+          mountPath: /etc/kube-flannel/
+      volumes:
+        - name: run
+          hostPath:
+            path: /run
+        - name: cni
+          hostPath:
+            path: /etc/cni/net.d
+        - name: flannel-cfg
+          configMap:
+            name: kube-flannel-cfg

+ 9 - 0
Documentation/running.md

@@ -7,6 +7,15 @@ Additionally it will monitor etcd for new members of the network and adjust the
 
 After flannel has acquired the subnet and configured backend, it will write out an environment variable file (`/run/flannel/subnet.env` by default) with subnet address and MTU that it supports.
 
+## Multiple networks
+
+Flanneld does not support multiple from a single daemon (it did previously as an experimental feature).
+However, it does support running multiple daemons on the same host with different configuration. The `-subnet-file` and `-etcd-prefix` options should be used to "namespace" the different daemons.
+For example
+```
+flanneld -subnet-file /vxlan.env -etcd-prefix=/vxlan/network
+```
+
 ## Zero-downtime restarts
 
 When running with a backend other than `udp`, the kernel is providing the data path with flanneld acting as the control plane.

+ 38 - 0
Makefile

@@ -194,3 +194,41 @@ flannel-git:
 	ARCH=arm64 REGISTRY=quay.io/coreos/flannel-git make clean dist/flanneld-$(TAG)-arm64.docker docker-push
 	ARCH=ppc64le REGISTRY=quay.io/coreos/flannel-git make clean dist/flanneld-$(TAG)-ppc64le.docker docker-push
 	ARCH=s390x REGISTRY=quay.io/coreos/flannel-git make clean dist/flanneld-$(TAG)-s390x.docker docker-push
+
+install:
+	# This is intended as just a developer convenience to help speed up non-containerized builds
+	# It is NOT how you install flannel
+	CGO_ENABLED=1 go install -v github.com/coreos/flannel
+
+minikube-start:
+	minikube start --network-plugin cni
+
+minikube-build-image: dist/iptables-amd64 dist/libpthread.so.0-amd64
+	CGO_ENABLED=1 go build -v -o dist/flanneld-amd64
+	# Make sure the minikube docker is being used "eval $(minikube docker-env)"
+	sh -c 'eval $$(minikube docker-env) && docker build -f Dockerfile.amd64 -t flannel/minikube .'
+
+minikube-deploy-flannel:
+	kubectl apply -f Documentation/minikube.yml
+
+minikube-remove-flannel:
+	kubectl delete -f Documentation/minikube.yml
+
+minikube-restart-pod:
+	# Use this to pick up a new image
+	kubectl delete pods -l app=flannel --grace-period=0
+
+kubernetes-logs:
+	kubectl logs `kubectl get po -l app=flannel -o=custom-columns=NAME:metadata.name --no-headers=true` -c kube-flannel -f
+
+LOCAL_IP_ENV?=$(shell ip route get 8.8.8.8 | head -1 | awk '{print $$7}')
+run-etcd: stop-etcd
+	docker run --detach \
+	-p 2379:2379 \
+	--name flannel-etcd quay.io/coreos/etcd \
+	etcd \
+	--advertise-client-urls "http://$(LOCAL_IP_ENV):2379,http://127.0.0.1:2379,http://$(LOCAL_IP_ENV):4001,http://127.0.0.1:4001" \
+	--listen-client-urls "http://0.0.0.0:2379,http://0.0.0.0:4001"
+
+stop-etcd:
+	@-docker rm -f flannel-etcd

+ 2 - 2
backend/alivpc/alivpc.go

@@ -51,7 +51,7 @@ func (be *AliVpcBackend) Run(ctx context.Context) {
 	<-ctx.Done()
 }
 
-func (be *AliVpcBackend) RegisterNetwork(ctx context.Context, network string, config *subnet.Config) (backend.Network, error) {
+func (be *AliVpcBackend) RegisterNetwork(ctx context.Context, config *subnet.Config) (backend.Network, error) {
 	// 1. Parse our configuration
 	cfg := struct {
 		AccessKeyID     string
@@ -70,7 +70,7 @@ func (be *AliVpcBackend) RegisterNetwork(ctx context.Context, network string, co
 		PublicIP: ip.FromIP(be.extIface.ExtAddr),
 	}
 
-	l, err := be.sm.AcquireLease(ctx, network, &attrs)
+	l, err := be.sm.AcquireLease(ctx, &attrs)
 	switch err {
 	case nil:
 

+ 2 - 2
backend/alloc/alloc.go

@@ -44,12 +44,12 @@ func (_ *AllocBackend) Run(ctx context.Context) {
 	<-ctx.Done()
 }
 
-func (be *AllocBackend) RegisterNetwork(ctx context.Context, network string, config *subnet.Config) (backend.Network, error) {
+func (be *AllocBackend) RegisterNetwork(ctx context.Context, config *subnet.Config) (backend.Network, error) {
 	attrs := subnet.LeaseAttrs{
 		PublicIP: ip.FromIP(be.extIface.ExtAddr),
 	}
 
-	l, err := be.sm.AcquireLease(ctx, network, &attrs)
+	l, err := be.sm.AcquireLease(ctx, &attrs)
 	switch err {
 	case nil:
 		return &backend.SimpleNetwork{

+ 3 - 3
backend/awsvpc/awsvpc.go

@@ -53,7 +53,7 @@ func (be *AwsVpcBackend) Run(ctx context.Context) {
 	<-ctx.Done()
 }
 
-func (be *AwsVpcBackend) RegisterNetwork(ctx context.Context, network string, config *subnet.Config) (backend.Network, error) {
+func (be *AwsVpcBackend) RegisterNetwork(ctx context.Context, config *subnet.Config) (backend.Network, error) {
 	// Parse our configuration
 	cfg := struct {
 		RouteTableID string
@@ -70,7 +70,7 @@ func (be *AwsVpcBackend) RegisterNetwork(ctx context.Context, network string, co
 		PublicIP: ip.FromIP(be.extIface.ExtAddr),
 	}
 
-	l, err := be.sm.AcquireLease(ctx, network, &attrs)
+	l, err := be.sm.AcquireLease(ctx, &attrs)
 	switch err {
 	case nil:
 
@@ -115,7 +115,7 @@ func (be *AwsVpcBackend) RegisterNetwork(ctx context.Context, network string, co
 		log.Infof("Found route table %s.\n", cfg.RouteTableID)
 	}
 
-	networkConfig, err := be.sm.GetNetworkConfig(ctx, network)
+	networkConfig, err := be.sm.GetNetworkConfig(ctx)
 	if err != nil {
 		log.Errorf("Error fetching network config: %v", err)
 	}

+ 1 - 1
backend/common.go

@@ -43,7 +43,7 @@ type Backend interface {
 	// Called first to start the necessary event loops and such
 	Run(ctx context.Context)
 	// Called when the backend should create or begin managing a new network
-	RegisterNetwork(ctx context.Context, network string, config *subnet.Config) (Network, error)
+	RegisterNetwork(ctx context.Context, config *subnet.Config) (Network, error)
 }
 
 type Network interface {

+ 2 - 2
backend/gce/gce.go

@@ -86,12 +86,12 @@ func (g *GCEBackend) Run(ctx context.Context) {
 	<-ctx.Done()
 }
 
-func (g *GCEBackend) RegisterNetwork(ctx context.Context, network string, config *subnet.Config) (backend.Network, error) {
+func (g *GCEBackend) RegisterNetwork(ctx context.Context, config *subnet.Config) (backend.Network, error) {
 	attrs := subnet.LeaseAttrs{
 		PublicIP: ip.FromIP(g.extIface.ExtAddr),
 	}
 
-	l, err := g.sm.AcquireLease(ctx, network, &attrs)
+	l, err := g.sm.AcquireLease(ctx, &attrs)
 	switch err {
 	case nil:
 

+ 2 - 5
backend/hostgw/hostgw.go

@@ -55,9 +55,8 @@ func (_ *HostgwBackend) Run(ctx context.Context) {
 	<-ctx.Done()
 }
 
-func (be *HostgwBackend) RegisterNetwork(ctx context.Context, netname string, config *subnet.Config) (backend.Network, error) {
+func (be *HostgwBackend) RegisterNetwork(ctx context.Context, config *subnet.Config) (backend.Network, error) {
 	n := &network{
-		name:     netname,
 		extIface: be.extIface,
 		sm:       be.sm,
 	}
@@ -67,7 +66,7 @@ func (be *HostgwBackend) RegisterNetwork(ctx context.Context, netname string, co
 		BackendType: "host-gw",
 	}
 
-	l, err := be.sm.AcquireLease(ctx, netname, &attrs)
+	l, err := be.sm.AcquireLease(ctx, &attrs)
 	switch err {
 	case nil:
 		n.lease = l
@@ -81,7 +80,5 @@ func (be *HostgwBackend) RegisterNetwork(ctx context.Context, netname string, co
 
 	/* NB: docker will create the local route to `sn` */
 
-	be.networks[netname] = n
-
 	return n, nil
 }

+ 1 - 1
backend/hostgw/network.go → backend/hostgw/hostgw_network.go

@@ -52,7 +52,7 @@ func (n *network) Run(ctx context.Context) {
 	evts := make(chan []subnet.Event)
 	wg.Add(1)
 	go func() {
-		subnet.WatchLeases(ctx, n.sm, n.name, n.lease, evts)
+		subnet.WatchLeases(ctx, n.sm, n.lease, evts)
 		wg.Done()
 	}()
 

+ 3 - 3
backend/udp/udp.go

@@ -46,7 +46,7 @@ func New(sm subnet.Manager, extIface *backend.ExternalInterface) (backend.Backen
 	return &be, nil
 }
 
-func (be *UdpBackend) RegisterNetwork(ctx context.Context, netname string, config *subnet.Config) (backend.Network, error) {
+func (be *UdpBackend) RegisterNetwork(ctx context.Context, config *subnet.Config) (backend.Network, error) {
 	cfg := struct {
 		Port int
 	}{
@@ -65,7 +65,7 @@ func (be *UdpBackend) RegisterNetwork(ctx context.Context, netname string, confi
 		PublicIP: ip.FromIP(be.extIface.ExtAddr),
 	}
 
-	l, err := be.sm.AcquireLease(ctx, netname, &attrs)
+	l, err := be.sm.AcquireLease(ctx, &attrs)
 	switch err {
 	case nil:
 
@@ -83,7 +83,7 @@ func (be *UdpBackend) RegisterNetwork(ctx context.Context, netname string, confi
 		PrefixLen: config.Network.PrefixLen,
 	}
 
-	return newNetwork(netname, be.sm, be.extIface, cfg.Port, tunNet, l)
+	return newNetwork(be.sm, be.extIface, cfg.Port, tunNet, l)
 }
 
 func (_ *UdpBackend) Run(ctx context.Context) {

+ 2 - 3
backend/udp/network.go → backend/udp/udp_network.go

@@ -46,13 +46,12 @@ type network struct {
 	sm     subnet.Manager
 }
 
-func newNetwork(name string, sm subnet.Manager, extIface *backend.ExternalInterface, port int, nw ip.IP4Net, l *subnet.Lease) (*network, error) {
+func newNetwork(sm subnet.Manager, extIface *backend.ExternalInterface, port int, nw ip.IP4Net, l *subnet.Lease) (*network, error) {
 	n := &network{
 		SimpleNetwork: backend.SimpleNetwork{
 			SubnetLease: l,
 			ExtIface:    extIface,
 		},
-		name: name,
 		port: port,
 		sm:   sm,
 	}
@@ -101,7 +100,7 @@ func (n *network) Run(ctx context.Context) {
 
 	wg.Add(1)
 	go func() {
-		subnet.WatchLeases(ctx, n.sm, n.name, n.SubnetLease, evts)
+		subnet.WatchLeases(ctx, n.sm, n.SubnetLease, evts)
 		wg.Done()
 	}()
 

+ 3 - 3
backend/vxlan/vxlan.go

@@ -65,7 +65,7 @@ func (be *VXLANBackend) Run(ctx context.Context) {
 	<-ctx.Done()
 }
 
-func (be *VXLANBackend) RegisterNetwork(ctx context.Context, network string, config *subnet.Config) (backend.Network, error) {
+func (be *VXLANBackend) RegisterNetwork(ctx context.Context, config *subnet.Config) (backend.Network, error) {
 	// Parse our configuration
 	cfg := struct {
 		VNI  int
@@ -100,7 +100,7 @@ func (be *VXLANBackend) RegisterNetwork(ctx context.Context, network string, con
 		return nil, err
 	}
 
-	lease, err := be.subnetMgr.AcquireLease(ctx, network, subnetAttrs)
+	lease, err := be.subnetMgr.AcquireLease(ctx, subnetAttrs)
 	switch err {
 	case nil:
 
@@ -121,7 +121,7 @@ func (be *VXLANBackend) RegisterNetwork(ctx context.Context, network string, con
 		return nil, err
 	}
 
-	return newNetwork(network, be.subnetMgr, be.extIface, dev, vxlanNet, lease)
+	return newNetwork(be.subnetMgr, be.extIface, dev, vxlanNet, lease)
 }
 
 // So we can make it JSON (un)marshalable

+ 2 - 4
backend/vxlan/network.go → backend/vxlan/vxlan_network.go

@@ -33,20 +33,18 @@ import (
 
 type network struct {
 	backend.SimpleNetwork
-	name      string
 	extIface  *backend.ExternalInterface
 	dev       *vxlanDevice
 	routes    routes
 	subnetMgr subnet.Manager
 }
 
-func newNetwork(name string, subnetMgr subnet.Manager, extIface *backend.ExternalInterface, dev *vxlanDevice, _ ip.IP4Net, lease *subnet.Lease) (*network, error) {
+func newNetwork(subnetMgr subnet.Manager, extIface *backend.ExternalInterface, dev *vxlanDevice, _ ip.IP4Net, lease *subnet.Lease) (*network, error) {
 	nw := &network{
 		SimpleNetwork: backend.SimpleNetwork{
 			SubnetLease: lease,
 			ExtIface:    extIface,
 		},
-		name:      name,
 		subnetMgr: subnetMgr,
 		dev:       dev,
 	}
@@ -67,7 +65,7 @@ func (nw *network) Run(ctx context.Context) {
 	events := make(chan []subnet.Event)
 	wg.Add(1)
 	go func() {
-		subnet.WatchLeases(ctx, nw.subnetMgr, nw.name, nw.SubnetLease, events)
+		subnet.WatchLeases(ctx, nw.subnetMgr, nw.SubnetLease, events)
 		log.V(1).Info("WatchLeases exited")
 		wg.Done()
 	}()

+ 95 - 7
dist/functional-test.sh

@@ -1,5 +1,6 @@
 #!/bin/bash
-
+# Uncomment to see what commands are being executed
+#set -x
 
 ETCD_IMG="quay.io/coreos/etcd:v3.0.3"
 FLANNEL_NET="10.10.0.0/16"
@@ -41,8 +42,7 @@ run_test() {
 	flannel_conf="{ \"Network\": \"$FLANNEL_NET\", \"Backend\": { \"Type\": \"${backend}\" } }"
 
 	# etcd might take a bit to come up
-	while ! docker run --rm -it --entrypoint=/usr/local/bin/etcdctl $ETCD_IMG \
-			--endpoints=$etcd_endpt set /coreos.com/network/config "$flannel_conf"
+	while ! docker run --rm -it $ETCD_IMG etcdctl --endpoints=$etcd_endpt set /coreos.com/network/config "$flannel_conf"
 	do
 		sleep 1
 	done
@@ -51,14 +51,14 @@ run_test() {
 
 	# rm any old flannel container that maybe running, ignore error as it might not exist
 	docker rm -f flannel-e2e-test-flannel1 2>/dev/null
-	docker run --name=flannel-e2e-test-flannel1 -d --privileged --entrypoint=/opt/bin/flanneld $flannel_img --etcd-endpoints=$etcd_endpt
+	docker run --name=flannel-e2e-test-flannel1 -d --privileged $flannel_img --etcd-endpoints=$etcd_endpt
 	if [ $? -ne 0 ]; then
 		exit 1
 	fi
 
 	# rm any old flannel container that maybe running, ignore error as it might not exist
 	docker rm -f flannel-e2e-test-flannel2 2>/dev/null
-	docker run --name=flannel-e2e-test-flannel2 -d --privileged --entrypoint=/opt/bin/flanneld $flannel_img --etcd-endpoints=$etcd_endpt
+	docker run --name=flannel-e2e-test-flannel2 -d --privileged $flannel_img --etcd-endpoints=$etcd_endpt
 	if [ $? -ne 0 ]; then
 		exit 1
 	fi
@@ -111,6 +111,89 @@ run_test() {
 	return $exit_code
 }
 
+multi_test() {
+	flannel_conf_vxlan='{"Network": "10.11.0.0/16", "Backend": {"Type": "vxlan"}}'
+	flannel_conf_host_gw='{"Network": "10.12.0.0/16", "Backend": {"Type": "host-gw"}}'
+
+	# etcd might take a bit to come up
+	while ! docker run --rm -it $ETCD_IMG etcdctl --endpoints=$etcd_endpt set /vxlan/network/config "$flannel_conf_vxlan"
+	do
+		sleep 1
+	done
+
+	while ! docker run --rm -it $ETCD_IMG etcdctl --endpoints=$etcd_endpt set /hostgw/network/config "$flannel_conf_host_gw"
+	do
+		sleep 1
+	done
+
+	echo flannel config written
+
+	for host in 1 2; do
+    	echo "=== Creating Host: $host ==============================================="
+
+        # rm any old flannel container that maybe running, ignore error as it might not exist
+        docker rm -f flannel-host$host 2>/dev/null
+
+        # Start the hosts
+        docker run --name=flannel-host$host -d -it --privileged --entrypoint /bin/sh $flannel_img
+
+        # Start two flanneld instances
+        docker exec -d flannel-host$host sh -c "/opt/bin/flanneld -subnet-file /vxlan.env -etcd-prefix=/vxlan/network --etcd-endpoints=$etcd_endpt 2>vxlan.log"
+        docker exec -d flannel-host$host sh -c "/opt/bin/flanneld -subnet-file /hostgw.env -etcd-prefix=/hostgw/network --etcd-endpoints=$etcd_endpt 2>hostgw.log"
+    done
+
+	echo flannels running
+
+	# wait an arbitrary amount to have flannels come up
+	sleep 1
+
+	# add dummy interface on host1 only so we have a known working IP to ping then ping it from host2
+	vxlan_ping_dest=$(docker exec flannel-host1 /bin/sh -c '\
+		source /vxlan.env &&
+		ip link add name dummy_vxlan type dummy && \
+		ip addr add $FLANNEL_SUBNET dev dummy_vxlan && \
+	       	ip link set dummy_vxlan up && \
+		echo $FLANNEL_SUBNET | cut -f 1 -d "/" ')
+
+    hostgw_ping_dest=$(docker exec flannel-host1 /bin/sh -c '\
+		source /hostgw.env &&
+		ip link add name dummy_hostgw type dummy && \
+		ip addr add $FLANNEL_SUBNET dev dummy_hostgw && \
+	       	ip link set dummy_hostgw up && \
+		echo $FLANNEL_SUBNET | cut -f 1 -d "/" ')
+
+    # Send some pings from host2. Make sure we can send traffic over vxlan or directly.
+    # If a particular (wrong) interface is forced then pings should fail
+	docker exec -it flannel-host2 ping -c 3 $hostgw_ping_dest && \
+	docker exec -it flannel-host2 ping -c 3 $vxlan_ping_dest && \
+	! docker exec -it flannel-host2 ping -W 1 -c 1 -I flannel.1 $hostgw_ping_dest && \
+	! docker exec -it flannel-host2 ping -W 1 -c 1 -I eth0 $vxlan_ping_dest
+	exit_code=$?
+
+	# Uncomment to debug (you can nsenter)
+	#if [ $exit_code -eq "1" ]; then
+	#	sleep 10000
+	#fi
+
+	echo "Test for multi-backend: exit=$exit_code"
+
+
+	if [ $exit_code -ne 0 ]; then
+		# Print flannel logs to help debug
+		echo "------ flannel server (one being pinged) log -------"
+		docker exec flannel-host1 sh -c 'cat *.log'
+		echo
+
+		echo "------ flannel client (one doing the ping) log -------"
+		docker exec flannel-host2 sh -c 'cat *.log'
+		echo
+	fi
+
+	docker rm -f flannel-host1 flannel-host2 >/dev/null
+
+	return $exit_code
+}
+
 if [ $# -ne 1 ]; then
 	usage
 fi
@@ -124,7 +207,7 @@ docker0=$(ip -o -f inet addr show docker0 | grep -Po 'inet \K[\d.]+')
 etcd_endpt="http://$docker0:2379"
 
 docker rm -f flannel-e2e-test-etcd 2>/dev/null
-docker run --name=flannel-e2e-test-etcd -d -p 2379:2379 --entrypoint /usr/local/bin/etcd $ETCD_IMG --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls $etcd_endpt
+docker run --name=flannel-e2e-test-etcd -d -p 2379:2379 $ETCD_IMG etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls $etcd_endpt
 if [ $? -ne 0 ]; then
 	exit 1
 fi
@@ -133,7 +216,7 @@ echo etcd launched
 
 global_exit_code=0
 
-backends=${BACKEND:-"udp vxlan host-gw"} 
+backends=${BACKEND:-"udp vxlan host-gw"}
 for backend in $backends; do
 	echo
 	echo "=== BACKEND: $backend ==============================================="
@@ -143,6 +226,11 @@ for backend in $backends; do
 	fi
 done
 
+echo "=== MULTI BACKEND ==============================================="
+if ! multi_test; then
+    global_exit_code=1
+fi
+
 docker stop flannel-e2e-test-etcd >/dev/null
 
 if [ $global_exit_code -eq 0 ]; then

BIN
dist/iptables-amd64


BIN
dist/iptables-arm


BIN
dist/iptables-arm64


BIN
dist/iptables-ppc64le


BIN
dist/iptables-s390x


+ 253 - 27
main.go

@@ -15,12 +15,14 @@
 package main
 
 import (
+	"errors"
 	"flag"
 	"fmt"
+	"net"
 	"os"
 	"os/signal"
+	"path/filepath"
 	"strings"
-	"sync"
 	"syscall"
 
 	"github.com/coreos/pkg/flagutil"
@@ -28,12 +30,16 @@ import (
 	"golang.org/x/net/context"
 
 	"github.com/coreos/flannel/network"
+	"github.com/coreos/flannel/pkg/ip"
 	"github.com/coreos/flannel/subnet"
 	"github.com/coreos/flannel/subnet/etcdv2"
 	"github.com/coreos/flannel/subnet/kube"
 	"github.com/coreos/flannel/version"
 
+	"time"
+
 	// Backends need to be imported for their init() to get executed and them to register
+	"github.com/coreos/flannel/backend"
 	_ "github.com/coreos/flannel/backend/alivpc"
 	_ "github.com/coreos/flannel/backend/alloc"
 	_ "github.com/coreos/flannel/backend/awsvpc"
@@ -41,22 +47,33 @@ import (
 	_ "github.com/coreos/flannel/backend/hostgw"
 	_ "github.com/coreos/flannel/backend/udp"
 	_ "github.com/coreos/flannel/backend/vxlan"
+	"github.com/coreos/go-systemd/daemon"
 )
 
 type CmdLineOpts struct {
-	etcdEndpoints string
-	etcdPrefix    string
-	etcdKeyfile   string
-	etcdCertfile  string
-	etcdCAFile    string
-	etcdUsername  string
-	etcdPassword  string
-	help          bool
-	version       bool
-	kubeSubnetMgr bool
+	etcdEndpoints          string
+	etcdPrefix             string
+	etcdKeyfile            string
+	etcdCertfile           string
+	etcdCAFile             string
+	etcdUsername           string
+	etcdPassword           string
+	help                   bool
+	version                bool
+	kubeSubnetMgr          bool
+	iface                  string
+	ipMasq                 bool
+	subnetFile             string
+	subnetDir              string
+	publicIP               string
+	subnetLeaseRenewMargin int
 }
 
-var opts CmdLineOpts
+var (
+	opts           CmdLineOpts
+	errInterrupted = errors.New("interrupted")
+	errCanceled    = errors.New("canceled")
+)
 
 func init() {
 	flag.StringVar(&opts.etcdEndpoints, "etcd-endpoints", "http://127.0.0.1:4001,http://127.0.0.1:2379", "a comma-delimited list of etcd endpoints")
@@ -66,6 +83,11 @@ func init() {
 	flag.StringVar(&opts.etcdCAFile, "etcd-cafile", "", "SSL Certificate Authority file used to secure etcd communication")
 	flag.StringVar(&opts.etcdUsername, "etcd-username", "", "Username for BasicAuth to etcd")
 	flag.StringVar(&opts.etcdPassword, "etcd-password", "", "Password for BasicAuth to etcd")
+	flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
+	flag.StringVar(&opts.subnetFile, "subnet-file", "/run/flannel/subnet.env", "filename where env variables (subnet, MTU, ... ) will be written to")
+	flag.StringVar(&opts.publicIP, "public-ip", "", "IP accessible by other nodes for inter-host communication")
+	flag.IntVar(&opts.subnetLeaseRenewMargin, "subnet-lease-renew-margin", 60, "Subnet lease renewal margin, in minutes.")
+	flag.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
 	flag.BoolVar(&opts.kubeSubnetMgr, "kube-subnet-mgr", false, "Contact the Kubernetes API for subnet assignement instead of etcd or flannel-server.")
 	flag.BoolVar(&opts.help, "help", false, "print this message")
 	flag.BoolVar(&opts.version, "version", false, "print version and exit")
@@ -110,6 +132,13 @@ func main() {
 
 	flagutil.SetFlagsFromEnv(flag.CommandLine, "FLANNELD")
 
+	// Work out which interface to use
+	extIface, err := LookupExtIface(opts.iface)
+	if err != nil {
+		log.Error("Failed to find interface to use: ", err)
+		os.Exit(1)
+	}
+
 	sm, err := newSubnetManager()
 	if err != nil {
 		log.Error("Failed to create SubnetManager: ", err)
@@ -122,32 +151,229 @@ func main() {
 	signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
 
 	ctx, cancel := context.WithCancel(context.Background())
+	go shutdown(sigs, cancel)
 
-	var runFunc func(ctx context.Context)
+	// Fetch the network config (i.e. what backend to use etc..).
+	config, err := getConfig(ctx, sm)
+	if err == errCanceled {
+		exit()
+	}
 
-	nm, err := network.NewNetworkManager(ctx, sm)
+	// Create a backend manager then use it to create the backend and register the network with it.
+	bm := backend.NewManager(ctx, sm, extIface)
+	be, err := bm.GetBackend(config.BackendType)
 	if err != nil {
-		log.Error("Failed to create NetworkManager: ", err)
-		os.Exit(1)
+		log.Errorf("Error fetching backend: %s", err)
+		exit()
 	}
 
-	runFunc = func(ctx context.Context) {
-		nm.Run(ctx)
+	bn, err := be.RegisterNetwork(ctx, config)
+	if err != nil {
+		log.Errorf("Error registering network: %s", err)
+		exit()
+	}
+
+	// Set up ipMasq if needed
+	if opts.ipMasq {
+		err = network.SetupIPMasq(config.Network)
+		if err != nil {
+			// Continue, even though it failed.
+			log.Errorf("Failed to set up IP Masquerade: %v", err)
+		}
+
+		defer func() {
+			if err := network.TeardownIPMasq(config.Network); err != nil {
+				log.Errorf("Failed to tear down IP Masquerade: %v", err)
+			}
+		}()
 	}
 
-	wg := sync.WaitGroup{}
-	wg.Add(1)
-	go func() {
-		runFunc(ctx)
-		wg.Done()
-	}()
+	if err := WriteSubnetFile(opts.subnetFile, config.Network, opts.ipMasq, bn); err != nil {
+		// Continue, even though it failed.
+		log.Warningf("Failed to write subnet file: %s", err)
+	} else {
+		log.Infof("Wrote subnet file to %s", opts.subnetFile)
+	}
+
+	// Start "Running" the backend network. This will block until the context is done so run in another goroutine.
+	go bn.Run(ctx)
+	log.Infof("Finished starting backend.")
+
+	daemon.SdNotify(false, "READY=1")
 
+	// Block waiting to renew the lease
+	_ = MonitorLease(ctx, sm, bn)
+
+	// To get to here, the Cancel signal must have been received or the lease has been revoked.
+	exit()
+}
+
+func exit() {
+	// Wait just a second for the cancel signal to propagate everywhere, then just exit cleanly.
+	log.Info("Waiting for cancel to propagate...")
+	time.Sleep(time.Second)
+	log.Info("Exiting...")
+	os.Exit(0)
+}
+
+func shutdown(sigs chan os.Signal, cancel context.CancelFunc) {
+	// Wait for the shutdown signal.
 	<-sigs
-	// unregister to get default OS nuke behaviour in case we don't exit cleanly
+	// Unregister to get default OS nuke behaviour in case we don't exit cleanly
 	signal.Stop(sigs)
+	log.Info("Starting shutdown...")
 
-	log.Info("Exiting...")
+	// Call cancel on the context to close everything down.
 	cancel()
+	log.Info("Sent cancel signal...")
+}
+
+func getConfig(ctx context.Context, sm subnet.Manager) (*subnet.Config, error) {
+	// Retry every second until it succeeds
+	for {
+		config, err := sm.GetNetworkConfig(ctx)
+		if err != nil {
+			log.Errorf("Couldn't fetch network config: %s", err)
+		} else if config == nil {
+			log.Warningf("Couldn't find network config: %s", err)
+		} else {
+			log.Infof("Found network config - Backend type: %s", config.BackendType)
+			return config, nil
+		}
+		select {
+		case <-ctx.Done():
+			return nil, errCanceled
+		case <-time.After(1 * time.Second):
+			fmt.Println("timed out")
+		}
+	}
+}
+
+func MonitorLease(ctx context.Context, sm subnet.Manager, bn backend.Network) error {
+	// Use the subnet manager to start watching leases.
+	evts := make(chan subnet.Event)
+	go subnet.WatchLease(ctx, sm, bn.Lease().Subnet, evts)
+	renewMargin := time.Duration(opts.subnetLeaseRenewMargin) * time.Minute
+	dur := bn.Lease().Expiration.Sub(time.Now()) - renewMargin
+
+	for {
+		select {
+		case <-time.After(dur):
+			err := sm.RenewLease(ctx, bn.Lease())
+			if err != nil {
+				log.Error("Error renewing lease (trying again in 1 min): ", err)
+				dur = time.Minute
+				continue
+			}
+
+			log.Info("Lease renewed, new expiration: ", bn.Lease().Expiration)
+			dur = bn.Lease().Expiration.Sub(time.Now()) - renewMargin
+
+		case e := <-evts:
+			switch e.Type {
+			case subnet.EventAdded:
+				bn.Lease().Expiration = e.Lease.Expiration
+				dur = bn.Lease().Expiration.Sub(time.Now()) - renewMargin
+				log.Infof("Waiting for %s to renew lease", dur)
+
+			case subnet.EventRemoved:
+				log.Error("Lease has been revoked. Shutting down daemon.")
+				return errInterrupted
+			}
+
+		case <-ctx.Done():
+			log.Infof("Stopped monitoring lease")
+			return errCanceled
+		}
+	}
+}
+
+func LookupExtIface(ifname string) (*backend.ExternalInterface, error) {
+	var iface *net.Interface
+	var ifaceAddr net.IP
+	var err error
+
+	if len(ifname) > 0 {
+		if ifaceAddr = net.ParseIP(ifname); ifaceAddr != nil {
+			log.Infof("Searching for interface using %s", ifaceAddr)
+			iface, err = ip.GetInterfaceByIP(ifaceAddr)
+			if err != nil {
+				return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
+			}
+		} else {
+			iface, err = net.InterfaceByName(ifname)
+			if err != nil {
+				return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
+			}
+		}
+	} else {
+		log.Info("Determining IP address of default interface")
+		if iface, err = ip.GetDefaultGatewayIface(); err != nil {
+			return nil, fmt.Errorf("failed to get default interface: %s", err)
+		}
+	}
+
+	if ifaceAddr == nil {
+		ifaceAddr, err = ip.GetIfaceIP4Addr(iface)
+		if err != nil {
+			return nil, fmt.Errorf("failed to find IPv4 address for interface %s", iface.Name)
+		}
+	}
+
+	log.Infof("Using interface with name %s and address %s", iface.Name, ifaceAddr)
+
+	if iface.MTU == 0 {
+		return nil, fmt.Errorf("failed to determine MTU for %s interface", ifaceAddr)
+	}
+
+	var extAddr net.IP
+
+	if len(opts.publicIP) > 0 {
+		extAddr = net.ParseIP(opts.publicIP)
+		if extAddr == nil {
+			return nil, fmt.Errorf("invalid public IP address: %s", opts.publicIP)
+		}
+		log.Infof("Using %s as external address", extAddr)
+	}
+
+	if extAddr == nil {
+		log.Infof("Defaulting external address to interface address (%s)", ifaceAddr)
+		extAddr = ifaceAddr
+	}
+
+	return &backend.ExternalInterface{
+		Iface:     iface,
+		IfaceAddr: ifaceAddr,
+		ExtAddr:   extAddr,
+	}, nil
+}
+
+func WriteSubnetFile(path string, nw ip.IP4Net, ipMasq bool, bn backend.Network) error {
+	dir, name := filepath.Split(path)
+	os.MkdirAll(dir, 0755)
+
+	tempFile := filepath.Join(dir, "."+name)
+	f, err := os.Create(tempFile)
+	if err != nil {
+		return err
+	}
+
+	// Write out the first usable IP by incrementing
+	// sn.IP by one
+	sn := bn.Lease().Subnet
+	sn.IP += 1
+
+	fmt.Fprintf(f, "FLANNEL_NETWORK=%s\n", nw)
+	fmt.Fprintf(f, "FLANNEL_SUBNET=%s\n", sn)
+	fmt.Fprintf(f, "FLANNEL_MTU=%d\n", bn.MTU())
+	_, err = fmt.Fprintf(f, "FLANNEL_IPMASQ=%v\n", ipMasq)
+	f.Close()
+	if err != nil {
+		return err
+	}
 
-	wg.Wait()
+	// rename(2) the temporary file to the desired location so that it becomes
+	// atomically visible with the contents
+	return os.Rename(tempFile, path)
+	//TODO - is this safe? What if it's not on the same FS?
 }

+ 2 - 2
network/ipmasq.go

@@ -37,7 +37,7 @@ func rules(ipn ip.IP4Net) [][]string {
 	}
 }
 
-func setupIPMasq(ipn ip.IP4Net) error {
+func SetupIPMasq(ipn ip.IP4Net) error {
 	ipt, err := iptables.New()
 	if err != nil {
 		return fmt.Errorf("failed to set up IP Masquerade. iptables was not found")
@@ -54,7 +54,7 @@ func setupIPMasq(ipn ip.IP4Net) error {
 	return nil
 }
 
-func teardownIPMasq(ipn ip.IP4Net) error {
+func TeardownIPMasq(ipn ip.IP4Net) error {
 	ipt, err := iptables.New()
 	if err != nil {
 		return fmt.Errorf("failed to teardown IP Masquerade. iptables was not found")

+ 0 - 363
network/manager.go

@@ -1,363 +0,0 @@
-// Copyright 2015 flannel authors
-// Copyright 2015 Red Hat, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package network
-
-import (
-	"errors"
-	"flag"
-	"fmt"
-	"net"
-	"os"
-	"path/filepath"
-	"strings"
-	"sync"
-	"time"
-
-	"github.com/coreos/go-systemd/daemon"
-	log "github.com/golang/glog"
-	"golang.org/x/net/context"
-
-	"github.com/coreos/flannel/backend"
-	"github.com/coreos/flannel/pkg/ip"
-	"github.com/coreos/flannel/subnet"
-)
-
-type CmdLineOpts struct {
-	publicIP               string
-	ipMasq                 bool
-	subnetFile             string
-	subnetDir              string
-	iface                  string
-	networks               string
-	watchNetworks          bool
-	subnetLeaseRenewMargin int
-}
-
-var errAlreadyExists = errors.New("already exists")
-
-var opts CmdLineOpts
-
-func init() {
-	flag.StringVar(&opts.publicIP, "public-ip", "", "IP accessible by other nodes for inter-host communication")
-	flag.StringVar(&opts.subnetFile, "subnet-file", "/run/flannel/subnet.env", "filename where env variables (subnet, MTU, ... ) will be written to")
-	flag.StringVar(&opts.subnetDir, "subnet-dir", "/run/flannel/networks", "directory where files with env variables (subnet, MTU, ...) will be written to")
-	flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
-	flag.StringVar(&opts.networks, "networks", "", "run in multi-network mode and service the specified networks")
-	flag.IntVar(&opts.subnetLeaseRenewMargin, "subnet-lease-renew-margin", 60, "Subnet lease renewal margin, in minutes.")
-	flag.BoolVar(&opts.watchNetworks, "watch-networks", false, "run in multi-network mode and watch for networks from 'networks' or all networks")
-	flag.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
-}
-
-type Manager struct {
-	ctx             context.Context
-	sm              subnet.Manager
-	bm              backend.Manager
-	allowedNetworks map[string]bool
-	mux             sync.Mutex
-	networks        map[string]*Network
-	watch           bool
-	ipMasq          bool
-	extIface        *backend.ExternalInterface
-}
-
-func (m *Manager) isNetAllowed(name string) bool {
-	// If allowedNetworks is empty all networks are allowed
-	if len(m.allowedNetworks) > 0 {
-		_, ok := m.allowedNetworks[name]
-		return ok
-	}
-	return true
-}
-
-func (m *Manager) isMultiNetwork() bool {
-	return len(m.allowedNetworks) > 0 || m.watch
-}
-
-func NewNetworkManager(ctx context.Context, sm subnet.Manager) (*Manager, error) {
-	extIface, err := lookupExtIface(opts.iface)
-	if err != nil {
-		return nil, err
-	}
-
-	bm := backend.NewManager(ctx, sm, extIface)
-
-	manager := &Manager{
-		ctx:             ctx,
-		sm:              sm,
-		bm:              bm,
-		allowedNetworks: make(map[string]bool),
-		networks:        make(map[string]*Network),
-		watch:           opts.watchNetworks,
-		ipMasq:          opts.ipMasq,
-		extIface:        extIface,
-	}
-
-	for _, name := range strings.Split(opts.networks, ",") {
-		if name != "" {
-			manager.allowedNetworks[name] = true
-		}
-	}
-
-	return manager, nil
-}
-
-func lookupExtIface(ifname string) (*backend.ExternalInterface, error) {
-	var iface *net.Interface
-	var ifaceAddr net.IP
-	var err error
-
-	if len(ifname) > 0 {
-		if ifaceAddr = net.ParseIP(ifname); ifaceAddr != nil {
-			log.Infof("Searching for interface using %s", ifaceAddr)
-			iface, err = ip.GetInterfaceByIP(ifaceAddr)
-			if err != nil {
-				return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
-			}
-		} else {
-			iface, err = net.InterfaceByName(ifname)
-			if err != nil {
-				return nil, fmt.Errorf("error looking up interface %s: %s", ifname, err)
-			}
-		}
-	} else {
-		log.Info("Determining IP address of default interface")
-		if iface, err = ip.GetDefaultGatewayIface(); err != nil {
-			return nil, fmt.Errorf("failed to get default interface: %s", err)
-		}
-	}
-
-	if ifaceAddr == nil {
-		ifaceAddr, err = ip.GetIfaceIP4Addr(iface)
-		if err != nil {
-			return nil, fmt.Errorf("failed to find IPv4 address for interface %s", iface.Name)
-		}
-	}
-
-	log.Infof("Using interface with name %s and address %s", iface.Name, ifaceAddr)
-
-	if iface.MTU == 0 {
-		return nil, fmt.Errorf("failed to determine MTU for %s interface", ifaceAddr)
-	}
-
-	var extAddr net.IP
-
-	if len(opts.publicIP) > 0 {
-		extAddr = net.ParseIP(opts.publicIP)
-		if extAddr == nil {
-			return nil, fmt.Errorf("invalid public IP address: %s", opts.publicIP)
-		}
-		log.Infof("Using %s as external address", extAddr)
-	}
-
-	if extAddr == nil {
-		log.Infof("Defaulting external address to interface address (%s)", ifaceAddr)
-		extAddr = ifaceAddr
-	}
-
-	return &backend.ExternalInterface{
-		Iface:     iface,
-		IfaceAddr: ifaceAddr,
-		ExtAddr:   extAddr,
-	}, nil
-}
-
-func writeSubnetFile(path string, nw ip.IP4Net, ipMasq bool, bn backend.Network) error {
-	dir, name := filepath.Split(path)
-	os.MkdirAll(dir, 0755)
-
-	tempFile := filepath.Join(dir, "."+name)
-	f, err := os.Create(tempFile)
-	if err != nil {
-		return err
-	}
-
-	// Write out the first usable IP by incrementing
-	// sn.IP by one
-	sn := bn.Lease().Subnet
-	sn.IP += 1
-
-	fmt.Fprintf(f, "FLANNEL_NETWORK=%s\n", nw)
-	fmt.Fprintf(f, "FLANNEL_SUBNET=%s\n", sn)
-	fmt.Fprintf(f, "FLANNEL_MTU=%d\n", bn.MTU())
-	_, err = fmt.Fprintf(f, "FLANNEL_IPMASQ=%v\n", ipMasq)
-	f.Close()
-	if err != nil {
-		return err
-	}
-
-	// rename(2) the temporary file to the desired location so that it becomes
-	// atomically visible with the contents
-	return os.Rename(tempFile, path)
-}
-
-func (m *Manager) addNetwork(n *Network) error {
-	m.mux.Lock()
-	defer m.mux.Unlock()
-
-	if _, ok := m.networks[n.Name]; ok {
-		return errAlreadyExists
-	}
-	m.networks[n.Name] = n
-	return nil
-}
-
-func (m *Manager) delNetwork(n *Network) {
-	m.mux.Lock()
-	delete(m.networks, n.Name)
-	m.mux.Unlock()
-}
-
-func (m *Manager) getNetwork(netname string) (*Network, bool) {
-	m.mux.Lock()
-	n, ok := m.networks[netname]
-	m.mux.Unlock()
-
-	return n, ok
-}
-
-func (m *Manager) forEachNetwork(f func(n *Network)) {
-	m.mux.Lock()
-	for _, n := range m.networks {
-		f(n)
-	}
-	m.mux.Unlock()
-}
-
-func (m *Manager) runNetwork(n *Network) {
-	n.Run(m.extIface, func(bn backend.Network) {
-		if m.isMultiNetwork() {
-			log.Infof("%v: lease acquired: %v", n.Name, bn.Lease().Subnet)
-
-			path := filepath.Join(opts.subnetDir, n.Name) + ".env"
-			if err := writeSubnetFile(path, n.Config.Network, m.ipMasq, bn); err != nil {
-				log.Warningf("%v failed to write subnet file: %s", n.Name, err)
-				return
-			}
-		} else {
-			log.Infof("Lease acquired: %v", bn.Lease().Subnet)
-
-			if err := writeSubnetFile(opts.subnetFile, n.Config.Network, m.ipMasq, bn); err != nil {
-				log.Warningf("%v failed to write subnet file: %s", n.Name, err)
-				return
-			}
-			daemon.SdNotify(false, "READY=1")
-		}
-	})
-
-	m.delNetwork(n)
-}
-
-func (m *Manager) watchNetworks() {
-	wg := sync.WaitGroup{}
-	defer wg.Wait()
-
-	events := make(chan []subnet.Event)
-	wg.Add(1)
-	go func() {
-		subnet.WatchNetworks(m.ctx, m.sm, events)
-		wg.Done()
-	}()
-	// skip over the initial snapshot
-	<-events
-
-	for {
-		select {
-		case <-m.ctx.Done():
-			return
-
-		case evtBatch := <-events:
-			for _, e := range evtBatch {
-				netname := e.Network
-				if !m.isNetAllowed(netname) {
-					log.Infof("Network %q is not allowed", netname)
-					continue
-				}
-
-				switch e.Type {
-				case subnet.EventAdded:
-					n := NewNetwork(m.ctx, m.sm, m.bm, netname, m.ipMasq)
-					if err := m.addNetwork(n); err != nil {
-						log.Infof("Network %q: %v", netname, err)
-						continue
-					}
-
-					log.Infof("Network added: %v", netname)
-
-					wg.Add(1)
-					go func() {
-						m.runNetwork(n)
-						wg.Done()
-					}()
-
-				case subnet.EventRemoved:
-					log.Infof("Network removed: %v", netname)
-
-					n, ok := m.getNetwork(netname)
-					if !ok {
-						log.Warningf("Network %v unknown; ignoring EventRemoved", netname)
-						continue
-					}
-					n.Cancel()
-				}
-			}
-		}
-	}
-}
-
-func (m *Manager) Run(ctx context.Context) {
-	wg := sync.WaitGroup{}
-
-	if m.isMultiNetwork() {
-		for {
-			// Try adding initial networks
-			result, err := m.sm.WatchNetworks(ctx, nil)
-			if err == nil {
-				for _, n := range result.Snapshot {
-					if m.isNetAllowed(n) {
-						m.networks[n] = NewNetwork(ctx, m.sm, m.bm, n, m.ipMasq)
-					}
-				}
-				break
-			}
-
-			// Otherwise retry in a few seconds
-			log.Warning("Failed to retrieve networks (will retry): %v", err)
-			select {
-			case <-ctx.Done():
-				return
-			case <-time.After(time.Second):
-			}
-		}
-	} else {
-		m.networks[""] = NewNetwork(ctx, m.sm, m.bm, "", m.ipMasq)
-	}
-
-	// Run existing networks
-	m.forEachNetwork(func(n *Network) {
-		wg.Add(1)
-		go func(n *Network) {
-			m.runNetwork(n)
-			wg.Done()
-		}(n)
-	})
-
-	if opts.watchNetworks {
-		m.watchNetworks()
-	}
-
-	wg.Wait()
-	m.bm.Wait()
-}

+ 0 - 196
network/network.go

@@ -1,196 +0,0 @@
-// Copyright 2015 flannel authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package network
-
-import (
-	"errors"
-	"fmt"
-	"sync"
-	"time"
-
-	log "github.com/golang/glog"
-	"golang.org/x/net/context"
-
-	"github.com/coreos/flannel/backend"
-	"github.com/coreos/flannel/subnet"
-)
-
-var (
-	errInterrupted = errors.New("interrupted")
-	errCanceled    = errors.New("canceled")
-)
-
-type Network struct {
-	Name   string
-	Config *subnet.Config
-
-	ctx        context.Context
-	cancelFunc context.CancelFunc
-	sm         subnet.Manager
-	bm         backend.Manager
-	ipMasq     bool
-	bn         backend.Network
-}
-
-func NewNetwork(ctx context.Context, sm subnet.Manager, bm backend.Manager, name string, ipMasq bool) *Network {
-	ctx, cf := context.WithCancel(ctx)
-
-	return &Network{
-		Name:       name,
-		sm:         sm,
-		bm:         bm,
-		ipMasq:     ipMasq,
-		ctx:        ctx,
-		cancelFunc: cf,
-	}
-}
-
-func wrapError(desc string, err error) error {
-	if err == context.Canceled {
-		return err
-	}
-	return fmt.Errorf("failed to %v: %v", desc, err)
-}
-
-func (n *Network) init() error {
-	var err error
-
-	n.Config, err = n.sm.GetNetworkConfig(n.ctx, n.Name)
-	if err != nil {
-		return wrapError("retrieve network config", err)
-	}
-
-	be, err := n.bm.GetBackend(n.Config.BackendType)
-	if err != nil {
-		return wrapError("create and initialize network", err)
-	}
-
-	n.bn, err = be.RegisterNetwork(n.ctx, n.Name, n.Config)
-	if err != nil {
-		return wrapError("register network", err)
-	}
-
-	if n.ipMasq {
-		err = setupIPMasq(n.Config.Network)
-		if err != nil {
-			return wrapError("set up IP Masquerade", err)
-		}
-	}
-
-	return nil
-}
-
-func (n *Network) retryInit() error {
-	for {
-		err := n.init()
-		if err == nil || err == context.Canceled {
-			return err
-		}
-
-		log.Error(err)
-
-		select {
-		case <-n.ctx.Done():
-			return n.ctx.Err()
-		case <-time.After(time.Second):
-		}
-	}
-}
-
-func (n *Network) runOnce(extIface *backend.ExternalInterface, inited func(bn backend.Network)) error {
-	if err := n.retryInit(); err != nil {
-		return errCanceled
-	}
-
-	inited(n.bn)
-
-	ctx, interruptFunc := context.WithCancel(n.ctx)
-
-	wg := sync.WaitGroup{}
-
-	wg.Add(1)
-	go func() {
-		n.bn.Run(ctx)
-		wg.Done()
-	}()
-
-	evts := make(chan subnet.Event)
-
-	wg.Add(1)
-	go func() {
-		subnet.WatchLease(ctx, n.sm, n.Name, n.bn.Lease().Subnet, evts)
-		wg.Done()
-	}()
-
-	defer func() {
-		if n.ipMasq {
-			if err := teardownIPMasq(n.Config.Network); err != nil {
-				log.Errorf("Failed to tear down IP Masquerade for network %v: %v", n.Name, err)
-			}
-		}
-	}()
-
-	defer wg.Wait()
-
-	renewMargin := time.Duration(opts.subnetLeaseRenewMargin) * time.Minute
-
-	dur := n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin
-	for {
-		select {
-		case <-time.After(dur):
-			err := n.sm.RenewLease(n.ctx, n.Name, n.bn.Lease())
-			if err != nil {
-				log.Error("Error renewing lease (trying again in 1 min): ", err)
-				dur = time.Minute
-				continue
-			}
-
-			log.Info("Lease renewed, new expiration: ", n.bn.Lease().Expiration)
-			dur = n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin
-
-		case e := <-evts:
-			switch e.Type {
-			case subnet.EventAdded:
-				n.bn.Lease().Expiration = e.Lease.Expiration
-				dur = n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin
-
-			case subnet.EventRemoved:
-				log.Warning("Lease has been revoked")
-				interruptFunc()
-				return errInterrupted
-			}
-
-		case <-n.ctx.Done():
-			return errCanceled
-		}
-	}
-}
-
-func (n *Network) Run(extIface *backend.ExternalInterface, inited func(bn backend.Network)) {
-	for {
-		switch n.runOnce(extIface, inited) {
-		case errInterrupted:
-
-		case errCanceled:
-			return
-		default:
-			panic("unexpected error returned")
-		}
-	}
-}
-
-func (n *Network) Cancel() {
-	n.cancelFunc()
-}

+ 41 - 87
subnet/etcdv2/local_manager.go

@@ -82,8 +82,8 @@ func newLocalManager(r Registry) Manager {
 	}
 }
 
-func (m *LocalManager) GetNetworkConfig(ctx context.Context, network string) (*Config, error) {
-	cfg, err := m.registry.getNetworkConfig(ctx, network)
+func (m *LocalManager) GetNetworkConfig(ctx context.Context) (*Config, error) {
+	cfg, err := m.registry.getNetworkConfig(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -91,14 +91,14 @@ func (m *LocalManager) GetNetworkConfig(ctx context.Context, network string) (*C
 	return ParseConfig(cfg)
 }
 
-func (m *LocalManager) AcquireLease(ctx context.Context, network string, attrs *LeaseAttrs) (*Lease, error) {
-	config, err := m.GetNetworkConfig(ctx, network)
+func (m *LocalManager) AcquireLease(ctx context.Context, attrs *LeaseAttrs) (*Lease, error) {
+	config, err := m.GetNetworkConfig(ctx)
 	if err != nil {
 		return nil, err
 	}
 
 	for i := 0; i < raceRetries; i++ {
-		l, err := m.tryAcquireLease(ctx, network, config, attrs.PublicIP, attrs)
+		l, err := m.tryAcquireLease(ctx, config, attrs.PublicIP, attrs)
 		switch err {
 		case nil:
 			return l, nil
@@ -122,8 +122,8 @@ func findLeaseByIP(leases []Lease, pubIP ip.IP4) *Lease {
 	return nil
 }
 
-func (m *LocalManager) tryAcquireLease(ctx context.Context, network string, config *Config, extIaddr ip.IP4, attrs *LeaseAttrs) (*Lease, error) {
-	leases, _, err := m.registry.getSubnets(ctx, network)
+func (m *LocalManager) tryAcquireLease(ctx context.Context, config *Config, extIaddr ip.IP4, attrs *LeaseAttrs) (*Lease, error) {
+	leases, _, err := m.registry.getSubnets(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -139,7 +139,7 @@ func (m *LocalManager) tryAcquireLease(ctx context.Context, network string, conf
 				// Not a reservation
 				ttl = subnetTTL
 			}
-			exp, err := m.registry.updateSubnet(ctx, network, l.Subnet, attrs, ttl, 0)
+			exp, err := m.registry.updateSubnet(ctx, l.Subnet, attrs, ttl, 0)
 			if err != nil {
 				return nil, err
 			}
@@ -149,7 +149,7 @@ func (m *LocalManager) tryAcquireLease(ctx context.Context, network string, conf
 			return l, nil
 		} else {
 			log.Infof("Found lease (%v) for current IP (%v) but not compatible with current config, deleting", l.Subnet, extIaddr)
-			if err := m.registry.deleteSubnet(ctx, network, l.Subnet); err != nil {
+			if err := m.registry.deleteSubnet(ctx, l.Subnet); err != nil {
 				return nil, err
 			}
 		}
@@ -161,9 +161,10 @@ func (m *LocalManager) tryAcquireLease(ctx context.Context, network string, conf
 		return nil, err
 	}
 
-	exp, err := m.registry.createSubnet(ctx, network, sn, attrs, subnetTTL)
+	exp, err := m.registry.createSubnet(ctx, sn, attrs, subnetTTL)
 	switch {
 	case err == nil:
+		log.Infof("Allocated lease (%v) to current node (%v) ", sn, extIaddr)
 		return &Lease{
 			Subnet:     sn,
 			Attrs:      *attrs,
@@ -200,12 +201,12 @@ OuterLoop:
 	}
 }
 
-func (m *LocalManager) RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error {
-	return m.registry.deleteSubnet(ctx, network, sn)
+func (m *LocalManager) RevokeLease(ctx context.Context, sn ip.IP4Net) error {
+	return m.registry.deleteSubnet(ctx, sn)
 }
 
-func (m *LocalManager) RenewLease(ctx context.Context, network string, lease *Lease) error {
-	exp, err := m.registry.updateSubnet(ctx, network, lease.Subnet, &lease.Attrs, subnetTTL, 0)
+func (m *LocalManager) RenewLease(ctx context.Context, lease *Lease) error {
+	exp, err := m.registry.updateSubnet(ctx, lease.Subnet, &lease.Attrs, subnetTTL, 0)
 	if err != nil {
 		return err
 	}
@@ -232,8 +233,8 @@ func getNextIndex(cursor interface{}) (uint64, error) {
 	return nextIndex, nil
 }
 
-func (m *LocalManager) leaseWatchReset(ctx context.Context, network string, sn ip.IP4Net) (LeaseWatchResult, error) {
-	l, index, err := m.registry.getSubnet(ctx, network, sn)
+func (m *LocalManager) leaseWatchReset(ctx context.Context, sn ip.IP4Net) (LeaseWatchResult, error) {
+	l, index, err := m.registry.getSubnet(ctx, sn)
 	if err != nil {
 		return LeaseWatchResult{}, err
 	}
@@ -244,9 +245,9 @@ func (m *LocalManager) leaseWatchReset(ctx context.Context, network string, sn i
 	}, nil
 }
 
-func (m *LocalManager) WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error) {
+func (m *LocalManager) WatchLease(ctx context.Context, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error) {
 	if cursor == nil {
-		return m.leaseWatchReset(ctx, network, sn)
+		return m.leaseWatchReset(ctx, sn)
 	}
 
 	nextIndex, err := getNextIndex(cursor)
@@ -254,7 +255,7 @@ func (m *LocalManager) WatchLease(ctx context.Context, network string, sn ip.IP4
 		return LeaseWatchResult{}, err
 	}
 
-	evt, index, err := m.registry.watchSubnet(ctx, network, nextIndex, sn)
+	evt, index, err := m.registry.watchSubnet(ctx, nextIndex, sn)
 
 	switch {
 	case err == nil:
@@ -265,16 +266,16 @@ func (m *LocalManager) WatchLease(ctx context.Context, network string, sn ip.IP4
 
 	case isIndexTooSmall(err):
 		log.Warning("Watch of subnet leases failed because etcd index outside history window")
-		return m.leaseWatchReset(ctx, network, sn)
+		return m.leaseWatchReset(ctx, sn)
 
 	default:
 		return LeaseWatchResult{}, err
 	}
 }
 
-func (m *LocalManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (LeaseWatchResult, error) {
+func (m *LocalManager) WatchLeases(ctx context.Context, cursor interface{}) (LeaseWatchResult, error) {
 	if cursor == nil {
-		return m.leasesWatchReset(ctx, network)
+		return m.leasesWatchReset(ctx)
 	}
 
 	nextIndex, err := getNextIndex(cursor)
@@ -282,7 +283,7 @@ func (m *LocalManager) WatchLeases(ctx context.Context, network string, cursor i
 		return LeaseWatchResult{}, err
 	}
 
-	evt, index, err := m.registry.watchSubnets(ctx, network, nextIndex)
+	evt, index, err := m.registry.watchSubnets(ctx, nextIndex)
 
 	switch {
 	case err == nil:
@@ -293,56 +294,23 @@ func (m *LocalManager) WatchLeases(ctx context.Context, network string, cursor i
 
 	case isIndexTooSmall(err):
 		log.Warning("Watch of subnet leases failed because etcd index outside history window")
-		return m.leasesWatchReset(ctx, network)
+		return m.leasesWatchReset(ctx)
 
 	default:
 		return LeaseWatchResult{}, err
 	}
 }
 
-func (m *LocalManager) WatchNetworks(ctx context.Context, cursor interface{}) (NetworkWatchResult, error) {
-	if cursor == nil {
-		return m.networkWatchReset(ctx)
-	}
-
-	nextIndex, err := getNextIndex(cursor)
-	if err != nil {
-		return NetworkWatchResult{}, err
-	}
-
-	for {
-		evt, index, err := m.registry.watchNetworks(ctx, nextIndex)
-
-		switch {
-		case err == nil:
-			return NetworkWatchResult{
-				Events: []Event{evt},
-				Cursor: watchCursor{index},
-			}, nil
-
-		case err == errTryAgain:
-			nextIndex = index
-
-		case isIndexTooSmall(err):
-			log.Warning("Watch of networks failed because etcd index outside history window")
-			return m.networkWatchReset(ctx)
-
-		default:
-			return NetworkWatchResult{}, err
-		}
-	}
-}
-
 func isIndexTooSmall(err error) bool {
 	etcdErr, ok := err.(etcd.Error)
 	return ok && etcdErr.Code == etcd.ErrorCodeEventIndexCleared
 }
 
 // leasesWatchReset is called when incremental lease watch failed and we need to grab a snapshot
-func (m *LocalManager) leasesWatchReset(ctx context.Context, network string) (LeaseWatchResult, error) {
+func (m *LocalManager) leasesWatchReset(ctx context.Context) (LeaseWatchResult, error) {
 	wr := LeaseWatchResult{}
 
-	leases, index, err := m.registry.getSubnets(ctx, network)
+	leases, index, err := m.registry.getSubnets(ctx)
 	if err != nil {
 		return wr, fmt.Errorf("failed to retrieve subnet leases: %v", err)
 	}
@@ -352,20 +320,6 @@ func (m *LocalManager) leasesWatchReset(ctx context.Context, network string) (Le
 	return wr, nil
 }
 
-// networkWatchReset is called when incremental network watch failed and we need to grab a snapshot
-func (m *LocalManager) networkWatchReset(ctx context.Context) (NetworkWatchResult, error) {
-	wr := NetworkWatchResult{}
-
-	networks, index, err := m.registry.getNetworks(ctx)
-	if err != nil {
-		return wr, fmt.Errorf("failed to retrieve networks: %v", err)
-	}
-
-	wr.Cursor = watchCursor{index}
-	wr.Snapshot = networks
-	return wr, nil
-}
-
 func isSubnetConfigCompat(config *Config, sn ip.IP4Net) bool {
 	if sn.IP < config.SubnetMin || sn.IP > config.SubnetMax {
 		return false
@@ -374,12 +328,12 @@ func isSubnetConfigCompat(config *Config, sn ip.IP4Net) bool {
 	return sn.PrefixLen == config.SubnetLen
 }
 
-func (m *LocalManager) tryAddReservation(ctx context.Context, network string, r *Reservation) error {
+func (m *LocalManager) tryAddReservation(ctx context.Context, r *Reservation) error {
 	attrs := &LeaseAttrs{
 		PublicIP: r.PublicIP,
 	}
 
-	_, err := m.registry.createSubnet(ctx, network, r.Subnet, attrs, 0)
+	_, err := m.registry.createSubnet(ctx, r.Subnet, attrs, 0)
 	switch {
 	case err == nil:
 		return nil
@@ -392,7 +346,7 @@ func (m *LocalManager) tryAddReservation(ctx context.Context, network string, r
 	// Get what's there and
 	// - if PublicIP matches, remove the TTL make it a reservation
 	// - otherwise, error out
-	sub, asof, err := m.registry.getSubnet(ctx, network, r.Subnet)
+	sub, asof, err := m.registry.getSubnet(ctx, r.Subnet)
 	switch {
 	case err == nil:
 	case isErrEtcdKeyNotFound(err):
@@ -408,15 +362,15 @@ func (m *LocalManager) tryAddReservation(ctx context.Context, network string, r
 	}
 
 	// remove TTL
-	_, err = m.registry.updateSubnet(ctx, network, r.Subnet, &sub.Attrs, 0, asof)
+	_, err = m.registry.updateSubnet(ctx, r.Subnet, &sub.Attrs, 0, asof)
 	if isErrEtcdTestFailed(err) {
 		return errTryAgain
 	}
 	return err
 }
 
-func (m *LocalManager) AddReservation(ctx context.Context, network string, r *Reservation) error {
-	config, err := m.GetNetworkConfig(ctx, network)
+func (m *LocalManager) AddReservation(ctx context.Context, r *Reservation) error {
+	config, err := m.GetNetworkConfig(ctx)
 	if err != nil {
 		return err
 	}
@@ -430,7 +384,7 @@ func (m *LocalManager) AddReservation(ctx context.Context, network string, r *Re
 	}
 
 	for i := 0; i < raceRetries; i++ {
-		err := m.tryAddReservation(ctx, network, r)
+		err := m.tryAddReservation(ctx, r)
 		switch {
 		case err == nil:
 			return nil
@@ -444,14 +398,14 @@ func (m *LocalManager) AddReservation(ctx context.Context, network string, r *Re
 	return ErrNoMoreTries
 }
 
-func (m *LocalManager) tryRemoveReservation(ctx context.Context, network string, subnet ip.IP4Net) error {
-	sub, asof, err := m.registry.getSubnet(ctx, network, subnet)
+func (m *LocalManager) tryRemoveReservation(ctx context.Context, subnet ip.IP4Net) error {
+	sub, asof, err := m.registry.getSubnet(ctx, subnet)
 	if err != nil {
 		return err
 	}
 
 	// add back the TTL
-	_, err = m.registry.updateSubnet(ctx, network, subnet, &sub.Attrs, subnetTTL, asof)
+	_, err = m.registry.updateSubnet(ctx, subnet, &sub.Attrs, subnetTTL, asof)
 	if isErrEtcdTestFailed(err) {
 		return errTryAgain
 	}
@@ -459,9 +413,9 @@ func (m *LocalManager) tryRemoveReservation(ctx context.Context, network string,
 }
 
 //RemoveReservation removes the subnet by setting TTL back to subnetTTL (24hours)
-func (m *LocalManager) RemoveReservation(ctx context.Context, network string, subnet ip.IP4Net) error {
+func (m *LocalManager) RemoveReservation(ctx context.Context, subnet ip.IP4Net) error {
 	for i := 0; i < raceRetries; i++ {
-		err := m.tryRemoveReservation(ctx, network, subnet)
+		err := m.tryRemoveReservation(ctx, subnet)
 		switch {
 		case err == nil:
 			return nil
@@ -475,8 +429,8 @@ func (m *LocalManager) RemoveReservation(ctx context.Context, network string, su
 	return ErrNoMoreTries
 }
 
-func (m *LocalManager) ListReservations(ctx context.Context, network string) ([]Reservation, error) {
-	subnets, _, err := m.registry.getSubnets(ctx, network)
+func (m *LocalManager) ListReservations(ctx context.Context) ([]Reservation, error) {
+	subnets, _, err := m.registry.getSubnets(ctx)
 	if err != nil {
 		return nil, err
 	}

+ 50 - 225
subnet/etcdv2/mock_registry.go

@@ -16,7 +16,6 @@ package etcdv2
 
 import (
 	"fmt"
-	"strings"
 	"sync"
 	"time"
 
@@ -69,74 +68,47 @@ type event struct {
 }
 
 type MockSubnetRegistry struct {
-	mux           sync.Mutex
-	networks      map[string]*netwk
-	networkEvents chan event
-	index         uint64
+	mux     sync.Mutex
+	network *netwk
+	index   uint64
 }
 
-func NewMockRegistry(network, config string, initialSubnets []Lease) *MockSubnetRegistry {
+func NewMockRegistry(config string, initialSubnets []Lease) *MockSubnetRegistry {
 	msr := &MockSubnetRegistry{
-		networkEvents: make(chan event, 1000),
-		index:         1000,
-		networks:      make(map[string]*netwk),
+		index: 1000,
+		network: &netwk{
+			config:        config,
+			subnets:       initialSubnets,
+			subnetsEvents: make(chan event, 1000),
+			subnetEvents:  make(map[ip.IP4Net]chan event)},
 	}
 
-	msr.networks[network] = &netwk{
-		config:        config,
-		subnets:       initialSubnets,
-		subnetsEvents: make(chan event, 1000),
-		subnetEvents:  make(map[ip.IP4Net]chan event),
-	}
 	return msr
 }
 
-func (msr *MockSubnetRegistry) getNetworkConfig(ctx context.Context, network string) (string, error) {
-	msr.mux.Lock()
-	defer msr.mux.Unlock()
-
-	n, ok := msr.networks[network]
-	if !ok {
-		return "", fmt.Errorf("Network %s not found", network)
-	}
-	return n.config, nil
+func (msr *MockSubnetRegistry) getNetworkConfig(ctx context.Context) (string, error) {
+	return msr.network.config, nil
 }
 
-func (msr *MockSubnetRegistry) setConfig(network, config string) error {
-	msr.mux.Lock()
-	defer msr.mux.Unlock()
-
-	n, ok := msr.networks[network]
-	if !ok {
-		return fmt.Errorf("Network %s not found", network)
-	}
-	n.config = config
+func (msr *MockSubnetRegistry) setConfig(config string) error {
+	msr.network.config = config
 	return nil
 }
 
-func (msr *MockSubnetRegistry) getSubnets(ctx context.Context, network string) ([]Lease, uint64, error) {
-	msr.mux.Lock()
-	defer msr.mux.Unlock()
+func (msr *MockSubnetRegistry) getSubnets(ctx context.Context) ([]Lease, uint64, error) {
+	//msr.mux.Lock()
+	//defer msr.mux.Unlock()
 
-	n, ok := msr.networks[network]
-	if !ok {
-		return nil, 0, fmt.Errorf("Network %s not found", network)
-	}
-
-	subs := make([]Lease, len(n.subnets))
-	copy(subs, n.subnets)
+	subs := make([]Lease, len(msr.network.subnets))
+	copy(subs, msr.network.subnets)
 	return subs, msr.index, nil
 }
 
-func (msr *MockSubnetRegistry) getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) {
+func (msr *MockSubnetRegistry) getSubnet(ctx context.Context, sn ip.IP4Net) (*Lease, uint64, error) {
 	msr.mux.Lock()
 	defer msr.mux.Unlock()
 
-	n, ok := msr.networks[network]
-	if !ok {
-		return nil, 0, fmt.Errorf("Network %s not found", network)
-	}
-	for _, l := range n.subnets {
+	for _, l := range msr.network.subnets {
 		if l.Subnet.Equal(sn) {
 			return &l, msr.index, nil
 		}
@@ -144,17 +116,12 @@ func (msr *MockSubnetRegistry) getSubnet(ctx context.Context, network string, sn
 	return nil, msr.index, fmt.Errorf("subnet %s not found", sn)
 }
 
-func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
+func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
 	msr.mux.Lock()
 	defer msr.mux.Unlock()
 
-	n, ok := msr.networks[network]
-	if !ok {
-		return time.Time{}, fmt.Errorf("Network %s not found", network)
-	}
-
 	// check for existing
-	if _, _, err := n.findSubnet(sn); err == nil {
+	if _, _, err := msr.network.findSubnet(sn); err == nil {
 		return time.Time{}, etcd.Error{
 			Code:  etcd.ErrorCodeNodeExist,
 			Index: msr.index,
@@ -174,28 +141,22 @@ func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, network string,
 		Expiration: exp,
 		Asof:       msr.index,
 	}
-	n.subnets = append(n.subnets, l)
+	msr.network.subnets = append(msr.network.subnets, l)
 
 	evt := Event{
-		Type:    EventAdded,
-		Lease:   l,
-		Network: network,
+		Type:  EventAdded,
+		Lease: l,
 	}
 
-	n.sendSubnetEvent(sn, event{evt, msr.index})
+	msr.network.sendSubnetEvent(sn, event{evt, msr.index})
 
 	return exp, nil
 }
 
-func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
+func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
 	msr.mux.Lock()
 	defer msr.mux.Unlock()
 
-	n, ok := msr.networks[network]
-	if !ok {
-		return time.Time{}, fmt.Errorf("Network %s not found", network)
-	}
-
 	msr.index += 1
 
 	exp := time.Time{}
@@ -203,7 +164,7 @@ func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, network string,
 		exp = clock.Now().Add(ttl)
 	}
 
-	sub, i, err := n.findSubnet(sn)
+	sub, i, err := msr.network.findSubnet(sn)
 	if err != nil {
 		return time.Time{}, err
 	}
@@ -211,57 +172,42 @@ func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, network string,
 	sub.Attrs = *attrs
 	sub.Asof = msr.index
 	sub.Expiration = exp
-	n.subnets[i] = sub
-	n.sendSubnetEvent(sn, event{
+	msr.network.subnets[i] = sub
+	msr.network.sendSubnetEvent(sn, event{
 		Event{
-			Type:    EventAdded,
-			Lease:   sub,
-			Network: network,
+			Type:  EventAdded,
+			Lease: sub,
 		}, msr.index,
 	})
 
 	return sub.Expiration, nil
 }
 
-func (msr *MockSubnetRegistry) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error {
+func (msr *MockSubnetRegistry) deleteSubnet(ctx context.Context, sn ip.IP4Net) error {
 	msr.mux.Lock()
 	defer msr.mux.Unlock()
 
-	n, ok := msr.networks[network]
-	if !ok {
-		return fmt.Errorf("Network %s not found", network)
-	}
-
 	msr.index += 1
 
-	sub, i, err := n.findSubnet(sn)
+	sub, i, err := msr.network.findSubnet(sn)
 	if err != nil {
 		return err
 	}
 
-	n.subnets[i] = n.subnets[len(n.subnets)-1]
-	n.subnets = n.subnets[:len(n.subnets)-1]
+	msr.network.subnets[i] = msr.network.subnets[len(msr.network.subnets)-1]
+	msr.network.subnets = msr.network.subnets[:len(msr.network.subnets)-1]
 	sub.Asof = msr.index
-	n.sendSubnetEvent(sn, event{
+	msr.network.sendSubnetEvent(sn, event{
 		Event{
-			Type:    EventRemoved,
-			Lease:   sub,
-			Network: network,
+			Type:  EventRemoved,
+			Lease: sub,
 		}, msr.index,
 	})
 
 	return nil
 }
 
-func (msr *MockSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error) {
-	msr.mux.Lock()
-	n, ok := msr.networks[network]
-	msr.mux.Unlock()
-
-	if !ok {
-		return Event{}, 0, fmt.Errorf("Network %s not found", network)
-	}
-
+func (msr *MockSubnetRegistry) watchSubnets(ctx context.Context, since uint64) (Event, uint64, error) {
 	for {
 		msr.mux.Lock()
 		index := msr.index
@@ -280,7 +226,7 @@ func (msr *MockSubnetRegistry) watchSubnets(ctx context.Context, network string,
 		case <-ctx.Done():
 			return Event{}, 0, ctx.Err()
 
-		case e := <-n.subnetsEvents:
+		case e := <-msr.network.subnetsEvents:
 			if e.index > since {
 				return e.evt, e.index, nil
 			}
@@ -288,15 +234,7 @@ func (msr *MockSubnetRegistry) watchSubnets(ctx context.Context, network string,
 	}
 }
 
-func (msr *MockSubnetRegistry) watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) {
-	msr.mux.Lock()
-	n, ok := msr.networks[network]
-	msr.mux.Unlock()
-
-	if !ok {
-		return Event{}, 0, fmt.Errorf("Network %s not found", network)
-	}
-
+func (msr *MockSubnetRegistry) watchSubnet(ctx context.Context, since uint64, sn ip.IP4Net) (Event, uint64, error) {
 	for {
 		msr.mux.Lock()
 		index := msr.index
@@ -315,7 +253,7 @@ func (msr *MockSubnetRegistry) watchSubnet(ctx context.Context, network string,
 		case <-ctx.Done():
 			return Event{}, index, ctx.Err()
 
-		case e := <-n.subnetEventsChan(sn):
+		case e := <-msr.network.subnetEventsChan(sn):
 			if e.index > since {
 				return e.evt, index, nil
 			}
@@ -324,20 +262,12 @@ func (msr *MockSubnetRegistry) watchSubnet(ctx context.Context, network string,
 }
 
 func (msr *MockSubnetRegistry) expireSubnet(network string, sn ip.IP4Net) {
-	msr.mux.Lock()
-	defer msr.mux.Unlock()
-
-	n, ok := msr.networks[network]
-	if !ok {
-		return
-	}
-
-	if sub, i, err := n.findSubnet(sn); err == nil {
+	if sub, i, err := msr.network.findSubnet(sn); err == nil {
 		msr.index += 1
-		n.subnets[i] = n.subnets[len(n.subnets)-1]
-		n.subnets = n.subnets[:len(n.subnets)-1]
+		msr.network.subnets[i] = msr.network.subnets[len(msr.network.subnets)-1]
+		msr.network.subnets = msr.network.subnets[:len(msr.network.subnets)-1]
 		sub.Asof = msr.index
-		n.sendSubnetEvent(sn, event{
+		msr.network.sendSubnetEvent(sn, event{
 			Event{
 				Type:  EventRemoved,
 				Lease: sub,
@@ -346,113 +276,8 @@ func (msr *MockSubnetRegistry) expireSubnet(network string, sn ip.IP4Net) {
 	}
 }
 
-func configKeyToNetworkKey(configKey string) string {
-	if !strings.HasSuffix(configKey, "/config") {
-		return ""
-	}
-	return strings.TrimSuffix(configKey, "/config")
-}
-
-func (msr *MockSubnetRegistry) getNetworks(ctx context.Context) ([]string, uint64, error) {
-	msr.mux.Lock()
-	defer msr.mux.Unlock()
-
-	ns := []string{}
-
-	for n, _ := range msr.networks {
-		ns = append(ns, n)
-	}
-
-	return ns, msr.index, nil
-}
-
-func (msr *MockSubnetRegistry) watchNetworks(ctx context.Context, since uint64) (Event, uint64, error) {
-	msr.mux.Lock()
-	index := msr.index
-	msr.mux.Unlock()
-
-	for {
-		if since < index {
-			return Event{}, 0, etcd.Error{
-				Code:    etcd.ErrorCodeEventIndexCleared,
-				Cause:   "out of date",
-				Message: "cursor is out of date",
-				Index:   index,
-			}
-		}
-
-		select {
-		case <-ctx.Done():
-			return Event{}, 0, ctx.Err()
-
-		case e := <-msr.networkEvents:
-			if e.index > since {
-				return e.evt, e.index, nil
-			}
-		}
-	}
-}
-
-func (msr *MockSubnetRegistry) getNetwork(ctx context.Context, network string) (*netwk, error) {
-	msr.mux.Lock()
-	defer msr.mux.Unlock()
-
-	n, ok := msr.networks[network]
-	if !ok {
-		return nil, fmt.Errorf("Network %s not found", network)
-	}
-
-	return n, nil
-}
-
-func (msr *MockSubnetRegistry) CreateNetwork(ctx context.Context, network, config string) error {
-	msr.mux.Lock()
-	defer msr.mux.Unlock()
-
-	_, ok := msr.networks[network]
-	if ok {
-		return fmt.Errorf("Network %s already exists", network)
-	}
-
-	msr.index += 1
-
-	n := &netwk{
-		config:        network,
-		subnetsEvents: make(chan event, 1000),
-		subnetEvents:  make(map[ip.IP4Net]chan event),
-	}
-
-	msr.networks[network] = n
-	msr.networkEvents <- event{
-		Event{
-			Type:    EventAdded,
-			Network: network,
-		}, msr.index,
-	}
-
-	return nil
-}
-
-func (msr *MockSubnetRegistry) DeleteNetwork(ctx context.Context, network string) error {
-	msr.mux.Lock()
-	defer msr.mux.Unlock()
-
-	_, ok := msr.networks[network]
-	if !ok {
-		return fmt.Errorf("Network %s not found", network)
-	}
-	delete(msr.networks, network)
-
-	msr.index += 1
-
-	msr.networkEvents <- event{
-		Event{
-			Type:    EventRemoved,
-			Network: network,
-		}, msr.index,
-	}
-
-	return nil
+func (msr *MockSubnetRegistry) getNetwork(ctx context.Context) (*netwk, error) {
+	return msr.network, nil
 }
 
 func (n *netwk) findSubnet(sn ip.IP4Net) (Lease, int, error) {

+ 24 - 120
subnet/etcdv2/registry.go

@@ -37,16 +37,14 @@ var (
 )
 
 type Registry interface {
-	getNetworkConfig(ctx context.Context, network string) (string, error)
-	getSubnets(ctx context.Context, network string) ([]Lease, uint64, error)
-	getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error)
-	createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error)
-	updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error)
-	deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error
-	watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error)
-	watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error)
-	getNetworks(ctx context.Context) ([]string, uint64, error)
-	watchNetworks(ctx context.Context, since uint64) (Event, uint64, error)
+	getNetworkConfig(ctx context.Context) (string, error)
+	getSubnets(ctx context.Context) ([]Lease, uint64, error)
+	getSubnet(ctx context.Context, sn ip.IP4Net) (*Lease, uint64, error)
+	createSubnet(ctx context.Context, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error)
+	updateSubnet(ctx context.Context, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error)
+	deleteSubnet(ctx context.Context, sn ip.IP4Net) error
+	watchSubnets(ctx context.Context, since uint64) (Event, uint64, error)
+	watchSubnet(ctx context.Context, since uint64, sn ip.IP4Net) (Event, uint64, error)
 }
 
 type EtcdConfig struct {
@@ -114,8 +112,8 @@ func newEtcdSubnetRegistry(config *EtcdConfig, cliNewFunc etcdNewFunc) (Registry
 	return r, nil
 }
 
-func (esr *etcdSubnetRegistry) getNetworkConfig(ctx context.Context, network string) (string, error) {
-	key := path.Join(esr.etcdCfg.Prefix, network, "config")
+func (esr *etcdSubnetRegistry) getNetworkConfig(ctx context.Context) (string, error) {
+	key := path.Join(esr.etcdCfg.Prefix, "config")
 	resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Quorum: true})
 	if err != nil {
 		return "", err
@@ -126,8 +124,8 @@ func (esr *etcdSubnetRegistry) getNetworkConfig(ctx context.Context, network str
 // getSubnets queries etcd to get a list of currently allocated leases for a given network.
 // It returns the leases along with the "as-of" etcd-index that can be used as the starting
 // point for etcd watch.
-func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context, network string) ([]Lease, uint64, error) {
-	key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
+func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context) ([]Lease, uint64, error) {
+	key := path.Join(esr.etcdCfg.Prefix, "subnets")
 	resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Recursive: true, Quorum: true})
 	if err != nil {
 		if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
@@ -151,8 +149,8 @@ func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context, network string) (
 	return leases, resp.Index, nil
 }
 
-func (esr *etcdSubnetRegistry) getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) {
-	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
+func (esr *etcdSubnetRegistry) getSubnet(ctx context.Context, sn ip.IP4Net) (*Lease, uint64, error) {
+	key := path.Join(esr.etcdCfg.Prefix, "subnets", MakeSubnetKey(sn))
 	resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Quorum: true})
 	if err != nil {
 		return nil, 0, err
@@ -162,8 +160,8 @@ func (esr *etcdSubnetRegistry) getSubnet(ctx context.Context, network string, sn
 	return l, resp.Index, err
 }
 
-func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
-	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
+func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
+	key := path.Join(esr.etcdCfg.Prefix, "subnets", MakeSubnetKey(sn))
 	value, err := json.Marshal(attrs)
 	if err != nil {
 		return time.Time{}, err
@@ -187,8 +185,8 @@ func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network string,
 	return exp, nil
 }
 
-func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
-	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
+func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
+	key := path.Join(esr.etcdCfg.Prefix, "subnets", MakeSubnetKey(sn))
 	value, err := json.Marshal(attrs)
 	if err != nil {
 		return time.Time{}, err
@@ -210,14 +208,14 @@ func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string,
 	return exp, nil
 }
 
-func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error {
-	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
+func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, sn ip.IP4Net) error {
+	key := path.Join(esr.etcdCfg.Prefix, "subnets", MakeSubnetKey(sn))
 	_, err := esr.client().Delete(ctx, key, nil)
 	return err
 }
 
-func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error) {
-	key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
+func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, since uint64) (Event, uint64, error) {
+	key := path.Join(esr.etcdCfg.Prefix, "subnets")
 	opts := &etcd.WatcherOptions{
 		AfterIndex: since,
 		Recursive:  true,
@@ -231,8 +229,8 @@ func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, network string,
 	return evt, e.Node.ModifiedIndex, err
 }
 
-func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) {
-	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
+func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, since uint64, sn ip.IP4Net) (Event, uint64, error) {
+	key := path.Join(esr.etcdCfg.Prefix, "subnets", MakeSubnetKey(sn))
 	opts := &etcd.WatcherOptions{
 		AfterIndex: since,
 	}
@@ -246,50 +244,6 @@ func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, network string,
 	return evt, e.Node.ModifiedIndex, err
 }
 
-// getNetworks queries etcd to get a list of network names.  It returns the
-// networks along with the 'as-of' etcd-index that can be used as the starting
-// point for etcd watch.
-func (esr *etcdSubnetRegistry) getNetworks(ctx context.Context) ([]string, uint64, error) {
-	resp, err := esr.client().Get(ctx, esr.etcdCfg.Prefix, &etcd.GetOptions{Recursive: true, Quorum: true})
-
-	networks := []string{}
-
-	if err == nil {
-		for _, node := range resp.Node.Nodes {
-			// Look for '/config' on the child nodes
-			for _, child := range node.Nodes {
-				netname, isConfig := esr.parseNetworkKey(child.Key)
-				if isConfig {
-					networks = append(networks, netname)
-				}
-			}
-		}
-
-		return networks, resp.Index, nil
-	}
-
-	if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
-		// key not found: treat it as empty set
-		return networks, etcdErr.Index, nil
-	}
-
-	return nil, 0, err
-}
-
-func (esr *etcdSubnetRegistry) watchNetworks(ctx context.Context, since uint64) (Event, uint64, error) {
-	key := esr.etcdCfg.Prefix
-	opts := &etcd.WatcherOptions{
-		AfterIndex: since,
-		Recursive:  true,
-	}
-	e, err := esr.client().Watcher(key, opts).Next(ctx)
-	if err != nil {
-		return Event{}, 0, err
-	}
-
-	return esr.parseNetworkWatchResponse(e)
-}
-
 func (esr *etcdSubnetRegistry) client() etcd.KeysAPI {
 	esr.mux.Lock()
 	defer esr.mux.Unlock()
@@ -318,7 +272,6 @@ func parseSubnetWatchResponse(resp *etcd.Response) (Event, error) {
 		return Event{
 			EventRemoved,
 			Lease{Subnet: *sn},
-			"",
 		}, nil
 
 	default:
@@ -340,60 +293,11 @@ func parseSubnetWatchResponse(resp *etcd.Response) (Event, error) {
 				Attrs:      *attrs,
 				Expiration: exp,
 			},
-			"",
 		}
 		return evt, nil
 	}
 }
 
-func (esr *etcdSubnetRegistry) parseNetworkWatchResponse(resp *etcd.Response) (Event, uint64, error) {
-	index := resp.Node.ModifiedIndex
-	netname, isConfig := esr.parseNetworkKey(resp.Node.Key)
-	if netname == "" {
-		return Event{}, index, errTryAgain
-	}
-
-	var evt Event
-
-	switch resp.Action {
-	case "delete":
-		evt = Event{
-			EventRemoved,
-			Lease{},
-			netname,
-		}
-
-	default:
-		if !isConfig {
-			// Ignore non .../<netname>/config keys; tell caller to try again
-			return Event{}, index, errTryAgain
-		}
-
-		_, err := ParseConfig(resp.Node.Value)
-		if err != nil {
-			return Event{}, index, err
-		}
-
-		evt = Event{
-			EventAdded,
-			Lease{},
-			netname,
-		}
-	}
-
-	return evt, index, nil
-}
-
-// Returns network name from config key (eg, /coreos.com/network/foobar/config),
-// if the 'config' key isn't present we don't consider the network valid
-func (esr *etcdSubnetRegistry) parseNetworkKey(s string) (string, bool) {
-	if parts := esr.networkRegex.FindStringSubmatch(s); len(parts) == 3 {
-		return parts[1], parts[2] != ""
-	}
-
-	return "", false
-}
-
 func nodeToLease(node *etcd.Node) (*Lease, error) {
 	sn := ParseSubnetKey(node.Key)
 	if sn == nil {

+ 13 - 24
subnet/etcdv2/registry_test.go

@@ -56,7 +56,7 @@ func watchSubnets(t *testing.T, r Registry, ctx context.Context, sn ip.IP4Net, n
 
 	numFound := 0
 	for {
-		evt, index, err := r.watchSubnets(ctx, "foobar", nextIndex)
+		evt, index, err := r.watchSubnets(ctx, nextIndex)
 
 		switch {
 		case err == nil:
@@ -95,30 +95,19 @@ func TestEtcdRegistry(t *testing.T) {
 
 	ctx, _ := context.WithCancel(context.Background())
 
-	networks, _, err := r.getNetworks(ctx)
-	if err != nil {
-		t.Fatal("Failed to get networks")
-	}
-	if len(networks) != 0 {
-		t.Fatal("Networks should be empty")
+	config, err := r.getNetworkConfig(ctx)
+	if err == nil {
+		t.Fatal("Should hit error getting config")
 	}
 
 	// Populate etcd with a network
-	netKey := "/coreos.com/network/foobar/config"
+	netKey := "/coreos.com/network/config"
 	netValue := "{ \"Network\": \"10.1.0.0/16\", \"Backend\": { \"Type\": \"host-gw\" } }"
 	m.Create(ctx, netKey, netValue)
 
-	networks, _, err = r.getNetworks(ctx)
-	if err != nil {
-		t.Fatal("Failed to get networks the second time")
-	}
-	if len(networks) != 1 {
-		t.Fatal("Failed to find expected network foobar")
-	}
-
-	config, err := r.getNetworkConfig(ctx, "foobar")
+	config, err = r.getNetworkConfig(ctx)
 	if err != nil {
-		t.Fatal("Failed to get network config")
+		t.Fatal("Failed to get network config", err)
 	}
 	if config != netValue {
 		t.Fatal("Failed to match network config")
@@ -145,7 +134,7 @@ func TestEtcdRegistry(t *testing.T) {
 	attrs := &LeaseAttrs{
 		PublicIP: ip.MustParseIP4("1.2.3.4"),
 	}
-	exp, err := r.createSubnet(ctx, "foobar", sn, attrs, 24*time.Hour)
+	exp, err := r.createSubnet(ctx, sn, attrs, 24*time.Hour)
 	if err != nil {
 		t.Fatal("Failed to create subnet lease")
 	}
@@ -154,7 +143,7 @@ func TestEtcdRegistry(t *testing.T) {
 	}
 
 	// Make sure the lease got created
-	resp, err := m.Get(ctx, "/coreos.com/network/foobar/subnets/10.1.5.0-24", nil)
+	resp, err := m.Get(ctx, "/coreos.com/network/subnets/10.1.5.0-24", nil)
 	if err != nil {
 		t.Fatalf("Failed to verify subnet lease directly in etcd: %v", err)
 	}
@@ -165,7 +154,7 @@ func TestEtcdRegistry(t *testing.T) {
 		t.Fatalf("Unexpected subnet lease node %s value %s", resp.Node.Key, resp.Node.Value)
 	}
 
-	leases, _, err := r.getSubnets(ctx, "foobar")
+	leases, _, err := r.getSubnets(ctx)
 	if len(leases) != 1 {
 		t.Fatalf("Unexpected number of leases %d (expected 1)", len(leases))
 	}
@@ -173,18 +162,18 @@ func TestEtcdRegistry(t *testing.T) {
 		t.Fatalf("Mismatched subnet %v (expected %v)", leases[0].Subnet, sn)
 	}
 
-	lease, _, err := r.getSubnet(ctx, "foobar", sn)
+	lease, _, err := r.getSubnet(ctx, sn)
 	if lease == nil {
 		t.Fatal("Missing subnet lease")
 	}
 
-	err = r.deleteSubnet(ctx, "foobar", sn)
+	err = r.deleteSubnet(ctx, sn)
 	if err != nil {
 		t.Fatalf("Failed to delete subnet %v: %v", sn, err)
 	}
 
 	// Make sure the lease got deleted
-	resp, err = m.Get(ctx, "/coreos.com/network/foobar/subnets/10.1.5.0-24", nil)
+	resp, err = m.Get(ctx, "/coreos.com/network/subnets/10.1.5.0-24", nil)
 	if err == nil {
 		t.Fatal("Unexpected success getting deleted subnet")
 	}

+ 27 - 130
subnet/etcdv2/subnet_test.go

@@ -46,7 +46,7 @@ func newDummyRegistry() *MockSubnetRegistry {
 	}
 
 	config := `{ "Network": "10.3.0.0/16", "SubnetMin": "10.3.1.0", "SubnetMax": "10.3.25.0" }`
-	return NewMockRegistry("_", config, subnets)
+	return NewMockRegistry(config, subnets)
 }
 
 func TestAcquireLease(t *testing.T) {
@@ -58,7 +58,7 @@ func TestAcquireLease(t *testing.T) {
 		PublicIP: extIaddr,
 	}
 
-	l, err := sm.AcquireLease(context.Background(), "_", &attrs)
+	l, err := sm.AcquireLease(context.Background(), &attrs)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
@@ -68,7 +68,7 @@ func TestAcquireLease(t *testing.T) {
 	}
 
 	// Acquire again, should reuse
-	l2, err := sm.AcquireLease(context.Background(), "_", &attrs)
+	l2, err := sm.AcquireLease(context.Background(), &attrs)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
@@ -87,7 +87,7 @@ func TestConfigChanged(t *testing.T) {
 		PublicIP: extIaddr,
 	}
 
-	l, err := sm.AcquireLease(context.Background(), "_", &attrs)
+	l, err := sm.AcquireLease(context.Background(), &attrs)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
@@ -98,10 +98,10 @@ func TestConfigChanged(t *testing.T) {
 
 	// Change config
 	config := `{ "Network": "10.4.0.0/16" }`
-	msr.setConfig("_", config)
+	msr.setConfig(config)
 
 	// Acquire again, should not reuse
-	if l, err = sm.AcquireLease(context.Background(), "_", &attrs); err != nil {
+	if l, err = sm.AcquireLease(context.Background(), &attrs); err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
@@ -127,7 +127,7 @@ func acquireLease(ctx context.Context, t *testing.T, sm Manager) *Lease {
 		PublicIP: extIaddr,
 	}
 
-	l, err := sm.AcquireLease(ctx, "_", &attrs)
+	l, err := sm.AcquireLease(ctx, &attrs)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
@@ -145,7 +145,7 @@ func TestWatchLeaseAdded(t *testing.T) {
 	l := acquireLease(ctx, t, sm)
 
 	events := make(chan []Event)
-	go WatchLeases(ctx, sm, "_", l, events)
+	go WatchLeases(ctx, sm, l, events)
 
 	evtBatch := <-events
 	for _, evt := range evtBatch {
@@ -167,7 +167,7 @@ func TestWatchLeaseAdded(t *testing.T) {
 	attrs := &LeaseAttrs{
 		PublicIP: ip.MustParseIP4("1.1.1.1"),
 	}
-	_, err := msr.createSubnet(ctx, "_", expected, attrs, 0)
+	_, err := msr.createSubnet(ctx, expected, attrs, 0)
 	if err != nil {
 		t.Fatalf("createSubnet filed: %v", err)
 	}
@@ -200,7 +200,7 @@ func TestWatchLeaseRemoved(t *testing.T) {
 	l := acquireLease(ctx, t, sm)
 
 	events := make(chan []Event)
-	go WatchLeases(ctx, sm, "_", l, events)
+	go WatchLeases(ctx, sm, l, events)
 
 	evtBatch := <-events
 
@@ -264,7 +264,7 @@ func TestRenewLease(t *testing.T) {
 	ctx, cancel := context.WithCancel(context.Background())
 	defer cancel()
 
-	l, err := sm.AcquireLease(ctx, "_", &attrs)
+	l, err := sm.AcquireLease(ctx, &attrs)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
@@ -273,12 +273,12 @@ func TestRenewLease(t *testing.T) {
 
 	fakeClock.Advance(24 * time.Hour)
 
-	if err := sm.RenewLease(ctx, "_", l); err != nil {
+	if err := sm.RenewLease(ctx, l); err != nil {
 		t.Fatal("RenewLease failed: ", err)
 	}
 
 	// check that it's still good
-	n, err := msr.getNetwork(ctx, "_")
+	n, err := msr.getNetwork(ctx)
 	if err != nil {
 		t.Errorf("Failed to renew lease: could not get networks: %v", err)
 	}
@@ -308,11 +308,11 @@ func TestLeaseRevoked(t *testing.T) {
 
 	l := acquireLease(ctx, t, sm)
 
-	if err := sm.RevokeLease(ctx, "_", l.Subnet); err != nil {
+	if err := sm.RevokeLease(ctx, l.Subnet); err != nil {
 		t.Fatalf("RevokeLease failed: %v", err)
 	}
 
-	_, _, err := msr.getSubnet(ctx, "_", l.Subnet)
+	_, _, err := msr.getSubnet(ctx, l.Subnet)
 	if err == nil {
 		t.Fatalf("Revoked lease still exists")
 	}
@@ -321,109 +321,6 @@ func TestLeaseRevoked(t *testing.T) {
 	}
 }
 
-func TestWatchGetNetworks(t *testing.T) {
-	msr := newDummyRegistry()
-	sm := NewMockManager(msr)
-
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-
-	// Kill the previously added "_" network
-	msr.DeleteNetwork(ctx, "_")
-
-	expected := "foobar"
-	msr.CreateNetwork(ctx, expected, `{"Network": "10.1.1.0/16", "Backend": {"Type": "bridge"}}`)
-
-	resp, err := sm.WatchNetworks(ctx, nil)
-	if err != nil {
-		t.Errorf("WatchNetworks(nil) failed: %v", err)
-	}
-
-	if len(resp.Snapshot) != 1 {
-		t.Errorf("WatchNetworks(nil) produced wrong number of networks: expected 1, got %d", len(resp.Snapshot))
-	}
-
-	if resp.Snapshot[0] != expected {
-		t.Errorf("WatchNetworks(nil) produced wrong network: expected %s, got %s", expected, resp.Snapshot[0])
-	}
-}
-
-func TestWatchNetworkAdded(t *testing.T) {
-	msr := newDummyRegistry()
-	sm := NewMockManager(msr)
-
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-
-	events := make(chan []Event)
-	go WatchNetworks(ctx, sm, events)
-
-	// skip over the initial snapshot
-	<-events
-
-	expected := "foobar"
-	msr.CreateNetwork(ctx, expected, `{"Network": "10.1.1.0/16", "Backend": {"Type": "bridge"}}`)
-
-	evtBatch := <-events
-
-	if len(evtBatch) != 1 {
-		t.Fatalf("WatchNetworks produced wrong sized event batch")
-	}
-
-	evt := evtBatch[0]
-
-	if evt.Type != EventAdded {
-		t.Fatalf("WatchNetworks produced wrong event type")
-	}
-
-	actual := evt.Network
-	if actual != expected {
-		t.Errorf("WatchNetworks produced wrong network: expected %s, got %s", expected, actual)
-	}
-}
-
-func TestWatchNetworkRemoved(t *testing.T) {
-	msr := newDummyRegistry()
-	sm := NewMockManager(msr)
-
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-
-	events := make(chan []Event)
-	go WatchNetworks(ctx, sm, events)
-
-	// skip over the initial snapshot
-	<-events
-
-	expected := "blah"
-	msr.CreateNetwork(ctx, expected, `{"Network": "10.1.1.0/16", "Backend": {"Type": "bridge"}}`)
-
-	// skip over the create event
-	<-events
-
-	err := msr.DeleteNetwork(ctx, expected)
-	if err != nil {
-		t.Fatalf("WatchNetworks failed to delete the network")
-	}
-
-	evtBatch := <-events
-
-	if len(evtBatch) != 1 {
-		t.Fatalf("WatchNetworks produced wrong sized event batch")
-	}
-
-	evt := evtBatch[0]
-
-	if evt.Type != EventRemoved {
-		t.Fatalf("WatchNetworks produced wrong event type")
-	}
-
-	actual := evt.Network
-	if actual != expected {
-		t.Errorf("WatchNetwork produced wrong network: expected %s, got %s", expected, actual)
-	}
-}
-
 func TestAddReservation(t *testing.T) {
 	msr := newDummyRegistry()
 	sm := NewMockManager(msr)
@@ -435,31 +332,31 @@ func TestAddReservation(t *testing.T) {
 		Subnet:   newIP4Net("10.4.3.0", 24),
 		PublicIP: ip.MustParseIP4("52.195.12.13"),
 	}
-	if err := sm.AddReservation(ctx, "_", &r); err == nil {
+	if err := sm.AddReservation(ctx, &r); err == nil {
 		t.Fatalf("unexpectedly added a reservation outside of configured network")
 	}
 
 	r.Subnet = newIP4Net("10.3.10.0", 24)
-	if err := sm.AddReservation(ctx, "_", &r); err != nil {
+	if err := sm.AddReservation(ctx, &r); err != nil {
 		t.Fatalf("failed to add reservation: %v", err)
 	}
 
 	// Add the same reservation -- should succeed
-	if err := sm.AddReservation(ctx, "_", &r); err != nil {
+	if err := sm.AddReservation(ctx, &r); err != nil {
 		t.Fatalf("failed to add reservation: %v", err)
 	}
 
 	// Add a reservation with a different public IP -- should fail
 	r2 := r
 	r2.PublicIP = ip.MustParseIP4("52.195.12.17")
-	if err := sm.AddReservation(ctx, "_", &r2); err != ErrLeaseTaken {
+	if err := sm.AddReservation(ctx, &r2); err != ErrLeaseTaken {
 		t.Fatalf("taken add reservation returned: %v", err)
 	}
 
 	attrs := &LeaseAttrs{
 		PublicIP: r.PublicIP,
 	}
-	l, err := sm.AcquireLease(ctx, "_", attrs)
+	l, err := sm.AcquireLease(ctx, attrs)
 	if err != nil {
 		t.Fatalf("failed to acquire subnet: %v", err)
 	}
@@ -482,16 +379,16 @@ func TestRemoveReservation(t *testing.T) {
 		Subnet:   newIP4Net("10.3.10.0", 24),
 		PublicIP: ip.MustParseIP4("52.195.12.13"),
 	}
-	if err := sm.AddReservation(ctx, "_", &r); err != nil {
+	if err := sm.AddReservation(ctx, &r); err != nil {
 		t.Fatalf("failed to add reservation: %v", err)
 	}
 
-	if err := sm.RemoveReservation(ctx, "_", r.Subnet); err != nil {
+	if err := sm.RemoveReservation(ctx, r.Subnet); err != nil {
 		t.Fatalf("failed to remove reservation: %v", err)
 	}
 
 	// The node should have a TTL
-	sub, _, err := msr.getSubnet(ctx, "_", r.Subnet)
+	sub, _, err := msr.getSubnet(ctx, r.Subnet)
 	if err != nil {
 		t.Fatalf("getSubnet failed: %v", err)
 	}
@@ -512,7 +409,7 @@ func TestListReservations(t *testing.T) {
 		Subnet:   newIP4Net("10.3.10.0", 24),
 		PublicIP: ip.MustParseIP4("52.195.12.13"),
 	}
-	if err := sm.AddReservation(ctx, "_", &r1); err != nil {
+	if err := sm.AddReservation(ctx, &r1); err != nil {
 		t.Fatalf("failed to add reservation: %v", err)
 	}
 
@@ -520,11 +417,11 @@ func TestListReservations(t *testing.T) {
 		Subnet:   newIP4Net("10.3.20.0", 24),
 		PublicIP: ip.MustParseIP4("52.195.12.14"),
 	}
-	if err := sm.AddReservation(ctx, "_", &r2); err != nil {
+	if err := sm.AddReservation(ctx, &r2); err != nil {
 		t.Fatalf("failed to add reservation: %v", err)
 	}
 
-	rs, err := sm.ListReservations(ctx, "_")
+	rs, err := sm.ListReservations(ctx)
 	if err != nil {
 		if len(rs) != 2 {
 			t.Fatalf("unexpected number of reservations, expected 2, got %v", len(rs))
@@ -539,7 +436,7 @@ func TestListReservations(t *testing.T) {
 }
 
 func inAllocatableRange(ctx context.Context, sm Manager, ipn ip.IP4Net) bool {
-	cfg, err := sm.GetNetworkConfig(ctx, "_")
+	cfg, err := sm.GetNetworkConfig(ctx)
 	if err != nil {
 		panic(err)
 	}

+ 14 - 21
subnet/kube/kube.go

@@ -165,9 +165,9 @@ func (ksm *kubeSubnetManager) handleAddLeaseEvent(et subnet.EventType, obj inter
 		glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
 		return
 	}
-	ksm.events <- subnet.Event{et, l, ""}
+	ksm.events <- subnet.Event{et, l}
 	if n.ObjectMeta.Name == ksm.nodeName {
-		ksm.selfEvents <- subnet.Event{et, l, ""}
+		ksm.selfEvents <- subnet.Event{et, l}
 	}
 }
 
@@ -188,17 +188,17 @@ func (ksm *kubeSubnetManager) handleUpdateLeaseEvent(oldObj, newObj interface{})
 		glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
 		return
 	}
-	ksm.events <- subnet.Event{subnet.EventAdded, l, ""}
+	ksm.events <- subnet.Event{subnet.EventAdded, l}
 	if n.ObjectMeta.Name == ksm.nodeName {
-		ksm.selfEvents <- subnet.Event{subnet.EventAdded, l, ""}
+		ksm.selfEvents <- subnet.Event{subnet.EventAdded, l}
 	}
 }
 
-func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context, network string) (*subnet.Config, error) {
+func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context) (*subnet.Config, error) {
 	return ksm.subnetConf, nil
 }
 
-func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, network string, attrs *subnet.LeaseAttrs) (*subnet.Lease, error) {
+func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *subnet.LeaseAttrs) (*subnet.Lease, error) {
 	cachedNode, err := ksm.nodeStore.Get(ksm.nodeName)
 	if err != nil {
 		return nil, err
@@ -256,8 +256,8 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, network string,
 	}, nil
 }
 
-func (ksm *kubeSubnetManager) RenewLease(ctx context.Context, network string, lease *subnet.Lease) error {
-	l, err := ksm.AcquireLease(ctx, network, &lease.Attrs)
+func (ksm *kubeSubnetManager) RenewLease(ctx context.Context, lease *subnet.Lease) error {
+	l, err := ksm.AcquireLease(ctx, &lease.Attrs)
 	if err != nil {
 		return err
 	}
@@ -267,7 +267,7 @@ func (ksm *kubeSubnetManager) RenewLease(ctx context.Context, network string, le
 	return nil
 }
 
-func (ksm *kubeSubnetManager) WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
+func (ksm *kubeSubnetManager) WatchLease(ctx context.Context, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
 	select {
 	case event := <-ksm.selfEvents:
 		return subnet.LeaseWatchResult{
@@ -278,7 +278,7 @@ func (ksm *kubeSubnetManager) WatchLease(ctx context.Context, network string, sn
 	}
 }
 
-func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (subnet.LeaseWatchResult, error) {
+func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, cursor interface{}) (subnet.LeaseWatchResult, error) {
 	select {
 	case event := <-ksm.events:
 		return subnet.LeaseWatchResult{
@@ -289,13 +289,6 @@ func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, network string, c
 	}
 }
 
-func (ksm *kubeSubnetManager) WatchNetworks(ctx context.Context, cursor interface{}) (subnet.NetworkWatchResult, error) {
-	time.Sleep(time.Second)
-	return subnet.NetworkWatchResult{
-		Snapshot: []string{""},
-	}, nil
-}
-
 func (ksm *kubeSubnetManager) Run(ctx context.Context) {
 	glog.Infof("starting kube subnet manager")
 	ksm.nodeController.Run(ctx.Done())
@@ -318,18 +311,18 @@ func nodeToLease(n v1.Node) (l subnet.Lease, err error) {
 }
 
 // unimplemented
-func (ksm *kubeSubnetManager) RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error {
+func (ksm *kubeSubnetManager) RevokeLease(ctx context.Context, sn ip.IP4Net) error {
 	return ErrUnimplemented
 }
 
-func (ksm *kubeSubnetManager) AddReservation(ctx context.Context, network string, r *subnet.Reservation) error {
+func (ksm *kubeSubnetManager) AddReservation(ctx context.Context, r *subnet.Reservation) error {
 	return ErrUnimplemented
 }
 
-func (ksm *kubeSubnetManager) RemoveReservation(ctx context.Context, network string, subnet ip.IP4Net) error {
+func (ksm *kubeSubnetManager) RemoveReservation(ctx context.Context, subnet ip.IP4Net) error {
 	return ErrUnimplemented
 }
 
-func (ksm *kubeSubnetManager) ListReservations(ctx context.Context, network string) ([]subnet.Reservation, error) {
+func (ksm *kubeSubnetManager) ListReservations(ctx context.Context) ([]subnet.Reservation, error) {
 	return nil, ErrUnimplemented
 }

+ 12 - 23
subnet/subnet.go

@@ -60,9 +60,8 @@ type (
 	EventType int
 
 	Event struct {
-		Type    EventType `json:"type"`
-		Lease   Lease     `json:"lease,omitempty"`
-		Network string    `json:"network,omitempty"`
+		Type  EventType `json:"type"`
+		Lease Lease     `json:"lease,omitempty"`
 	}
 )
 
@@ -80,15 +79,6 @@ type LeaseWatchResult struct {
 	Cursor   interface{} `json:"cursor"`
 }
 
-type NetworkWatchResult struct {
-	// Either Events or Snapshot will be set.  If Events is empty, it means
-	// the cursor was out of range and Snapshot contains the current list
-	// of items, even if empty.
-	Events   []Event     `json:"events"`
-	Snapshot []string    `json:"snapshot"`
-	Cursor   interface{} `json:"cursor,omitempty"`
-}
-
 func (et EventType) MarshalJSON() ([]byte, error) {
 	s := ""
 
@@ -134,15 +124,14 @@ func MakeSubnetKey(sn ip.IP4Net) string {
 }
 
 type Manager interface {
-	GetNetworkConfig(ctx context.Context, network string) (*Config, error)
-	AcquireLease(ctx context.Context, network string, attrs *LeaseAttrs) (*Lease, error)
-	RenewLease(ctx context.Context, network string, lease *Lease) error
-	RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error
-	WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error)
-	WatchLeases(ctx context.Context, network string, cursor interface{}) (LeaseWatchResult, error)
-	WatchNetworks(ctx context.Context, cursor interface{}) (NetworkWatchResult, error)
-
-	AddReservation(ctx context.Context, network string, r *Reservation) error
-	RemoveReservation(ctx context.Context, network string, subnet ip.IP4Net) error
-	ListReservations(ctx context.Context, network string) ([]Reservation, error)
+	GetNetworkConfig(ctx context.Context) (*Config, error)
+	AcquireLease(ctx context.Context, attrs *LeaseAttrs) (*Lease, error)
+	RenewLease(ctx context.Context, lease *Lease) error
+	RevokeLease(ctx context.Context, sn ip.IP4Net) error
+	WatchLease(ctx context.Context, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error)
+	WatchLeases(ctx context.Context, cursor interface{}) (LeaseWatchResult, error)
+
+	AddReservation(ctx context.Context, r *Reservation) error
+	RemoveReservation(ctx context.Context, subnet ip.IP4Net) error
+	ListReservations(ctx context.Context) ([]Reservation, error)
 }

+ 10 - 111
subnet/watch.go

@@ -27,14 +27,14 @@ import (
 // and communicates addition/deletion events on receiver channel. It takes care
 // of handling "fall-behind" logic where the history window has advanced too far
 // and it needs to diff the latest snapshot with its saved state and generate events
-func WatchLeases(ctx context.Context, sm Manager, network string, ownLease *Lease, receiver chan []Event) {
+func WatchLeases(ctx context.Context, sm Manager, ownLease *Lease, receiver chan []Event) {
 	lw := &leaseWatcher{
 		ownLease: ownLease,
 	}
 	var cursor interface{}
 
 	for {
-		res, err := sm.WatchLeases(ctx, network, cursor)
+		res, err := sm.WatchLeases(ctx, cursor)
 		if err != nil {
 			if err == context.Canceled || err == context.DeadlineExceeded {
 				return
@@ -85,7 +85,7 @@ func (lw *leaseWatcher) reset(leases []Lease) []Event {
 
 		if !found {
 			// new lease
-			batch = append(batch, Event{EventAdded, nl, ""})
+			batch = append(batch, Event{EventAdded, nl})
 		}
 	}
 
@@ -94,7 +94,7 @@ func (lw *leaseWatcher) reset(leases []Lease) []Event {
 		if lw.ownLease != nil && l.Subnet.Equal(lw.ownLease.Subnet) {
 			continue
 		}
-		batch = append(batch, Event{EventRemoved, l, ""})
+		batch = append(batch, Event{EventRemoved, l})
 	}
 
 	// copy the leases over (caution: don't just assign a slice)
@@ -128,25 +128,25 @@ func (lw *leaseWatcher) add(lease *Lease) Event {
 	for i, l := range lw.leases {
 		if l.Subnet.Equal(lease.Subnet) {
 			lw.leases[i] = *lease
-			return Event{EventAdded, lw.leases[i], ""}
+			return Event{EventAdded, lw.leases[i]}
 		}
 	}
 
 	lw.leases = append(lw.leases, *lease)
 
-	return Event{EventAdded, lw.leases[len(lw.leases)-1], ""}
+	return Event{EventAdded, lw.leases[len(lw.leases)-1]}
 }
 
 func (lw *leaseWatcher) remove(lease *Lease) Event {
 	for i, l := range lw.leases {
 		if l.Subnet.Equal(lease.Subnet) {
 			lw.leases = deleteLease(lw.leases, i)
-			return Event{EventRemoved, l, ""}
+			return Event{EventRemoved, l}
 		}
 	}
 
 	log.Errorf("Removed subnet (%s) was not found", lease.Subnet)
-	return Event{EventRemoved, *lease, ""}
+	return Event{EventRemoved, *lease}
 }
 
 func deleteLease(l []Lease, i int) []Lease {
@@ -154,116 +154,15 @@ func deleteLease(l []Lease, i int) []Lease {
 	return l[:len(l)-1]
 }
 
-// WatchNetworks performs a long term watch of flannel networks and communicates
-// addition/deletion events on receiver channel. It takes care of handling
-// "fall-behind" logic where the history window has advanced too far and it
-// needs to diff the latest snapshot with its saved state and generate events
-func WatchNetworks(ctx context.Context, sm Manager, receiver chan []Event) {
-	nw := newNetWatcher()
-	var cursor interface{}
-
-	for {
-		res, err := sm.WatchNetworks(ctx, cursor)
-		if err != nil {
-			if err == context.Canceled || err == context.DeadlineExceeded {
-				return
-			}
-
-			log.Errorf("Watch networks: %v", err)
-			time.Sleep(time.Second)
-			continue
-		}
-		cursor = res.Cursor
-
-		var batch []Event
-
-		if len(res.Events) > 0 {
-			batch = nw.update(res.Events)
-		} else {
-			batch = nw.reset(res.Snapshot)
-		}
-
-		if len(batch) > 0 {
-			receiver <- batch
-		}
-	}
-}
-
-type netWatcher struct {
-	networks map[string]bool
-}
-
-func newNetWatcher() *netWatcher {
-	return &netWatcher{networks: make(map[string]bool)}
-}
-
-func (nw *netWatcher) reset(networks []string) []Event {
-	batch := []Event{}
-	newNetworks := make(map[string]bool)
-
-	for _, netname := range networks {
-		if nw.networks[netname] {
-			delete(nw.networks, netname)
-		} else {
-			// new network
-			batch = append(batch, Event{EventAdded, Lease{}, netname})
-		}
-		newNetworks[netname] = true
-	}
-
-	// everything left in sm.networks has been deleted
-	for netname := range nw.networks {
-		batch = append(batch, Event{EventRemoved, Lease{}, netname})
-	}
-
-	nw.networks = newNetworks
-
-	return batch
-}
-
-func (nw *netWatcher) update(events []Event) []Event {
-	batch := []Event{}
-
-	for _, e := range events {
-		switch e.Type {
-		case EventAdded:
-			batch = append(batch, nw.add(e.Network))
-
-		case EventRemoved:
-			batch = append(batch, nw.remove(e.Network))
-		}
-	}
-
-	return batch
-}
-
-func (nw *netWatcher) add(network string) Event {
-	if _, ok := nw.networks[network]; !ok {
-		nw.networks[network] = true
-	}
-
-	return Event{EventAdded, Lease{}, network}
-}
-
-func (nw *netWatcher) remove(network string) Event {
-	if _, ok := nw.networks[network]; ok {
-		delete(nw.networks, network)
-	} else {
-		log.Errorf("Removed network (%s) was not found", network)
-	}
-
-	return Event{EventRemoved, Lease{}, network}
-}
-
 // WatchLease performs a long term watch of the given network's subnet lease
 // and communicates addition/deletion events on receiver channel. It takes care
 // of handling "fall-behind" logic where the history window has advanced too far
 // and it needs to diff the latest snapshot with its saved state and generate events
-func WatchLease(ctx context.Context, sm Manager, network string, sn ip.IP4Net, receiver chan Event) {
+func WatchLease(ctx context.Context, sm Manager, sn ip.IP4Net, receiver chan Event) {
 	var cursor interface{}
 
 	for {
-		wr, err := sm.WatchLease(ctx, network, sn, cursor)
+		wr, err := sm.WatchLease(ctx, sn, cursor)
 		if err != nil {
 			if err == context.Canceled || err == context.DeadlineExceeded {
 				return