Преглед изворни кода

Merge pull request #356 from eyakubovich/revoke-lease

Ability to revoke lease
Eugene Yakubovich пре 9 година
родитељ
комит
8c97e5b20b
12 измењених фајлова са 579 додато и 201 уклоњено
  1. 4 0
      network/manager.go
  2. 73 20
      network/network.go
  3. 46 14
      remote/client.go
  4. 103 64
      remote/remote_test.go
  5. 64 0
      remote/server.go
  6. 48 4
      subnet/local_manager.go
  7. 94 18
      subnet/mock_registry.go
  8. 60 29
      subnet/registry.go
  9. 0 48
      subnet/renew.go
  10. 21 1
      subnet/subnet.go
  11. 31 3
      subnet/subnet_test.go
  12. 35 0
      subnet/watch.go

+ 4 - 0
network/manager.go

@@ -235,12 +235,16 @@ func (m *Manager) forEachNetwork(f func(n *Network)) {
 func (m *Manager) runNetwork(n *Network) {
 	n.Run(m.extIface, func(bn backend.Network) {
 		if m.isMultiNetwork() {
+			log.Infof("%v: lease acquired: %v", n.Name, bn.Lease().Subnet)
+
 			path := filepath.Join(opts.subnetDir, n.Name) + ".env"
 			if err := writeSubnetFile(path, n.Config.Network, m.ipMasq, bn); err != nil {
 				log.Warningf("%v failed to write subnet file: %s", n.Name, err)
 				return
 			}
 		} else {
+			log.Infof("Lease acquired: %v", bn.Lease().Subnet)
+
 			if err := writeSubnetFile(opts.subnetFile, n.Config.Network, m.ipMasq, bn); err != nil {
 				log.Warningf("%v failed to write subnet file: %s", n.Name, err)
 				return

+ 73 - 20
network/network.go

@@ -15,12 +15,14 @@
 package network
 
 import (
+	"errors"
 	"fmt"
 	"sync"
 	"time"
 
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+
 	"github.com/coreos/flannel/backend"
 	"github.com/coreos/flannel/subnet"
 )
@@ -29,7 +31,10 @@ const (
 	renewMargin = time.Hour
 )
 
-var backends map[string]backend.Backend
+var (
+	errInterrupted = errors.New("interrupted")
+	errCanceled    = errors.New("canceled")
+)
 
 type Network struct {
 	Name   string
@@ -91,38 +96,45 @@ func (n *Network) init() error {
 	return nil
 }
 
-func (n *Network) Run(extIface *backend.ExternalInterface, inited func(bn backend.Network)) {
-	wg := sync.WaitGroup{}
-
-For:
+func (n *Network) retryInit() error {
 	for {
 		err := n.init()
-		switch err {
-		case nil:
-			break For
-		case context.Canceled:
-			return
-		default:
-			log.Error(err)
-			select {
-			case <-n.ctx.Done():
-				return
-			case <-time.After(time.Second):
-			}
+		if err == nil || err == context.Canceled {
+			return err
+		}
+
+		log.Error(err)
+
+		select {
+		case <-n.ctx.Done():
+			return n.ctx.Err()
+		case <-time.After(time.Second):
 		}
 	}
+}
+
+func (n *Network) runOnce(extIface *backend.ExternalInterface, inited func(bn backend.Network)) error {
+	if err := n.retryInit(); err != nil {
+		return errCanceled
+	}
 
 	inited(n.bn)
 
+	ctx, interruptFunc := context.WithCancel(n.ctx)
+
+	wg := sync.WaitGroup{}
+
 	wg.Add(1)
 	go func() {
-		n.bn.Run(n.ctx)
+		n.bn.Run(ctx)
 		wg.Done()
 	}()
 
+	evts := make(chan subnet.Event)
+
 	wg.Add(1)
 	go func() {
-		subnet.LeaseRenewer(n.ctx, n.sm, n.Name, n.bn.Lease())
+		subnet.WatchLease(ctx, n.sm, n.Name, n.bn.Lease().Subnet, evts)
 		wg.Done()
 	}()
 
@@ -134,7 +146,48 @@ For:
 		}
 	}()
 
-	wg.Wait()
+	defer wg.Wait()
+
+	dur := n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin
+
+	for {
+		select {
+		case <-time.After(dur):
+			err := n.sm.RenewLease(n.ctx, n.Name, n.bn.Lease())
+			if err != nil {
+				log.Error("Error renewing lease (trying again in 1 min): ", err)
+				dur = time.Minute
+				continue
+			}
+
+			log.Info("Lease renewed, new expiration: ", n.bn.Lease().Expiration)
+			dur = n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin
+
+		case e := <-evts:
+			if e.Type == subnet.EventRemoved {
+				log.Warning("Lease has been revoked")
+				interruptFunc()
+				return errInterrupted
+			}
+			dur = n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin
+
+		case <-n.ctx.Done():
+			return errCanceled
+		}
+	}
+}
+
+func (n *Network) Run(extIface *backend.ExternalInterface, inited func(bn backend.Network)) {
+	for {
+		switch n.runOnce(extIface, inited) {
+		case errInterrupted:
+
+		case errCanceled:
+			return
+		default:
+			panic("unexpected error returned")
+		}
+	}
 }
 
 func (n *Network) Cancel() {

+ 46 - 14
remote/client.go

@@ -18,6 +18,7 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"io"
 	"io/ioutil"
 	"net"
 	"net/http"
@@ -27,6 +28,7 @@ import (
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/pkg/transport"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 
+	"github.com/coreos/flannel/pkg/ip"
 	"github.com/coreos/flannel/subnet"
 )
 
@@ -93,7 +95,7 @@ func (m *RemoteManager) mkurl(network string, parts ...string) string {
 func (m *RemoteManager) GetNetworkConfig(ctx context.Context, network string) (*subnet.Config, error) {
 	url := m.mkurl(network, "config")
 
-	resp, err := m.httpGet(ctx, url)
+	resp, err := m.httpVerb(ctx, "GET", url, "", nil)
 	if err != nil {
 		return nil, err
 	}
@@ -124,7 +126,7 @@ func (m *RemoteManager) AcquireLease(ctx context.Context, network string, attrs
 		return nil, err
 	}
 
-	resp, err := m.httpPutPost(ctx, "POST", url, "application/json", body)
+	resp, err := m.httpVerb(ctx, "POST", url, "application/json", body)
 	if err != nil {
 		return nil, err
 	}
@@ -150,7 +152,7 @@ func (m *RemoteManager) RenewLease(ctx context.Context, network string, lease *s
 		return err
 	}
 
-	resp, err := m.httpPutPost(ctx, "PUT", url, "application/json", body)
+	resp, err := m.httpVerb(ctx, "PUT", url, "application/json", body)
 	if err != nil {
 		return err
 	}
@@ -169,6 +171,22 @@ func (m *RemoteManager) RenewLease(ctx context.Context, network string, lease *s
 	return nil
 }
 
+func (m *RemoteManager) RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error {
+	url := m.mkurl(network, "leases", subnet.MakeSubnetKey(sn))
+
+	resp, err := m.httpVerb(ctx, "DELETE", url, "", nil)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		return httpError(resp)
+	}
+
+	return nil
+}
+
 func (m *RemoteManager) watch(ctx context.Context, url string, cursor interface{}, wr interface{}) error {
 	if cursor != nil {
 		c, ok := cursor.(string)
@@ -179,7 +197,7 @@ func (m *RemoteManager) watch(ctx context.Context, url string, cursor interface{
 		url = fmt.Sprintf("%v?next=%v", url, c)
 	}
 
-	resp, err := m.httpGet(ctx, url)
+	resp, err := m.httpVerb(ctx, "GET", url, "", nil)
 	if err != nil {
 		return err
 	}
@@ -196,6 +214,21 @@ func (m *RemoteManager) watch(ctx context.Context, url string, cursor interface{
 	return nil
 }
 
+func (m *RemoteManager) WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
+	url := m.mkurl(network, "leases", subnet.MakeSubnetKey(sn))
+
+	wr := subnet.LeaseWatchResult{}
+	err := m.watch(ctx, url, cursor, &wr)
+	if err != nil {
+		return subnet.LeaseWatchResult{}, err
+	}
+	if _, ok := wr.Cursor.(string); !ok {
+		return subnet.LeaseWatchResult{}, fmt.Errorf("watch returned non-string cursor")
+	}
+
+	return wr, nil
+}
+
 func (m *RemoteManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (subnet.LeaseWatchResult, error) {
 	url := m.mkurl(network, "leases")
 
@@ -258,20 +291,19 @@ func (m *RemoteManager) httpDo(ctx context.Context, req *http.Request) (*http.Re
 	}
 }
 
-func (m *RemoteManager) httpGet(ctx context.Context, url string) (*http.Response, error) {
-	req, err := http.NewRequest("GET", url, nil)
-	if err != nil {
-		return nil, err
+func (m *RemoteManager) httpVerb(ctx context.Context, method, url, contentType string, body []byte) (*http.Response, error) {
+	var r io.Reader
+	if body != nil {
+		r = bytes.NewBuffer(body)
 	}
 
-	return m.httpDo(ctx, req)
-}
-
-func (m *RemoteManager) httpPutPost(ctx context.Context, method, url, contentType string, body []byte) (*http.Response, error) {
-	req, err := http.NewRequest(method, url, bytes.NewBuffer(body))
+	req, err := http.NewRequest(method, url, r)
 	if err != nil {
 		return nil, err
 	}
-	req.Header.Set("Content-Type", contentType)
+
+	if contentType != "" {
+		req.Header.Set("Content-Type", contentType)
+	}
 	return m.httpDo(ctx, req)
 }

+ 103 - 64
remote/remote_test.go

@@ -32,25 +32,62 @@ import (
 
 const expectedNetwork = "10.1.0.0/16"
 
-func TestRemote(t *testing.T) {
+type fixture struct {
+	ctx      context.Context
+	cancel   context.CancelFunc
+	srvAddr  string
+	registry *subnet.MockSubnetRegistry
+	sm       subnet.Manager
+	wg       sync.WaitGroup
+}
+
+func newFixture(t *testing.T) *fixture {
+	f := &fixture{}
+
 	config := fmt.Sprintf(`{"Network": %q}`, expectedNetwork)
-	serverRegistry := subnet.NewMockRegistry("", config, nil)
-	sm := subnet.NewMockManager(serverRegistry)
+	f.registry = subnet.NewMockRegistry("", config, nil)
+	sm := subnet.NewMockManager(f.registry)
 
-	addr := "127.0.0.1:9999"
+	f.srvAddr = "127.0.0.1:9999"
 
-	ctx, cancel := context.WithCancel(context.Background())
-	wg := sync.WaitGroup{}
-	wg.Add(1)
+	f.ctx, f.cancel = context.WithCancel(context.Background())
+	f.wg.Add(1)
 	go func() {
-		RunServer(ctx, sm, addr, "", "", "")
-		wg.Done()
+		RunServer(f.ctx, sm, f.srvAddr, "", "", "")
+		f.wg.Done()
 	}()
 
-	doTestRemote(ctx, t, addr, serverRegistry)
+	var err error
+	f.sm, err = NewRemoteManager(f.srvAddr, "", "", "")
+	if err != nil {
+		panic(fmt.Sprintf("Failed to create remote mananager: %v", err))
+	}
+
+	for i := 0; ; i++ {
+		_, err := f.sm.GetNetworkConfig(f.ctx, "_")
+		if err == nil {
+			break
+		}
+
+		if isConnRefused(err) {
+			if i == 100 {
+				t.Fatalf("Out of connection retries")
+			}
+
+			fmt.Println("Connection refused, retrying...")
+			time.Sleep(300 * time.Millisecond)
+			continue
+		}
+
+		t.Fatalf("GetNetworkConfig failed: %v", err)
+	}
 
-	cancel()
-	wg.Wait()
+	return f
+}
+
+func (f *fixture) Close() {
+	f.cancel()
+	f.wg.Wait()
 }
 
 func mustParseIP4(s string) ip.IP4 {
@@ -81,38 +118,29 @@ func isConnRefused(err error) bool {
 	return false
 }
 
-func doTestRemote(ctx context.Context, t *testing.T, remoteAddr string, serverRegistry *subnet.MockSubnetRegistry) {
-	sm, err := NewRemoteManager(remoteAddr, "", "", "")
+func TestGetConfig(t *testing.T) {
+	f := newFixture(t)
+	defer f.Close()
+
+	cfg, err := f.sm.GetNetworkConfig(f.ctx, "_")
 	if err != nil {
-		t.Fatalf("Failed to create remote mananager: %v", err)
+		t.Fatalf("GetNetworkConfig failed: %v", err)
 	}
 
-	for i := 0; ; i++ {
-		cfg, err := sm.GetNetworkConfig(ctx, "_")
-		if err != nil {
-			if isConnRefused(err) {
-				if i == 100 {
-					t.Fatalf("Out of connection retries")
-				}
-
-				fmt.Println("Connection refused, retrying...")
-				time.Sleep(300 * time.Millisecond)
-				continue
-			}
-
-			t.Fatalf("GetNetworkConfig failed: %v", err)
-		}
-
-		if cfg.Network.String() != expectedNetwork {
-			t.Errorf("GetNetworkConfig returned bad network: %v vs %v", cfg.Network, expectedNetwork)
-		}
-		break
+	if cfg.Network.String() != expectedNetwork {
+		t.Errorf("GetNetworkConfig returned bad network: %v vs %v", cfg.Network, expectedNetwork)
 	}
+}
+
+func TestAcquireRenewLease(t *testing.T) {
+	f := newFixture(t)
+	defer f.Close()
 
 	attrs := &subnet.LeaseAttrs{
 		PublicIP: mustParseIP4("1.1.1.1"),
 	}
-	l, err := sm.AcquireLease(ctx, "_", attrs)
+
+	l, err := f.sm.AcquireLease(f.ctx, "_", attrs)
 	if err != nil {
 		t.Fatalf("AcquireLease failed: %v", err)
 	}
@@ -121,28 +149,20 @@ func doTestRemote(ctx context.Context, t *testing.T, remoteAddr string, serverRe
 		t.Errorf("AcquireLease returned subnet not in network: %v (in %v)", l.Subnet, expectedNetwork)
 	}
 
-	if err = sm.RenewLease(ctx, "_", l); err != nil {
+	if err = f.sm.RenewLease(f.ctx, "_", l); err != nil {
 		t.Errorf("RenewLease failed: %v", err)
 	}
-
-	doTestWatchLeases(t, sm)
-
-	doTestWatchNetworks(t, sm, serverRegistry)
 }
 
-func doTestWatchLeases(t *testing.T, sm subnet.Manager) {
-	ctx, cancel := context.WithCancel(context.Background())
-	wg := sync.WaitGroup{}
-	wg.Add(1)
-	defer func() {
-		cancel()
-		wg.Wait()
-	}()
+func TestWatchLeases(t *testing.T) {
+	f := newFixture(t)
+	defer f.Close()
 
 	events := make(chan []subnet.Event)
+	f.wg.Add(1)
 	go func() {
-		subnet.WatchLeases(ctx, sm, "_", nil, events)
-		wg.Done()
+		subnet.WatchLeases(f.ctx, f.sm, "_", nil, events)
+		f.wg.Done()
 	}()
 
 	// skip over the initial snapshot
@@ -151,7 +171,7 @@ func doTestWatchLeases(t *testing.T, sm subnet.Manager) {
 	attrs := &subnet.LeaseAttrs{
 		PublicIP: mustParseIP4("1.1.1.2"),
 	}
-	l, err := sm.AcquireLease(ctx, "_", attrs)
+	l, err := f.sm.AcquireLease(f.ctx, "_", attrs)
 	if err != nil {
 		t.Errorf("AcquireLease failed: %v", err)
 		return
@@ -176,19 +196,38 @@ func doTestWatchLeases(t *testing.T, sm subnet.Manager) {
 	}
 }
 
-func doTestWatchNetworks(t *testing.T, sm subnet.Manager, serverRegistry *subnet.MockSubnetRegistry) {
-	ctx, cancel := context.WithCancel(context.Background())
-	wg := sync.WaitGroup{}
-	wg.Add(1)
-	defer func() {
-		cancel()
-		wg.Wait()
-	}()
+func TestRevokeLease(t *testing.T) {
+	f := newFixture(t)
+	defer f.Close()
+
+	attrs := &subnet.LeaseAttrs{
+		PublicIP: mustParseIP4("1.1.1.1"),
+	}
+
+	l, err := f.sm.AcquireLease(f.ctx, "_", attrs)
+	if err != nil {
+		t.Fatalf("AcquireLease failed: %v", err)
+	}
+
+	if err := f.sm.RevokeLease(f.ctx, "_", l.Subnet); err != nil {
+		t.Fatalf("RevokeLease failed: %v", err)
+	}
+
+	_, err = f.sm.WatchLease(f.ctx, "_", l.Subnet, nil)
+	if err == nil {
+		t.Fatalf("Revoked lease found")
+	}
+}
+
+func TestWatchNetworks(t *testing.T) {
+	f := newFixture(t)
+	defer f.Close()
 
 	events := make(chan []subnet.Event)
+	f.wg.Add(1)
 	go func() {
-		subnet.WatchNetworks(ctx, sm, events)
-		wg.Done()
+		subnet.WatchNetworks(f.ctx, f.sm, events)
+		f.wg.Done()
 	}()
 
 	// skip over the initial snapshot
@@ -196,7 +235,7 @@ func doTestWatchNetworks(t *testing.T, sm subnet.Manager, serverRegistry *subnet
 
 	expectedNetname := "foobar"
 	config := fmt.Sprintf(`{"Network": %q}`, expectedNetwork)
-	err := serverRegistry.CreateNetwork(ctx, expectedNetname, config)
+	err := f.registry.CreateNetwork(f.ctx, expectedNetname, config)
 	if err != nil {
 		t.Errorf("create network failed: %v", err)
 	}
@@ -216,7 +255,7 @@ func doTestWatchNetworks(t *testing.T, sm subnet.Manager, serverRegistry *subnet
 		t.Errorf("WatchNetwork create produced wrong network: expected %s, got %s", expectedNetname, evt.Network)
 	}
 
-	err = serverRegistry.DeleteNetwork(ctx, expectedNetname)
+	err = f.registry.DeleteNetwork(f.ctx, expectedNetname)
 	if err != nil {
 		t.Errorf("delete network failed: %v", err)
 	}

+ 64 - 0
remote/server.go

@@ -114,6 +114,30 @@ func handleRenewLease(ctx context.Context, sm subnet.Manager, w http.ResponseWri
 	jsonResponse(w, http.StatusOK, lease)
 }
 
+func handleRevokeLease(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+
+	network := mux.Vars(r)["network"]
+	if network == "_" {
+		network = ""
+	}
+
+	sn := subnet.ParseSubnetKey(mux.Vars(r)["subnet"])
+	if sn == nil {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprint(w, "failed to parse subnet")
+		return
+	}
+
+	if err := sm.RevokeLease(ctx, network, *sn); err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprint(w, err)
+		return
+	}
+
+	w.WriteHeader(http.StatusOK)
+}
+
 func getCursor(u *url.URL) interface{} {
 	vals, ok := u.Query()["next"]
 	if !ok {
@@ -122,6 +146,44 @@ func getCursor(u *url.URL) interface{} {
 	return vals[0]
 }
 
+// GET /{network}/leases/subnet?next=cursor
+func handleWatchLease(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+
+	network := mux.Vars(r)["network"]
+	if network == "_" {
+		network = ""
+	}
+
+	sn := subnet.ParseSubnetKey(mux.Vars(r)["subnet"])
+	if sn == nil {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprint(w, "bad subnet")
+		return
+	}
+
+	cursor := getCursor(r.URL)
+
+	wr, err := sm.WatchLease(ctx, network, *sn, cursor)
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprint(w, err)
+		return
+	}
+
+	switch wr.Cursor.(type) {
+	case string:
+	case fmt.Stringer:
+		wr.Cursor = wr.Cursor.(fmt.Stringer).String()
+	default:
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprint(w, fmt.Errorf("internal error: watch cursor is of unknown type"))
+		return
+	}
+
+	jsonResponse(w, http.StatusOK, wr)
+}
+
 // GET /{network}/leases?next=cursor
 func handleWatchLeases(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
 	defer r.Body.Close()
@@ -262,7 +324,9 @@ func RunServer(ctx context.Context, sm subnet.Manager, listenAddr, cafile, certf
 	r := mux.NewRouter()
 	r.HandleFunc("/v1/{network}/config", bindHandler(handleGetNetworkConfig, ctx, sm)).Methods("GET")
 	r.HandleFunc("/v1/{network}/leases", bindHandler(handleAcquireLease, ctx, sm)).Methods("POST")
+	r.HandleFunc("/v1/{network}/leases/{subnet}", bindHandler(handleWatchLease, ctx, sm)).Methods("GET")
 	r.HandleFunc("/v1/{network}/leases/{subnet}", bindHandler(handleRenewLease, ctx, sm)).Methods("PUT")
+	r.HandleFunc("/v1/{network}/leases/{subnet}", bindHandler(handleRevokeLease, ctx, sm)).Methods("DELETE")
 	r.HandleFunc("/v1/{network}/leases", bindHandler(handleWatchLeases, ctx, sm)).Methods("GET")
 	r.HandleFunc("/v1/", bindHandler(handleNetworks, ctx, sm)).Methods("GET")
 

+ 48 - 4
subnet/local_manager.go

@@ -170,6 +170,10 @@ OuterLoop:
 	}
 }
 
+func (m *LocalManager) RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error {
+	return m.registry.deleteSubnet(ctx, network, sn)
+}
+
 func (m *LocalManager) RenewLease(ctx context.Context, network string, lease *Lease) error {
 	// TODO(eyakubovich): propogate ctx into registry
 	exp, err := m.registry.updateSubnet(ctx, network, lease.Subnet, &lease.Attrs, subnetTTL, 0)
@@ -199,9 +203,49 @@ func getNextIndex(cursor interface{}) (uint64, error) {
 	return nextIndex, nil
 }
 
+func (m *LocalManager) leaseWatchReset(ctx context.Context, network string, sn ip.IP4Net) (LeaseWatchResult, error) {
+	l, index, err := m.registry.getSubnet(ctx, network, sn)
+	if err != nil {
+		return LeaseWatchResult{}, err
+	}
+
+	return LeaseWatchResult{
+		Snapshot: []Lease{*l},
+		Cursor:   watchCursor{index},
+	}, nil
+}
+
+func (m *LocalManager) WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error) {
+	if cursor == nil {
+		return m.leaseWatchReset(ctx, network, sn)
+	}
+
+	nextIndex, err := getNextIndex(cursor)
+	if err != nil {
+		return LeaseWatchResult{}, err
+	}
+
+	evt, index, err := m.registry.watchSubnet(ctx, network, nextIndex, sn)
+
+	switch {
+	case err == nil:
+		return LeaseWatchResult{
+			Events: []Event{evt},
+			Cursor: watchCursor{index},
+		}, nil
+
+	case isIndexTooSmall(err):
+		log.Warning("Watch of subnet leases failed because etcd index outside history window")
+		return m.leaseWatchReset(ctx, network, sn)
+
+	default:
+		return LeaseWatchResult{}, err
+	}
+}
+
 func (m *LocalManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (LeaseWatchResult, error) {
 	if cursor == nil {
-		return m.leaseWatchReset(ctx, network)
+		return m.leasesWatchReset(ctx, network)
 	}
 
 	nextIndex, err := getNextIndex(cursor)
@@ -220,7 +264,7 @@ func (m *LocalManager) WatchLeases(ctx context.Context, network string, cursor i
 
 	case isIndexTooSmall(err):
 		log.Warning("Watch of subnet leases failed because etcd index outside history window")
-		return m.leaseWatchReset(ctx, network)
+		return m.leasesWatchReset(ctx, network)
 
 	default:
 		return LeaseWatchResult{}, err
@@ -265,8 +309,8 @@ func isIndexTooSmall(err error) bool {
 	return ok && etcdErr.Code == etcd.ErrorCodeEventIndexCleared
 }
 
-// leaseWatchReset is called when incremental lease watch failed and we need to grab a snapshot
-func (m *LocalManager) leaseWatchReset(ctx context.Context, network string) (LeaseWatchResult, error) {
+// leasesWatchReset is called when incremental lease watch failed and we need to grab a snapshot
+func (m *LocalManager) leasesWatchReset(ctx context.Context, network string) (LeaseWatchResult, error) {
 	wr := LeaseWatchResult{}
 
 	leases, index, err := m.registry.getSubnets(ctx, network)

+ 94 - 18
subnet/mock_registry.go

@@ -17,6 +17,7 @@ package subnet
 import (
 	"fmt"
 	"strings"
+	"sync"
 	"time"
 
 	etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
@@ -26,9 +27,36 @@ import (
 )
 
 type netwk struct {
-	config       string
-	subnets      []Lease
-	subnetEvents chan event
+	config        string
+	subnets       []Lease
+	subnetsEvents chan event
+
+	mux          sync.Mutex
+	subnetEvents map[ip.IP4Net]chan event
+}
+
+func (n *netwk) sendSubnetEvent(sn ip.IP4Net, e event) {
+	n.subnetsEvents <- e
+
+	n.mux.Lock()
+	c, ok := n.subnetEvents[sn]
+	if !ok {
+		c = make(chan event, 10)
+		n.subnetEvents[sn] = c
+	}
+	n.mux.Unlock()
+	c <- e
+}
+
+func (n *netwk) subnetEventsChan(sn ip.IP4Net) chan event {
+	n.mux.Lock()
+	c, ok := n.subnetEvents[sn]
+	if !ok {
+		c = make(chan event, 10)
+		n.subnetEvents[sn] = c
+	}
+	n.mux.Unlock()
+	return c
 }
 
 type event struct {
@@ -50,9 +78,10 @@ func NewMockRegistry(network, config string, initialSubnets []Lease) *MockSubnet
 	}
 
 	msr.networks[network] = &netwk{
-		config:       config,
-		subnets:      initialSubnets,
-		subnetEvents: make(chan event, 1000),
+		config:        config,
+		subnets:       initialSubnets,
+		subnetsEvents: make(chan event, 1000),
+		subnetEvents:  make(map[ip.IP4Net]chan event),
 	}
 	return msr
 }
@@ -82,6 +111,19 @@ func (msr *MockSubnetRegistry) getSubnets(ctx context.Context, network string) (
 	return n.subnets, msr.index, nil
 }
 
+func (msr *MockSubnetRegistry) getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) {
+	n, ok := msr.networks[network]
+	if !ok {
+		return nil, 0, fmt.Errorf("Network %s not found", network)
+	}
+	for _, l := range n.subnets {
+		if l.Subnet.Equal(sn) {
+			return &l, msr.index, nil
+		}
+	}
+	return nil, msr.index, fmt.Errorf("subnet %s not found", sn)
+}
+
 func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
 	n, ok := msr.networks[network]
 	if !ok {
@@ -113,7 +155,8 @@ func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, network string,
 		Lease:   l,
 		Network: network,
 	}
-	n.subnetEvents <- event{evt, msr.index}
+
+	n.sendSubnetEvent(sn, event{evt, msr.index})
 
 	return exp, nil
 }
@@ -128,20 +171,22 @@ func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, network string,
 
 	exp := clock.Now().Add(ttl)
 
-	sub, _, err := n.findSubnet(sn)
+	sub, i, err := n.findSubnet(sn)
 	if err != nil {
 		return time.Time{}, err
 	}
 
 	sub.Attrs = *attrs
-	sub.Expiration = exp
 	sub.asof = msr.index
-	n.subnetEvents <- event{
+	sub.Expiration = exp
+	n.subnets[i] = sub
+	n.sendSubnetEvent(sn, event{
 		Event{
 			Type:    EventAdded,
 			Lease:   sub,
 			Network: network,
-		}, msr.index}
+		}, msr.index,
+	})
 
 	return sub.Expiration, nil
 }
@@ -162,12 +207,13 @@ func (msr *MockSubnetRegistry) deleteSubnet(ctx context.Context, network string,
 	n.subnets[i] = n.subnets[len(n.subnets)-1]
 	n.subnets = n.subnets[:len(n.subnets)-1]
 	sub.asof = msr.index
-	n.subnetEvents <- event{
+	n.sendSubnetEvent(sn, event{
 		Event{
 			Type:    EventRemoved,
 			Lease:   sub,
 			Network: network,
-		}, msr.index}
+		}, msr.index,
+	})
 
 	return nil
 }
@@ -192,7 +238,36 @@ func (msr *MockSubnetRegistry) watchSubnets(ctx context.Context, network string,
 		case <-ctx.Done():
 			return Event{}, msr.index, ctx.Err()
 
-		case e := <-n.subnetEvents:
+		case e := <-n.subnetsEvents:
+			if e.index <= since {
+				continue
+			}
+			return e.evt, msr.index, nil
+		}
+	}
+}
+
+func (msr *MockSubnetRegistry) watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) {
+	n, ok := msr.networks[network]
+	if !ok {
+		return Event{}, msr.index, fmt.Errorf("Network %s not found", network)
+	}
+
+	for {
+		if since < msr.index {
+			return Event{}, msr.index, etcd.Error{
+				Code:    etcd.ErrorCodeEventIndexCleared,
+				Cause:   "out of date",
+				Message: "cursor is out of date",
+				Index:   msr.index,
+			}
+		}
+
+		select {
+		case <-ctx.Done():
+			return Event{}, msr.index, ctx.Err()
+
+		case e := <-n.subnetEventsChan(sn):
 			if e.index <= since {
 				continue
 			}
@@ -212,12 +287,12 @@ func (msr *MockSubnetRegistry) expireSubnet(network string, sn ip.IP4Net) {
 		n.subnets[i] = n.subnets[len(n.subnets)-1]
 		n.subnets = n.subnets[:len(n.subnets)-1]
 		sub.asof = msr.index
-		n.subnetEvents <- event{
+		n.sendSubnetEvent(sn, event{
 			Event{
 				Type:  EventRemoved,
 				Lease: sub,
 			}, msr.index,
-		}
+		})
 	}
 }
 
@@ -280,8 +355,9 @@ func (msr *MockSubnetRegistry) CreateNetwork(ctx context.Context, network, confi
 	msr.index += 1
 
 	n := &netwk{
-		config:       network,
-		subnetEvents: make(chan event, 1000),
+		config:        network,
+		subnetsEvents: make(chan event, 1000),
+		subnetEvents:  make(map[ip.IP4Net]chan event),
 	}
 
 	msr.networks[network] = n

+ 60 - 29
subnet/registry.go

@@ -18,10 +18,8 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	"net"
 	"path"
 	"regexp"
-	"strconv"
 	"sync"
 	"time"
 
@@ -41,10 +39,12 @@ var (
 type Registry interface {
 	getNetworkConfig(ctx context.Context, network string) (string, error)
 	getSubnets(ctx context.Context, network string) ([]Lease, uint64, error)
+	getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error)
 	createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error)
 	updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error)
 	deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error
 	watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error)
+	watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error)
 	getNetworks(ctx context.Context) ([]string, uint64, error)
 	watchNetworks(ctx context.Context, since uint64) (Event, uint64, error)
 }
@@ -127,29 +127,31 @@ func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context, network string) (
 
 	leases := []Lease{}
 	for _, node := range resp.Node.Nodes {
-		if sn := parseSubnetKey(node.Key); sn != nil {
-			attrs := &LeaseAttrs{}
-			if err = json.Unmarshal([]byte(node.Value), attrs); err == nil {
-				exp := time.Time{}
-				if node.Expiration != nil {
-					exp = *node.Expiration
-				}
-
-				lease := Lease{
-					Subnet:     *sn,
-					Attrs:      *attrs,
-					Expiration: exp,
-				}
-				leases = append(leases, lease)
-			}
+		l, err := nodeToLease(node)
+		if err != nil {
+			log.Warningf("Ignoring bad subnet node: %v", err)
+			continue
 		}
+
+		leases = append(leases, *l)
 	}
 
 	return leases, resp.Index, nil
 }
 
+func (esr *etcdSubnetRegistry) getSubnet(ctx context.Context, network string, sn ip.IP4Net) (*Lease, uint64, error) {
+	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
+	resp, err := esr.client().Get(ctx, key, nil)
+	if err != nil {
+		return nil, 0, err
+	}
+
+	l, err := nodeToLease(resp.Node)
+	return l, resp.Index, err
+}
+
 func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) {
-	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn.StringSep(".", "-"))
+	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
 	value, err := json.Marshal(attrs)
 	if err != nil {
 		return time.Time{}, err
@@ -170,7 +172,7 @@ func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network string,
 }
 
 func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
-	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn.StringSep(".", "-"))
+	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
 	value, err := json.Marshal(attrs)
 	if err != nil {
 		return time.Time{}, err
@@ -189,7 +191,7 @@ func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string,
 }
 
 func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error {
-	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn.StringSep(".", "-"))
+	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
 	_, err := esr.client().Delete(ctx, key, nil)
 	return err
 }
@@ -209,6 +211,21 @@ func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, network string,
 	return evt, e.Node.ModifiedIndex, err
 }
 
+func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, network string, since uint64, sn ip.IP4Net) (Event, uint64, error) {
+	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", MakeSubnetKey(sn))
+	opts := &etcd.WatcherOptions{
+		AfterIndex: since,
+	}
+
+	e, err := esr.client().Watcher(key, opts).Next(ctx)
+	if err != nil {
+		return Event{}, 0, err
+	}
+
+	evt, err := parseSubnetWatchResponse(e)
+	return evt, e.Index, err
+}
+
 // getNetworks queries etcd to get a list of network names.  It returns the
 // networks along with the 'as-of' etcd-index that can be used as the starting
 // point for etcd watch.
@@ -280,7 +297,7 @@ func ensureExpiration(resp *etcd.Response, ttl time.Duration) {
 }
 
 func parseSubnetWatchResponse(resp *etcd.Response) (Event, error) {
-	sn := parseSubnetKey(resp.Node.Key)
+	sn := ParseSubnetKey(resp.Node.Key)
 	if sn == nil {
 		return Event{}, fmt.Errorf("%v %q: not a subnet, skipping", resp.Action, resp.Node.Key)
 	}
@@ -366,14 +383,28 @@ func (esr *etcdSubnetRegistry) parseNetworkKey(s string) (string, bool) {
 	return "", false
 }
 
-func parseSubnetKey(s string) *ip.IP4Net {
-	if parts := subnetRegex.FindStringSubmatch(s); len(parts) == 3 {
-		snIp := net.ParseIP(parts[1]).To4()
-		prefixLen, err := strconv.ParseUint(parts[2], 10, 5)
-		if snIp != nil && err == nil {
-			return &ip.IP4Net{IP: ip.FromIP(snIp), PrefixLen: uint(prefixLen)}
-		}
+func nodeToLease(node *etcd.Node) (*Lease, error) {
+	sn := ParseSubnetKey(node.Key)
+	if sn == nil {
+		return nil, fmt.Errorf("failed to parse subnet key %q", *sn)
+	}
+
+	attrs := &LeaseAttrs{}
+	if err := json.Unmarshal([]byte(node.Value), attrs); err != nil {
+		return nil, err
+	}
+
+	exp := time.Time{}
+	if node.Expiration != nil {
+		exp = *node.Expiration
+	}
+
+	lease := Lease{
+		Subnet:     *sn,
+		Attrs:      *attrs,
+		Expiration: exp,
+		asof:       node.ModifiedIndex,
 	}
 
-	return nil
+	return &lease, nil
 }

+ 0 - 48
subnet/renew.go

@@ -1,48 +0,0 @@
-// Copyright 2015 flannel authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package subnet
-
-import (
-	"time"
-
-	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
-	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
-)
-
-const (
-	renewMargin = time.Hour
-)
-
-func LeaseRenewer(ctx context.Context, m Manager, network string, lease *Lease) {
-	dur := lease.Expiration.Sub(clock.Now()) - renewMargin
-
-	for {
-		select {
-		case <-time.After(dur):
-			err := m.RenewLease(ctx, network, lease)
-			if err != nil {
-				log.Error("Error renewing lease (trying again in 1 min): ", err)
-				dur = time.Minute
-				continue
-			}
-
-			log.Info("Lease renewed, new expiration: ", lease.Expiration)
-			dur = lease.Expiration.Sub(clock.Now()) - renewMargin
-
-		case <-ctx.Done():
-			return
-		}
-	}
-}

+ 21 - 1
subnet/subnet.go

@@ -18,6 +18,8 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"net"
+	"strconv"
 	"time"
 
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
@@ -39,7 +41,7 @@ type Lease struct {
 }
 
 func (l *Lease) Key() string {
-	return l.Subnet.StringSep(".", "-")
+	return MakeSubnetKey(l.Subnet)
 }
 
 type (
@@ -103,10 +105,28 @@ func (et *EventType) UnmarshalJSON(data []byte) error {
 	return nil
 }
 
+func ParseSubnetKey(s string) *ip.IP4Net {
+	if parts := subnetRegex.FindStringSubmatch(s); len(parts) == 3 {
+		snIp := net.ParseIP(parts[1]).To4()
+		prefixLen, err := strconv.ParseUint(parts[2], 10, 5)
+		if snIp != nil && err == nil {
+			return &ip.IP4Net{IP: ip.FromIP(snIp), PrefixLen: uint(prefixLen)}
+		}
+	}
+
+	return nil
+}
+
+func MakeSubnetKey(sn ip.IP4Net) string {
+	return sn.StringSep(".", "-")
+}
+
 type Manager interface {
 	GetNetworkConfig(ctx context.Context, network string) (*Config, error)
 	AcquireLease(ctx context.Context, network string, attrs *LeaseAttrs) (*Lease, error)
 	RenewLease(ctx context.Context, network string, lease *Lease) error
+	RevokeLease(ctx context.Context, network string, sn ip.IP4Net) error
+	WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error)
 	WatchLeases(ctx context.Context, network string, cursor interface{}) (LeaseWatchResult, error)
 	WatchNetworks(ctx context.Context, cursor interface{}) (NetworkWatchResult, error)
 }

+ 31 - 3
subnet/subnet_test.go

@@ -20,6 +20,7 @@ import (
 	"testing"
 	"time"
 
+	etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/jonboulle/clockwork"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 
@@ -248,10 +249,14 @@ func TestRenewLease(t *testing.T) {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
-	go LeaseRenewer(ctx, sm, "_", l)
+	now = now.Add(subnetTTL)
 
 	fakeClock.Advance(24 * time.Hour)
 
+	if err := sm.RenewLease(ctx, "_", l); err != nil {
+		t.Fatal("RenewLease failed: ", err)
+	}
+
 	// check that it's still good
 	n, err := msr.getNetwork(ctx, "_")
 	if err != nil {
@@ -259,8 +264,9 @@ func TestRenewLease(t *testing.T) {
 	}
 	for _, sn := range n.subnets {
 		if sn.Subnet.Equal(l.Subnet) {
-			if !sn.Expiration.Equal(now.Add(subnetTTL)) {
-				t.Errorf("Failed to renew lease: bad expiration; expected %v, got %v", now.Add(subnetTTL), sn.Expiration)
+			expected := now.Add(subnetTTL)
+			if !sn.Expiration.Equal(expected) {
+				t.Errorf("Failed to renew lease: bad expiration; expected %v, got %v", expected, sn.Expiration)
 			}
 			if !reflect.DeepEqual(sn.Attrs, attrs) {
 				t.Errorf("LeaseAttrs changed: was %#v, now %#v", attrs, sn.Attrs)
@@ -272,6 +278,28 @@ func TestRenewLease(t *testing.T) {
 	t.Fatalf("Failed to find acquired lease")
 }
 
+func TestLeaseRevoked(t *testing.T) {
+	msr := newDummyRegistry()
+	sm := NewMockManager(msr)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	l := acquireLease(ctx, t, sm)
+
+	if err := sm.RevokeLease(ctx, "_", l.Subnet); err != nil {
+		t.Fatalf("RevokeLease failed: %v", err)
+	}
+
+	_, _, err := msr.getSubnet(ctx, "_", l.Subnet)
+	if err == nil {
+		t.Fatalf("Revoked lease still exists")
+	}
+	if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code != etcd.ErrorCodeKeyNotFound {
+		t.Fatalf("getSubnets after revoked lease returned unexpected error: %v", err)
+	}
+}
+
 func TestWatchGetNetworks(t *testing.T) {
 	msr := newDummyRegistry()
 	sm := NewMockManager(msr)

+ 35 - 0
subnet/watch.go

@@ -19,6 +19,8 @@ import (
 
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+
+	"github.com/coreos/flannel/pkg/ip"
 )
 
 // WatchLeases performs a long term watch of the given network's subnet leases
@@ -42,6 +44,7 @@ func WatchLeases(ctx context.Context, sm Manager, network string, ownLease *Leas
 			time.Sleep(time.Second)
 			continue
 		}
+
 		cursor = res.Cursor
 
 		batch := []Event{}
@@ -245,3 +248,35 @@ func (nw *netWatcher) remove(network string) Event {
 
 	return Event{EventRemoved, Lease{}, network}
 }
+
+// WatchLease performs a long term watch of the given network's subnet lease
+// and communicates addition/deletion events on receiver channel. It takes care
+// of handling "fall-behind" logic where the history window has advanced too far
+// and it needs to diff the latest snapshot with its saved state and generate events
+func WatchLease(ctx context.Context, sm Manager, network string, sn ip.IP4Net, receiver chan Event) {
+	var cursor interface{}
+
+	for {
+		wr, err := sm.WatchLease(ctx, network, sn, cursor)
+		if err != nil {
+			if err == context.Canceled || err == context.DeadlineExceeded {
+				return
+			}
+
+			log.Errorf("Subnet watch failed: %v", err)
+			time.Sleep(time.Second)
+			continue
+		}
+
+		if len(wr.Snapshot) > 0 {
+			receiver <- Event{
+				Type:  EventAdded,
+				Lease: wr.Snapshot[0],
+			}
+		} else {
+			receiver <- wr.Events[0]
+		}
+
+		cursor = wr.Cursor
+	}
+}