Ver Fonte

Reduce number of errors printed on watch timeout. Still need changes in go-etcd

Eugene Yakubovich há 10 anos atrás
pai
commit
ffe0d6fa64
3 ficheiros alterados com 86 adições e 46 exclusões
  1. 1 4
      main.go
  2. 81 0
      subnet/registry.go
  3. 4 42
      subnet/subnet.go

+ 1 - 4
main.go

@@ -8,7 +8,6 @@ import (
 	"path"
 	"time"
 
-	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
 	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-systemd/daemon"
 	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
 
@@ -105,10 +104,8 @@ func lookupIface() (*net.Interface, net.IP) {
 }
 
 func makeSubnetManager() *subnet.SubnetManager {
-	etcdCli := etcd.NewClient([]string{opts.etcdEndpoint})
-
 	for {
-		sm, err := subnet.NewSubnetManager(etcdCli, opts.etcdPrefix)
+		sm, err := subnet.NewSubnetManager(opts.etcdEndpoint, opts.etcdPrefix)
 		if err == nil {
 			return sm
 		}

+ 81 - 0
subnet/registry.go

@@ -0,0 +1,81 @@
+package subnet
+
+import (
+	"sync"
+
+	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
+)
+
+type subnetRegistry interface {
+	getConfig() (*etcd.Response, error)
+	getSubnets() (*etcd.Response, error)
+	createSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
+	updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
+	watchSubnets(since uint64, stop chan bool) (*etcd.Response, error)
+}
+
+type etcdSubnetRegistry struct {
+	mux      sync.Mutex
+	cli      *etcd.Client
+	endpoint string
+	prefix   string
+}
+
+func newEtcdSubnetRegistry(endpoint, prefix string) subnetRegistry {
+	return &etcdSubnetRegistry{
+		cli:      etcd.NewClient([]string{endpoint}),
+		endpoint: endpoint,
+		prefix:   prefix,
+	}
+}
+
+func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
+	resp, err := esr.client().Get(esr.prefix+"/config", false, false)
+	if err != nil {
+		return nil, err
+	}
+	return resp, nil
+}
+
+func (esr *etcdSubnetRegistry) getSubnets() (*etcd.Response, error) {
+	return esr.client().Get(esr.prefix+"/subnets", false, true)
+}
+
+func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
+	return esr.client().Create(esr.prefix+"/subnets/"+sn, data, ttl)
+}
+
+func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
+	return esr.client().Set(esr.prefix+"/subnets/"+sn, data, ttl)
+}
+
+func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
+	for {
+		resp, err := esr.client().RawWatch(esr.prefix+"/subnets", since, true, nil, stop)
+
+		if err != nil {
+			return nil, err
+		}
+
+		if len(resp.Body) == 0 {
+			// etcd timed out, go back but recreate the client as the underlying
+			// http transport gets hosed (http://code.google.com/p/go/issues/detail?id=8648)
+			esr.resetClient()
+			continue
+		}
+
+		return resp.Unmarshal()
+	}
+}
+
+func (esr *etcdSubnetRegistry) client() *etcd.Client {
+	esr.mux.Lock()
+	defer esr.mux.Unlock()
+	return esr.cli
+}
+
+func (esr *etcdSubnetRegistry) resetClient() {
+	esr.mux.Lock()
+	defer esr.mux.Unlock()
+	esr.cli = etcd.NewClient([]string{esr.endpoint})
+}

+ 4 - 42
subnet/subnet.go

@@ -61,9 +61,9 @@ type Event struct {
 
 type EventBatch []Event
 
-func NewSubnetManager(etcdCli *etcd.Client, prefix string) (*SubnetManager, error) {
-	esr := etcdSubnetRegistry{etcdCli, prefix}
-	return newSubnetManager(&esr)
+func NewSubnetManager(etcdEndpoint, prefix string) (*SubnetManager, error) {
+	esr := newEtcdSubnetRegistry(etcdEndpoint, prefix)
+	return newSubnetManager(esr)
 }
 
 func (sm *SubnetManager) AcquireLease(tep ip.IP4, data string) (ip.IP4Net, error) {
@@ -141,7 +141,6 @@ func (sm *SubnetManager) GetConfig() *Config {
 }
 
 /// Implementation
-
 func parseSubnetKey(s string) (ip.IP4Net, error) {
 	if parts := subnetRegex.FindStringSubmatch(s); len(parts) == 3 {
 		snIp := net.ParseIP(parts[1]).To4()
@@ -154,43 +153,6 @@ func parseSubnetKey(s string) (ip.IP4Net, error) {
 	return ip.IP4Net{}, errors.New("Error parsing IP Subnet")
 }
 
-type subnetRegistry interface {
-	getConfig() (*etcd.Response, error)
-	getSubnets() (*etcd.Response, error)
-	createSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
-	updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error)
-	watchSubnets(since uint64, stop chan bool) (*etcd.Response, error)
-}
-
-type etcdSubnetRegistry struct {
-	cli    *etcd.Client
-	prefix string
-}
-
-func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
-	resp, err := esr.cli.Get(esr.prefix+"/config", false, false)
-	if err != nil {
-		return nil, err
-	}
-	return resp, nil
-}
-
-func (esr *etcdSubnetRegistry) getSubnets() (*etcd.Response, error) {
-	return esr.cli.Get(esr.prefix+"/subnets", false, true)
-}
-
-func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
-	return esr.cli.Create(esr.prefix+"/subnets/"+sn, data, ttl)
-}
-
-func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
-	return esr.cli.Set(esr.prefix+"/subnets/"+sn, data, ttl)
-}
-
-func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
-	return esr.cli.Watch(esr.prefix+"/subnets", since, true, nil, stop)
-}
-
 func newSubnetManager(r subnetRegistry) (*SubnetManager, error) {
 	cfgResp, err := r.getConfig()
 	if err != nil {
@@ -424,7 +386,7 @@ func (sm *SubnetManager) leaseRenewer() {
 				continue
 			}
 
-			sm.leaseExp = *(resp.Node.Expiration)
+			sm.leaseExp = *resp.Node.Expiration
 			log.Info("Lease renewed, new expiration: ", sm.leaseExp)
 			dur = sm.leaseExp.Sub(time.Now()) - renewMargin