Parcourir la source

add client/server option

This adds an option to run flannel as a client and a server.
The server talks to etcd and exposes an API. The client talks
to the server.

Fixes #148
Eugene Yakubovich il y a 9 ans
Parent
commit
001292b435
10 fichiers modifiés avec 800 ajouts et 141 suppressions
  1. 26 6
      main.go
  2. 197 0
      remote/client.go
  3. 38 0
      remote/http_logger.go
  4. 167 0
      remote/remote_test.go
  5. 183 0
      remote/server.go
  6. 154 0
      subnet/mock_registry.go
  7. 20 0
      subnet/mock_subnet.go
  8. 4 3
      subnet/subnet.go
  9. 10 131
      subnet/subnet_test.go
  10. 1 1
      test

+ 26 - 6
main.go

@@ -31,6 +31,7 @@ import (
 	"github.com/coreos/flannel/backend"
 	"github.com/coreos/flannel/network"
 	"github.com/coreos/flannel/pkg/ip"
+	"github.com/coreos/flannel/remote"
 	"github.com/coreos/flannel/subnet"
 )
 
@@ -46,6 +47,8 @@ type CmdLineOpts struct {
 	subnetFile    string
 	subnetDir     string
 	iface         string
+	listen        string
+	remote        string
 	networks      string
 }
 
@@ -60,6 +63,8 @@ func init() {
 	flag.StringVar(&opts.subnetFile, "subnet-file", "/run/flannel/subnet.env", "filename where env variables (subnet, MTU, ... ) will be written to")
 	flag.StringVar(&opts.subnetDir, "subnet-dir", "/run/flannel/networks", "directory where files with env variables (subnet, MTU, ...) will be written to")
 	flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
+	flag.StringVar(&opts.listen, "listen", "", "run as server and listen on specified address (e.g. ':8080')")
+	flag.StringVar(&opts.remote, "remote", "", "run as client and connect to server on specified address (e.g. '10.1.2.3:8080')")
 	flag.StringVar(&opts.networks, "networks", "", "run in multi-network mode and service the specified networks")
 	flag.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
 	flag.BoolVar(&opts.help, "help", false, "print this message")
@@ -154,6 +159,10 @@ func isMultiNetwork() bool {
 }
 
 func newSubnetManager() (subnet.Manager, error) {
+	if opts.remote != "" {
+		return remote.NewRemoteManager(opts.remote), nil
+	}
+
 	cfg := &subnet.EtcdConfig{
 		Endpoints: strings.Split(opts.etcdEndpoints, ","),
 		Keyfile:   opts.etcdKeyfile,
@@ -243,12 +252,23 @@ func main() {
 
 	var runFunc func(ctx context.Context)
 
-	networks := strings.Split(opts.networks, ",")
-	if len(networks) == 0 {
-		networks = append(networks, "")
-	}
-	runFunc = func(ctx context.Context) {
-		initAndRun(ctx, sm, networks)
+	if opts.listen != "" {
+		if opts.remote != "" {
+			log.Error("--listen and --remote are mutually exclusive")
+			os.Exit(1)
+		}
+		log.Info("running as server")
+		runFunc = func(ctx context.Context) {
+			remote.RunServer(ctx, sm, opts.listen)
+		}
+	} else {
+		networks := strings.Split(opts.networks, ",")
+		if len(networks) == 0 {
+			networks = append(networks, "")
+		}
+		runFunc = func(ctx context.Context) {
+			initAndRun(ctx, sm, networks)
+		}
 	}
 
 	// Register for SIGINT and SIGTERM

+ 197 - 0
remote/client.go

@@ -0,0 +1,197 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package remote
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"path"
+
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+
+	"github.com/coreos/flannel/subnet"
+)
+
+// implements subnet.Manager by sending requests to the server
+type RemoteManager struct {
+	host string // includes scheme, host, and port
+}
+
+func NewRemoteManager(listenAddr string) subnet.Manager {
+	return &RemoteManager{host: "http://" + listenAddr}
+}
+
+func (m *RemoteManager) mkurl(network string, parts ...string) string {
+	if network == "" {
+		network = "/_"
+	}
+	if network[0] != '/' {
+		network = "/" + network
+	}
+	return m.host + path.Join(append([]string{network}, parts...)...)
+}
+
+func (m *RemoteManager) GetNetworkConfig(ctx context.Context, network string) (*subnet.Config, error) {
+	url := m.mkurl(network, "config")
+
+	resp, err := httpGet(ctx, url)
+	if err != nil {
+		return nil, err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		return nil, httpError(resp)
+	}
+
+	config := &subnet.Config{}
+	if err := json.NewDecoder(resp.Body).Decode(config); err != nil {
+		return nil, err
+	}
+
+	return config, nil
+}
+
+func (m *RemoteManager) AcquireLease(ctx context.Context, network string, attrs *subnet.LeaseAttrs) (*subnet.Lease, error) {
+	url := m.mkurl(network, "leases/")
+
+	body, err := json.Marshal(attrs)
+	if err != nil {
+		return nil, err
+	}
+
+	resp, err := httpPutPost(ctx, "POST", url, "application/json", body)
+	if err != nil {
+		return nil, err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		return nil, httpError(resp)
+	}
+
+	newLease := &subnet.Lease{}
+	if err := json.NewDecoder(resp.Body).Decode(newLease); err != nil {
+		return nil, err
+	}
+
+	return newLease, nil
+}
+
+func (m *RemoteManager) RenewLease(ctx context.Context, network string, lease *subnet.Lease) error {
+	url := m.mkurl(network, "leases", lease.Key())
+
+	body, err := json.Marshal(lease)
+	if err != nil {
+		return err
+	}
+
+	resp, err := httpPutPost(ctx, "PUT", url, "application/json", body)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		return httpError(resp)
+	}
+
+	newLease := &subnet.Lease{}
+	if err := json.NewDecoder(resp.Body).Decode(newLease); err != nil {
+		return err
+	}
+
+	*lease = *newLease
+	return nil
+}
+
+func (m *RemoteManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (subnet.WatchResult, error) {
+	url := m.mkurl(network, "leases")
+
+	if cursor != nil {
+		url = fmt.Sprintf("%v?next=%v", url, cursor)
+	}
+
+	resp, err := httpGet(ctx, url)
+	if err != nil {
+		return subnet.WatchResult{}, err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return subnet.WatchResult{}, httpError(resp)
+	}
+
+	wr := subnet.WatchResult{}
+	if err := json.NewDecoder(resp.Body).Decode(&wr); err != nil {
+		return subnet.WatchResult{}, err
+	}
+
+	return wr, nil
+}
+
+func httpError(resp *http.Response) error {
+	b, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		return err
+	}
+	return fmt.Errorf("%v: %v", resp.Status, string(b))
+}
+
+type httpRespErr struct {
+	resp *http.Response
+	err  error
+}
+
+func httpDo(ctx context.Context, req *http.Request) (*http.Response, error) {
+	// Run the HTTP request in a goroutine (so it can be canceled) and pass
+	// the result via the channel c
+	tr := &http.Transport{}
+	client := &http.Client{Transport: tr}
+	c := make(chan httpRespErr, 1)
+	go func() {
+		resp, err := client.Do(req)
+		c <- httpRespErr{resp, err}
+	}()
+
+	select {
+	case <-ctx.Done():
+		tr.CancelRequest(req)
+		<-c // Wait for f to return.
+		return nil, ctx.Err()
+	case r := <-c:
+		return r.resp, r.err
+	}
+}
+
+func httpGet(ctx context.Context, url string) (*http.Response, error) {
+	req, err := http.NewRequest("GET", url, nil)
+	if err != nil {
+		return nil, err
+	}
+
+	return httpDo(ctx, req)
+}
+
+func httpPutPost(ctx context.Context, method, url, contentType string, body []byte) (*http.Response, error) {
+	req, err := http.NewRequest(method, url, bytes.NewBuffer(body))
+	if err != nil {
+		return nil, err
+	}
+	req.Header.Set("Content-Type", contentType)
+	return httpDo(ctx, req)
+}

+ 38 - 0
remote/http_logger.go

@@ -0,0 +1,38 @@
+package remote
+
+import (
+	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
+	"net/http"
+)
+
+type httpResp struct {
+	writer http.ResponseWriter
+	status int
+}
+
+func (r *httpResp) Header() http.Header {
+	return r.writer.Header()
+}
+
+func (r *httpResp) Write(d []byte) (int, error) {
+	return r.writer.Write(d)
+}
+
+func (r *httpResp) WriteHeader(status int) {
+	r.status = status
+	r.writer.WriteHeader(status)
+}
+
+type httpLoggerHandler struct {
+	h http.Handler
+}
+
+func (lh httpLoggerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	resp := &httpResp{w, 0}
+	lh.h.ServeHTTP(resp, r)
+	log.Infof("%v %v - %v", r.Method, r.RequestURI, resp.status)
+}
+
+func httpLogger(h http.Handler) http.Handler {
+	return httpLoggerHandler{h}
+}

+ 167 - 0
remote/remote_test.go

@@ -0,0 +1,167 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package remote
+
+import (
+	"fmt"
+	"net"
+	"sync"
+	"testing"
+
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+
+	"github.com/coreos/flannel/pkg/ip"
+	"github.com/coreos/flannel/subnet"
+)
+
+const expectedNetwork = "10.1.0.0/16"
+
+func TestRemote(t *testing.T) {
+	config := fmt.Sprintf(`{"Network": %q}`, expectedNetwork)
+	sm := subnet.NewMockManager(1, config)
+
+	addr := "127.0.0.1:9999"
+
+	ctx, cancel := context.WithCancel(context.Background())
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	go func() {
+		RunServer(ctx, sm, addr)
+		wg.Done()
+	}()
+
+	doTestRemote(ctx, t, addr)
+
+	cancel()
+	wg.Wait()
+}
+
+func mustParseIP4(s string) ip.IP4 {
+	a, err := ip.ParseIP4(s)
+	if err != nil {
+		panic(err)
+	}
+	return a
+}
+
+func mustParseIP4Net(s string) ip.IP4Net {
+	_, n, err := net.ParseCIDR(s)
+	if err != nil {
+		panic(err)
+	}
+	return ip.FromIPNet(n)
+}
+
+func doTestRemote(ctx context.Context, t *testing.T, remoteAddr string) {
+	sm := NewRemoteManager(remoteAddr)
+
+	cfg, err := sm.GetNetworkConfig(ctx, "_")
+	if err != nil {
+		t.Errorf("GetNetworkConfig failed: %v", err)
+	}
+
+	if cfg.Network.String() != expectedNetwork {
+		t.Errorf("GetNetworkConfig returned bad network: %v vs %v", cfg.Network, expectedNetwork)
+	}
+
+	attrs := &subnet.LeaseAttrs{
+		PublicIP: mustParseIP4("1.1.1.1"),
+	}
+	l, err := sm.AcquireLease(ctx, "_", attrs)
+	if err != nil {
+		t.Errorf("AcquireLease failed: %v", err)
+	}
+
+	if !mustParseIP4Net(expectedNetwork).Contains(l.Subnet.IP) {
+		t.Errorf("AcquireLease returned subnet not in network: %v (in %v)", l.Subnet, expectedNetwork)
+	}
+
+	if err = sm.RenewLease(ctx, "_", l); err != nil {
+		t.Errorf("RenewLease failed: %v", err)
+	}
+
+	doTestWatch(t, sm)
+}
+
+func doTestWatch(t *testing.T, sm subnet.Manager) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	res := make(chan error)
+	barrier := make(chan struct{})
+
+	sm.WatchLeases(ctx, "_", nil)
+
+	var expectedSubnet ip.IP4Net
+
+	go func() {
+		wr, err := sm.WatchLeases(ctx, "_", nil)
+		if err != nil {
+			res <- fmt.Errorf("WatchLeases failed: %v", err)
+			return
+		}
+		if len(wr.Events) > 0 && len(wr.Snapshot) > 0 {
+			res <- fmt.Errorf("WatchLeases returned events and snapshots")
+			return
+		}
+
+		res <- nil
+		<-barrier
+
+		wr, err = sm.WatchLeases(ctx, "_", wr.Cursor)
+		if err != nil {
+			res <- fmt.Errorf("WatchLeases failed: %v", err)
+			return
+		}
+		if len(wr.Events) == 0 {
+			res <- fmt.Errorf("WatchLeases returned empty events")
+			return
+		}
+
+		if wr.Events[0].Type != subnet.SubnetAdded {
+			res <- fmt.Errorf("WatchLeases returned event with wrong EventType: %v vs %v", wr.Events[0].Type, subnet.SubnetAdded)
+			return
+		}
+
+		if !wr.Events[0].Lease.Subnet.Equal(expectedSubnet) {
+			res <- fmt.Errorf("WatchLeases returned unexpected subnet: %v vs %v", wr.Events[0].Lease.Subnet, expectedSubnet)
+		}
+
+		res <- nil
+	}()
+
+	if err := <-res; err != nil {
+		t.Fatal(err.Error())
+	}
+
+	attrs := &subnet.LeaseAttrs{
+		PublicIP: mustParseIP4("1.1.1.2"),
+	}
+	l, err := sm.AcquireLease(ctx, "_", attrs)
+	if err != nil {
+		t.Errorf("AcquireLease failed: %v", err)
+		return
+	}
+	if !mustParseIP4Net(expectedNetwork).Contains(l.Subnet.IP) {
+		t.Errorf("AcquireLease returned subnet not in network: %v (in %v)", l.Subnet, expectedNetwork)
+	}
+
+	expectedSubnet = l.Subnet
+
+	barrier <- struct{}{}
+	if err := <-res; err != nil {
+		t.Fatal(err.Error())
+	}
+}

+ 183 - 0
remote/server.go

@@ -0,0 +1,183 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package remote
+
+import (
+	"encoding/json"
+	"fmt"
+	"net"
+	"net/http"
+	"net/url"
+	"strconv"
+
+	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
+	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/gorilla/mux"
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+
+	"github.com/coreos/flannel/subnet"
+)
+
+type handler func(context.Context, subnet.Manager, http.ResponseWriter, *http.Request)
+
+func jsonResponse(w http.ResponseWriter, code int, v interface{}) {
+	w.Header().Set("Content-Type", "application/json; charset=utf-8")
+	w.WriteHeader(code)
+	if err := json.NewEncoder(w).Encode(v); err != nil {
+		log.Error("Error JSON encoding response: %v", err)
+	}
+}
+
+// GET /{network}/config
+func handleGetNetworkConfig(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+
+	network := mux.Vars(r)["network"]
+	if network == "_" {
+		network = ""
+	}
+
+	c, err := sm.GetNetworkConfig(ctx, network)
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprint(w, err)
+		return
+	}
+
+	jsonResponse(w, http.StatusOK, c)
+}
+
+// POST /{network}/leases
+func handleAcquireLease(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+
+	network := mux.Vars(r)["network"]
+	if network == "_" {
+		network = ""
+	}
+
+	attrs := subnet.LeaseAttrs{}
+	if err := json.NewDecoder(r.Body).Decode(&attrs); err != nil {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprint(w, "JSON decoding error: ", err)
+		return
+	}
+
+	lease, err := sm.AcquireLease(ctx, network, &attrs)
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprint(w, err)
+		return
+	}
+
+	jsonResponse(w, http.StatusOK, lease)
+}
+
+// PUT /{network}/{lease.network}
+func handleRenewLease(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+
+	network := mux.Vars(r)["network"]
+	if network == "_" {
+		network = ""
+	}
+
+	lease := subnet.Lease{}
+	if err := json.NewDecoder(r.Body).Decode(&lease); err != nil {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprint(w, "JSON decoding error: ", err)
+		return
+	}
+
+	if err := sm.RenewLease(ctx, network, &lease); err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprint(w, err)
+		return
+	}
+
+	jsonResponse(w, http.StatusOK, lease)
+}
+
+func getCursor(u *url.URL) (interface{}, error) {
+	vals, ok := u.Query()["next"]
+	if !ok {
+		return nil, nil
+	}
+	index, err := strconv.ParseUint(vals[0], 10, 64)
+	return index, err
+}
+
+// GET /{network}/leases?next=cursor
+func handleWatchLeases(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
+	defer r.Body.Close()
+
+	network := mux.Vars(r)["network"]
+	if network == "_" {
+		network = ""
+	}
+
+	cursor, err := getCursor(r.URL)
+	if err != nil {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprint(w, "invalid 'next' value: ", err)
+		return
+	}
+
+	wr, err := sm.WatchLeases(ctx, network, cursor)
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprint(w, err)
+		return
+	}
+
+	jsonResponse(w, http.StatusOK, wr)
+}
+
+func bindHandler(h handler, ctx context.Context, sm subnet.Manager) http.HandlerFunc {
+	return func(resp http.ResponseWriter, req *http.Request) {
+		h(ctx, sm, resp, req)
+	}
+}
+
+func RunServer(ctx context.Context, sm subnet.Manager, listenAddr string) {
+	// {network} is always required a the API level but to
+	// keep backward compat, special "_" network is allowed
+	// that means "no network"
+
+	r := mux.NewRouter()
+	r.HandleFunc("/{network}/config", bindHandler(handleGetNetworkConfig, ctx, sm)).Methods("GET")
+	r.HandleFunc("/{network}/leases", bindHandler(handleAcquireLease, ctx, sm)).Methods("POST")
+	r.HandleFunc("/{network}/leases/{subnet}", bindHandler(handleRenewLease, ctx, sm)).Methods("PUT")
+	r.HandleFunc("/{network}/leases", bindHandler(handleWatchLeases, ctx, sm)).Methods("GET")
+
+	l, err := net.Listen("tcp", listenAddr)
+	if err != nil {
+		log.Errorf("Error listening on %v: %v", listenAddr, err)
+		return
+	}
+
+	c := make(chan error, 1)
+	go func() {
+		c <- http.Serve(l, httpLogger(r))
+	}()
+
+	select {
+	case <-ctx.Done():
+		l.Close()
+		<-c
+
+	case err := <-c:
+		log.Errorf("Error serving on %v: %v", listenAddr, err)
+	}
+}

+ 154 - 0
subnet/mock_registry.go

@@ -0,0 +1,154 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package subnet
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+)
+
+type mockSubnetRegistry struct {
+	config  *etcd.Node
+	subnets *etcd.Node
+	events  chan *etcd.Response
+	index   uint64
+	ttl     uint64
+}
+
+func newMockRegistry(ttlOverride uint64, config string, initialSubnets []*etcd.Node) *mockSubnetRegistry {
+	index := uint64(0)
+	for _, n := range initialSubnets {
+		if n.ModifiedIndex > index {
+			index = n.ModifiedIndex
+		}
+	}
+
+	return &mockSubnetRegistry{
+		config: &etcd.Node{
+			Value: config,
+		},
+		subnets: &etcd.Node{
+			Nodes: initialSubnets,
+		},
+		events: make(chan *etcd.Response, 1000),
+		index:  index + 1,
+		ttl:    ttlOverride,
+	}
+}
+
+func (msr *mockSubnetRegistry) getConfig(ctx context.Context, network string) (*etcd.Response, error) {
+	return &etcd.Response{
+		EtcdIndex: msr.index,
+		Node:      msr.config,
+	}, nil
+}
+
+func (msr *mockSubnetRegistry) getSubnets(ctx context.Context, network string) (*etcd.Response, error) {
+	return &etcd.Response{
+		Node:      msr.subnets,
+		EtcdIndex: msr.index,
+	}, nil
+}
+
+func (msr *mockSubnetRegistry) createSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error) {
+	msr.index += 1
+
+	if msr.ttl > 0 {
+		ttl = msr.ttl
+	}
+
+	// add squared durations :)
+	exp := time.Now().Add(time.Duration(ttl) * time.Second)
+
+	node := &etcd.Node{
+		Key:           sn,
+		Value:         data,
+		ModifiedIndex: msr.index,
+		Expiration:    &exp,
+	}
+
+	msr.subnets.Nodes = append(msr.subnets.Nodes, node)
+	msr.events <- &etcd.Response{
+		Action: "add",
+		Node:   node,
+	}
+
+	return &etcd.Response{
+		Node:      node,
+		EtcdIndex: msr.index,
+	}, nil
+}
+
+func (msr *mockSubnetRegistry) updateSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error) {
+	msr.index += 1
+
+	// add squared durations :)
+	exp := time.Now().Add(time.Duration(ttl) * time.Second)
+
+	for _, n := range msr.subnets.Nodes {
+		if n.Key == sn {
+			n.Value = data
+			n.ModifiedIndex = msr.index
+			n.Expiration = &exp
+			msr.events <- &etcd.Response{
+				Action: "add",
+				Node:   n,
+			}
+
+			return &etcd.Response{
+				Node:      n,
+				EtcdIndex: msr.index,
+			}, nil
+		}
+	}
+
+	return nil, fmt.Errorf("Subnet not found")
+}
+
+func (msr *mockSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (*etcd.Response, error) {
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+
+	case r := <-msr.events:
+		return r, nil
+	}
+}
+
+func (msr *mockSubnetRegistry) hasSubnet(sn string) bool {
+	for _, n := range msr.subnets.Nodes {
+		if n.Key == sn {
+			return true
+		}
+	}
+	return false
+}
+
+func (msr *mockSubnetRegistry) expireSubnet(sn string) {
+	for i, n := range msr.subnets.Nodes {
+		if n.Key == sn {
+			msr.subnets.Nodes[i] = msr.subnets.Nodes[len(msr.subnets.Nodes)-1]
+			msr.subnets.Nodes = msr.subnets.Nodes[:len(msr.subnets.Nodes)-2]
+			msr.events <- &etcd.Response{
+				Action: "expire",
+				Node:   n,
+			}
+			return
+		}
+	}
+}

+ 20 - 0
subnet/mock_subnet.go

@@ -0,0 +1,20 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package subnet
+
+func NewMockManager(ttlOverride uint64, config string) Manager {
+	r := newMockRegistry(ttlOverride, config, nil)
+	return newEtcdManager(r)
+}

+ 4 - 3
subnet/subnet.go

@@ -17,10 +17,10 @@ package subnet
 import (
 	"encoding/json"
 	"errors"
+	"fmt"
 	"time"
 
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
-
 	"github.com/coreos/flannel/pkg/ip"
 )
 
@@ -80,11 +80,12 @@ func (et EventType) MarshalJSON() ([]byte, error) {
 
 func (et *EventType) UnmarshalJSON(data []byte) error {
 	switch string(data) {
-	case "added":
+	case "\"added\"":
 		*et = SubnetAdded
-	case "removed":
+	case "\"removed\"":
 		*et = SubnetRemoved
 	default:
+		fmt.Println(string(data))
 		return errors.New("bad event type")
 	}
 

+ 10 - 131
subnet/subnet_test.go

@@ -27,141 +27,20 @@ import (
 	"github.com/coreos/flannel/pkg/ip"
 )
 
-type mockSubnetRegistry struct {
-	subnets *etcd.Node
-	addCh   chan string
-	delCh   chan string
-	index   uint64
-	ttl     uint64
-}
-
-func newMockSubnetRegistry(ttlOverride uint64) *mockSubnetRegistry {
-	subnodes := []*etcd.Node{
+func newDummyRegistry(ttlOverride uint64) *mockSubnetRegistry {
+	subnets := []*etcd.Node{
 		&etcd.Node{Key: "10.3.1.0-24", Value: `{ "PublicIP": "1.1.1.1" }`, ModifiedIndex: 10},
 		&etcd.Node{Key: "10.3.2.0-24", Value: `{ "PublicIP": "1.1.1.1" }`, ModifiedIndex: 11},
 		&etcd.Node{Key: "10.3.4.0-24", Value: `{ "PublicIP": "1.1.1.1" }`, ModifiedIndex: 12},
 		&etcd.Node{Key: "10.3.5.0-24", Value: `{ "PublicIP": "1.1.1.1" }`, ModifiedIndex: 13},
 	}
 
-	return &mockSubnetRegistry{
-		subnets: &etcd.Node{
-			Nodes: subnodes,
-		},
-		addCh: make(chan string),
-		delCh: make(chan string),
-		index: 14,
-		ttl:   ttlOverride,
-	}
-}
-
-func (msr *mockSubnetRegistry) getConfig(ctx context.Context, network string) (*etcd.Response, error) {
-	return &etcd.Response{
-		EtcdIndex: msr.index,
-		Node: &etcd.Node{
-			Value: `{ "Network": "10.3.0.0/16", "SubnetMin": "10.3.1.0", "SubnetMax": "10.3.5.0" }`,
-		},
-	}, nil
-}
-
-func (msr *mockSubnetRegistry) getSubnets(ctx context.Context, network string) (*etcd.Response, error) {
-	return &etcd.Response{
-		Node:      msr.subnets,
-		EtcdIndex: msr.index,
-	}, nil
-}
-
-func (msr *mockSubnetRegistry) createSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error) {
-	msr.index += 1
-
-	if msr.ttl > 0 {
-		ttl = msr.ttl
-	}
-
-	// add squared durations :)
-	exp := time.Now().Add(time.Duration(ttl) * time.Second)
-
-	node := &etcd.Node{
-		Key:           sn,
-		Value:         data,
-		ModifiedIndex: msr.index,
-		Expiration:    &exp,
-	}
-
-	msr.subnets.Nodes = append(msr.subnets.Nodes, node)
-
-	return &etcd.Response{
-		Node:      node,
-		EtcdIndex: msr.index,
-	}, nil
-}
-
-func (msr *mockSubnetRegistry) updateSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error) {
-	msr.index += 1
-
-	// add squared durations :)
-	exp := time.Now().Add(time.Duration(ttl) * time.Second)
-
-	for _, n := range msr.subnets.Nodes {
-		if n.Key == sn {
-			n.Value = data
-			n.ModifiedIndex = msr.index
-			n.Expiration = &exp
-
-			return &etcd.Response{
-				Node:      n,
-				EtcdIndex: msr.index,
-			}, nil
-		}
-	}
-
-	return nil, fmt.Errorf("Subnet not found")
-}
-
-func (msr *mockSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (*etcd.Response, error) {
-	var sn string
-
-	select {
-	case <-ctx.Done():
-		return nil, ctx.Err()
-
-	case sn = <-msr.addCh:
-		n := etcd.Node{
-			Key:           sn,
-			Value:         `{"PublicIP": "1.1.1.1"}`,
-			ModifiedIndex: msr.index,
-		}
-		msr.subnets.Nodes = append(msr.subnets.Nodes, &n)
-		return &etcd.Response{
-			Action: "add",
-			Node:   &n,
-		}, nil
-
-	case sn = <-msr.delCh:
-		for i, n := range msr.subnets.Nodes {
-			if n.Key == sn {
-				msr.subnets.Nodes[i] = msr.subnets.Nodes[len(msr.subnets.Nodes)-1]
-				msr.subnets.Nodes = msr.subnets.Nodes[:len(msr.subnets.Nodes)-2]
-				return &etcd.Response{
-					Action: "expire",
-					Node:   n,
-				}, nil
-			}
-		}
-		return nil, fmt.Errorf("Subnet (%s) to delete was not found: ", sn)
-	}
-}
-
-func (msr *mockSubnetRegistry) hasSubnet(sn string) bool {
-	for _, n := range msr.subnets.Nodes {
-		if n.Key == sn {
-			return true
-		}
-	}
-	return false
+	config := `{ "Network": "10.3.0.0/16", "SubnetMin": "10.3.1.0", "SubnetMax": "10.3.5.0" }`
+	return newMockRegistry(ttlOverride, config, subnets)
 }
 
 func TestAcquireLease(t *testing.T) {
-	msr := newMockSubnetRegistry(0)
+	msr := newDummyRegistry(0)
 	sm := newEtcdManager(msr)
 
 	extIP, _ := ip.ParseIP4("1.2.3.4")
@@ -189,7 +68,7 @@ func TestAcquireLease(t *testing.T) {
 }
 
 func TestWatchLeaseAdded(t *testing.T) {
-	msr := newMockSubnetRegistry(0)
+	msr := newDummyRegistry(0)
 	sm := newEtcdManager(msr)
 
 	ctx, cancel := context.WithCancel(context.Background())
@@ -199,7 +78,7 @@ func TestWatchLeaseAdded(t *testing.T) {
 	go WatchLeases(ctx, sm, "", events)
 
 	expected := "10.3.3.0-24"
-	msr.addCh <- expected
+	msr.createSubnet(ctx, "_", expected, `{"PublicIP": "1.1.1.1"}`, 0)
 
 	evtBatch, ok := <-events
 	if !ok {
@@ -223,7 +102,7 @@ func TestWatchLeaseAdded(t *testing.T) {
 }
 
 func TestWatchLeaseRemoved(t *testing.T) {
-	msr := newMockSubnetRegistry(0)
+	msr := newDummyRegistry(0)
 	sm := newEtcdManager(msr)
 
 	ctx, cancel := context.WithCancel(context.Background())
@@ -233,7 +112,7 @@ func TestWatchLeaseRemoved(t *testing.T) {
 	go WatchLeases(ctx, sm, "", events)
 
 	expected := "10.3.4.0-24"
-	msr.delCh <- expected
+	msr.expireSubnet(expected)
 
 	evtBatch, ok := <-events
 	if !ok {
@@ -261,7 +140,7 @@ type leaseData struct {
 }
 
 func TestRenewLease(t *testing.T) {
-	msr := newMockSubnetRegistry(1)
+	msr := newDummyRegistry(1)
 	sm := newEtcdManager(msr)
 
 	// Create LeaseAttrs

+ 1 - 1
test

@@ -14,7 +14,7 @@ COVER=${COVER:-"-cover"}
 
 source ./build
 
-TESTABLE="pkg/ip subnet"
+TESTABLE="pkg/ip subnet remote"
 FORMATTABLE="$TESTABLE"
 
 # user has not provided PKG override