|
@@ -1,6 +1,7 @@
|
|
|
package subnet
|
|
|
|
|
|
import (
|
|
|
+ "fmt"
|
|
|
"path"
|
|
|
"sync"
|
|
|
"time"
|
|
@@ -17,23 +18,44 @@ type subnetRegistry interface {
|
|
|
watchSubnets(since uint64, stop chan bool) (*etcd.Response, error)
|
|
|
}
|
|
|
|
|
|
+type EtcdConfig struct {
|
|
|
+ Endpoints []string
|
|
|
+ Keyfile string
|
|
|
+ Certfile string
|
|
|
+ CAFile string
|
|
|
+ Prefix string
|
|
|
+}
|
|
|
+
|
|
|
type etcdSubnetRegistry struct {
|
|
|
- mux sync.Mutex
|
|
|
- cli *etcd.Client
|
|
|
- endpoint []string
|
|
|
- prefix string
|
|
|
+ mux sync.Mutex
|
|
|
+ cli *etcd.Client
|
|
|
+ etcdCfg *EtcdConfig
|
|
|
}
|
|
|
|
|
|
-func newEtcdSubnetRegistry(endpoint []string, prefix string) subnetRegistry {
|
|
|
- return &etcdSubnetRegistry{
|
|
|
- cli: etcd.NewClient(endpoint),
|
|
|
- endpoint: endpoint,
|
|
|
- prefix: prefix,
|
|
|
+func newEtcdClient(c *EtcdConfig) (*etcd.Client, error) {
|
|
|
+ if c.Keyfile != "" || c.Certfile != "" || c.CAFile != "" {
|
|
|
+ return etcd.NewTLSClient(c.Endpoints, c.Certfile, c.Keyfile, c.CAFile)
|
|
|
+ } else {
|
|
|
+ return etcd.NewClient(c.Endpoints), nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func newEtcdSubnetRegistry(config *EtcdConfig) (subnetRegistry, error) {
|
|
|
+ r := &etcdSubnetRegistry{
|
|
|
+ etcdCfg: config,
|
|
|
+ }
|
|
|
+
|
|
|
+ var err error
|
|
|
+ r.cli, err = newEtcdClient(config)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return r, nil
|
|
|
+}
|
|
|
+
|
|
|
func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
|
|
|
- key := path.Join(esr.prefix, "config")
|
|
|
+ key := path.Join(esr.etcdCfg.Prefix, "config")
|
|
|
resp, err := esr.client().Get(key, false, false)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
@@ -42,12 +64,12 @@ func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
|
|
|
}
|
|
|
|
|
|
func (esr *etcdSubnetRegistry) getSubnets() (*etcd.Response, error) {
|
|
|
- key := path.Join(esr.prefix, "subnets")
|
|
|
+ key := path.Join(esr.etcdCfg.Prefix, "subnets")
|
|
|
return esr.client().Get(key, false, true)
|
|
|
}
|
|
|
|
|
|
func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
|
|
|
- key := path.Join(esr.prefix, "subnets", sn)
|
|
|
+ key := path.Join(esr.etcdCfg.Prefix, "subnets", sn)
|
|
|
resp, err := esr.client().Create(key, data, ttl)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
@@ -58,7 +80,7 @@ func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.
|
|
|
}
|
|
|
|
|
|
func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
|
|
|
- key := path.Join(esr.prefix, "subnets", sn)
|
|
|
+ key := path.Join(esr.etcdCfg.Prefix, "subnets", sn)
|
|
|
resp, err := esr.client().Set(key, data, ttl)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
@@ -70,7 +92,7 @@ func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.
|
|
|
|
|
|
func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
|
|
|
for {
|
|
|
- key := path.Join(esr.prefix, "subnets")
|
|
|
+ key := path.Join(esr.etcdCfg.Prefix, "subnets")
|
|
|
resp, err := esr.client().RawWatch(key, since, true, nil, stop)
|
|
|
|
|
|
if err != nil {
|
|
@@ -101,7 +123,12 @@ func (esr *etcdSubnetRegistry) client() *etcd.Client {
|
|
|
func (esr *etcdSubnetRegistry) resetClient() {
|
|
|
esr.mux.Lock()
|
|
|
defer esr.mux.Unlock()
|
|
|
- esr.cli = etcd.NewClient(esr.endpoint)
|
|
|
+
|
|
|
+ var err error
|
|
|
+ esr.cli, err = newEtcdClient(esr.etcdCfg)
|
|
|
+ if err != nil {
|
|
|
+ panic(fmt.Errorf("resetClient: error recreating etcd client: %v", err))
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
func ensureExpiration(resp *etcd.Response, ttl uint64) {
|