Browse Source

Switch to using etcd/client

go-etcd is dated and has logging that is either
too silent or too verbose.

Fixes #174
Eugene Yakubovich 9 years ago
parent
commit
8330c95eb6
5 changed files with 85 additions and 114 deletions
  1. 20 30
      subnet/etcd.go
  2. 18 20
      subnet/mock_registry.go
  3. 3 1
      subnet/mock_subnet.go
  4. 42 61
      subnet/registry.go
  5. 2 2
      subnet/subnet_test.go

+ 20 - 30
subnet/etcd.go

@@ -23,7 +23,7 @@ import (
 	"strconv"
 	"time"
 
-	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
+	etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/flannel/pkg/ip"
@@ -31,14 +31,7 @@ import (
 
 const (
 	registerRetries = 10
-	subnetTTL       = 24 * 3600
-)
-
-// etcd error codes
-const (
-	etcdKeyNotFound       = 100
-	etcdKeyAlreadyExists  = 105
-	etcdEventIndexCleared = 401
+	subnetTTL       = 24 * time.Hour
 )
 
 type EtcdManager struct {
@@ -157,21 +150,20 @@ func (m *EtcdManager) tryAcquireLease(ctx context.Context, network string, confi
 	}
 
 	resp, err := m.registry.createSubnet(ctx, network, sn.StringSep(".", "-"), string(attrBytes), subnetTTL)
-	switch {
-	case err == nil:
+	if err == nil {
 		return &Lease{
 			Subnet:     sn,
 			Attrs:      attrs,
 			Expiration: *resp.Node.Expiration,
 		}, nil
+	}
 
-	// if etcd returned Key Already Exists, try again.
-	case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists:
+	if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeNodeExist {
+		// if etcd returned Key Already Exists, try again.
 		return nil, nil
-
-	default:
-		return nil, err
 	}
+
+	return nil, err
 }
 
 func (m *EtcdManager) acquireLeaseOnce(ctx context.Context, network string, config *Config, attrs *LeaseAttrs) (*Lease, error) {
@@ -239,10 +231,8 @@ func (m *EtcdManager) getLeases(ctx context.Context, network string) ([]Lease, u
 	resp, err := m.registry.getSubnets(ctx, network)
 
 	leases := []Lease{}
-	index := uint64(0)
 
-	switch {
-	case err == nil:
+	if err == nil {
 		for _, node := range resp.Node.Nodes {
 			sn, err := parseSubnetKey(node.Key)
 			if err == nil {
@@ -262,17 +252,16 @@ func (m *EtcdManager) getLeases(ctx context.Context, network string) ([]Lease, u
 				}
 			}
 		}
-		index = resp.EtcdIndex
 
-	case err.(*etcd.EtcdError).ErrorCode == etcdKeyNotFound:
-		// key not found: treat it as empty set
-		index = err.(*etcd.EtcdError).Index
+		return leases, resp.Index, nil
+	}
 
-	default:
-		return nil, 0, err
+	if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
+		// key not found: treat it as empty set
+		return leases, etcdErr.Index, nil
 	}
 
-	return leases, index, nil
+	return nil, 0, err
 }
 
 func (m *EtcdManager) RenewLease(ctx context.Context, network string, lease *Lease) error {
@@ -326,8 +315,8 @@ func (m *EtcdManager) WatchLeases(ctx context.Context, network string, cursor in
 }
 
 func isIndexTooSmall(err error) bool {
-	etcdErr, ok := err.(*etcd.EtcdError)
-	return ok && etcdErr.ErrorCode == etcdEventIndexCleared
+	etcdErr, ok := err.(etcd.Error)
+	return ok && etcdErr.Code == etcd.ErrorCodeEventIndexCleared
 }
 
 func parseSubnetWatchResponse(resp *etcd.Response) (WatchResult, error) {
@@ -367,7 +356,7 @@ func parseSubnetWatchResponse(resp *etcd.Response) (WatchResult, error) {
 		}
 	}
 
-	cursor := watchCursor{resp.Node.ModifiedIndex + 1}
+	cursor := watchCursor{resp.Node.ModifiedIndex}
 
 	return WatchResult{
 		Cursor: cursor,
@@ -380,11 +369,12 @@ func (m *EtcdManager) watchReset(ctx context.Context, network string) (WatchResu
 	wr := WatchResult{}
 
 	leases, index, err := m.getLeases(ctx, network)
+
 	if err != nil {
 		return wr, fmt.Errorf("failed to retrieve subnet leases: %v", err)
 	}
 
-	cursor := watchCursor{index + 1}
+	cursor := watchCursor{index}
 	wr.Snapshot = leases
 	wr.Cursor = cursor
 	return wr, nil

+ 18 - 20
subnet/mock_registry.go

@@ -18,7 +18,7 @@ import (
 	"fmt"
 	"time"
 
-	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
+	etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 )
 
@@ -27,10 +27,10 @@ type mockSubnetRegistry struct {
 	subnets *etcd.Node
 	events  chan *etcd.Response
 	index   uint64
-	ttl     uint64
+	ttl     time.Duration
 }
 
-func newMockRegistry(ttlOverride uint64, config string, initialSubnets []*etcd.Node) *mockSubnetRegistry {
+func newMockRegistry(ttlOverride time.Duration, config string, initialSubnets []*etcd.Node) *mockSubnetRegistry {
 	index := uint64(0)
 	for _, n := range initialSubnets {
 		if n.ModifiedIndex > index {
@@ -53,8 +53,8 @@ func newMockRegistry(ttlOverride uint64, config string, initialSubnets []*etcd.N
 
 func (msr *mockSubnetRegistry) getConfig(ctx context.Context, network string) (*etcd.Response, error) {
 	return &etcd.Response{
-		EtcdIndex: msr.index,
-		Node:      msr.config,
+		Index: msr.index,
+		Node:  msr.config,
 	}, nil
 }
 
@@ -67,20 +67,19 @@ func (msr *mockSubnetRegistry) setConfig(config string) {
 
 func (msr *mockSubnetRegistry) getSubnets(ctx context.Context, network string) (*etcd.Response, error) {
 	return &etcd.Response{
-		Node:      msr.subnets,
-		EtcdIndex: msr.index,
+		Node:  msr.subnets,
+		Index: msr.index,
 	}, nil
 }
 
-func (msr *mockSubnetRegistry) createSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error) {
+func (msr *mockSubnetRegistry) createSubnet(ctx context.Context, network, sn, data string, ttl time.Duration) (*etcd.Response, error) {
 	msr.index += 1
 
 	if msr.ttl > 0 {
 		ttl = msr.ttl
 	}
 
-	// add squared durations :)
-	exp := time.Now().Add(time.Duration(ttl) * time.Second)
+	exp := time.Now().Add(ttl)
 
 	node := &etcd.Node{
 		Key:           sn,
@@ -96,16 +95,15 @@ func (msr *mockSubnetRegistry) createSubnet(ctx context.Context, network, sn, da
 	}
 
 	return &etcd.Response{
-		Node:      node,
-		EtcdIndex: msr.index,
+		Node:  node,
+		Index: msr.index,
 	}, nil
 }
 
-func (msr *mockSubnetRegistry) updateSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error) {
+func (msr *mockSubnetRegistry) updateSubnet(ctx context.Context, network, sn, data string, ttl time.Duration) (*etcd.Response, error) {
 	msr.index += 1
 
-	// add squared durations :)
-	exp := time.Now().Add(time.Duration(ttl) * time.Second)
+	exp := time.Now().Add(ttl)
 
 	for _, n := range msr.subnets.Nodes {
 		if n.Key == sn {
@@ -118,8 +116,8 @@ func (msr *mockSubnetRegistry) updateSubnet(ctx context.Context, network, sn, da
 			}
 
 			return &etcd.Response{
-				Node:      n,
-				EtcdIndex: msr.index,
+				Node:  n,
+				Index: msr.index,
 			}, nil
 		}
 	}
@@ -140,8 +138,8 @@ func (msr *mockSubnetRegistry) deleteSubnet(ctx context.Context, network, sn str
 			}
 
 			return &etcd.Response{
-				Node:      n,
-				EtcdIndex: msr.index,
+				Node:  n,
+				Index: msr.index,
 			}, nil
 		}
 	}
@@ -157,7 +155,7 @@ func (msr *mockSubnetRegistry) watchSubnets(ctx context.Context, network string,
 			return nil, ctx.Err()
 
 		case r := <-msr.events:
-			if r.Node.ModifiedIndex < since {
+			if r.Node.ModifiedIndex <= since {
 				continue
 			}
 			return r, nil

+ 3 - 1
subnet/mock_subnet.go

@@ -14,7 +14,9 @@
 
 package subnet
 
-func NewMockManager(ttlOverride uint64, config string) Manager {
+import "time"
+
+func NewMockManager(ttlOverride time.Duration, config string) Manager {
 	r := newMockRegistry(ttlOverride, config, nil)
 	return newEtcdManager(r)
 }

+ 42 - 61
subnet/registry.go

@@ -16,13 +16,12 @@ package subnet
 
 import (
 	"fmt"
-	golog "log"
-	"os"
 	"path"
 	"sync"
 	"time"
 
-	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
+	etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
+	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/pkg/transport"
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 )
@@ -30,8 +29,8 @@ import (
 type Registry interface {
 	getConfig(ctx context.Context, network string) (*etcd.Response, error)
 	getSubnets(ctx context.Context, network string) (*etcd.Response, error)
-	createSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error)
-	updateSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error)
+	createSubnet(ctx context.Context, network, sn, data string, ttl time.Duration) (*etcd.Response, error)
+	updateSubnet(ctx context.Context, network, sn, data string, ttl time.Duration) (*etcd.Response, error)
 	deleteSubnet(ctx context.Context, network, sn string) (*etcd.Response, error)
 	watchSubnets(ctx context.Context, network string, since uint64) (*etcd.Response, error)
 }
@@ -46,20 +45,31 @@ type EtcdConfig struct {
 
 type etcdSubnetRegistry struct {
 	mux     sync.Mutex
-	cli     *etcd.Client
+	cli     etcd.KeysAPI
 	etcdCfg *EtcdConfig
 }
 
-func init() {
-	etcd.SetLogger(golog.New(os.Stderr, "go-etcd", golog.LstdFlags))
-}
+func newEtcdClient(c *EtcdConfig) (etcd.KeysAPI, error) {
+	tlsInfo := transport.TLSInfo{
+		CertFile: c.Certfile,
+		KeyFile:  c.Keyfile,
+		CAFile:   c.CAFile,
+	}
 
-func newEtcdClient(c *EtcdConfig) (*etcd.Client, error) {
-	if c.Keyfile != "" || c.Certfile != "" || c.CAFile != "" {
-		return etcd.NewTLSClient(c.Endpoints, c.Certfile, c.Keyfile, c.CAFile)
-	} else {
-		return etcd.NewClient(c.Endpoints), nil
+	t, err := transport.NewTransport(tlsInfo)
+	if err != nil {
+		return nil, err
 	}
+
+	cli, err := etcd.New(etcd.Config{
+		Endpoints: c.Endpoints,
+		Transport: t,
+	})
+	if err != nil {
+		return nil, err
+	}
+
+	return etcd.NewKeysAPI(cli), nil
 }
 
 func newEtcdSubnetRegistry(config *EtcdConfig) (Registry, error) {
@@ -78,7 +88,7 @@ func newEtcdSubnetRegistry(config *EtcdConfig) (Registry, error) {
 
 func (esr *etcdSubnetRegistry) getConfig(ctx context.Context, network string) (*etcd.Response, error) {
 	key := path.Join(esr.etcdCfg.Prefix, network, "config")
-	resp, err := esr.client().Get(key, false, false)
+	resp, err := esr.client().Get(ctx, key, nil)
 	if err != nil {
 		return nil, err
 	}
@@ -87,12 +97,16 @@ func (esr *etcdSubnetRegistry) getConfig(ctx context.Context, network string) (*
 
 func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context, network string) (*etcd.Response, error) {
 	key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
-	return esr.client().Get(key, false, true)
+	return esr.client().Get(ctx, key, &etcd.GetOptions{Recursive: true})
 }
 
-func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error) {
+func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network, sn, data string, ttl time.Duration) (*etcd.Response, error) {
 	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn)
-	resp, err := esr.client().Create(key, data, ttl)
+	opts := &etcd.SetOptions{
+		PrevExist: etcd.PrevNoExist,
+		TTL:       ttl,
+	}
+	resp, err := esr.client().Set(ctx, key, data, opts)
 	if err != nil {
 		return nil, err
 	}
@@ -101,9 +115,9 @@ func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network, sn, da
 	return resp, nil
 }
 
-func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network, sn, data string, ttl uint64) (*etcd.Response, error) {
+func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network, sn, data string, ttl time.Duration) (*etcd.Response, error) {
 	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn)
-	resp, err := esr.client().Set(key, data, ttl)
+	resp, err := esr.client().Set(ctx, key, data, &etcd.SetOptions{TTL: ttl})
 	if err != nil {
 		return nil, err
 	}
@@ -114,51 +128,19 @@ func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network, sn, da
 
 func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, network, sn string) (*etcd.Response, error) {
 	key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn)
-	return esr.client().Delete(key, false)
-}
-
-type watchResp struct {
-	resp *etcd.Response
-	err  error
+	return esr.client().Delete(ctx, key, nil)
 }
 
 func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (*etcd.Response, error) {
-	stop := make(chan bool)
-	respCh := make(chan watchResp)
-
-	go func() {
-		for {
-			key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
-			rresp, err := esr.client().RawWatch(key, since, true, nil, stop)
-
-			if err != nil {
-				respCh <- watchResp{nil, err}
-				return
-			}
-
-			if len(rresp.Body) == 0 {
-				// etcd timed out, go back but recreate the client as the underlying
-				// http transport gets hosed (http://code.google.com/p/go/issues/detail?id=8648)
-				esr.resetClient()
-				continue
-			}
-
-			resp, err := rresp.Unmarshal()
-			respCh <- watchResp{resp, err}
-		}
-	}()
-
-	select {
-	case <-ctx.Done():
-		close(stop)
-		<-respCh // Wait for f to return.
-		return nil, ctx.Err()
-	case wr := <-respCh:
-		return wr.resp, wr.err
+	key := path.Join(esr.etcdCfg.Prefix, network, "subnets")
+	opts := &etcd.WatcherOptions{
+		AfterIndex: since,
+		Recursive:  true,
 	}
+	return esr.client().Watcher(key, opts).Next(ctx)
 }
 
-func (esr *etcdSubnetRegistry) client() *etcd.Client {
+func (esr *etcdSubnetRegistry) client() etcd.KeysAPI {
 	esr.mux.Lock()
 	defer esr.mux.Unlock()
 	return esr.cli
@@ -169,14 +151,13 @@ func (esr *etcdSubnetRegistry) resetClient() {
 	defer esr.mux.Unlock()
 
 	var err error
-	esr.cli.Close()
 	esr.cli, err = newEtcdClient(esr.etcdCfg)
 	if err != nil {
 		panic(fmt.Errorf("resetClient: error recreating etcd client: %v", err))
 	}
 }
 
-func ensureExpiration(resp *etcd.Response, ttl uint64) {
+func ensureExpiration(resp *etcd.Response, ttl time.Duration) {
 	if resp.Node.Expiration == nil {
 		// should not be but calc it ourselves in this case
 		log.Info("Expiration field missing on etcd response, calculating locally")

+ 2 - 2
subnet/subnet_test.go

@@ -21,13 +21,13 @@ import (
 	"testing"
 	"time"
 
-	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
+	etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 
 	"github.com/coreos/flannel/pkg/ip"
 )
 
-func newDummyRegistry(ttlOverride uint64) *mockSubnetRegistry {
+func newDummyRegistry(ttlOverride time.Duration) *mockSubnetRegistry {
 	subnets := []*etcd.Node{
 		&etcd.Node{Key: "10.3.1.0-24", Value: `{ "PublicIP": "1.1.1.1" }`, ModifiedIndex: 10},
 		&etcd.Node{Key: "10.3.2.0-24", Value: `{ "PublicIP": "1.1.1.1" }`, ModifiedIndex: 11},