Browse Source

mutli-network support and abstract subnet mgr

- Support for flannel running multiple networks
- Refactor to make subnet manager into an interface
- Impl local (direct connect to etcd) impl of subnet mgr
Eugene Yakubovich 10 years ago
parent
commit
9cbad3b7f0
17 changed files with 1107 additions and 774 deletions
  1. 28 21
      backend/alloc/alloc.go
  2. 35 26
      backend/awsvpc/awsvpc.go
  3. 39 30
      backend/hostgw/hostgw.go
  4. 41 32
      backend/udp/udp.go
  5. 46 35
      backend/vxlan/vxlan.go
  6. 80 103
      main.go
  7. 44 0
      network/backend.go
  8. 1 1
      network/ipmasq.go
  9. 117 0
      network/network.go
  10. 0 19
      pkg/task/errors.go
  11. 1 1
      subnet/config.go
  12. 365 0
      subnet/etcd.go
  13. 50 32
      subnet/registry.go
  14. 48 0
      subnet/renew.go
  15. 47 425
      subnet/subnet.go
  16. 35 49
      subnet/subnet_test.go
  17. 130 0
      subnet/watch.go

+ 28 - 21
backend/alloc/alloc.go

@@ -4,22 +4,28 @@ import (
 	"fmt"
 	"net"
 
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/flannel/backend"
 	"github.com/coreos/flannel/pkg/ip"
-	"github.com/coreos/flannel/pkg/task"
 	"github.com/coreos/flannel/subnet"
 )
 
-
 type AllocBackend struct {
-	sm   *subnet.SubnetManager
-	stop chan bool
+	sm      subnet.Manager
+	network string
+	lease   *subnet.Lease
+	ctx     context.Context
+	cancel  context.CancelFunc
 }
 
-func New(sm *subnet.SubnetManager) backend.Backend {
+func New(sm subnet.Manager, network string) backend.Backend {
+	ctx, cancel := context.WithCancel(context.Background())
+
 	return &AllocBackend{
-		sm: sm,
-		stop: make(chan bool),
+		sm:      sm,
+		network: network,
+		ctx:     ctx,
+		cancel:  cancel,
 	}
 }
 
@@ -28,27 +34,28 @@ func (m *AllocBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.Sub
 		PublicIP: ip.FromIP(extIP),
 	}
 
-	sn, err := m.sm.AcquireLease(&attrs, m.stop)
-	if err != nil {
-		if err == task.ErrCanceled {
-			return nil, err
-		} else {
-			return nil, fmt.Errorf("failed to acquire lease: %v", err)
-		}
-	}
+	l, err := m.sm.AcquireLease(m.ctx, m.network, &attrs)
+	switch err {
+	case nil:
+		return &backend.SubnetDef{
+			Net: l.Subnet,
+			MTU: extIface.MTU,
+		}, nil
 
-	return &backend.SubnetDef{
-		Net: sn,
-		MTU: extIface.MTU,
-	}, nil
+	case context.Canceled, context.DeadlineExceeded:
+		return nil, err
+
+	default:
+		return nil, fmt.Errorf("failed to acquire lease: %v", err)
+	}
 }
 
 func (m *AllocBackend) Run() {
-	m.sm.LeaseRenewer(m.stop)
+	subnet.LeaseRenewer(m.ctx, m.sm, m.network, m.lease)
 }
 
 func (m *AllocBackend) Stop() {
-	close(m.stop)
+	m.cancel()
 }
 
 func (m *AllocBackend) Name() string {

+ 35 - 26
backend/awsvpc/awsvpc.go

@@ -22,36 +22,42 @@ import (
 
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/mitchellh/goamz/aws"
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/mitchellh/goamz/ec2"
-
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/flannel/backend"
 	"github.com/coreos/flannel/pkg/ip"
-	"github.com/coreos/flannel/pkg/task"
 	"github.com/coreos/flannel/subnet"
 )
 
 type AwsVpcBackend struct {
-	sm     *subnet.SubnetManager
-	rawCfg json.RawMessage
-	cfg    struct {
+	sm      subnet.Manager
+	network string
+	config  *subnet.Config
+	cfg     struct {
 		RouteTableID string
 	}
-	stop chan bool
-	wg   sync.WaitGroup
+	lease  *subnet.Lease
+	ctx    context.Context
+	cancel context.CancelFunc
+	wg     sync.WaitGroup
 }
 
-func New(sm *subnet.SubnetManager, config json.RawMessage) backend.Backend {
+func New(sm subnet.Manager, network string, config *subnet.Config) backend.Backend {
+	ctx, cancel := context.WithCancel(context.Background())
+
 	be := AwsVpcBackend{
-		sm:     sm,
-		rawCfg: config,
-		stop:   make(chan bool),
+		sm:      sm,
+		network: network,
+		config:  config,
+		ctx:     ctx,
+		cancel:  cancel,
 	}
 	return &be
 }
 
 func (m *AwsVpcBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.SubnetDef, error) {
 	// Parse our configuration
-	if len(m.rawCfg) > 0 {
-		if err := json.Unmarshal(m.rawCfg, &m.cfg); err != nil {
+	if len(m.config.Backend) > 0 {
+		if err := json.Unmarshal(m.config.Backend, &m.cfg); err != nil {
 			return nil, fmt.Errorf("error decoding VPC backend config: %v", err)
 		}
 	}
@@ -61,13 +67,16 @@ func (m *AwsVpcBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.Su
 		PublicIP: ip.FromIP(extIP),
 	}
 
-	sn, err := m.sm.AcquireLease(&attrs, m.stop)
-	if err != nil {
-		if err == task.ErrCanceled {
-			return nil, err
-		} else {
-			return nil, fmt.Errorf("failed to acquire lease: %v", err)
-		}
+	l, err := m.sm.AcquireLease(m.ctx, m.network, &attrs)
+	switch err {
+	case nil:
+		m.lease = l
+
+	case context.Canceled, context.DeadlineExceeded:
+		return nil, err
+
+	default:
+		return nil, fmt.Errorf("failed to acquire lease: %v", err)
 	}
 
 	// Figure out this machine's EC2 instance ID and region
@@ -95,10 +104,10 @@ func (m *AwsVpcBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.Su
 	ec2c := ec2.New(auth, region)
 
 	// Delete route for this machine's subnet if it already exists
-	if _, err := ec2c.DeleteRoute(m.cfg.RouteTableID, sn.String()); err != nil {
+	if _, err := ec2c.DeleteRoute(m.cfg.RouteTableID, l.Subnet.String()); err != nil {
 		if ec2err, ok := err.(*ec2.Error); !ok || ec2err.Code != "InvalidRoute.NotFound" {
 			// an error other than the route not already existing occurred
-			return nil, fmt.Errorf("error deleting existing route for %s: %v", sn.String(), err)
+			return nil, fmt.Errorf("error deleting existing route for %s: %v", l.Subnet.String(), err)
 		}
 	}
 
@@ -106,7 +115,7 @@ func (m *AwsVpcBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.Su
 	route := &ec2.CreateRoute{
 		RouteTableId:         m.cfg.RouteTableID,
 		InstanceId:           instanceID,
-		DestinationCidrBlock: sn.String(),
+		DestinationCidrBlock: l.Subnet.String(),
 	}
 
 	if _, err := ec2c.CreateRoute(route); err != nil {
@@ -114,17 +123,17 @@ func (m *AwsVpcBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.Su
 	}
 
 	return &backend.SubnetDef{
-		Net: sn,
+		Net: l.Subnet,
 		MTU: extIface.MTU,
 	}, nil
 }
 
 func (m *AwsVpcBackend) Run() {
-	m.sm.LeaseRenewer(m.stop)
+	subnet.LeaseRenewer(m.ctx, m.sm, m.network, m.lease)
 }
 
 func (m *AwsVpcBackend) Stop() {
-	close(m.stop)
+	m.cancel()
 }
 
 func (m *AwsVpcBackend) Name() string {

+ 39 - 30
backend/hostgw/hostgw.go

@@ -23,10 +23,9 @@ import (
 
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink"
-
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/flannel/backend"
 	"github.com/coreos/flannel/pkg/ip"
-	"github.com/coreos/flannel/pkg/task"
 	"github.com/coreos/flannel/subnet"
 )
 
@@ -35,18 +34,25 @@ const (
 )
 
 type HostgwBackend struct {
-	sm       *subnet.SubnetManager
+	sm       subnet.Manager
+	network  string
+	lease    *subnet.Lease
 	extIface *net.Interface
 	extIP    net.IP
-	stop     chan bool
+	ctx      context.Context
+	cancel   context.CancelFunc
 	wg       sync.WaitGroup
 	rl       []netlink.Route
 }
 
-func New(sm *subnet.SubnetManager) backend.Backend {
+func New(sm subnet.Manager, network string) backend.Backend {
+	ctx, cancel := context.WithCancel(context.Background())
+
 	b := &HostgwBackend{
-		sm:   sm,
-		stop: make(chan bool),
+		sm:      sm,
+		network: network,
+		ctx:     ctx,
+		cancel:  cancel,
 	}
 	return b
 }
@@ -60,19 +66,22 @@ func (rb *HostgwBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.S
 		BackendType: "host-gw",
 	}
 
-	sn, err := rb.sm.AcquireLease(&attrs, rb.stop)
-	if err != nil {
-		if err == task.ErrCanceled {
-			return nil, err
-		} else {
-			return nil, fmt.Errorf("Failed to acquire lease: %v", err)
-		}
+	l, err := rb.sm.AcquireLease(rb.ctx, rb.network, &attrs)
+	switch err {
+	case nil:
+		rb.lease = l
+
+	case context.Canceled, context.DeadlineExceeded:
+		return nil, err
+
+	default:
+		return nil, fmt.Errorf("failed to acquire lease: %v", err)
 	}
 
 	/* NB: docker will create the local route to `sn` */
 
 	return &backend.SubnetDef{
-		Net: sn,
+		Net: l.Subnet,
 		MTU: extIface.MTU,
 	}, nil
 }
@@ -80,22 +89,22 @@ func (rb *HostgwBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.S
 func (rb *HostgwBackend) Run() {
 	rb.wg.Add(1)
 	go func() {
-		rb.sm.LeaseRenewer(rb.stop)
+		subnet.LeaseRenewer(rb.ctx, rb.sm, rb.network, rb.lease)
 		rb.wg.Done()
 	}()
 
 	log.Info("Watching for new subnet leases")
-	evts := make(chan subnet.EventBatch)
+	evts := make(chan []subnet.Event)
 	rb.wg.Add(1)
 	go func() {
-		rb.sm.WatchLeases(evts, rb.stop)
+		subnet.WatchLeases(rb.ctx, rb.sm, rb.network, evts)
 		rb.wg.Done()
 	}()
 
 	rb.rl = make([]netlink.Route, 0, 10)
 	rb.wg.Add(1)
 	go func() {
-		rb.routeCheck(rb.stop)
+		rb.routeCheck(rb.ctx)
 		rb.wg.Done()
 	}()
 
@@ -106,25 +115,25 @@ func (rb *HostgwBackend) Run() {
 		case evtBatch := <-evts:
 			rb.handleSubnetEvents(evtBatch)
 
-		case <-rb.stop:
+		case <-rb.ctx.Done():
 			return
 		}
 	}
 }
 
 func (rb *HostgwBackend) Stop() {
-	close(rb.stop)
+	rb.cancel()
 }
 
 func (rb *HostgwBackend) Name() string {
 	return "host-gw"
 }
 
-func (rb *HostgwBackend) handleSubnetEvents(batch subnet.EventBatch) {
+func (rb *HostgwBackend) handleSubnetEvents(batch []subnet.Event) {
 	for _, evt := range batch {
 		switch evt.Type {
 		case subnet.SubnetAdded:
-			log.Infof("Subnet added: %v via %v", evt.Lease.Network, evt.Lease.Attrs.PublicIP)
+			log.Infof("Subnet added: %v via %v", evt.Lease.Subnet, evt.Lease.Attrs.PublicIP)
 
 			if evt.Lease.Attrs.BackendType != "host-gw" {
 				log.Warningf("Ignoring non-host-gw subnet: type=%v", evt.Lease.Attrs.BackendType)
@@ -132,18 +141,18 @@ func (rb *HostgwBackend) handleSubnetEvents(batch subnet.EventBatch) {
 			}
 
 			route := netlink.Route{
-				Dst:       evt.Lease.Network.ToIPNet(),
+				Dst:       evt.Lease.Subnet.ToIPNet(),
 				Gw:        evt.Lease.Attrs.PublicIP.ToIP(),
 				LinkIndex: rb.extIface.Index,
 			}
 			if err := netlink.RouteAdd(&route); err != nil {
-				log.Errorf("Error adding route to %v via %v: %v", evt.Lease.Network, evt.Lease.Attrs.PublicIP, err)
+				log.Errorf("Error adding route to %v via %v: %v", evt.Lease.Subnet, evt.Lease.Attrs.PublicIP, err)
 				continue
 			}
 			rb.addToRouteList(route)
 
 		case subnet.SubnetRemoved:
-			log.Info("Subnet removed: ", evt.Lease.Network)
+			log.Info("Subnet removed: ", evt.Lease.Subnet)
 
 			if evt.Lease.Attrs.BackendType != "host-gw" {
 				log.Warningf("Ignoring non-host-gw subnet: type=%v", evt.Lease.Attrs.BackendType)
@@ -151,12 +160,12 @@ func (rb *HostgwBackend) handleSubnetEvents(batch subnet.EventBatch) {
 			}
 
 			route := netlink.Route{
-				Dst:       evt.Lease.Network.ToIPNet(),
+				Dst:       evt.Lease.Subnet.ToIPNet(),
 				Gw:        evt.Lease.Attrs.PublicIP.ToIP(),
 				LinkIndex: rb.extIface.Index,
 			}
 			if err := netlink.RouteDel(&route); err != nil {
-				log.Errorf("Error deleting route to %v: %v", evt.Lease.Network, err)
+				log.Errorf("Error deleting route to %v: %v", evt.Lease.Subnet, err)
 				continue
 			}
 			rb.removeFromRouteList(route)
@@ -180,10 +189,10 @@ func (rb *HostgwBackend) removeFromRouteList(route netlink.Route) {
 	}
 }
 
-func (rb *HostgwBackend) routeCheck(cancel chan bool) {
+func (rb *HostgwBackend) routeCheck(ctx context.Context) {
 	for {
 		select {
-		case <-cancel:
+		case <-ctx.Done():
 			return
 		case <-time.After(routeCheckRetries * time.Second):
 			rb.checkSubnetExistInRoutes()

+ 41 - 32
backend/udp/udp.go

@@ -24,10 +24,9 @@ import (
 
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink"
-
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/flannel/backend"
 	"github.com/coreos/flannel/pkg/ip"
-	"github.com/coreos/flannel/pkg/task"
 	"github.com/coreos/flannel/subnet"
 )
 
@@ -37,26 +36,33 @@ const (
 )
 
 type UdpBackend struct {
-	sm     *subnet.SubnetManager
-	rawCfg json.RawMessage
-	cfg    struct {
+	sm      subnet.Manager
+	network string
+	config  *subnet.Config
+	cfg     struct {
 		Port int
 	}
+	lease  *subnet.Lease
 	ctl    *os.File
 	ctl2   *os.File
 	tun    *os.File
 	conn   *net.UDPConn
 	mtu    int
 	tunNet ip.IP4Net
-	stop   chan bool
+	ctx    context.Context
+	cancel context.CancelFunc
 	wg     sync.WaitGroup
 }
 
-func New(sm *subnet.SubnetManager, config json.RawMessage) backend.Backend {
+func New(sm subnet.Manager, network string, config *subnet.Config) backend.Backend {
+	ctx, cancel := context.WithCancel(context.Background())
+
 	be := UdpBackend{
-		sm:     sm,
-		rawCfg: config,
-		stop:   make(chan bool),
+		sm:      sm,
+		network: network,
+		config:  config,
+		ctx:     ctx,
+		cancel:  cancel,
 	}
 	be.cfg.Port = defaultPort
 	return &be
@@ -64,8 +70,8 @@ func New(sm *subnet.SubnetManager, config json.RawMessage) backend.Backend {
 
 func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.SubnetDef, error) {
 	// Parse our configuration
-	if len(m.rawCfg) > 0 {
-		if err := json.Unmarshal(m.rawCfg, &m.cfg); err != nil {
+	if len(m.config.Backend) > 0 {
+		if err := json.Unmarshal(m.config.Backend, &m.cfg); err != nil {
 			return nil, fmt.Errorf("error decoding UDP backend config: %v", err)
 		}
 	}
@@ -75,20 +81,23 @@ func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.Subne
 		PublicIP: ip.FromIP(extIP),
 	}
 
-	sn, err := m.sm.AcquireLease(&attrs, m.stop)
-	if err != nil {
-		if err == task.ErrCanceled {
-			return nil, err
-		} else {
-			return nil, fmt.Errorf("failed to acquire lease: %v", err)
-		}
+	l, err := m.sm.AcquireLease(m.ctx, m.network, &attrs)
+	switch err {
+	case nil:
+		m.lease = l
+
+	case context.Canceled, context.DeadlineExceeded:
+		return nil, err
+
+	default:
+		return nil, fmt.Errorf("failed to acquire lease: %v", err)
 	}
 
 	// Tunnel's subnet is that of the whole overlay network (e.g. /16)
 	// and not that of the individual host (e.g. /24)
 	m.tunNet = ip.IP4Net{
-		IP:        sn.IP,
-		PrefixLen: m.sm.GetConfig().Network.PrefixLen,
+		IP:        l.Subnet.IP,
+		PrefixLen: m.config.Network.PrefixLen,
 	}
 
 	// TUN MTU will be smaller b/c of encap (IP+UDP hdrs)
@@ -109,7 +118,7 @@ func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.Subne
 	}
 
 	return &backend.SubnetDef{
-		Net: sn,
+		Net: l.Subnet,
 		MTU: m.mtu,
 	}, nil
 }
@@ -124,7 +133,7 @@ func (m *UdpBackend) Run() {
 	}()
 
 	go func() {
-		m.sm.LeaseRenewer(m.stop)
+		subnet.LeaseRenewer(m.ctx, m.sm, m.network, m.lease)
 		m.wg.Done()
 	}()
 
@@ -138,7 +147,7 @@ func (m *UdpBackend) Stop() {
 		stopProxy(m.ctl)
 	}
 
-	close(m.stop)
+	m.cancel()
 }
 
 func (m *UdpBackend) Name() string {
@@ -211,11 +220,11 @@ func configureIface(ifname string, ipn ip.IP4Net, mtu int) error {
 func (m *UdpBackend) monitorEvents() {
 	log.Info("Watching for new subnet leases")
 
-	evts := make(chan subnet.EventBatch)
+	evts := make(chan []subnet.Event)
 
 	m.wg.Add(1)
 	go func() {
-		m.sm.WatchLeases(evts, m.stop)
+		subnet.WatchLeases(m.ctx, m.sm, m.network, evts)
 		m.wg.Done()
 	}()
 
@@ -224,24 +233,24 @@ func (m *UdpBackend) monitorEvents() {
 		case evtBatch := <-evts:
 			m.processSubnetEvents(evtBatch)
 
-		case <-m.stop:
+		case <-m.ctx.Done():
 			return
 		}
 	}
 }
 
-func (m *UdpBackend) processSubnetEvents(batch subnet.EventBatch) {
+func (m *UdpBackend) processSubnetEvents(batch []subnet.Event) {
 	for _, evt := range batch {
 		switch evt.Type {
 		case subnet.SubnetAdded:
-			log.Info("Subnet added: ", evt.Lease.Network)
+			log.Info("Subnet added: ", evt.Lease.Subnet)
 
-			setRoute(m.ctl, evt.Lease.Network, evt.Lease.Attrs.PublicIP, m.cfg.Port)
+			setRoute(m.ctl, evt.Lease.Subnet, evt.Lease.Attrs.PublicIP, m.cfg.Port)
 
 		case subnet.SubnetRemoved:
-			log.Info("Subnet removed: ", evt.Lease.Network)
+			log.Info("Subnet removed: ", evt.Lease.Subnet)
 
-			removeRoute(m.ctl, evt.Lease.Network)
+			removeRoute(m.ctl, evt.Lease.Subnet)
 
 		default:
 			log.Error("Internal error: unknown event type: ", int(evt.Type))

+ 46 - 35
backend/vxlan/vxlan.go

@@ -23,10 +23,9 @@ import (
 
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink"
-
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/flannel/backend"
 	"github.com/coreos/flannel/pkg/ip"
-	"github.com/coreos/flannel/pkg/task"
 	"github.com/coreos/flannel/subnet"
 )
 
@@ -35,23 +34,30 @@ const (
 )
 
 type VXLANBackend struct {
-	sm     *subnet.SubnetManager
-	rawCfg json.RawMessage
-	cfg    struct {
+	sm      subnet.Manager
+	network string
+	config  *subnet.Config
+	cfg     struct {
 		VNI  int
 		Port int
 	}
-	dev  *vxlanDevice
-	stop chan bool
-	wg   sync.WaitGroup
-	rts  routes
+	lease  *subnet.Lease
+	dev    *vxlanDevice
+	ctx    context.Context
+	cancel context.CancelFunc
+	wg     sync.WaitGroup
+	rts    routes
 }
 
-func New(sm *subnet.SubnetManager, config json.RawMessage) backend.Backend {
+func New(sm subnet.Manager, network string, config *subnet.Config) backend.Backend {
+	ctx, cancel := context.WithCancel(context.Background())
+
 	vb := &VXLANBackend{
-		sm:     sm,
-		rawCfg: config,
-		stop:   make(chan bool),
+		sm:      sm,
+		network: network,
+		config:  config,
+		ctx:     ctx,
+		cancel:  cancel,
 	}
 	vb.cfg.VNI = defaultVNI
 
@@ -73,8 +79,8 @@ func newSubnetAttrs(pubIP net.IP, mac net.HardwareAddr) (*subnet.LeaseAttrs, err
 
 func (vb *VXLANBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.SubnetDef, error) {
 	// Parse our configuration
-	if len(vb.rawCfg) > 0 {
-		if err := json.Unmarshal(vb.rawCfg, &vb.cfg); err != nil {
+	if len(vb.config.Backend) > 0 {
+		if err := json.Unmarshal(vb.config.Backend, &vb.cfg); err != nil {
 			return nil, fmt.Errorf("error decoding UDP backend config: %v", err)
 		}
 	}
@@ -106,27 +112,30 @@ func (vb *VXLANBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.Su
 		return nil, err
 	}
 
-	sn, err := vb.sm.AcquireLease(sa, vb.stop)
-	if err != nil {
-		if err == task.ErrCanceled {
-			return nil, err
-		} else {
-			return nil, fmt.Errorf("failed to acquire lease: %v", err)
-		}
+	l, err := vb.sm.AcquireLease(vb.ctx, vb.network, sa)
+	switch err {
+	case nil:
+		vb.lease = l
+
+	case context.Canceled, context.DeadlineExceeded:
+		return nil, err
+
+	default:
+		return nil, fmt.Errorf("failed to acquire lease: %v", err)
 	}
 
 	// vxlan's subnet is that of the whole overlay network (e.g. /16)
 	// and not that of the individual host (e.g. /24)
 	vxlanNet := ip.IP4Net{
-		IP:        sn.IP,
-		PrefixLen: vb.sm.GetConfig().Network.PrefixLen,
+		IP:        l.Subnet.IP,
+		PrefixLen: vb.config.Network.PrefixLen,
 	}
 	if err = vb.dev.Configure(vxlanNet); err != nil {
 		return nil, err
 	}
 
 	return &backend.SubnetDef{
-		Net: sn,
+		Net: l.Subnet,
 		MTU: vb.dev.MTU(),
 	}, nil
 }
@@ -134,7 +143,8 @@ func (vb *VXLANBackend) Init(extIface *net.Interface, extIP net.IP) (*backend.Su
 func (vb *VXLANBackend) Run() {
 	vb.wg.Add(1)
 	go func() {
-		vb.sm.LeaseRenewer(vb.stop)
+		subnet.LeaseRenewer(vb.ctx, vb.sm, vb.network, vb.lease)
+		log.Info("LeaseRenewer exited")
 		vb.wg.Done()
 	}()
 
@@ -145,10 +155,11 @@ func (vb *VXLANBackend) Run() {
 	go vb.dev.MonitorMisses(misses)
 
 	log.Info("Watching for new subnet leases")
-	evts := make(chan subnet.EventBatch)
+	evts := make(chan []subnet.Event)
 	vb.wg.Add(1)
 	go func() {
-		vb.sm.WatchLeases(evts, vb.stop)
+		subnet.WatchLeases(vb.ctx, vb.sm, vb.network, evts)
+		log.Info("WatchLeases exited")
 		vb.wg.Done()
 	}()
 
@@ -162,14 +173,14 @@ func (vb *VXLANBackend) Run() {
 		case evtBatch := <-evts:
 			vb.handleSubnetEvents(evtBatch)
 
-		case <-vb.stop:
+		case <-vb.ctx.Done():
 			return
 		}
 	}
 }
 
 func (vb *VXLANBackend) Stop() {
-	close(vb.stop)
+	vb.cancel()
 }
 
 func (vb *VXLANBackend) Name() string {
@@ -203,11 +214,11 @@ type vxlanLeaseAttrs struct {
 	VtepMAC hardwareAddr
 }
 
-func (vb *VXLANBackend) handleSubnetEvents(batch subnet.EventBatch) {
+func (vb *VXLANBackend) handleSubnetEvents(batch []subnet.Event) {
 	for _, evt := range batch {
 		switch evt.Type {
 		case subnet.SubnetAdded:
-			log.Info("Subnet added: ", evt.Lease.Network)
+			log.Info("Subnet added: ", evt.Lease.Subnet)
 
 			if evt.Lease.Attrs.BackendType != "vxlan" {
 				log.Warningf("Ignoring non-vxlan subnet: type=%v", evt.Lease.Attrs.BackendType)
@@ -220,12 +231,12 @@ func (vb *VXLANBackend) handleSubnetEvents(batch subnet.EventBatch) {
 				continue
 			}
 
-			vb.rts.set(evt.Lease.Network, evt.Lease.Attrs.PublicIP.ToIP(), net.HardwareAddr(attrs.VtepMAC))
+			vb.rts.set(evt.Lease.Subnet, evt.Lease.Attrs.PublicIP.ToIP(), net.HardwareAddr(attrs.VtepMAC))
 
 		case subnet.SubnetRemoved:
-			log.Info("Subnet removed: ", evt.Lease.Network)
+			log.Info("Subnet removed: ", evt.Lease.Subnet)
 
-			vb.rts.remove(evt.Lease.Network)
+			vb.rts.remove(evt.Lease.Subnet)
 
 			if evt.Lease.Attrs.BackendType != "vxlan" {
 				log.Warningf("Ignoring non-vxlan subnet: type=%v", evt.Lease.Attrs.BackendType)

+ 80 - 103
main.go

@@ -15,7 +15,6 @@
 package main
 
 import (
-	"encoding/json"
 	"flag"
 	"fmt"
 	"net"
@@ -23,20 +22,15 @@ import (
 	"os/signal"
 	"path/filepath"
 	"strings"
+	"sync"
 	"syscall"
-	"time"
 
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-systemd/daemon"
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
-
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/flannel/backend"
-	"github.com/coreos/flannel/backend/alloc"
-	"github.com/coreos/flannel/backend/awsvpc"
-	"github.com/coreos/flannel/backend/hostgw"
-	"github.com/coreos/flannel/backend/udp"
-	"github.com/coreos/flannel/backend/vxlan"
+	"github.com/coreos/flannel/network"
 	"github.com/coreos/flannel/pkg/ip"
-	"github.com/coreos/flannel/pkg/task"
 	"github.com/coreos/flannel/subnet"
 )
 
@@ -50,7 +44,9 @@ type CmdLineOpts struct {
 	version       bool
 	ipMasq        bool
 	subnetFile    string
+	subnetDir     string
 	iface         string
+	networks      string
 }
 
 var opts CmdLineOpts
@@ -61,8 +57,10 @@ func init() {
 	flag.StringVar(&opts.etcdKeyfile, "etcd-keyfile", "", "SSL key file used to secure etcd communication")
 	flag.StringVar(&opts.etcdCertfile, "etcd-certfile", "", "SSL certification file used to secure etcd communication")
 	flag.StringVar(&opts.etcdCAFile, "etcd-cafile", "", "SSL Certificate Authority file used to secure etcd communication")
-	flag.StringVar(&opts.subnetFile, "subnet-file", "/run/flannel/subnet.env", "filename where env variables (subnet and MTU values) will be written to")
+	flag.StringVar(&opts.subnetFile, "subnet-file", "/run/flannel/subnet.env", "filename where env variables (subnet, MTU, ... ) will be written to")
+	flag.StringVar(&opts.subnetDir, "subnet-dir", "/run/flannel/networks", "directory where files with env variables (subnet, MTU, ...) will be written to")
 	flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
+	flag.StringVar(&opts.networks, "networks", "", "run in multi-network mode and service the specified networks")
 	flag.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
 	flag.BoolVar(&opts.help, "help", false, "print this message")
 	flag.BoolVar(&opts.version, "version", false, "print version and exit")
@@ -90,12 +88,8 @@ func flagsFromEnv(prefix string, fs *flag.FlagSet) {
 	})
 }
 
-func writeSubnetFile(sn *backend.SubnetDef) error {
-	// Write out the first usable IP by incrementing
-	// sn.IP by one
-	sn.Net.IP += 1
-
-	dir, name := filepath.Split(opts.subnetFile)
+func writeSubnetFile(path string, sn *backend.SubnetDef) error {
+	dir, name := filepath.Split(path)
 	os.MkdirAll(dir, 0755)
 
 	tempFile := filepath.Join(dir, "."+name)
@@ -104,6 +98,10 @@ func writeSubnetFile(sn *backend.SubnetDef) error {
 		return err
 	}
 
+	// Write out the first usable IP by incrementing
+	// sn.IP by one
+	sn.Net.IP += 1
+
 	fmt.Fprintf(f, "FLANNEL_SUBNET=%s\n", sn.Net)
 	fmt.Fprintf(f, "FLANNEL_MTU=%d\n", sn.MTU)
 	_, err = fmt.Fprintf(f, "FLANNEL_IPMASQ=%v\n", opts.ipMasq)
@@ -114,7 +112,7 @@ func writeSubnetFile(sn *backend.SubnetDef) error {
 
 	// rename(2) the temporary file to the desired location so that it becomes
 	// atomically visible with the contents
-	return os.Rename(tempFile, opts.subnetFile)
+	return os.Rename(tempFile, path)
 }
 
 func lookupIface() (*net.Interface, net.IP, error) {
@@ -151,99 +149,69 @@ func lookupIface() (*net.Interface, net.IP, error) {
 	return iface, ipaddr, nil
 }
 
-func newSubnetManager() *subnet.SubnetManager {
-	peers := strings.Split(opts.etcdEndpoints, ",")
+func isMultiNetwork() bool {
+	return len(opts.networks) > 0
+}
 
+func newSubnetManager() (subnet.Manager, error) {
 	cfg := &subnet.EtcdConfig{
-		Endpoints: peers,
+		Endpoints: strings.Split(opts.etcdEndpoints, ","),
 		Keyfile:   opts.etcdKeyfile,
 		Certfile:  opts.etcdCertfile,
 		CAFile:    opts.etcdCAFile,
 		Prefix:    opts.etcdPrefix,
 	}
 
-	for {
-		sm, err := subnet.NewSubnetManager(cfg)
-		if err == nil {
-			return sm
-		}
-
-		log.Error("Failed to create SubnetManager: ", err)
-		time.Sleep(time.Second)
-	}
+	return subnet.NewEtcdManager(cfg)
 }
 
-func newBackend(sm *subnet.SubnetManager) (backend.Backend, error) {
-	config := sm.GetConfig()
-
-	var bt struct {
-		Type string
-	}
-
-	if len(config.Backend) == 0 {
-		bt.Type = "udp"
-	} else {
-		if err := json.Unmarshal(config.Backend, &bt); err != nil {
-			return nil, fmt.Errorf("Error decoding Backend property of config: %v", err)
-		}
-	}
-
-	switch strings.ToLower(bt.Type) {
-	case "udp":
-		return udp.New(sm, config.Backend), nil
-	case "alloc":
-		return alloc.New(sm), nil
-	case "host-gw":
-		return hostgw.New(sm), nil
-	case "vxlan":
-		return vxlan.New(sm, config.Backend), nil
-	case "aws-vpc":
-		return awsvpc.New(sm, config.Backend), nil
-	default:
-		return nil, fmt.Errorf("'%v': unknown backend type", bt.Type)
-	}
-}
-
-func run(sm *subnet.SubnetManager, be backend.Backend, exit chan int) {
-	var err error
-	defer func() {
-		if err == nil || err == task.ErrCanceled {
-			exit <- 0
-		} else {
-			log.Error(err)
-			exit <- 1
-		}
-	}()
-
+func initAndRun(ctx context.Context, sm subnet.Manager, netnames []string) {
 	iface, ipaddr, err := lookupIface()
 	if err != nil {
+		log.Error(err)
 		return
 	}
 
 	if iface.MTU == 0 {
-		err = fmt.Errorf("Failed to determine MTU for %s interface", ipaddr)
+		log.Errorf("Failed to determine MTU for %s interface", ipaddr)
 		return
 	}
 
 	log.Infof("Using %s as external interface", ipaddr)
 
-	sn, err := be.Init(iface, ipaddr)
-	if err != nil {
-		return
+	nets := []*network.Network{}
+	for _, n := range netnames {
+		nets = append(nets, network.New(sm, n, opts.ipMasq))
 	}
 
-	if opts.ipMasq {
-		flannelNet := sm.GetConfig().Network
-		if err = setupIPMasq(flannelNet); err != nil {
-			return
-		}
+	wg := sync.WaitGroup{}
+
+	for _, n := range nets {
+		go func(n *network.Network) {
+			wg.Add(1)
+			defer wg.Done()
+
+			sn := n.Init(ctx, iface, ipaddr)
+			if sn != nil {
+				if isMultiNetwork() {
+					path := filepath.Join(opts.subnetDir, n.Name) + ".env"
+					if err := writeSubnetFile(path, sn); err != nil {
+						return
+					}
+				} else {
+					if err := writeSubnetFile(opts.subnetFile, sn); err != nil {
+						return
+					}
+					daemon.SdNotify("READY=1")
+				}
+
+				n.Run(ctx)
+				log.Infof("%v exited", n.Name)
+			}
+		}(n)
 	}
 
-	writeSubnetFile(sn)
-	daemon.SdNotify("READY=1")
-
-	log.Infof("%s mode initialized", be.Name())
-	be.Run()
+	wg.Wait()
 }
 
 func main() {
@@ -267,33 +235,42 @@ func main() {
 
 	flagsFromEnv("FLANNELD", flag.CommandLine)
 
-	sm := newSubnetManager()
-	be, err := newBackend(sm)
+	sm, err := newSubnetManager()
 	if err != nil {
-		log.Info(err)
+		log.Error("Failed to create SubnetManager: ", err)
 		os.Exit(1)
 	}
 
-	// Register for SIGINT and SIGTERM and wait for one of them to arrive
+	var runFunc func(ctx context.Context)
+
+	networks := strings.Split(opts.networks, ",")
+	if len(networks) == 0 {
+		networks = append(networks, "")
+	}
+	runFunc = func(ctx context.Context) {
+		initAndRun(ctx, sm, networks)
+	}
+
+	// Register for SIGINT and SIGTERM
 	log.Info("Installing signal handlers")
 	sigs := make(chan os.Signal, 1)
 	signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
 
-	exit := make(chan int)
-	go run(sm, be, exit)
+	ctx, cancel := context.WithCancel(context.Background())
 
-	for {
-		select {
-		case <-sigs:
-			// unregister to get default OS nuke behaviour in case we don't exit cleanly
-			signal.Stop(sigs)
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	go func() {
+		runFunc(ctx)
+		wg.Done()
+	}()
 
-			log.Info("Exiting...")
-			be.Stop()
+	<-sigs
+	// unregister to get default OS nuke behaviour in case we don't exit cleanly
+	signal.Stop(sigs)
 
-		case code := <-exit:
-			log.Infof("%s mode exited", be.Name())
-			os.Exit(code)
-		}
-	}
+	log.Info("Exiting...")
+	cancel()
+
+	wg.Wait()
 }

+ 44 - 0
network/backend.go

@@ -0,0 +1,44 @@
+package network
+
+import (
+	"encoding/json"
+	"fmt"
+	"strings"
+
+	"github.com/coreos/flannel/backend"
+	"github.com/coreos/flannel/backend/alloc"
+	"github.com/coreos/flannel/backend/awsvpc"
+	"github.com/coreos/flannel/backend/hostgw"
+	"github.com/coreos/flannel/backend/udp"
+	"github.com/coreos/flannel/backend/vxlan"
+	"github.com/coreos/flannel/subnet"
+)
+
+func newBackend(sm subnet.Manager, network string, config *subnet.Config) (backend.Backend, error) {
+	var bt struct {
+		Type string
+	}
+
+	if len(config.Backend) == 0 {
+		bt.Type = "udp"
+	} else {
+		if err := json.Unmarshal(config.Backend, &bt); err != nil {
+			return nil, fmt.Errorf("Error decoding Backend property of config: %v", err)
+		}
+	}
+
+	switch strings.ToLower(bt.Type) {
+	case "udp":
+		return udp.New(sm, network, config), nil
+	case "alloc":
+		return alloc.New(sm, network), nil
+	case "host-gw":
+		return hostgw.New(sm, network), nil
+	case "vxlan":
+		return vxlan.New(sm, network, config), nil
+	case "aws-vpc":
+		return awsvpc.New(sm, network, config), nil
+	default:
+		return nil, fmt.Errorf("%v: '%v': unknown backend type", network, bt.Type)
+	}
+}

+ 1 - 1
ipmasq.go → network/ipmasq.go

@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package main
+package network
 
 import (
 	"fmt"

+ 117 - 0
network/network.go

@@ -0,0 +1,117 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package network
+
+import (
+	"net"
+	"sync"
+	"time"
+
+	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/flannel/backend"
+	"github.com/coreos/flannel/subnet"
+)
+
+type Network struct {
+	Name string
+
+	sm     subnet.Manager
+	ipMasq bool
+	be     backend.Backend
+}
+
+func New(sm subnet.Manager, name string, ipMasq bool) *Network {
+	return &Network{
+		Name:   name,
+		sm:     sm,
+		ipMasq: ipMasq,
+	}
+}
+
+func (n *Network) Init(ctx context.Context, iface *net.Interface, ipaddr net.IP) *backend.SubnetDef {
+	var cfg *subnet.Config
+	var be backend.Backend
+	var sn *backend.SubnetDef
+
+	steps := []func() error{
+		func() (err error) {
+			cfg, err = n.sm.GetNetworkConfig(ctx, n.Name)
+			if err != nil {
+				log.Error("Failed to retrieve network config: ", err)
+			}
+			return
+		},
+
+		func() (err error) {
+			be, err = newBackend(n.sm, n.Name, cfg)
+			if err != nil {
+				log.Error("Failed to create backend: ", err)
+			} else {
+				n.be = be
+			}
+			return
+		},
+
+		func() (err error) {
+			sn, err = be.Init(iface, ipaddr)
+			if err != nil {
+				log.Errorf("Failed to initialize network %v (type %v): %v", n.Name, be.Name(), err)
+			}
+			return
+		},
+
+		func() (err error) {
+			if n.ipMasq {
+				flannelNet := cfg.Network
+				if err = setupIPMasq(flannelNet); err != nil {
+					log.Errorf("Failed to set up IP Masquerade for network %v: %v", n.Name, err)
+				}
+			}
+			return
+		},
+	}
+
+	for _, s := range steps {
+		for ; ; time.Sleep(time.Second) {
+			select {
+			case <-ctx.Done():
+				return nil
+			default:
+			}
+
+			err := s()
+			if err == nil {
+				break
+			}
+		}
+	}
+
+	return sn
+}
+
+func (n *Network) Run(ctx context.Context) {
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	go func() {
+		n.be.Run()
+		wg.Done()
+	}()
+
+	<-ctx.Done()
+	n.be.Stop()
+
+	wg.Wait()
+}

+ 0 - 19
pkg/task/errors.go

@@ -1,19 +0,0 @@
-// Copyright 2015 CoreOS, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package task
-
-import "errors"
-
-var ErrCanceled = errors.New("Task canceled")

+ 1 - 1
subnet/config.go

@@ -26,7 +26,7 @@ type Config struct {
 	SubnetMin ip.IP4
 	SubnetMax ip.IP4
 	SubnetLen uint
-	Backend   json.RawMessage
+	Backend   json.RawMessage `json:",omitempty"`
 }
 
 func ParseConfig(s string) (*Config, error) {

+ 365 - 0
subnet/etcd.go

@@ -0,0 +1,365 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package subnet
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"net"
+	"regexp"
+	"strconv"
+	"time"
+
+	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
+	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+	"github.com/coreos/flannel/pkg/ip"
+)
+
+const (
+	registerRetries = 10
+	subnetTTL       = 24 * 3600
+)
+
+// etcd error codes
+const (
+	etcdKeyNotFound       = 100
+	etcdKeyAlreadyExists  = 105
+	etcdEventIndexCleared = 401
+)
+
+type EtcdManager struct {
+	registry Registry
+}
+
+var (
+	subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`)
+)
+
+func NewEtcdManager(config *EtcdConfig) (Manager, error) {
+	r, err := newEtcdSubnetRegistry(config)
+	if err != nil {
+		return nil, err
+	}
+	return &EtcdManager{r}, nil
+}
+
+func newEtcdManager(r Registry) Manager {
+	return &EtcdManager{r}
+}
+
+func (m *EtcdManager) GetNetworkConfig(ctx context.Context, network string) (*Config, error) {
+	cfgResp, err := m.registry.getConfig(ctx, network)
+	if err != nil {
+		return nil, err
+	}
+
+	return ParseConfig(cfgResp.Node.Value)
+}
+
+func (m *EtcdManager) AcquireLease(ctx context.Context, network string, attrs *LeaseAttrs) (*Lease, error) {
+	config, err := m.GetNetworkConfig(ctx, network)
+	if err != nil {
+		return nil, err
+	}
+
+	for {
+		l, err := m.acquireLeaseOnce(ctx, network, config, attrs)
+		switch {
+		case err == nil:
+			log.Info("Subnet lease acquired: ", l.Subnet)
+			return l, nil
+
+		case err == context.Canceled, err == context.DeadlineExceeded:
+			return nil, err
+
+		default:
+			log.Error("Failed to acquire subnet: ", err)
+		}
+
+		select {
+		case <-time.After(time.Second):
+
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		}
+	}
+}
+
+func findLeaseByIP(leases []Lease, pubIP ip.IP4) *Lease {
+	for _, l := range leases {
+		if pubIP == l.Attrs.PublicIP {
+			return &l
+		}
+	}
+
+	return nil
+}
+
+func (m *EtcdManager) tryAcquireLease(ctx context.Context, network string, config *Config, extIP ip.IP4, attrs *LeaseAttrs) (*Lease, error) {
+	var err error
+	leases, err := m.getLeases(ctx, network)
+	if err != nil {
+		return nil, err
+	}
+
+	attrBytes, err := json.Marshal(attrs)
+	if err != nil {
+		return nil, err
+	}
+
+	// try to reuse a subnet if there's one that matches our IP
+	if l := findLeaseByIP(leases, extIP); l != nil {
+		resp, err := m.registry.updateSubnet(ctx, network, l.Key(), string(attrBytes), subnetTTL)
+		if err != nil {
+			return nil, err
+		}
+
+		l.Attrs = attrs
+		l.Expiration = *resp.Node.Expiration
+		return l, nil
+	}
+
+	// no existing match, grab a new one
+	sn, err := m.allocateSubnet(config, leases)
+	if err != nil {
+		return nil, err
+	}
+
+	resp, err := m.registry.createSubnet(ctx, network, sn.StringSep(".", "-"), string(attrBytes), subnetTTL)
+	switch {
+	case err == nil:
+		return &Lease{
+			Subnet:     sn,
+			Attrs:      attrs,
+			Expiration: *resp.Node.Expiration,
+		}, nil
+
+	// if etcd returned Key Already Exists, try again.
+	case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists:
+		return nil, nil
+
+	default:
+		return nil, err
+	}
+}
+
+func (m *EtcdManager) acquireLeaseOnce(ctx context.Context, network string, config *Config, attrs *LeaseAttrs) (*Lease, error) {
+	for i := 0; i < registerRetries; i++ {
+		l, err := m.tryAcquireLease(ctx, network, config, attrs.PublicIP, attrs)
+		switch {
+		case err != nil:
+			return nil, err
+		case l != nil:
+			return l, nil
+		}
+
+		// before moving on, check for cancel
+		// TODO(eyakubovich): propogate ctx deeper into registry
+		select {
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		default:
+		}
+	}
+
+	return nil, errors.New("Max retries reached trying to acquire a subnet")
+}
+
+func parseSubnetKey(s string) (ip.IP4Net, error) {
+	if parts := subnetRegex.FindStringSubmatch(s); len(parts) == 3 {
+		snIp := net.ParseIP(parts[1]).To4()
+		prefixLen, err := strconv.ParseUint(parts[2], 10, 5)
+		if snIp != nil && err == nil {
+			return ip.IP4Net{IP: ip.FromIP(snIp), PrefixLen: uint(prefixLen)}, nil
+		}
+	}
+
+	return ip.IP4Net{}, errors.New("Error parsing IP Subnet")
+}
+
+func (m *EtcdManager) allocateSubnet(config *Config, leases []Lease) (ip.IP4Net, error) {
+	log.Infof("Picking subnet in range %s ... %s", config.SubnetMin, config.SubnetMax)
+
+	var bag []ip.IP4
+	sn := ip.IP4Net{IP: config.SubnetMin, PrefixLen: config.SubnetLen}
+
+OuterLoop:
+	for ; sn.IP <= config.SubnetMax && len(bag) < 100; sn = sn.Next() {
+		for _, l := range leases {
+			if sn.Overlaps(l.Subnet) {
+				continue OuterLoop
+			}
+		}
+		bag = append(bag, sn.IP)
+	}
+
+	if len(bag) == 0 {
+		return ip.IP4Net{}, errors.New("out of subnets")
+	} else {
+		i := randInt(0, len(bag))
+		return ip.IP4Net{IP: bag[i], PrefixLen: config.SubnetLen}, nil
+	}
+}
+
+func (m *EtcdManager) getLeases(ctx context.Context, network string) ([]Lease, error) {
+	resp, err := m.registry.getSubnets(ctx, network)
+
+	leases := []Lease{}
+
+	switch {
+	case err == nil:
+		for _, node := range resp.Node.Nodes {
+			sn, err := parseSubnetKey(node.Key)
+			if err == nil {
+				attrs := &LeaseAttrs{}
+				if err = json.Unmarshal([]byte(node.Value), attrs); err == nil {
+					exp := time.Time{}
+					if resp.Node.Expiration != nil {
+						exp = *resp.Node.Expiration
+					}
+
+					lease := Lease{
+						Subnet:     sn,
+						Attrs:      attrs,
+						Expiration: exp,
+					}
+					leases = append(leases, lease)
+				}
+			}
+		}
+
+	case err.(*etcd.EtcdError).ErrorCode == etcdKeyNotFound:
+		// key not found: treat it as empty set
+
+	default:
+		return nil, err
+	}
+
+	return leases, nil
+}
+
+func (m *EtcdManager) RenewLease(ctx context.Context, network string, lease *Lease) error {
+	attrBytes, err := json.Marshal(lease.Attrs)
+	if err != nil {
+		return err
+	}
+
+	// TODO(eyakubovich): propogate ctx into registry
+	resp, err := m.registry.updateSubnet(ctx, network, lease.Key(), string(attrBytes), subnetTTL)
+	if err != nil {
+		return err
+	}
+
+	lease.Expiration = *resp.Node.Expiration
+	return nil
+}
+
+func (m *EtcdManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (WatchResult, error) {
+	nextIndex := uint64(0)
+	if cursor != nil {
+		nextIndex = cursor.(uint64)
+	}
+
+	resp, err := m.registry.watchSubnets(ctx, network, nextIndex)
+
+	switch {
+	case err == nil:
+		return parseSubnetWatchResponse(resp)
+
+	case isIndexTooSmall(err):
+		log.Warning("Watch of subnet leases failed because etcd index outside history window")
+		return m.watchReset(ctx, network)
+
+	default:
+		return WatchResult{}, err
+	}
+}
+
+func isIndexTooSmall(err error) bool {
+	etcdErr, ok := err.(*etcd.EtcdError)
+	return ok && etcdErr.ErrorCode == etcdEventIndexCleared
+}
+
+func parseSubnetWatchResponse(resp *etcd.Response) (WatchResult, error) {
+	sn, err := parseSubnetKey(resp.Node.Key)
+	if err != nil {
+		return WatchResult{}, fmt.Errorf("error parsing subnet IP: %s", resp.Node.Key)
+	}
+
+	evt := Event{}
+
+	switch resp.Action {
+	case "delete", "expire":
+		evt = Event{
+			SubnetRemoved,
+			Lease{Subnet: sn},
+		}
+
+	default:
+		attrs := &LeaseAttrs{}
+		err := json.Unmarshal([]byte(resp.Node.Value), attrs)
+		if err != nil {
+			return WatchResult{}, err
+		}
+
+		exp := time.Time{}
+		if resp.Node.Expiration != nil {
+			exp = *resp.Node.Expiration
+		}
+
+		evt = Event{
+			SubnetAdded,
+			Lease{
+				Subnet:     sn,
+				Attrs:      attrs,
+				Expiration: exp,
+			},
+		}
+	}
+
+	cursor := resp.Node.ModifiedIndex + 1
+
+	return WatchResult{
+		Cursor: cursor,
+		Events: []Event{evt},
+	}, nil
+}
+
+func (m *EtcdManager) watchReset(ctx context.Context, network string) (WatchResult, error) {
+	wr := WatchResult{}
+
+	leases, err := m.getLeases(ctx, network)
+	if err != nil {
+		return wr, fmt.Errorf("failed to retrieve subnet leases: %v", err)
+	}
+
+	for _, l := range leases {
+		e := Event{SubnetAdded, l}
+		wr.Events = append(wr.Events, e)
+	}
+
+	return wr, nil
+}
+
+func interrupted(cancel chan bool) bool {
+	select {
+	case <-cancel:
+		return true
+	default:
+		return false
+	}
+}

+ 50 - 32
subnet/registry.go

@@ -24,14 +24,15 @@ import (
 
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 )
 
-type subnetRegistry interface {
-	getConfig() (*etcd.Response, error)
-	getSubnets() (*etcd.Response, error)
-	createSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
-	updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
-	watchSubnets(since uint64, stop chan bool) (*etcd.Response, error)
+type Registry interface {
+	getConfig(ctx context.Context, network string) (*etcd.Response, error)
+	getSubnets(ctx context.Context, network string) (*etcd.Response, error)
+	createSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error)
+	updateSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error)
+	watchSubnets(ctx context.Context, network string, since uint64) (*etcd.Response, error)
 }
 
 type EtcdConfig struct {
@@ -60,7 +61,7 @@ func newEtcdClient(c *EtcdConfig) (*etcd.Client, error) {
 	}
 }
 
-func newEtcdSubnetRegistry(config *EtcdConfig) (subnetRegistry, error) {
+func newEtcdSubnetRegistry(config *EtcdConfig) (Registry, error) {
 	r := &etcdSubnetRegistry{
 		etcdCfg: config,
 	}
@@ -74,8 +75,8 @@ func newEtcdSubnetRegistry(config *EtcdConfig) (subnetRegistry, error) {
 	return r, nil
 }
 
-func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
-	key := path.Join(esr.etcdCfg.Prefix, "config")
+func (esr *etcdSubnetRegistry) getConfig(ctx context.Context, network string) (*etcd.Response, error) {
+	key := path.Join(esr.etcdCfg.Prefix, network, "config")
 	resp, err := esr.client().Get(key, false, false)
 	if err != nil {
 		return nil, err
@@ -83,13 +84,13 @@ func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
 	return resp, nil
 }
 
-func (esr *etcdSubnetRegistry) getSubnets() (*etcd.Response, error) {
-	key := path.Join(esr.etcdCfg.Prefix, "subnets")
+func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context, network string) (*etcd.Response, error) {
+	key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
 	return esr.client().Get(key, false, true)
 }
 
-func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
-	key := path.Join(esr.etcdCfg.Prefix, "subnets", sn)
+func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error) {
+	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn)
 	resp, err := esr.client().Create(key, data, ttl)
 	if err != nil {
 		return nil, err
@@ -99,8 +100,8 @@ func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.
 	return resp, nil
 }
 
-func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
-	key := path.Join(esr.etcdCfg.Prefix, "subnets", sn)
+func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error) {
+	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn)
 	resp, err := esr.client().Set(key, data, ttl)
 	if err != nil {
 		return nil, err
@@ -110,27 +111,44 @@ func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.
 	return resp, nil
 }
 
-func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
-	for {
-		key := path.Join(esr.etcdCfg.Prefix, "subnets")
-		resp, err := esr.client().RawWatch(key, since, true, nil, stop)
+type watchResp struct {
+	resp *etcd.Response
+	err  error
+}
+
+func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (*etcd.Response, error) {
+	stop := make(chan bool)
+	respCh := make(chan watchResp)
 
-		if err != nil {
-			if err == etcd.ErrWatchStoppedByUser {
-				return nil, nil
-			} else {
-				return nil, err
+	go func() {
+		for {
+			key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
+			rresp, err := esr.client().RawWatch(key, since, true, nil, stop)
+
+			if err != nil {
+				respCh <- watchResp{nil, err}
+				return
 			}
-		}
 
-		if len(resp.Body) == 0 {
-			// etcd timed out, go back but recreate the client as the underlying
-			// http transport gets hosed (http://code.google.com/p/go/issues/detail?id=8648)
-			esr.resetClient()
-			continue
-		}
+			if len(rresp.Body) == 0 {
+				// etcd timed out, go back but recreate the client as the underlying
+				// http transport gets hosed (http://code.google.com/p/go/issues/detail?id=8648)
+				esr.resetClient()
+				continue
+			}
 
-		return resp.Unmarshal()
+			resp, err := rresp.Unmarshal()
+			respCh <- watchResp{resp, err}
+		}
+	}()
+
+	select {
+	case <-ctx.Done():
+		close(stop)
+		<-respCh // Wait for f to return.
+		return nil, ctx.Err()
+	case wr := <-respCh:
+		return wr.resp, wr.err
 	}
 }
 

+ 48 - 0
subnet/renew.go

@@ -0,0 +1,48 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package subnet
+
+import (
+	"time"
+
+	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+const (
+	renewMargin = time.Hour
+)
+
+func LeaseRenewer(ctx context.Context, m Manager, network string, lease *Lease) {
+	dur := lease.Expiration.Sub(time.Now()) - renewMargin
+
+	for {
+		select {
+		case <-time.After(dur):
+			err := m.RenewLease(ctx, network, lease)
+			if err != nil {
+				log.Error("Error renewing lease (trying again in 1 min): ", err)
+				dur = time.Minute
+				continue
+			}
+
+			log.Info("Lease renewed, new expiration: ", lease.Expiration)
+			dur = lease.Expiration.Sub(time.Now()) - renewMargin
+
+		case <-ctx.Done():
+			return
+		}
+	}
+}

+ 47 - 425
subnet/subnet.go

@@ -17,39 +17,11 @@ package subnet
 import (
 	"encoding/json"
 	"errors"
-	"fmt"
-	"net"
-	"regexp"
-	"strconv"
 	"time"
 
-	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
-	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 
 	"github.com/coreos/flannel/pkg/ip"
-	"github.com/coreos/flannel/pkg/task"
-)
-
-const (
-	registerRetries = 10
-	subnetTTL       = 24 * 3600
-	renewMargin     = time.Hour
-)
-
-// etcd error codes
-const (
-	etcdKeyNotFound       = 100
-	etcdKeyAlreadyExists  = 105
-	etcdEventIndexCleared = 401
-)
-
-const (
-	SubnetAdded = iota
-	SubnetRemoved
-)
-
-var (
-	subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`)
 )
 
 type LeaseAttrs struct {
@@ -58,420 +30,70 @@ type LeaseAttrs struct {
 	BackendData json.RawMessage `json:",omitempty"`
 }
 
-type SubnetLease struct {
-	Network ip.IP4Net
-	Attrs   LeaseAttrs
-}
-
-type SubnetManager struct {
-	registry  subnetRegistry
-	config    *Config
-	myLease   SubnetLease
-	leaseExp  time.Time
-	lastIndex uint64
-	leases    []SubnetLease
-}
-
-type EventType int
-
-type Event struct {
-	Type  EventType
-	Lease SubnetLease
-}
-
-type EventBatch []Event
-
-func NewSubnetManager(config *EtcdConfig) (*SubnetManager, error) {
-	esr, err := newEtcdSubnetRegistry(config)
-	if err != nil {
-		return nil, err
-	}
-	return newSubnetManager(esr)
-}
-
-func (sm *SubnetManager) AcquireLease(attrs *LeaseAttrs, cancel chan bool) (ip.IP4Net, error) {
-	for {
-		sn, err := sm.acquireLeaseOnce(attrs, cancel)
-		switch {
-		case err == nil:
-			log.Info("Subnet lease acquired: ", sn)
-			return sn, nil
-
-		case err == task.ErrCanceled:
-			return ip.IP4Net{}, err
-
-		default:
-			log.Error("Failed to acquire subnet: ", err)
-		}
-
-		select {
-		case <-time.After(time.Second):
-
-		case <-cancel:
-			return ip.IP4Net{}, task.ErrCanceled
-		}
-	}
-}
-
-func findLeaseByIP(leases []SubnetLease, pubIP ip.IP4) *SubnetLease {
-	for _, l := range leases {
-		if pubIP == l.Attrs.PublicIP {
-			return &l
-		}
-	}
-
-	return nil
-}
-
-func (sm *SubnetManager) tryAcquireLease(extIP ip.IP4, attrs *LeaseAttrs) (ip.IP4Net, error) {
-	var err error
-	sm.leases, err = sm.getLeases()
-	if err != nil {
-		return ip.IP4Net{}, err
-	}
-
-	attrBytes, err := json.Marshal(attrs)
-	if err != nil {
-		log.Errorf("marshal failed: %#v, %v", attrs, err)
-		return ip.IP4Net{}, err
-	}
-
-	// try to reuse a subnet if there's one that matches our IP
-	if l := findLeaseByIP(sm.leases, extIP); l != nil {
-		resp, err := sm.registry.updateSubnet(l.Network.StringSep(".", "-"), string(attrBytes), subnetTTL)
-		if err != nil {
-			return ip.IP4Net{}, err
-		}
-
-		sm.myLease.Network = l.Network
-		sm.myLease.Attrs = *attrs
-		sm.leaseExp = *resp.Node.Expiration
-		return l.Network, nil
-	}
-
-	// no existing match, grab a new one
-	sn, err := sm.allocateSubnet()
-	if err != nil {
-		return ip.IP4Net{}, err
-	}
-
-	resp, err := sm.registry.createSubnet(sn.StringSep(".", "-"), string(attrBytes), subnetTTL)
-	switch {
-	case err == nil:
-		sm.myLease.Network = sn
-		sm.myLease.Attrs = *attrs
-		sm.leaseExp = *resp.Node.Expiration
-		return sn, nil
-
-	// if etcd returned Key Already Exists, try again.
-	case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists:
-		return ip.IP4Net{}, nil
-
-	default:
-		return ip.IP4Net{}, err
-	}
+type Lease struct {
+	Subnet     ip.IP4Net
+	Attrs      *LeaseAttrs
+	Expiration time.Time
 }
 
-func (sm *SubnetManager) acquireLeaseOnce(attrs *LeaseAttrs, cancel chan bool) (ip.IP4Net, error) {
-	for i := 0; i < registerRetries; i++ {
-		sn, err := sm.tryAcquireLease(attrs.PublicIP, attrs)
-		switch {
-		case err != nil:
-			return ip.IP4Net{}, err
-		case sn.IP != 0:
-			return sn, nil
-		}
-
-		// before moving on, check for cancel
-		if interrupted(cancel) {
-			return ip.IP4Net{}, task.ErrCanceled
-		}
-	}
-
-	return ip.IP4Net{}, errors.New("Max retries reached trying to acquire a subnet")
+func (l *Lease) Key() string {
+	return l.Subnet.StringSep(".", "-")
 }
 
-func (sm *SubnetManager) GetConfig() *Config {
-	return sm.config
-}
+type (
+	EventType int
 
-/// Implementation
-func parseSubnetKey(s string) (ip.IP4Net, error) {
-	if parts := subnetRegex.FindStringSubmatch(s); len(parts) == 3 {
-		snIp := net.ParseIP(parts[1]).To4()
-		prefixLen, err := strconv.ParseUint(parts[2], 10, 5)
-		if snIp != nil && err == nil {
-			return ip.IP4Net{IP: ip.FromIP(snIp), PrefixLen: uint(prefixLen)}, nil
-		}
-	}
-
-	return ip.IP4Net{}, errors.New("Error parsing IP Subnet")
-}
-
-func newSubnetManager(r subnetRegistry) (*SubnetManager, error) {
-	cfgResp, err := r.getConfig()
-	if err != nil {
-		return nil, err
-	}
-
-	cfg, err := ParseConfig(cfgResp.Node.Value)
-	if err != nil {
-		return nil, err
+	Event struct {
+		Type  EventType `json:"type"`
+		Lease Lease     `json:"lease"`
 	}
+)
 
-	sm := SubnetManager{
-		registry: r,
-		config:   cfg,
-	}
+const (
+	SubnetAdded EventType = iota
+	SubnetRemoved
+)
 
-	return &sm, nil
+type WatchResult struct {
+	// Either Events or Leases should be set.
+	// If Leases are not empty, it means the cursor
+	// was out of range and Snapshot contains the current
+	// list of leases
+	Events   []Event     `json:"events"`
+	Snapshot []Lease     `json:"snapshot"`
+	Cursor   interface{} `json:"cursor"`
 }
 
-func (sm *SubnetManager) getLeases() ([]SubnetLease, error) {
-	resp, err := sm.registry.getSubnets()
-
-	var leases []SubnetLease
-	switch {
-	case err == nil:
-		for _, node := range resp.Node.Nodes {
-			sn, err := parseSubnetKey(node.Key)
-			if err == nil {
-				var attrs LeaseAttrs
-				if err = json.Unmarshal([]byte(node.Value), &attrs); err == nil {
-					lease := SubnetLease{sn, attrs}
-					leases = append(leases, lease)
-				}
-			}
-		}
-		sm.lastIndex = resp.EtcdIndex
-
-	case err.(*etcd.EtcdError).ErrorCode == etcdKeyNotFound:
-		// key not found: treat it as empty set
-		sm.lastIndex = err.(*etcd.EtcdError).Index
+func (et EventType) MarshalJSON() ([]byte, error) {
+	s := ""
 
+	switch et {
+	case SubnetAdded:
+		s = "added"
+	case SubnetRemoved:
+		s = "removed"
 	default:
-		return nil, err
-	}
-
-	return leases, nil
-}
-
-func deleteLease(l []SubnetLease, i int) []SubnetLease {
-	l[i], l = l[len(l)-1], l[:len(l)-1]
-	return l
-}
-
-func (sm *SubnetManager) applyLeases(newLeases []SubnetLease) EventBatch {
-	var batch EventBatch
-
-	for _, l := range newLeases {
-		// skip self
-		if l.Network.Equal(sm.myLease.Network) {
-			continue
-		}
-
-		found := false
-		for i, c := range sm.leases {
-			if c.Network.Equal(l.Network) {
-				sm.leases = deleteLease(sm.leases, i)
-				found = true
-				break
-			}
-		}
-
-		if !found {
-			// new subnet
-			batch = append(batch, Event{SubnetAdded, l})
-		}
-	}
-
-	// everything left in sm.leases has been deleted
-	for _, c := range sm.leases {
-		batch = append(batch, Event{SubnetRemoved, c})
+		return nil, errors.New("bad event type")
 	}
-
-	sm.leases = newLeases
-
-	return batch
+	return json.Marshal(s)
 }
 
-func (sm *SubnetManager) applySubnetChange(action string, ipn ip.IP4Net, data string) (Event, error) {
-	switch action {
-	case "delete", "expire":
-		for i, l := range sm.leases {
-			if l.Network.Equal(ipn) {
-				deleteLease(sm.leases, i)
-				return Event{SubnetRemoved, l}, nil
-			}
-		}
-
-		log.Errorf("Removed subnet (%s) was not found", ipn)
-		return Event{
-			SubnetRemoved,
-			SubnetLease{ipn, LeaseAttrs{}},
-		}, nil
-
+func (et *EventType) UnmarshalJSON(data []byte) error {
+	switch string(data) {
+	case "added":
+		*et = SubnetAdded
+	case "removed":
+		*et = SubnetRemoved
 	default:
-		var attrs LeaseAttrs
-		err := json.Unmarshal([]byte(data), &attrs)
-		if err != nil {
-			return Event{}, err
-		}
-
-		for i, l := range sm.leases {
-			if l.Network.Equal(ipn) {
-				sm.leases[i] = SubnetLease{ipn, attrs}
-				return Event{SubnetAdded, sm.leases[i]}, nil
-			}
-		}
-
-		sm.leases = append(sm.leases, SubnetLease{ipn, attrs})
-		return Event{SubnetAdded, sm.leases[len(sm.leases)-1]}, nil
-	}
-}
-
-func (sm *SubnetManager) allocateSubnet() (ip.IP4Net, error) {
-	log.Infof("Picking subnet in range %s ... %s", sm.config.SubnetMin, sm.config.SubnetMax)
-
-	var bag []ip.IP4
-	sn := ip.IP4Net{IP: sm.config.SubnetMin, PrefixLen: sm.config.SubnetLen}
-
-OuterLoop:
-	for ; sn.IP <= sm.config.SubnetMax && len(bag) < 100; sn = sn.Next() {
-		for _, l := range sm.leases {
-			if sn.Overlaps(l.Network) {
-				continue OuterLoop
-			}
-		}
-		bag = append(bag, sn.IP)
-	}
-
-	if len(bag) == 0 {
-		return ip.IP4Net{}, errors.New("out of subnets")
-	} else {
-		i := randInt(0, len(bag))
-		return ip.IP4Net{IP: bag[i], PrefixLen: sm.config.SubnetLen}, nil
-	}
-}
-
-func (sm *SubnetManager) WatchLeases(receiver chan EventBatch, cancel chan bool) {
-	// "catch up" by replaying all the leases we discovered during
-	// AcquireLease
-	var batch EventBatch
-	for _, l := range sm.leases {
-		if !sm.myLease.Network.Equal(l.Network) {
-			batch = append(batch, Event{SubnetAdded, l})
-		}
-	}
-	if len(batch) > 0 {
-		receiver <- batch
-	}
-
-	for {
-		resp, err := sm.registry.watchSubnets(sm.lastIndex+1, cancel)
-
-		// watchSubnets exited by cancel chan being signaled
-		if err == nil && resp == nil {
-			return
-		}
-
-		var batch *EventBatch
-		if err == nil {
-			batch, err = sm.parseSubnetWatchResponse(resp)
-		} else {
-			batch, err = sm.parseSubnetWatchError(err)
-		}
-
-		if err != nil {
-			log.Errorf("%v", err)
-			time.Sleep(time.Second)
-			continue
-		}
-
-		if batch != nil {
-			receiver <- *batch
-		}
-	}
-}
-
-func (sm *SubnetManager) parseSubnetWatchResponse(resp *etcd.Response) (batch *EventBatch, err error) {
-	sm.lastIndex = resp.Node.ModifiedIndex
-
-	sn, err := parseSubnetKey(resp.Node.Key)
-	if err != nil {
-		err = fmt.Errorf("Error parsing subnet IP: %s", resp.Node.Key)
-		return
-	}
-
-	// Don't process our own changes
-	if !sm.myLease.Network.Equal(sn) {
-		evt, err := sm.applySubnetChange(resp.Action, sn, resp.Node.Value)
-		if err != nil {
-			return nil, err
-		}
-		batch = &EventBatch{evt}
+		return errors.New("bad event type")
 	}
 
-	return
-}
-
-func (sm *SubnetManager) parseSubnetWatchError(err error) (batch *EventBatch, out error) {
-	etcdErr, ok := err.(*etcd.EtcdError)
-	if ok && etcdErr.ErrorCode == etcdEventIndexCleared {
-		// etcd maintains a history window for events and it's possible to fall behind.
-		// to recover, get the current state and then "diff" against our cache to generate
-		// events for the caller
-		log.Warning("Watch of subnet leases failed because etcd index outside history window")
-
-		leases, err := sm.getLeases()
-		if err == nil {
-			lb := sm.applyLeases(leases)
-			batch = &lb
-		} else {
-			out = fmt.Errorf("Failed to retrieve subnet leases: %v", err)
-		}
-	} else {
-		out = fmt.Errorf("Watch of subnet leases failed: %v", err)
-	}
-
-	return
-}
-
-func (sm *SubnetManager) LeaseRenewer(cancel chan bool) {
-	for {
-		dur := sm.leaseExp.Sub(time.Now()) - renewMargin
-
-		select {
-		case <-time.After(dur):
-			attrBytes, err := json.Marshal(&sm.myLease.Attrs)
-			if err != nil {
-				log.Error("Error renewing lease (trying again in 1 min): ", err)
-				dur = time.Minute
-				continue
-			}
-
-			resp, err := sm.registry.updateSubnet(sm.myLease.Network.StringSep(".", "-"), string(attrBytes), subnetTTL)
-			if err != nil {
-				log.Error("Error renewing lease (trying again in 1 min): ", err)
-				dur = time.Minute
-				continue
-			}
-
-			sm.leaseExp = *resp.Node.Expiration
-			log.Info("Lease renewed, new expiration: ", sm.leaseExp)
-			dur = sm.leaseExp.Sub(time.Now()) - renewMargin
-
-		case <-cancel:
-			return
-		}
-	}
+	return nil
 }
 
-func interrupted(cancel chan bool) bool {
-	select {
-	case <-cancel:
-		return true
-	default:
-		return false
-	}
+type Manager interface {
+	GetNetworkConfig(ctx context.Context, network string) (*Config, error)
+	AcquireLease(ctx context.Context, network string, attrs *LeaseAttrs) (*Lease, error)
+	RenewLease(ctx context.Context, network string, lease *Lease) error
+	WatchLeases(ctx context.Context, network string, cursor interface{}) (WatchResult, error)
 }

+ 35 - 49
subnet/subnet_test.go

@@ -22,6 +22,7 @@ import (
 	"time"
 
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 
 	"github.com/coreos/flannel/pkg/ip"
 )
@@ -53,7 +54,7 @@ func newMockSubnetRegistry(ttlOverride uint64) *mockSubnetRegistry {
 	}
 }
 
-func (msr *mockSubnetRegistry) getConfig() (*etcd.Response, error) {
+func (msr *mockSubnetRegistry) getConfig(ctx context.Context, network string) (*etcd.Response, error) {
 	return &etcd.Response{
 		EtcdIndex: msr.index,
 		Node: &etcd.Node{
@@ -62,14 +63,14 @@ func (msr *mockSubnetRegistry) getConfig() (*etcd.Response, error) {
 	}, nil
 }
 
-func (msr *mockSubnetRegistry) getSubnets() (*etcd.Response, error) {
+func (msr *mockSubnetRegistry) getSubnets(ctx context.Context, network string) (*etcd.Response, error) {
 	return &etcd.Response{
 		Node:      msr.subnets,
 		EtcdIndex: msr.index,
 	}, nil
 }
 
-func (msr *mockSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
+func (msr *mockSubnetRegistry) createSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error) {
 	msr.index += 1
 
 	if msr.ttl > 0 {
@@ -94,8 +95,7 @@ func (msr *mockSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.
 	}, nil
 }
 
-func (msr *mockSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
-
+func (msr *mockSubnetRegistry) updateSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error) {
 	msr.index += 1
 
 	// add squared durations :)
@@ -115,15 +115,14 @@ func (msr *mockSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.
 	}
 
 	return nil, fmt.Errorf("Subnet not found")
-
 }
 
-func (msr *mockSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
+func (msr *mockSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (*etcd.Response, error) {
 	var sn string
 
 	select {
-	case <-stop:
-		return nil, nil
+	case <-ctx.Done():
+		return nil, ctx.Err()
 
 	case sn = <-msr.addCh:
 		n := etcd.Node{
@@ -163,46 +162,41 @@ func (msr *mockSubnetRegistry) hasSubnet(sn string) bool {
 
 func TestAcquireLease(t *testing.T) {
 	msr := newMockSubnetRegistry(0)
-	sm, err := newSubnetManager(msr)
-	if err != nil {
-		t.Fatalf("Failed to create subnet manager: %s", err)
-	}
+	sm := newEtcdManager(msr)
 
 	extIP, _ := ip.ParseIP4("1.2.3.4")
 	attrs := LeaseAttrs{
 		PublicIP: extIP,
 	}
 
-	cancel := make(chan bool)
-	sn, err := sm.AcquireLease(&attrs, cancel)
+	l, err := sm.AcquireLease(context.Background(), "", &attrs)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
-	if sn.String() != "10.3.3.0/24" {
-		t.Fatal("Subnet mismatch: expected 10.3.3.0/24, got: ", sn)
+	if l.Subnet.String() != "10.3.3.0/24" {
+		t.Fatal("Subnet mismatch: expected 10.3.3.0/24, got: ", l.Subnet)
 	}
 
 	// Acquire again, should reuse
-	if sn, err = sm.AcquireLease(&attrs, cancel); err != nil {
+	if l, err = sm.AcquireLease(context.Background(), "", &attrs); err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
-	if sn.String() != "10.3.3.0/24" {
-		t.Fatal("Subnet mismatch: expected 10.3.3.0/24, got: ", sn)
+	if l.Subnet.String() != "10.3.3.0/24" {
+		t.Fatal("Subnet mismatch: expected 10.3.3.0/24, got: ", l.Subnet)
 	}
 }
 
 func TestWatchLeaseAdded(t *testing.T) {
 	msr := newMockSubnetRegistry(0)
-	sm, err := newSubnetManager(msr)
-	if err != nil {
-		t.Fatalf("Failed to create subnet manager: %s", err)
-	}
+	sm := newEtcdManager(msr)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
 
-	events := make(chan EventBatch)
-	cancel := make(chan bool)
-	go sm.WatchLeases(events, cancel)
+	events := make(chan []Event)
+	go WatchLeases(ctx, sm, "", events)
 
 	expected := "10.3.3.0-24"
 	msr.addCh <- expected
@@ -222,24 +216,21 @@ func TestWatchLeaseAdded(t *testing.T) {
 		t.Fatalf("WatchSubnets produced wrong event type")
 	}
 
-	actual := evt.Lease.Network.StringSep(".", "-")
+	actual := evt.Lease.Key()
 	if actual != expected {
 		t.Errorf("WatchSubnet produced wrong subnet: expected %s, got %s", expected, actual)
 	}
-
-	close(cancel)
 }
 
 func TestWatchLeaseRemoved(t *testing.T) {
 	msr := newMockSubnetRegistry(0)
-	sm, err := newSubnetManager(msr)
-	if err != nil {
-		t.Fatalf("Failed to create subnet manager: %s", err)
-	}
+	sm := newEtcdManager(msr)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
 
-	events := make(chan EventBatch)
-	cancel := make(chan bool)
-	go sm.WatchLeases(events, cancel)
+	events := make(chan []Event)
+	go WatchLeases(ctx, sm, "", events)
 
 	expected := "10.3.4.0-24"
 	msr.delCh <- expected
@@ -259,12 +250,10 @@ func TestWatchLeaseRemoved(t *testing.T) {
 		t.Fatalf("WatchSubnets produced wrong event type")
 	}
 
-	actual := evt.Lease.Network.StringSep(".", "-")
+	actual := evt.Lease.Key()
 	if actual != expected {
 		t.Errorf("WatchSubnet produced wrong subnet: expected %s, got %s", expected, actual)
 	}
-
-	close(cancel)
 }
 
 type leaseData struct {
@@ -273,10 +262,7 @@ type leaseData struct {
 
 func TestRenewLease(t *testing.T) {
 	msr := newMockSubnetRegistry(1)
-	sm, err := newSubnetManager(msr)
-	if err != nil {
-		t.Fatalf("Failed to create subnet manager: %v", err)
-	}
+	sm := newEtcdManager(msr)
 
 	// Create LeaseAttrs
 	extIP, _ := ip.ParseIP4("1.2.3.4")
@@ -292,22 +278,22 @@ func TestRenewLease(t *testing.T) {
 	attrs.BackendData = json.RawMessage(ld)
 
 	// Acquire lease
-	cancel := make(chan bool)
-	defer close(cancel)
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
 
-	sn, err := sm.AcquireLease(&attrs, cancel)
+	l, err := sm.AcquireLease(ctx, "", &attrs)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
-	go sm.LeaseRenewer(cancel)
+	go LeaseRenewer(ctx, sm, "", l)
 
 	fmt.Println("Waiting for lease to pass original expiration")
 	time.Sleep(2 * time.Second)
 
 	// check that it's still good
 	for _, n := range msr.subnets.Nodes {
-		if n.Key == sn.StringSep(".", "-") {
+		if n.Key == l.Subnet.StringSep(".", "-") {
 			if n.Expiration.Before(time.Now()) {
 				t.Error("Failed to renew lease: expiration did not advance")
 			}

+ 130 - 0
subnet/watch.go

@@ -0,0 +1,130 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package subnet
+
+import (
+	"time"
+
+	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+func WatchLeases(ctx context.Context, sm Manager, network string, receiver chan []Event) {
+	lw := &leaseWatcher{}
+
+	for {
+		res, err := sm.WatchLeases(ctx, network, lw.cursor)
+		if err != nil {
+			if err == context.Canceled || err == context.DeadlineExceeded {
+				return
+			}
+
+			log.Errorf("Watch subnets: %v", err)
+			time.Sleep(time.Second)
+			continue
+		}
+
+		batch := []Event{}
+
+		if len(res.Snapshot) > 0 {
+			batch = lw.reset(res.Snapshot)
+		} else {
+			batch = lw.update(res.Events)
+		}
+
+		if batch != nil {
+			receiver <- batch
+		}
+	}
+}
+
+type leaseWatcher struct {
+	leases []Lease
+	cursor interface{}
+}
+
+func (lw *leaseWatcher) reset(leases []Lease) []Event {
+	batch := []Event{}
+
+	for _, nl := range leases {
+		found := false
+		for i, ol := range lw.leases {
+			if ol.Subnet.Equal(nl.Subnet) {
+				lw.leases = deleteLease(lw.leases, i)
+				found = true
+				break
+			}
+		}
+
+		if !found {
+			// new lease
+			batch = append(batch, Event{SubnetAdded, nl})
+		}
+	}
+
+	// everything left in sm.leases has been deleted
+	for _, l := range lw.leases {
+		batch = append(batch, Event{SubnetRemoved, l})
+	}
+
+	lw.leases = leases
+
+	return batch
+}
+
+func (lw *leaseWatcher) update(events []Event) []Event {
+	batch := []Event{}
+
+	for _, e := range events {
+		switch e.Type {
+		case SubnetAdded:
+			batch = append(batch, lw.add(&e.Lease))
+
+		case SubnetRemoved:
+			batch = append(batch, lw.remove(&e.Lease))
+		}
+	}
+
+	return batch
+}
+
+func (lw *leaseWatcher) add(lease *Lease) Event {
+	for i, l := range lw.leases {
+		if l.Subnet.Equal(lease.Subnet) {
+			lw.leases[i] = *lease
+			return Event{SubnetAdded, lw.leases[i]}
+		}
+	}
+
+	lw.leases = append(lw.leases, *lease)
+	return Event{SubnetAdded, lw.leases[len(lw.leases)-1]}
+}
+
+func (lw *leaseWatcher) remove(lease *Lease) Event {
+	for i, l := range lw.leases {
+		if l.Subnet.Equal(lease.Subnet) {
+			lw.leases = deleteLease(lw.leases, i)
+			return Event{SubnetRemoved, l}
+		}
+	}
+
+	log.Errorf("Removed subnet (%s) was not found", lease.Subnet)
+	return Event{SubnetRemoved, *lease}
+}
+
+func deleteLease(l []Lease, i int) []Lease {
+	l[i], l = l[len(l)-1], l[:len(l)-1]
+	return l
+}