瀏覽代碼

Add reservations to admin control subnet allocs

This adds ability to add and remove reservations
for subnet allocations. A subnet is reserved with
the host IP (PublicIP). flannel will then use this
subnet for a host requesting a subnet.

Reservations are denoted in etcd as subnets with no
expiration (TTL is zero). When a reserved subnet is
allocated, it's TTL continues to be not set. If a
reservation is removed, the TTL is set to the usual
24 hours.

Also adds corresponding client/server APIs for
reservation management.

Fixes #280
Eugene Yakubovich 9 年之前
父節點
當前提交
92ce594596
共有 8 個文件被更改,包括 491 次插入82 次删除
  1. 6 3
      network/network.go
  2. 57 0
      remote/client.go
  3. 66 16
      remote/server.go
  4. 186 34
      subnet/local_manager.go
  5. 8 2
      subnet/mock_registry.go
  6. 16 17
      subnet/registry.go
  7. 14 0
      subnet/subnet.go
  8. 138 10
      subnet/subnet_test.go

+ 6 - 3
network/network.go

@@ -149,7 +149,6 @@ func (n *Network) runOnce(extIface *backend.ExternalInterface, inited func(bn ba
 	defer wg.Wait()
 
 	dur := n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin
-
 	for {
 		select {
 		case <-time.After(dur):
@@ -164,12 +163,16 @@ func (n *Network) runOnce(extIface *backend.ExternalInterface, inited func(bn ba
 			dur = n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin
 
 		case e := <-evts:
-			if e.Type == subnet.EventRemoved {
+			switch e.Type {
+			case subnet.EventAdded:
+				n.bn.Lease().Expiration = e.Lease.Expiration
+				dur = n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin
+
+			case subnet.EventRemoved:
 				log.Warning("Lease has been revoked")
 				interruptFunc()
 				return errInterrupted
 			}
-			dur = n.bn.Lease().Expiration.Sub(time.Now()) - renewMargin
 
 		case <-n.ctx.Done():
 			return errCanceled

+ 57 - 0
remote/client.go

@@ -258,6 +258,63 @@ func (m *RemoteManager) WatchNetworks(ctx context.Context, cursor interface{}) (
 	return wr, nil
 }
 
+func (m *RemoteManager) AddReservation(ctx context.Context, network string, r *subnet.Reservation) error {
+	url := m.mkurl(network, "reservations")
+
+	body, err := json.Marshal(r)
+	if err != nil {
+		return err
+	}
+
+	resp, err := m.httpVerb(ctx, "POST", url, "application/json", body)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		return httpError(resp)
+	}
+	return nil
+}
+
+func (m *RemoteManager) RemoveReservation(ctx context.Context, network string, sn ip.IP4Net) error {
+	url := m.mkurl(network, "reservations", subnet.MakeSubnetKey(sn))
+
+	resp, err := m.httpVerb(ctx, "DELETE", url, "", nil)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		return httpError(resp)
+	}
+
+	return nil
+}
+
+func (m *RemoteManager) ListReservations(ctx context.Context, network string) ([]subnet.Reservation, error) {
+	url := m.mkurl(network, "reservations")
+
+	resp, err := m.httpVerb(ctx, "GET", url, "", nil)
+	if err != nil {
+		return nil, err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		return nil, httpError(resp)
+	}
+
+	rs := []subnet.Reservation{}
+	if err := json.NewDecoder(resp.Body).Decode(&rs); err != nil {
+		return nil, err
+	}
+
+	return rs, nil
+}
+
 func httpError(resp *http.Response) error {
 	b, err := ioutil.ReadAll(resp.Body)
 	if err != nil {

+ 66 - 16
remote/server.go

@@ -46,8 +46,6 @@ func jsonResponse(w http.ResponseWriter, code int, v interface{}) {
 
 // GET /{network}/config
 func handleGetNetworkConfig(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-
 	network := mux.Vars(r)["network"]
 	if network == "_" {
 		network = ""
@@ -65,8 +63,6 @@ func handleGetNetworkConfig(ctx context.Context, sm subnet.Manager, w http.Respo
 
 // POST /{network}/leases
 func handleAcquireLease(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-
 	network := mux.Vars(r)["network"]
 	if network == "_" {
 		network = ""
@@ -91,8 +87,6 @@ func handleAcquireLease(ctx context.Context, sm subnet.Manager, w http.ResponseW
 
 // PUT /{network}/{lease.network}
 func handleRenewLease(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-
 	network := mux.Vars(r)["network"]
 	if network == "_" {
 		network = ""
@@ -115,8 +109,6 @@ func handleRenewLease(ctx context.Context, sm subnet.Manager, w http.ResponseWri
 }
 
 func handleRevokeLease(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-
 	network := mux.Vars(r)["network"]
 	if network == "_" {
 		network = ""
@@ -134,8 +126,6 @@ func handleRevokeLease(ctx context.Context, sm subnet.Manager, w http.ResponseWr
 		fmt.Fprint(w, err)
 		return
 	}
-
-	w.WriteHeader(http.StatusOK)
 }
 
 func getCursor(u *url.URL) interface{} {
@@ -148,8 +138,6 @@ func getCursor(u *url.URL) interface{} {
 
 // GET /{network}/leases/subnet?next=cursor
 func handleWatchLease(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-
 	network := mux.Vars(r)["network"]
 	if network == "_" {
 		network = ""
@@ -186,8 +174,6 @@ func handleWatchLease(ctx context.Context, sm subnet.Manager, w http.ResponseWri
 
 // GET /{network}/leases?next=cursor
 func handleWatchLeases(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-
 	network := mux.Vars(r)["network"]
 	if network == "_" {
 		network = ""
@@ -218,8 +204,6 @@ func handleWatchLeases(ctx context.Context, sm subnet.Manager, w http.ResponseWr
 // GET /?next=cursor watches
 // GET / retrieves all networks
 func handleNetworks(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
-	defer r.Body.Close()
-
 	cursor := getCursor(r.URL)
 	wr, err := sm.WatchNetworks(ctx, cursor)
 	if err != nil {
@@ -241,6 +225,67 @@ func handleNetworks(ctx context.Context, sm subnet.Manager, w http.ResponseWrite
 	jsonResponse(w, http.StatusOK, wr)
 }
 
+// POST /{network}/reservations
+func handleAddReservation(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
+	network := mux.Vars(r)["network"]
+	if network == "_" {
+		network = ""
+	}
+
+	rsv := &subnet.Reservation{}
+	if err := json.NewDecoder(r.Body).Decode(rsv); err != nil {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprint(w, "JSON decoding error: ", err)
+		return
+	}
+
+	if err := sm.AddReservation(ctx, network, rsv); err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprint(w, fmt.Errorf("internal error: %v", err))
+		return
+	}
+}
+
+// DELETE /{network}/reservations/{subnet}
+func handleRemoveReservation(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
+	network := mux.Vars(r)["network"]
+	if network == "_" {
+		network = ""
+	}
+
+	sn := subnet.ParseSubnetKey(mux.Vars(r)["subnet"])
+	if sn == nil {
+		w.WriteHeader(http.StatusBadRequest)
+		fmt.Fprint(w, "bad subnet")
+		return
+	}
+
+	if err := sm.RemoveReservation(ctx, network, *sn); err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprint(w, err)
+		return
+	}
+
+	w.WriteHeader(http.StatusOK)
+}
+
+// GET /{network}/reservations
+func handleListReservations(ctx context.Context, sm subnet.Manager, w http.ResponseWriter, r *http.Request) {
+	network := mux.Vars(r)["network"]
+	if network == "_" {
+		network = ""
+	}
+
+	leases, err := sm.ListReservations(ctx, network)
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		fmt.Fprint(w, err)
+		return
+	}
+
+	jsonResponse(w, http.StatusOK, leases)
+}
+
 func bindHandler(h handler, ctx context.Context, sm subnet.Manager) http.HandlerFunc {
 	return func(resp http.ResponseWriter, req *http.Request) {
 		h(ctx, sm, resp, req)
@@ -323,6 +368,7 @@ func RunServer(ctx context.Context, sm subnet.Manager, listenAddr, cafile, certf
 
 	r := mux.NewRouter()
 	r.HandleFunc("/v1/{network}/config", bindHandler(handleGetNetworkConfig, ctx, sm)).Methods("GET")
+
 	r.HandleFunc("/v1/{network}/leases", bindHandler(handleAcquireLease, ctx, sm)).Methods("POST")
 	r.HandleFunc("/v1/{network}/leases/{subnet}", bindHandler(handleWatchLease, ctx, sm)).Methods("GET")
 	r.HandleFunc("/v1/{network}/leases/{subnet}", bindHandler(handleRenewLease, ctx, sm)).Methods("PUT")
@@ -330,6 +376,10 @@ func RunServer(ctx context.Context, sm subnet.Manager, listenAddr, cafile, certf
 	r.HandleFunc("/v1/{network}/leases", bindHandler(handleWatchLeases, ctx, sm)).Methods("GET")
 	r.HandleFunc("/v1/", bindHandler(handleNetworks, ctx, sm)).Methods("GET")
 
+	r.HandleFunc("/v1/{network}/reservations", bindHandler(handleListReservations, ctx, sm)).Methods("GET")
+	r.HandleFunc("/v1/{network}/reservations", bindHandler(handleAddReservation, ctx, sm)).Methods("POST")
+	r.HandleFunc("/v1/{network}/reservations/{subnet}", bindHandler(handleRemoveReservation, ctx, sm)).Methods("DELETE")
+
 	l, err := listener(listenAddr, cafile, certfile, keyfile)
 	if err != nil {
 		log.Errorf("Error listening on %v: %v", listenAddr, err)

+ 186 - 34
subnet/local_manager.go

@@ -27,8 +27,8 @@ import (
 )
 
 const (
-	registerRetries = 10
-	subnetTTL       = 24 * time.Hour
+	raceRetries = 10
+	subnetTTL   = 24 * time.Hour
 )
 
 type LocalManager struct {
@@ -39,6 +39,30 @@ type watchCursor struct {
 	index uint64
 }
 
+func isErrEtcdTestFailed(e error) bool {
+	if e == nil {
+		return false
+	}
+	etcdErr, ok := e.(etcd.Error)
+	return ok && etcdErr.Code == etcd.ErrorCodeTestFailed
+}
+
+func isErrEtcdNodeExist(e error) bool {
+	if e == nil {
+		return false
+	}
+	etcdErr, ok := e.(etcd.Error)
+	return ok || etcdErr.Code == etcd.ErrorCodeNodeExist
+}
+
+func isErrEtcdKeyNotFound(e error) bool {
+	if e == nil {
+		return false
+	}
+	etcdErr, ok := e.(etcd.Error)
+	return ok || etcdErr.Code == etcd.ErrorCodeKeyNotFound
+}
+
 func (c watchCursor) String() string {
 	return strconv.FormatUint(c.index, 10)
 }
@@ -72,13 +96,15 @@ func (m *LocalManager) AcquireLease(ctx context.Context, network string, attrs *
 		return nil, err
 	}
 
-	for i := 0; i < registerRetries; i++ {
+	for i := 0; i < raceRetries; i++ {
 		l, err := m.tryAcquireLease(ctx, network, config, attrs.PublicIP, attrs)
-		switch {
-		case err != nil:
-			return nil, err
-		case l != nil:
+		switch err {
+		case nil:
 			return l, nil
+		case errTryAgain:
+			continue
+		default:
+			return nil, err
 		}
 	}
 
@@ -96,7 +122,6 @@ func findLeaseByIP(leases []Lease, pubIP ip.IP4) *Lease {
 }
 
 func (m *LocalManager) tryAcquireLease(ctx context.Context, network string, config *Config, extIaddr ip.IP4, attrs *LeaseAttrs) (*Lease, error) {
-	var err error
 	leases, _, err := m.registry.getSubnets(ctx, network)
 	if err != nil {
 		return nil, err
@@ -107,7 +132,13 @@ func (m *LocalManager) tryAcquireLease(ctx context.Context, network string, conf
 		// make sure the existing subnet is still within the configured network
 		if isSubnetConfigCompat(config, l.Subnet) {
 			log.Infof("Found lease (%v) for current IP (%v), reusing", l.Subnet, extIaddr)
-			exp, err := m.registry.updateSubnet(ctx, network, l.Subnet, attrs, subnetTTL, 0)
+
+			ttl := time.Duration(0)
+			if !l.Expiration.IsZero() {
+				// Not a reservation
+				ttl = subnetTTL
+			}
+			exp, err := m.registry.updateSubnet(ctx, network, l.Subnet, attrs, ttl, 0)
 			if err != nil {
 				return nil, err
 			}
@@ -130,20 +161,18 @@ func (m *LocalManager) tryAcquireLease(ctx context.Context, network string, conf
 	}
 
 	exp, err := m.registry.createSubnet(ctx, network, sn, attrs, subnetTTL)
-	if err == nil {
+	switch {
+	case err == nil:
 		return &Lease{
 			Subnet:     sn,
 			Attrs:      *attrs,
 			Expiration: exp,
 		}, nil
+	case isErrEtcdNodeExist(err):
+		return nil, errTryAgain
+	default:
+		return nil, err
 	}
-
-	if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeNodeExist {
-		// if etcd returned Key Already Exists, try again.
-		return nil, nil
-	}
-
-	return nil, err
 }
 
 func (m *LocalManager) allocateSubnet(config *Config, leases []Lease) (ip.IP4Net, error) {
@@ -175,7 +204,6 @@ func (m *LocalManager) RevokeLease(ctx context.Context, network string, sn ip.IP
 }
 
 func (m *LocalManager) RenewLease(ctx context.Context, network string, lease *Lease) error {
-	// TODO(eyakubovich): propogate ctx into registry
 	exp, err := m.registry.updateSubnet(ctx, network, lease.Subnet, &lease.Attrs, subnetTTL, 0)
 	if err != nil {
 		return err
@@ -281,26 +309,26 @@ func (m *LocalManager) WatchNetworks(ctx context.Context, cursor interface{}) (N
 		return NetworkWatchResult{}, err
 	}
 
-DoWatch:
-	evt, index, err := m.registry.watchNetworks(ctx, nextIndex)
+	for {
+		evt, index, err := m.registry.watchNetworks(ctx, nextIndex)
 
-	switch {
-	case err == nil:
-		return NetworkWatchResult{
-			Events: []Event{evt},
-			Cursor: watchCursor{index},
-		}, nil
+		switch {
+		case err == nil:
+			return NetworkWatchResult{
+				Events: []Event{evt},
+				Cursor: watchCursor{index},
+			}, nil
 
-	case err == ErrTryAgain:
-		nextIndex = index
-		goto DoWatch
+		case err == errTryAgain:
+			nextIndex = index
 
-	case isIndexTooSmall(err):
-		log.Warning("Watch of networks failed because etcd index outside history window")
-		return m.networkWatchReset(ctx)
+		case isIndexTooSmall(err):
+			log.Warning("Watch of networks failed because etcd index outside history window")
+			return m.networkWatchReset(ctx)
 
-	default:
-		return NetworkWatchResult{}, err
+		default:
+			return NetworkWatchResult{}, err
+		}
 	}
 }
 
@@ -344,3 +372,127 @@ func isSubnetConfigCompat(config *Config, sn ip.IP4Net) bool {
 
 	return sn.PrefixLen == config.SubnetLen
 }
+
+func (m *LocalManager) tryAddReservation(ctx context.Context, network string, r *Reservation) error {
+	attrs := &LeaseAttrs{
+		PublicIP: r.PublicIP,
+	}
+
+	_, err := m.registry.createSubnet(ctx, network, r.Subnet, attrs, 0)
+	switch {
+	case err == nil:
+		return nil
+
+	case !isErrEtcdNodeExist(err):
+		return err
+	}
+
+	// This subnet or its reservation already exists.
+	// Get what's there and
+	// - if PublicIP matches, remove the TTL make it a reservation
+	// - otherwise, error out
+	sub, asof, err := m.registry.getSubnet(ctx, network, r.Subnet)
+	switch {
+	case err == nil:
+	case isErrEtcdKeyNotFound(err):
+		// Subnet just got expired or was deleted
+		return errTryAgain
+	default:
+		return err
+	}
+
+	if sub.Attrs.PublicIP != r.PublicIP {
+		// Subnet already taken
+		return ErrLeaseTaken
+	}
+
+	// remove TTL
+	_, err = m.registry.updateSubnet(ctx, network, r.Subnet, &sub.Attrs, 0, asof)
+	if isErrEtcdTestFailed(err) {
+		return errTryAgain
+	}
+	return err
+}
+
+func (m *LocalManager) AddReservation(ctx context.Context, network string, r *Reservation) error {
+	config, err := m.GetNetworkConfig(ctx, network)
+	if err != nil {
+		return err
+	}
+
+	if config.SubnetLen != r.Subnet.PrefixLen {
+		return fmt.Errorf("reservation subnet has mask incompatible with network config")
+	}
+
+	if !config.Network.Overlaps(r.Subnet) {
+		return fmt.Errorf("reservation subnet is outside of flannel network")
+	}
+
+	for i := 0; i < raceRetries; i++ {
+		err := m.tryAddReservation(ctx, network, r)
+		switch {
+		case err == nil:
+			return nil
+		case err == errTryAgain:
+			continue
+		default:
+			return err
+		}
+	}
+
+	return ErrNoMoreTries
+}
+
+func (m *LocalManager) tryRemoveReservation(ctx context.Context, network string, subnet ip.IP4Net) error {
+	sub, asof, err := m.registry.getSubnet(ctx, network, subnet)
+	if err != nil {
+		return err
+	}
+
+	// add back the TTL
+	_, err = m.registry.updateSubnet(ctx, network, subnet, &sub.Attrs, subnetTTL, asof)
+	if isErrEtcdTestFailed(err) {
+		return errTryAgain
+	}
+	return err
+}
+
+//RemoveReservation removes the subnet by setting TTL back to subnetTTL (24hours)
+func (m *LocalManager) RemoveReservation(ctx context.Context, network string, subnet ip.IP4Net) error {
+	for i := 0; i < raceRetries; i++ {
+		err := m.tryRemoveReservation(ctx, network, subnet)
+		switch {
+		case err == nil:
+			return nil
+		case err == errTryAgain:
+			continue
+		default:
+			return err
+		}
+	}
+
+	return ErrNoMoreTries
+}
+
+func (m *LocalManager) ListReservations(ctx context.Context, network string) ([]Reservation, error) {
+	subnets, _, err := m.registry.getSubnets(ctx, network)
+	if err != nil {
+		return nil, err
+	}
+
+	rsvs := []Reservation{}
+	for _, sub := range subnets {
+		// Reservations don't have TTL and so no expiration
+		if !sub.Expiration.IsZero() {
+			continue
+		}
+
+		r := Reservation{
+			Subnet:   sub.Subnet,
+			PublicIP: sub.Attrs.PublicIP,
+		}
+		rsvs = append(rsvs, r)
+	}
+
+	return rsvs, nil
+}

+ 8 - 2
subnet/mock_registry.go

@@ -140,7 +140,10 @@ func (msr *MockSubnetRegistry) createSubnet(ctx context.Context, network string,
 
 	msr.index += 1
 
-	exp := clock.Now().Add(ttl)
+	exp := time.Time{}
+	if ttl != 0 {
+		exp = clock.Now().Add(ttl)
+	}
 
 	l := Lease{
 		Subnet:     sn,
@@ -169,7 +172,10 @@ func (msr *MockSubnetRegistry) updateSubnet(ctx context.Context, network string,
 
 	msr.index += 1
 
-	exp := clock.Now().Add(ttl)
+	exp := time.Time{}
+	if ttl != 0 {
+		exp = clock.Now().Add(ttl)
+	}
 
 	sub, i, err := n.findSubnet(sn)
 	if err != nil {

+ 16 - 17
subnet/registry.go

@@ -33,7 +33,7 @@ import (
 
 var (
 	subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`)
-	ErrTryAgain                = errors.New("try again")
+	errTryAgain                = errors.New("try again")
 )
 
 type Registry interface {
@@ -167,8 +167,12 @@ func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network string,
 		return time.Time{}, err
 	}
 
-	ensureExpiration(resp, ttl)
-	return *resp.Node.Expiration, nil
+	exp := time.Time{}
+	if resp.Node.Expiration != nil {
+		exp = *resp.Node.Expiration
+	}
+
+	return exp, nil
 }
 
 func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) {
@@ -186,8 +190,12 @@ func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string,
 		return time.Time{}, err
 	}
 
-	ensureExpiration(resp, ttl)
-	return *resp.Node.Expiration, nil
+	exp := time.Time{}
+	if resp.Node.Expiration != nil {
+		exp = *resp.Node.Expiration
+	}
+
+	return exp, nil
 }
 
 func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error {
@@ -223,7 +231,7 @@ func (esr *etcdSubnetRegistry) watchSubnet(ctx context.Context, network string,
 	}
 
 	evt, err := parseSubnetWatchResponse(e)
-	return evt, e.Index, err
+	return evt, e.Node.ModifiedIndex, err
 }
 
 // getNetworks queries etcd to get a list of network names.  It returns the
@@ -287,15 +295,6 @@ func (esr *etcdSubnetRegistry) resetClient() {
 	}
 }
 
-func ensureExpiration(resp *etcd.Response, ttl time.Duration) {
-	if resp.Node.Expiration == nil {
-		// should not be but calc it ourselves in this case
-		log.Info("Expiration field missing on etcd response, calculating locally")
-		exp := clock.Now().Add(time.Duration(ttl) * time.Second)
-		resp.Node.Expiration = &exp
-	}
-}
-
 func parseSubnetWatchResponse(resp *etcd.Response) (Event, error) {
 	sn := ParseSubnetKey(resp.Node.Key)
 	if sn == nil {
@@ -339,7 +338,7 @@ func (esr *etcdSubnetRegistry) parseNetworkWatchResponse(resp *etcd.Response) (E
 	index := resp.Node.ModifiedIndex
 	netname, isConfig := esr.parseNetworkKey(resp.Node.Key)
 	if netname == "" {
-		return Event{}, index, ErrTryAgain
+		return Event{}, index, errTryAgain
 	}
 
 	evt := Event{}
@@ -355,7 +354,7 @@ func (esr *etcdSubnetRegistry) parseNetworkWatchResponse(resp *etcd.Response) (E
 	default:
 		if !isConfig {
 			// Ignore non .../<netname>/config keys; tell caller to try again
-			return Event{}, index, ErrTryAgain
+			return Event{}, index, errTryAgain
 		}
 
 		_, err := ParseConfig(resp.Node.Value)

+ 14 - 0
subnet/subnet.go

@@ -26,6 +26,11 @@ import (
 	"github.com/coreos/flannel/pkg/ip"
 )
 
+var (
+	ErrLeaseTaken  = errors.New("subnet: lease already taken")
+	ErrNoMoreTries = errors.New("subnet: no more tries")
+)
+
 type LeaseAttrs struct {
 	PublicIP    ip.IP4
 	BackendType string          `json:",omitempty"`
@@ -44,6 +49,11 @@ func (l *Lease) Key() string {
 	return MakeSubnetKey(l.Subnet)
 }
 
+type Reservation struct {
+	Subnet   ip.IP4Net
+	PublicIP ip.IP4
+}
+
 type (
 	EventType int
 
@@ -129,4 +139,8 @@ type Manager interface {
 	WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (LeaseWatchResult, error)
 	WatchLeases(ctx context.Context, network string, cursor interface{}) (LeaseWatchResult, error)
 	WatchNetworks(ctx context.Context, cursor interface{}) (NetworkWatchResult, error)
+
+	AddReservation(ctx context.Context, network string, r *Reservation) error
+	RemoveReservation(ctx context.Context, network string, subnet ip.IP4Net) error
+	ListReservations(ctx context.Context, network string) ([]Reservation, error)
 }

+ 138 - 10
subnet/subnet_test.go

@@ -41,7 +41,7 @@ func newDummyRegistry() *MockSubnetRegistry {
 		{ip.IP4Net{ip.MustParseIP4("10.3.5.0"), 24}, attrs, exp, 13},
 	}
 
-	config := `{ "Network": "10.3.0.0/16", "SubnetMin": "10.3.1.0", "SubnetMax": "10.3.5.0" }`
+	config := `{ "Network": "10.3.0.0/16", "SubnetMin": "10.3.1.0", "SubnetMax": "10.3.25.0" }`
 	return NewMockRegistry("_", config, subnets)
 }
 
@@ -59,17 +59,18 @@ func TestAcquireLease(t *testing.T) {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
-	if l.Subnet.String() != "10.3.3.0/24" {
+	if !inAllocatableRange(context.Background(), sm, l.Subnet) {
 		t.Fatal("Subnet mismatch: expected 10.3.3.0/24, got: ", l.Subnet)
 	}
 
 	// Acquire again, should reuse
-	if l, err = sm.AcquireLease(context.Background(), "_", &attrs); err != nil {
+	l2, err := sm.AcquireLease(context.Background(), "_", &attrs)
+	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
-	if l.Subnet.String() != "10.3.3.0/24" {
-		t.Fatal("Subnet mismatch: expected 10.3.3.0/24, got: ", l.Subnet)
+	if !l.Subnet.Equal(l2.Subnet) {
+		t.Fatalf("AcquireLease did not reuse subnet; expected %v, got %v", l.Subnet, l2.Subnet)
 	}
 }
 
@@ -87,8 +88,8 @@ func TestConfigChanged(t *testing.T) {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
-	if l.Subnet.String() != "10.3.3.0/24" {
-		t.Fatal("Subnet mismatch: expected 10.3.3.0/24, got: ", l.Subnet)
+	if !inAllocatableRange(context.Background(), sm, l.Subnet) {
+		t.Fatal("Acquired subnet outside of valid range: ", l.Subnet)
 	}
 
 	// Change config
@@ -100,9 +101,8 @@ func TestConfigChanged(t *testing.T) {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
-	newNet := newIP4Net("10.4.0.0", 16)
-	if !newNet.Contains(l.Subnet.IP) {
-		t.Fatalf("Subnet mismatch: expected within %v, got: %v", newNet, l.Subnet)
+	if !inAllocatableRange(context.Background(), sm, l.Subnet) {
+		t.Fatal("Acquired subnet outside of valid range: ", l.Subnet)
 	}
 }
 
@@ -262,6 +262,7 @@ func TestRenewLease(t *testing.T) {
 	if err != nil {
 		t.Error("Failed to renew lease: could not get networks: %v", err)
 	}
+
 	for _, sn := range n.subnets {
 		if sn.Subnet.Equal(l.Subnet) {
 			expected := now.Add(subnetTTL)
@@ -402,3 +403,130 @@ func TestWatchNetworkRemoved(t *testing.T) {
 		t.Errorf("WatchNetwork produced wrong network: expected %s, got %s", expected, actual)
 	}
 }
+
+func TestAddReservation(t *testing.T) {
+	msr := newDummyRegistry()
+	sm := NewMockManager(msr)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	r := Reservation{
+		Subnet:   newIP4Net("10.4.3.0", 24),
+		PublicIP: ip.MustParseIP4("52.195.12.13"),
+	}
+	if err := sm.AddReservation(ctx, "_", &r); err == nil {
+		t.Fatalf("unexpectedly added a reservation outside of configured network")
+	}
+
+	r.Subnet = newIP4Net("10.3.10.0", 24)
+	if err := sm.AddReservation(ctx, "_", &r); err != nil {
+		t.Fatalf("failed to add reservation: %v", err)
+	}
+
+	// Add the same reservation -- should succeed
+	if err := sm.AddReservation(ctx, "_", &r); err != nil {
+		t.Fatalf("failed to add reservation: %v", err)
+	}
+
+	// Add a reservation with a different public IP -- should fail
+	r2 := r
+	r2.PublicIP = ip.MustParseIP4("52.195.12.17")
+	if err := sm.AddReservation(ctx, "_", &r2); err != ErrLeaseTaken {
+		t.Fatalf("taken add reservation returned: %v", err)
+	}
+
+	attrs := &LeaseAttrs{
+		PublicIP: r.PublicIP,
+	}
+	l, err := sm.AcquireLease(ctx, "_", attrs)
+	if err != nil {
+		t.Fatalf("failed to acquire subnet: %v", err)
+	}
+	if !l.Subnet.Equal(r.Subnet) {
+		t.Fatalf("acquired subnet is not the reserved one: expected %v, got %v", r.Subnet, l.Subnet)
+	}
+	if !l.Expiration.IsZero() {
+		t.Fatalf("acquired lease (prev reserved) has expiration set")
+	}
+}
+
+func TestRemoveReservation(t *testing.T) {
+	msr := newDummyRegistry()
+	sm := NewMockManager(msr)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	r := Reservation{
+		Subnet:   newIP4Net("10.3.10.0", 24),
+		PublicIP: ip.MustParseIP4("52.195.12.13"),
+	}
+	if err := sm.AddReservation(ctx, "_", &r); err != nil {
+		t.Fatalf("failed to add reservation: %v", err)
+	}
+
+	if err := sm.RemoveReservation(ctx, "_", r.Subnet); err != nil {
+		t.Fatalf("failed to remove reservation: %v", err)
+	}
+
+	// The node should have a TTL
+	sub, _, err := msr.getSubnet(ctx, "_", r.Subnet)
+	if err != nil {
+		t.Fatalf("getSubnet failed: %v", err)
+	}
+
+	if sub.Expiration.IsZero() {
+		t.Fatalf("removed reservation resulted in no TTL")
+	}
+}
+
+func TestListReservations(t *testing.T) {
+	msr := newDummyRegistry()
+	sm := NewMockManager(msr)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	r1 := Reservation{
+		Subnet:   newIP4Net("10.3.10.0", 24),
+		PublicIP: ip.MustParseIP4("52.195.12.13"),
+	}
+	if err := sm.AddReservation(ctx, "_", &r1); err != nil {
+		t.Fatalf("failed to add reservation: %v", err)
+	}
+
+	r2 := Reservation{
+		Subnet:   newIP4Net("10.3.20.0", 24),
+		PublicIP: ip.MustParseIP4("52.195.12.14"),
+	}
+	if err := sm.AddReservation(ctx, "_", &r2); err != nil {
+		t.Fatalf("failed to add reservation: %v", err)
+	}
+
+	rs, err := sm.ListReservations(ctx, "_")
+	if err != nil {
+		if len(rs) != 2 {
+			t.Fatalf("unexpected number of reservations, expected 2, got %v", len(rs))
+		}
+		if !resvEqual(rs[0], r1) && !resvEqual(rs[1], r1) {
+			t.Fatalf("reservation not found")
+		}
+		if !resvEqual(rs[0], r2) && !resvEqual(rs[1], r2) {
+			t.Fatalf("reservation not found")
+		}
+	}
+}
+
+func inAllocatableRange(ctx context.Context, sm Manager, ipn ip.IP4Net) bool {
+	cfg, err := sm.GetNetworkConfig(ctx, "_")
+	if err != nil {
+		panic(err)
+	}
+
+	return ipn.IP >= cfg.SubnetMin || ipn.IP <= cfg.SubnetMax
+}
+
+func resvEqual(r1, r2 Reservation) bool {
+	return r1.Subnet.Equal(r2.Subnet) && r1.PublicIP == r2.PublicIP
+}