// Copyright 2015 flannel authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package subnet import ( "encoding/json" "errors" "fmt" "net" "path" "regexp" "strconv" "sync" "time" etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client" "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/pkg/transport" log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog" "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/flannel/pkg/ip" ) var ( subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`) ErrTryAgain = errors.New("try again") ) type Registry interface { getNetworkConfig(ctx context.Context, network string) (string, error) getSubnets(ctx context.Context, network string) ([]Lease, uint64, error) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error) getNetworks(ctx context.Context) ([]string, uint64, error) watchNetworks(ctx context.Context, since uint64) (Event, uint64, error) } type EtcdConfig struct { Endpoints []string Keyfile string Certfile string CAFile string Prefix string } type etcdSubnetRegistry struct { mux sync.Mutex cli etcd.KeysAPI etcdCfg *EtcdConfig networkRegex *regexp.Regexp } func newEtcdClient(c *EtcdConfig) (etcd.KeysAPI, error) { tlsInfo := transport.TLSInfo{ CertFile: c.Certfile, KeyFile: c.Keyfile, CAFile: c.CAFile, } t, err := transport.NewTransport(tlsInfo) if err != nil { return nil, err } cli, err := etcd.New(etcd.Config{ Endpoints: c.Endpoints, Transport: t, }) if err != nil { return nil, err } return etcd.NewKeysAPI(cli), nil } func newEtcdSubnetRegistry(config *EtcdConfig) (Registry, error) { r := &etcdSubnetRegistry{ etcdCfg: config, networkRegex: regexp.MustCompile(config.Prefix + `/([^/]*)(/|/config)?$`), } var err error r.cli, err = newEtcdClient(config) if err != nil { return nil, err } return r, nil } func (esr *etcdSubnetRegistry) getNetworkConfig(ctx context.Context, network string) (string, error) { key := path.Join(esr.etcdCfg.Prefix, network, "config") resp, err := esr.client().Get(ctx, key, nil) if err != nil { return "", err } return resp.Node.Value, nil } // getSubnets queries etcd to get a list of currently allocated leases for a given network. // It returns the leases along with the "as-of" etcd-index that can be used as the starting // point for etcd watch. func (esr *etcdSubnetRegistry) getSubnets(ctx context.Context, network string) ([]Lease, uint64, error) { key := path.Join(esr.etcdCfg.Prefix, network, "subnets") resp, err := esr.client().Get(ctx, key, &etcd.GetOptions{Recursive: true}) if err != nil { if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound { // key not found: treat it as empty set return []Lease{}, etcdErr.Index, nil } return nil, 0, err } leases := []Lease{} for _, node := range resp.Node.Nodes { if sn := parseSubnetKey(node.Key); sn != nil { attrs := &LeaseAttrs{} if err = json.Unmarshal([]byte(node.Value), attrs); err == nil { exp := time.Time{} if node.Expiration != nil { exp = *node.Expiration } lease := Lease{ Subnet: *sn, Attrs: *attrs, Expiration: exp, } leases = append(leases, lease) } } } return leases, resp.Index, nil } func (esr *etcdSubnetRegistry) createSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration) (time.Time, error) { key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn.StringSep(".", "-")) value, err := json.Marshal(attrs) if err != nil { return time.Time{}, err } opts := &etcd.SetOptions{ PrevExist: etcd.PrevNoExist, TTL: ttl, } resp, err := esr.client().Set(ctx, key, string(value), opts) if err != nil { return time.Time{}, err } ensureExpiration(resp, ttl) return *resp.Node.Expiration, nil } func (esr *etcdSubnetRegistry) updateSubnet(ctx context.Context, network string, sn ip.IP4Net, attrs *LeaseAttrs, ttl time.Duration, asof uint64) (time.Time, error) { key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn.StringSep(".", "-")) value, err := json.Marshal(attrs) if err != nil { return time.Time{}, err } resp, err := esr.client().Set(ctx, key, string(value), &etcd.SetOptions{ PrevIndex: asof, TTL: ttl, }) if err != nil { return time.Time{}, err } ensureExpiration(resp, ttl) return *resp.Node.Expiration, nil } func (esr *etcdSubnetRegistry) deleteSubnet(ctx context.Context, network string, sn ip.IP4Net) error { key := path.Join(esr.etcdCfg.Prefix, network, "subnets", sn.StringSep(".", "-")) _, err := esr.client().Delete(ctx, key, nil) return err } func (esr *etcdSubnetRegistry) watchSubnets(ctx context.Context, network string, since uint64) (Event, uint64, error) { key := path.Join(esr.etcdCfg.Prefix, network, "subnets") opts := &etcd.WatcherOptions{ AfterIndex: since, Recursive: true, } e, err := esr.client().Watcher(key, opts).Next(ctx) if err != nil { return Event{}, 0, err } evt, err := parseSubnetWatchResponse(e) return evt, e.Node.ModifiedIndex, err } // getNetworks queries etcd to get a list of network names. It returns the // networks along with the 'as-of' etcd-index that can be used as the starting // point for etcd watch. func (esr *etcdSubnetRegistry) getNetworks(ctx context.Context) ([]string, uint64, error) { resp, err := esr.client().Get(ctx, esr.etcdCfg.Prefix, &etcd.GetOptions{Recursive: true}) networks := []string{} if err == nil { for _, node := range resp.Node.Nodes { // Look for '/config' on the child nodes for _, child := range node.Nodes { netname, isConfig := esr.parseNetworkKey(child.Key) if isConfig { networks = append(networks, netname) } } } return networks, resp.Index, nil } if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound { // key not found: treat it as empty set return networks, etcdErr.Index, nil } return nil, 0, err } func (esr *etcdSubnetRegistry) watchNetworks(ctx context.Context, since uint64) (Event, uint64, error) { key := esr.etcdCfg.Prefix opts := &etcd.WatcherOptions{ AfterIndex: since, Recursive: true, } e, err := esr.client().Watcher(key, opts).Next(ctx) if err != nil { return Event{}, 0, err } return esr.parseNetworkWatchResponse(e) } func (esr *etcdSubnetRegistry) client() etcd.KeysAPI { esr.mux.Lock() defer esr.mux.Unlock() return esr.cli } func (esr *etcdSubnetRegistry) resetClient() { esr.mux.Lock() defer esr.mux.Unlock() var err error esr.cli, err = newEtcdClient(esr.etcdCfg) if err != nil { panic(fmt.Errorf("resetClient: error recreating etcd client: %v", err)) } } func ensureExpiration(resp *etcd.Response, ttl time.Duration) { if resp.Node.Expiration == nil { // should not be but calc it ourselves in this case log.Info("Expiration field missing on etcd response, calculating locally") exp := clock.Now().Add(time.Duration(ttl) * time.Second) resp.Node.Expiration = &exp } } func parseSubnetWatchResponse(resp *etcd.Response) (Event, error) { sn := parseSubnetKey(resp.Node.Key) if sn == nil { return Event{}, fmt.Errorf("%v %q: not a subnet, skipping", resp.Action, resp.Node.Key) } switch resp.Action { case "delete", "expire": return Event{ EventRemoved, Lease{Subnet: *sn}, "", }, nil default: attrs := &LeaseAttrs{} err := json.Unmarshal([]byte(resp.Node.Value), attrs) if err != nil { return Event{}, err } exp := time.Time{} if resp.Node.Expiration != nil { exp = *resp.Node.Expiration } evt := Event{ EventAdded, Lease{ Subnet: *sn, Attrs: *attrs, Expiration: exp, }, "", } return evt, nil } } func (esr *etcdSubnetRegistry) parseNetworkWatchResponse(resp *etcd.Response) (Event, uint64, error) { index := resp.Node.ModifiedIndex netname, isConfig := esr.parseNetworkKey(resp.Node.Key) if netname == "" { return Event{}, index, ErrTryAgain } evt := Event{} switch resp.Action { case "delete": evt = Event{ EventRemoved, Lease{}, netname, } default: if !isConfig { // Ignore non ...//config keys; tell caller to try again return Event{}, index, ErrTryAgain } _, err := ParseConfig(resp.Node.Value) if err != nil { return Event{}, index, err } evt = Event{ EventAdded, Lease{}, netname, } } return evt, index, nil } // Returns network name from config key (eg, /coreos.com/network/foobar/config), // if the 'config' key isn't present we don't consider the network valid func (esr *etcdSubnetRegistry) parseNetworkKey(s string) (string, bool) { if parts := esr.networkRegex.FindStringSubmatch(s); len(parts) == 3 { return parts[1], parts[2] != "" } return "", false } func parseSubnetKey(s string) *ip.IP4Net { if parts := subnetRegex.FindStringSubmatch(s); len(parts) == 3 { snIp := net.ParseIP(parts[1]).To4() prefixLen, err := strconv.ParseUint(parts[2], 10, 5) if snIp != nil && err == nil { return &ip.IP4Net{IP: ip.FromIP(snIp), PrefixLen: uint(prefixLen)} } } return nil }