Browse Source

Merge pull request #380 from dcbw/etcd-tests

Add mock etcd and etcd-backed registry testcases
Eugene Yakubovich 9 years ago
parent
commit
cf07041984
5 changed files with 886 additions and 3 deletions
  1. 1 1
      subnet/local_manager.go
  2. 426 0
      subnet/mock_etcd.go
  3. 247 0
      subnet/mock_etcd_test.go
  4. 10 2
      subnet/registry.go
  5. 202 0
      subnet/registry_test.go

+ 1 - 1
subnet/local_manager.go

@@ -68,7 +68,7 @@ func (c watchCursor) String() string {
 }
 
 func NewLocalManager(config *EtcdConfig) (Manager, error) {
-	r, err := newEtcdSubnetRegistry(config)
+	r, err := newEtcdSubnetRegistry(config, nil)
 	if err != nil {
 		return nil, err
 	}

+ 426 - 0
subnet/mock_etcd.go

@@ -0,0 +1,426 @@
+// Copyright 2015 flannel authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package subnet
+
+import (
+	"fmt"
+	"strings"
+	"time"
+
+	etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+const DEFAULT_TTL time.Duration = 8760 * time.Hour // one year
+
+type mockEtcd struct {
+	nodes    map[string]*etcd.Node
+	watchers map[*watcher]*watcher
+	// A given number of past events must be available for watchers, because
+	// flannel always uses a new watcher instead of re-using old ones, and
+	// the new watcher's index may be slightly in the past
+	events []*etcd.Response
+	index  uint64
+}
+
+func newMockEtcd() *mockEtcd {
+	me := &mockEtcd{
+		index:    1000,
+		nodes:    make(map[string]*etcd.Node),
+		watchers: make(map[*watcher]*watcher),
+		events:   make([]*etcd.Response, 0, 50),
+	}
+	me.nodes["/"] = me.newNode("/", "", true)
+
+	return me
+}
+
+func (me *mockEtcd) newNode(key, value string, dir bool) *etcd.Node {
+	exp := time.Now().Add(DEFAULT_TTL)
+	if dir {
+		value = ""
+	}
+	return &etcd.Node{
+		Key:           key,
+		Value:         value,
+		CreatedIndex:  me.index,
+		ModifiedIndex: me.index,
+		Dir:           dir,
+		Expiration:    &exp,
+		Nodes:         make([]*etcd.Node, 0, 20)}
+}
+
+func (me *mockEtcd) newError(code int, format string, args ...interface{}) etcd.Error {
+	msg := fmt.Sprintf(format, args...)
+	return etcd.Error{
+		Code:    code,
+		Message: msg,
+		Cause:   "",
+		Index:   me.index,
+	}
+}
+
+func (me *mockEtcd) getKeyPath(key string) ([]string, error) {
+	if !strings.HasPrefix(key, "/") {
+		return []string{}, me.newError(etcd.ErrorCodeKeyNotFound, "Invalid key %s", key)
+	}
+
+	// Build up a list of each intermediate key's path
+	path := []string{""}
+	for i, p := range strings.Split(strings.Trim(key, "/"), "/") {
+		if p == "" {
+			return []string{}, me.newError(etcd.ErrorCodeKeyNotFound, "Invalid key %s", key)
+		}
+		path = append(path, fmt.Sprintf("%s/%s", path[i], p))
+	}
+
+	return path[1:], nil
+}
+
+// Returns the node and its parent respectively.  Returns a nil node (but not
+// an error) if the requested node doest not exist.
+func (me *mockEtcd) findNode(key string) (*etcd.Node, *etcd.Node, error) {
+	if key == "/" {
+		return me.nodes["/"], nil, nil
+	}
+
+	path, err := me.getKeyPath(key)
+	if err != nil {
+		return nil, nil, err
+	}
+
+	var node *etcd.Node
+	var parent *etcd.Node
+	var ok bool
+
+	for i, part := range path {
+		parent = node
+		node, ok = me.nodes[part]
+		if !ok {
+			return nil, nil, nil
+		}
+
+		// intermediates must be directories
+		if i < len(path)-1 && node.Dir != true {
+			return nil, nil, me.newError(etcd.ErrorCodeNotDir, "Intermediate node %s not a directory", part)
+		}
+	}
+
+	return node, parent, nil
+}
+
+// Returns whether @child is a child of @node, and whether it is an immediate child respsectively
+func isChild(node *etcd.Node, child *etcd.Node) (bool, bool) {
+	if !strings.HasPrefix(child.Key, fmt.Sprintf("%s/", node.Key)) {
+		return false, false
+	}
+
+	nodeParts := strings.Split(node.Key, "/")
+	childParts := strings.Split(child.Key, "/")
+	return true, len(childParts) == len(nodeParts)+1
+}
+
+func (me *mockEtcd) copyNode(node *etcd.Node, recursive bool) *etcd.Node {
+	n := *node
+	n.Nodes = make([]*etcd.Node, 0)
+	if recursive {
+		for _, child := range me.nodes {
+			if _, directChild := isChild(node, child); directChild {
+				n.Nodes = append(n.Nodes, me.copyNode(child, true))
+			}
+		}
+	}
+	return &n
+}
+
+func (me *mockEtcd) Get(ctx context.Context, key string, opts *etcd.GetOptions) (*etcd.Response, error) {
+	node, _, err := me.findNode(key)
+	if err != nil {
+		return nil, err
+	}
+	if node == nil {
+		return nil, me.newError(etcd.ErrorCodeKeyNotFound, "Key %s not found", key)
+	}
+
+	if opts == nil {
+		opts = &etcd.GetOptions{}
+	}
+
+	return &etcd.Response{
+		Action: "get",
+		Node:   me.copyNode(node, opts.Recursive),
+		Index:  me.index,
+	}, nil
+}
+
+func (me *mockEtcd) sendEvent(resp *etcd.Response) {
+	// Add to history log
+	if len(me.events) == cap(me.events) {
+		me.events = me.events[1:]
+	}
+	me.events = append(me.events, resp)
+
+	// and notify watchers
+	for w := range me.watchers {
+		w.notifyEvent(resp)
+	}
+}
+
+// Returns the node created and its creation response
+// Don't need to check for intermediate directories here as that was already done
+// by the thing calling makeNode()
+func (me *mockEtcd) makeNode(path []string, value string, isDir bool, ttl time.Duration) (*etcd.Node, *etcd.Response, error) {
+	var child *etcd.Node
+	var resp *etcd.Response
+	var ok bool
+
+	node := me.nodes["/"]
+	for i, part := range path {
+		node, ok = me.nodes[part]
+		if !ok {
+			me.index += 1
+			if i < len(path)-1 {
+				// intermediate node
+				child = me.newNode(part, "", true)
+			} else {
+				// Final node
+				exp := time.Now().Add(ttl)
+				child = me.newNode(part, value, isDir)
+				child.Expiration = &exp
+
+				resp = &etcd.Response{
+					Action: "create",
+					Node:   me.copyNode(child, false),
+					Index:  child.CreatedIndex,
+				}
+				me.sendEvent(resp)
+			}
+			me.nodes[child.Key] = child
+			node = child
+		}
+	}
+
+	return node, resp, nil
+}
+
+func (me *mockEtcd) set(ctx context.Context, key, value string, opts *etcd.SetOptions, action string) (*etcd.Response, error) {
+	node, _, err := me.findNode(key)
+	if err != nil {
+		return nil, err
+	}
+	if opts.PrevExist == etcd.PrevExist && node == nil {
+		return nil, me.newError(etcd.ErrorCodeKeyNotFound, "Key %s not found", key)
+	} else if opts.PrevExist == etcd.PrevNoExist && node != nil {
+		return nil, me.newError(etcd.ErrorCodeNodeExist, "Key %s already exists", key)
+	}
+
+	if opts.Dir {
+		value = ""
+	}
+
+	var resp *etcd.Response
+
+	if node != nil {
+		if opts.PrevIndex > 0 && opts.PrevIndex < node.ModifiedIndex {
+			return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevIndex %s less than node ModifiedIndex %d", key, opts.PrevIndex, node.ModifiedIndex)
+		}
+
+		if opts.Dir != node.Dir {
+			if opts.Dir == true {
+				return nil, me.newError(etcd.ErrorCodeNotDir, "Key %s is not a directory", key)
+			} else {
+				return nil, me.newError(etcd.ErrorCodeNotFile, "Key %s is not a file", key)
+			}
+		}
+
+		if opts.PrevValue != "" && opts.PrevValue != node.Value {
+			return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevValue did not match", key)
+		}
+
+		prevNode := me.copyNode(node, false)
+
+		node.Value = value
+
+		me.index += 1
+		node.ModifiedIndex = me.index
+
+		if opts.TTL > 0 {
+			exp := time.Now().Add(opts.TTL)
+			node.Expiration = &exp
+		}
+
+		resp = &etcd.Response{
+			Action:   action,
+			Node:     me.copyNode(node, false),
+			PrevNode: prevNode,
+			Index:    me.index,
+		}
+		me.sendEvent(resp)
+	} else {
+		// Create the node and its parents
+		path, err := me.getKeyPath(key)
+		if err != nil {
+			return nil, err
+		}
+
+		_, resp, err = me.makeNode(path, value, opts.Dir, opts.TTL)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	return resp, nil
+}
+
+func (me *mockEtcd) Set(ctx context.Context, key, value string, opts *etcd.SetOptions) (*etcd.Response, error) {
+	return me.set(ctx, key, value, opts, "set")
+}
+
+// Removes a node and all children
+func (me *mockEtcd) deleteNode(node *etcd.Node, parent *etcd.Node, recursive bool) (*etcd.Response, error) {
+	for _, child := range me.nodes {
+		if isChild, directChild := isChild(node, child); isChild {
+			if recursive == false {
+				return nil, me.newError(etcd.ErrorCodeDirNotEmpty, "Key %s not empty", node.Key)
+			}
+
+			if directChild {
+				me.deleteNode(child, node, true)
+				me.index += 1
+				node.ModifiedIndex = me.index
+			}
+		}
+	}
+
+	me.index += 1
+	resp := &etcd.Response{
+		Action: "delete",
+		Node:   me.copyNode(node, false),
+		Index:  me.index,
+	}
+	me.sendEvent(resp)
+
+	delete(me.nodes, node.Key)
+
+	return resp, nil
+}
+
+func (me *mockEtcd) Delete(ctx context.Context, key string, opts *etcd.DeleteOptions) (*etcd.Response, error) {
+	node, parent, err := me.findNode(key)
+	if err != nil {
+		return nil, err
+	}
+	if node == nil {
+		return nil, me.newError(etcd.ErrorCodeKeyNotFound, "Key %s not found", key)
+	}
+
+	if opts == nil {
+		opts = &etcd.DeleteOptions{}
+	}
+
+	if opts.PrevIndex > 0 && opts.PrevIndex < node.ModifiedIndex {
+		return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevIndex %s less than node ModifiedIndex %d", key, opts.PrevIndex, node.ModifiedIndex)
+	}
+
+	if opts.PrevValue != "" && opts.PrevValue != node.Value {
+		return nil, me.newError(etcd.ErrorCodeTestFailed, "Key %s PrevValue did not match", key)
+	}
+
+	if opts.Dir != node.Dir {
+		if opts.Dir == true {
+			return nil, me.newError(etcd.ErrorCodeNotDir, "Key %s is not a directory", key)
+		} else {
+			return nil, me.newError(etcd.ErrorCodeNotFile, "Key %s is not a file", key)
+		}
+	}
+
+	return me.deleteNode(node, parent, opts.Recursive)
+}
+
+func (me *mockEtcd) Create(ctx context.Context, key, value string) (*etcd.Response, error) {
+	return me.set(ctx, key, value, &etcd.SetOptions{PrevExist: etcd.PrevNoExist}, "create")
+}
+
+func (me *mockEtcd) CreateInOrder(ctx context.Context, dir, value string, opts *etcd.CreateInOrderOptions) (*etcd.Response, error) {
+	panic(fmt.Errorf("Not implemented!"))
+}
+
+func (me *mockEtcd) Update(ctx context.Context, key, value string) (*etcd.Response, error) {
+	return me.set(ctx, key, value, &etcd.SetOptions{PrevExist: etcd.PrevExist}, "update")
+}
+
+type watcher struct {
+	parent     *mockEtcd
+	key        string
+	childMatch string
+	events     chan *etcd.Response
+	after      uint64
+	recursive  bool
+}
+
+func (me *mockEtcd) Watcher(key string, opts *etcd.WatcherOptions) etcd.Watcher {
+	watcher := &watcher{
+		parent:     me,
+		key:        key,
+		childMatch: fmt.Sprintf("%s/", key),
+		events:     make(chan *etcd.Response, 25),
+		recursive:  opts.Recursive,
+	}
+	if opts.AfterIndex > 0 {
+		watcher.after = opts.AfterIndex
+	}
+	return watcher
+}
+
+func (w *watcher) shouldGrabEvent(resp *etcd.Response) bool {
+	return (resp.Index > w.after) && ((resp.Node.Key == w.key) || (w.recursive && strings.HasPrefix(resp.Node.Key, w.childMatch)))
+}
+
+func (w *watcher) notifyEvent(resp *etcd.Response) {
+	if w.shouldGrabEvent(resp) {
+		w.events <- resp
+	}
+}
+
+func (w *watcher) Next(ctx context.Context) (*etcd.Response, error) {
+	// If the event is already in the history log return it from there
+	for _, e := range w.parent.events {
+		if e.Index > w.after && w.shouldGrabEvent(e) {
+			w.after = e.Index
+			return e, nil
+		}
+	}
+
+	// Watch must handle adding and removing itself from the parent when
+	// it's done to ensure it can be garbage collected correctly
+	w.parent.watchers[w] = w
+
+	// Otherwise wait for new events
+	for {
+		select {
+		case e := <-w.events:
+			// Might have already been grabbed through the history log
+			if e.Index <= w.after {
+				continue
+			}
+			w.after = e.Index
+			delete(w.parent.watchers, w)
+			return e, nil
+		case <-ctx.Done():
+			delete(w.parent.watchers, w)
+			return nil, context.Canceled
+		}
+	}
+}

+ 247 - 0
subnet/mock_etcd_test.go

@@ -0,0 +1,247 @@
+// Copyright 2015 flannel authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package subnet
+
+import (
+	"fmt"
+	"sync"
+	"testing"
+
+	etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+func expectSuccess(t *testing.T, r *etcd.Response, err error, expected *etcd.Response, expectedValue string) {
+	if err != nil {
+		t.Fatalf("Failed to get etcd keys: %v", err)
+	}
+	if r == nil {
+		t.Fatal("etcd response was nil")
+	}
+	if r.Action != expected.Action {
+		t.Fatalf("Unexpected action %s (expected %s)", r.Action, expected.Action)
+	}
+	if r.Index < expected.Index {
+		t.Fatalf("Unexpected response index %v (expected >= %v)", r.Index, expected.Index)
+	}
+	if expected.Node != nil {
+		if expected.Node.Key != r.Node.Key {
+			t.Fatalf("Unexpected response node %s key %s (expected %s)", r.Node.Key, r.Node.Key, expected.Node.Key)
+		}
+		if expected.Node.Value != r.Node.Value {
+			t.Fatalf("Unexpected response node %s value %s (expected %s)", r.Node.Key, r.Node.Value, expected.Node.Value)
+		}
+		if expected.Node.Dir != r.Node.Dir {
+			t.Fatalf("Unexpected response node %s dir %v (expected %v)", r.Node.Key, r.Node.Dir, expected.Node.Dir)
+		}
+		if expected.Node.CreatedIndex != r.Node.CreatedIndex {
+			t.Fatalf("Unexpected response node %s CreatedIndex %v (expected %v)", r.Node.Key, r.Node.CreatedIndex, expected.Node.CreatedIndex)
+		}
+		if expected.Node.ModifiedIndex > r.Node.ModifiedIndex {
+			t.Fatalf("Unexpected response node %s ModifiedIndex %v (expected %v)", r.Node.Key, r.Node.ModifiedIndex, expected.Node.ModifiedIndex)
+		}
+	}
+	if expectedValue != "" {
+		if r.Node == nil {
+			t.Fatalf("Unexpected empty response node")
+		}
+		if r.Node.Value != expectedValue {
+			t.Fatalf("Unexpected response node %s value %s (expected %s)", r.Node.Key, r.Node.Value, expectedValue)
+		}
+	}
+}
+
+func watchMockEtcd(ctx context.Context, watcher etcd.Watcher, result chan error) {
+	type evt struct {
+		key      string
+		event    string
+		received bool
+	}
+
+	expected := []evt{
+		{"/coreos.com/network/foobar/config", "create", false},
+		{"/coreos.com/network/blah/config", "create", false},
+		{"/coreos.com/network/blah/config", "update", false},
+		{"/coreos.com/network/foobar/config", "delete", false},
+		{"/coreos.com/network/foobar", "delete", false},
+	}
+
+	// Wait for delete events on /coreos.com/network/foobar and its
+	// 'config' child, and for the update event on
+	// /coreos.com/network/foobar (for 'config' delete) and on
+	// /coreos.com/network (for 'foobar' delete)
+	numEvents := 0
+	for {
+		resp, err := watcher.Next(ctx)
+		if err != nil {
+			if err == context.Canceled {
+				break
+			}
+			result <- fmt.Errorf("Unexpected error watching for event: %v", err)
+			break
+		}
+		if resp.Node == nil {
+			result <- fmt.Errorf("Unexpected empty node watching for event")
+			break
+		}
+		found := false
+		for i, e := range expected {
+			if e.key == resp.Node.Key && e.event == resp.Action {
+				if expected[i].received != true {
+					expected[i].received = true
+					found = true
+					numEvents += 1
+				}
+				break
+			}
+		}
+		if found == false {
+			result <- fmt.Errorf("Received unexpected or already received event %v", resp)
+			break
+		}
+
+		if numEvents == len(expected) {
+			result <- nil
+			break
+		}
+	}
+}
+
+func TestMockEtcd(t *testing.T) {
+	m := newMockEtcd()
+
+	ctx, _ := context.WithCancel(context.Background())
+
+	// Sanity tests for our mock etcd
+
+	// Ensure no entries yet exist
+	opts := &etcd.GetOptions{Recursive: true}
+	r, err := m.Get(ctx, "/", opts)
+	e := &etcd.Response{Action: "get", Index: 1000, Node: m.nodes["/"]}
+	expectSuccess(t, r, err, e, "")
+
+	// Create base test keys
+	sopts := &etcd.SetOptions{Dir: true}
+	r, err = m.Set(ctx, "/coreos.com/network", "", sopts)
+	e = &etcd.Response{Action: "create", Index: 1002}
+	expectSuccess(t, r, err, e, "")
+
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	startWg := sync.WaitGroup{}
+	startWg.Add(1)
+	result := make(chan error, 1)
+	go func() {
+		wopts := &etcd.WatcherOptions{AfterIndex: m.index, Recursive: true}
+		watcher := m.Watcher("/coreos.com/network", wopts)
+		startWg.Done()
+		watchMockEtcd(ctx, watcher, result)
+		wg.Done()
+	}()
+	startWg.Wait()
+
+	// Populate etcd with some keys
+	netKey1 := "/coreos.com/network/foobar/config"
+	netValue := "{ \"Network\": \"10.1.0.0/16\", \"Backend\": { \"Type\": \"host-gw\" } }"
+	r, err = m.Create(ctx, netKey1, netValue)
+	e = &etcd.Response{Action: "create", Index: 1004}
+	expectSuccess(t, r, err, e, netValue)
+
+	netKey2 := "/coreos.com/network/blah/config"
+	netValue = "{ \"Network\": \"10.1.1.0/16\", \"Backend\": { \"Type\": \"host-gw\" } }"
+	r, err = m.Create(ctx, netKey2, netValue)
+	e = &etcd.Response{Action: "create", Index: 1006}
+	expectSuccess(t, r, err, e, netValue)
+
+	// Get it again
+	expectedNode := r.Node
+	opts = &etcd.GetOptions{Recursive: false}
+	r, err = m.Get(ctx, netKey2, opts)
+	e = &etcd.Response{Action: "get", Index: m.index, Node: expectedNode}
+	expectSuccess(t, r, err, e, netValue)
+
+	// Update it
+	netValue = "ReallyCoolValue"
+	r, err = m.Update(ctx, netKey2, netValue)
+	e = &etcd.Response{Action: "update", Index: m.index}
+	expectSuccess(t, r, err, e, netValue)
+
+	// Get it again
+	opts = &etcd.GetOptions{Recursive: false}
+	r, err = m.Get(ctx, netKey2, opts)
+	e = &etcd.Response{Action: "get", Index: m.index}
+	expectSuccess(t, r, err, e, netValue)
+
+	// test directory listing
+	opts = &etcd.GetOptions{Recursive: true}
+	r, err = m.Get(ctx, "/coreos.com/network/", opts)
+	e = &etcd.Response{Action: "get", Index: 1007}
+	expectSuccess(t, r, err, e, "")
+
+	if len(r.Node.Nodes) != 2 {
+		t.Fatalf("Unexpected %d children in response (expected 2)", len(r.Node.Nodes))
+	}
+	node1Found := false
+	node2Found := false
+	for _, child := range r.Node.Nodes {
+		if child.Dir != true {
+			t.Fatalf("Unexpected non-directory child %s", child.Key)
+		}
+		if child.Key == "/coreos.com/network/foobar" {
+			node1Found = true
+		} else if child.Key == "/coreos.com/network/blah" {
+			node2Found = true
+		} else {
+			t.Fatalf("Unexpected child %s found", child.Key)
+		}
+		if len(child.Nodes) != 1 {
+			t.Fatalf("Unexpected %d children in response (expected 2)", len(r.Node.Nodes))
+		}
+	}
+	if node1Found == false || node2Found == false {
+		t.Fatalf("Failed to find expected children")
+	}
+
+	// Delete a key
+	dopts := &etcd.DeleteOptions{Recursive: true, Dir: false}
+	r, err = m.Delete(ctx, "/coreos.com/network/foobar", dopts)
+	if err == nil {
+		t.Fatalf("Unexpected success deleting a directory")
+	}
+
+	// Delete a key
+	dopts = &etcd.DeleteOptions{Recursive: true, Dir: true}
+	r, err = m.Delete(ctx, "/coreos.com/network/foobar", dopts)
+	e = &etcd.Response{Action: "delete", Index: 1010}
+	expectSuccess(t, r, err, e, "")
+
+	// Get it again; should fail
+	opts = &etcd.GetOptions{Recursive: false}
+	r, err = m.Get(ctx, netKey1, opts)
+	if err == nil {
+		t.Fatalf("Get of %s after delete unexpectedly succeeded", netKey1)
+	}
+	if r != nil {
+		t.Fatalf("Unexpected non-nil response to get after delete %v", r)
+	}
+
+	wg.Wait()
+
+	// Check errors from watch goroutine
+	watchResult := <-result
+	if watchResult != nil {
+		t.Fatalf("Error watching keys: %v", watchResult)
+	}
+}

+ 10 - 2
subnet/registry.go

@@ -57,7 +57,10 @@ type EtcdConfig struct {
 	Prefix    string
 }
 
+type etcdNewFunc func(c *EtcdConfig) (etcd.KeysAPI, error)
+
 type etcdSubnetRegistry struct {
+	cliNewFunc   etcdNewFunc
 	mux          sync.Mutex
 	cli          etcd.KeysAPI
 	etcdCfg      *EtcdConfig
@@ -87,14 +90,19 @@ func newEtcdClient(c *EtcdConfig) (etcd.KeysAPI, error) {
 	return etcd.NewKeysAPI(cli), nil
 }
 
-func newEtcdSubnetRegistry(config *EtcdConfig) (Registry, error) {
+func newEtcdSubnetRegistry(config *EtcdConfig, cliNewFunc etcdNewFunc) (Registry, error) {
 	r := &etcdSubnetRegistry{
 		etcdCfg:      config,
 		networkRegex: regexp.MustCompile(config.Prefix + `/([^/]*)(/|/config)?$`),
 	}
+	if cliNewFunc != nil {
+		r.cliNewFunc = cliNewFunc
+	} else {
+		r.cliNewFunc = newEtcdClient
+	}
 
 	var err error
-	r.cli, err = newEtcdClient(config)
+	r.cli, err = r.cliNewFunc(config)
 	if err != nil {
 		return nil, err
 	}

+ 202 - 0
subnet/registry_test.go

@@ -0,0 +1,202 @@
+// Copyright 2015 flannel authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package subnet
+
+import (
+	"fmt"
+	"sync"
+	"testing"
+	"time"
+
+	etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+
+	"github.com/coreos/flannel/pkg/ip"
+)
+
+func newTestEtcdRegistry(t *testing.T) (Registry, *mockEtcd) {
+	cfg := &EtcdConfig{
+		Endpoints: []string{"http://127.0.0.1:4001", "http://127.0.0.1:2379"},
+		Prefix:    "/coreos.com/network",
+	}
+
+	r, err := newEtcdSubnetRegistry(cfg, func(c *EtcdConfig) (etcd.KeysAPI, error) {
+		return newMockEtcd(), nil
+	})
+	if err != nil {
+		t.Fatal("Failed to create etcd subnet registry")
+	}
+
+	return r, r.(*etcdSubnetRegistry).cli.(*mockEtcd)
+}
+
+func watchSubnets(t *testing.T, r Registry, ctx context.Context, sn ip.IP4Net, nextIndex uint64, result chan error) {
+	type leaseEvent struct {
+		etype  EventType
+		subnet ip.IP4Net
+		found  bool
+	}
+	expectedEvents := []leaseEvent{
+		{EventAdded, sn, false},
+		{EventRemoved, sn, false},
+	}
+
+	numFound := 0
+	for {
+		evt, index, err := r.watchSubnets(ctx, "foobar", nextIndex)
+
+		switch {
+		case err == nil:
+			nextIndex = index
+			for _, exp := range expectedEvents {
+				if evt.Type != exp.etype {
+					continue
+				}
+				if exp.found == true {
+					result <- fmt.Errorf("Subnet event type already found: %v", exp)
+					return
+				}
+				if !evt.Lease.Subnet.Equal(exp.subnet) {
+					result <- fmt.Errorf("Subnet event lease %v mismatch (expected %v)", evt.Lease.Subnet, exp.subnet)
+				}
+				exp.found = true
+				numFound += 1
+			}
+			if numFound == len(expectedEvents) {
+				// All done; success
+				result <- nil
+				return
+			}
+		case isIndexTooSmall(err):
+			nextIndex = err.(etcd.Error).Index
+
+		default:
+			result <- fmt.Errorf("Error watching subnet leases: %v", err)
+			return
+		}
+	}
+
+	result <- fmt.Errorf("Should never get here")
+}
+
+func TestEtcdRegistry(t *testing.T) {
+	r, m := newTestEtcdRegistry(t)
+
+	ctx, _ := context.WithCancel(context.Background())
+
+	networks, _, err := r.getNetworks(ctx)
+	if err != nil {
+		t.Fatal("Failed to get networks")
+	}
+	if len(networks) != 0 {
+		t.Fatal("Networks should be empty")
+	}
+
+	// Populate etcd with a network
+	netKey := "/coreos.com/network/foobar/config"
+	netValue := "{ \"Network\": \"10.1.0.0/16\", \"Backend\": { \"Type\": \"host-gw\" } }"
+	m.Create(ctx, netKey, netValue)
+
+	networks, _, err = r.getNetworks(ctx)
+	if err != nil {
+		t.Fatal("Failed to get networks the second time")
+	}
+	if len(networks) != 1 {
+		t.Fatal("Failed to find expected network foobar")
+	}
+
+	config, err := r.getNetworkConfig(ctx, "foobar")
+	if err != nil {
+		t.Fatal("Failed to get network config")
+	}
+	if config != netValue {
+		t.Fatal("Failed to match network config")
+	}
+
+	sn := ip.IP4Net{
+		IP:        ip.MustParseIP4("10.1.5.0"),
+		PrefixLen: 24,
+	}
+
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	startWg := sync.WaitGroup{}
+	startWg.Add(1)
+	result := make(chan error, 1)
+	go func() {
+		startWg.Done()
+		watchSubnets(t, r, ctx, sn, m.index, result)
+		wg.Done()
+	}()
+
+	startWg.Wait()
+	// Lease a subnet for the network
+	attrs := &LeaseAttrs{
+		PublicIP: ip.MustParseIP4("1.2.3.4"),
+	}
+	exp, err := r.createSubnet(ctx, "foobar", sn, attrs, 24*time.Hour)
+	if err != nil {
+		t.Fatal("Failed to create subnet lease")
+	}
+	if !exp.After(time.Now()) {
+		t.Fatal("Subnet lease duration %v not in the future", exp)
+	}
+
+	// Make sure the lease got created
+	resp, err := m.Get(ctx, "/coreos.com/network/foobar/subnets/10.1.5.0-24", nil)
+	if err != nil {
+		t.Fatal("Failed to verify subnet lease directly in etcd: %v", err)
+	}
+	if resp == nil || resp.Node == nil {
+		t.Fatal("Failed to retrive node in subnet lease")
+	}
+	if resp.Node.Value != "{\"PublicIP\":\"1.2.3.4\"}" {
+		t.Fatal("Unexpected subnet lease node %s value %s", resp.Node.Key, resp.Node.Value)
+	}
+
+	leases, _, err := r.getSubnets(ctx, "foobar")
+	if len(leases) != 1 {
+		t.Fatalf("Unexpected number of leases %d (expected 1)", len(leases))
+	}
+	if !leases[0].Subnet.Equal(sn) {
+		t.Fatalf("Mismatched subnet %v (expected %v)", leases[0].Subnet, sn)
+	}
+
+	lease, _, err := r.getSubnet(ctx, "foobar", sn)
+	if lease == nil {
+		t.Fatal("Missing subnet lease")
+	}
+
+	err = r.deleteSubnet(ctx, "foobar", sn)
+	if err != nil {
+		t.Fatalf("Failed to delete subnet %v: %v", sn, err)
+	}
+
+	// Make sure the lease got deleted
+	resp, err = m.Get(ctx, "/coreos.com/network/foobar/subnets/10.1.5.0-24", nil)
+	if err == nil {
+		t.Fatal("Unexpected success getting deleted subnet")
+	}
+
+	wg.Wait()
+
+	// Check errors from watch goroutine
+	watchResult := <-result
+	if watchResult != nil {
+		t.Fatalf("Error watching keys: %v", watchResult)
+	}
+
+	// TODO: watchSubnet and watchNetworks
+}