123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516 |
- // Copyright 2015 CoreOS, Inc.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package subnet
- import (
- "encoding/json"
- "errors"
- "fmt"
- "net"
- "path"
- "regexp"
- "strconv"
- "time"
- etcd "github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/etcd/client"
- log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
- "github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
- "github.com/coreos/flannel/pkg/ip"
- )
- const (
- registerRetries = 10
- subnetTTL = 24 * time.Hour
- )
- type EtcdManager struct {
- registry Registry
- networkRegex *regexp.Regexp
- }
- var (
- subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`)
- )
- type watchCursor struct {
- index uint64
- }
- func (c watchCursor) String() string {
- return strconv.FormatUint(c.index, 10)
- }
- func NewEtcdManager(config *EtcdConfig) (Manager, error) {
- r, err := newEtcdSubnetRegistry(config)
- if err != nil {
- return nil, err
- }
- return &EtcdManager{
- registry: r,
- networkRegex: regexp.MustCompile(config.Prefix + `/([^/]*)/config`),
- }, nil
- }
- func newEtcdManager(r Registry) Manager {
- return &EtcdManager{
- registry: r,
- networkRegex: regexp.MustCompile(`/coreos.com/network/([^/]*)/config`),
- }
- }
- func (m *EtcdManager) GetNetworkConfig(ctx context.Context, network string) (*Config, error) {
- cfgResp, err := m.registry.getNetworkConfig(ctx, network)
- if err != nil {
- return nil, err
- }
- return ParseConfig(cfgResp.Node.Value)
- }
- func (m *EtcdManager) AcquireLease(ctx context.Context, network string, attrs *LeaseAttrs) (*Lease, error) {
- config, err := m.GetNetworkConfig(ctx, network)
- if err != nil {
- return nil, err
- }
- for {
- l, err := m.acquireLeaseOnce(ctx, network, config, attrs)
- switch {
- case err == nil:
- log.Info("Subnet lease acquired: ", l.Subnet)
- return l, nil
- case err == context.Canceled, err == context.DeadlineExceeded:
- return nil, err
- default:
- log.Error("Failed to acquire subnet: ", err)
- }
- select {
- case <-time.After(time.Second):
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- }
- }
- func findLeaseByIP(leases []Lease, pubIP ip.IP4) *Lease {
- for _, l := range leases {
- if pubIP == l.Attrs.PublicIP {
- return &l
- }
- }
- return nil
- }
- func (m *EtcdManager) tryAcquireLease(ctx context.Context, network string, config *Config, extIaddr ip.IP4, attrs *LeaseAttrs) (*Lease, error) {
- var err error
- leases, _, err := m.getLeases(ctx, network)
- if err != nil {
- return nil, err
- }
- attrBytes, err := json.Marshal(attrs)
- if err != nil {
- return nil, err
- }
- // try to reuse a subnet if there's one that matches our IP
- if l := findLeaseByIP(leases, extIaddr); l != nil {
- // make sure the existing subnet is still within the configured network
- if isSubnetConfigCompat(config, l.Subnet) {
- log.Infof("Found lease (%v) for current IP (%v), reusing", l.Subnet, extIaddr)
- resp, err := m.registry.updateSubnet(ctx, network, l.Key(), string(attrBytes), subnetTTL)
- if err != nil {
- return nil, err
- }
- l.Attrs = attrs
- l.Expiration = *resp.Node.Expiration
- return l, nil
- } else {
- log.Infof("Found lease (%v) for current IP (%v) but not compatible with current config, deleting", l.Subnet, extIaddr)
- if _, err := m.registry.deleteSubnet(ctx, network, l.Key()); err != nil {
- return nil, err
- }
- }
- }
- // no existing match, grab a new one
- sn, err := m.allocateSubnet(config, leases)
- if err != nil {
- return nil, err
- }
- resp, err := m.registry.createSubnet(ctx, network, sn.StringSep(".", "-"), string(attrBytes), subnetTTL)
- if err == nil {
- return &Lease{
- Subnet: sn,
- Attrs: attrs,
- Expiration: *resp.Node.Expiration,
- }, nil
- }
- if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeNodeExist {
- // if etcd returned Key Already Exists, try again.
- return nil, nil
- }
- return nil, err
- }
- func (m *EtcdManager) acquireLeaseOnce(ctx context.Context, network string, config *Config, attrs *LeaseAttrs) (*Lease, error) {
- for i := 0; i < registerRetries; i++ {
- l, err := m.tryAcquireLease(ctx, network, config, attrs.PublicIP, attrs)
- switch {
- case err != nil:
- return nil, err
- case l != nil:
- return l, nil
- }
- // before moving on, check for cancel
- // TODO(eyakubovich): propogate ctx deeper into registry
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- default:
- }
- }
- return nil, errors.New("Max retries reached trying to acquire a subnet")
- }
- func parseSubnetKey(s string) (ip.IP4Net, error) {
- if parts := subnetRegex.FindStringSubmatch(s); len(parts) == 3 {
- snIp := net.ParseIP(parts[1]).To4()
- prefixLen, err := strconv.ParseUint(parts[2], 10, 5)
- if snIp != nil && err == nil {
- return ip.IP4Net{IP: ip.FromIP(snIp), PrefixLen: uint(prefixLen)}, nil
- }
- }
- return ip.IP4Net{}, errors.New("Error parsing IP Subnet")
- }
- func (m *EtcdManager) allocateSubnet(config *Config, leases []Lease) (ip.IP4Net, error) {
- log.Infof("Picking subnet in range %s ... %s", config.SubnetMin, config.SubnetMax)
- var bag []ip.IP4
- sn := ip.IP4Net{IP: config.SubnetMin, PrefixLen: config.SubnetLen}
- OuterLoop:
- for ; sn.IP <= config.SubnetMax && len(bag) < 100; sn = sn.Next() {
- for _, l := range leases {
- if sn.Overlaps(l.Subnet) {
- continue OuterLoop
- }
- }
- bag = append(bag, sn.IP)
- }
- if len(bag) == 0 {
- return ip.IP4Net{}, errors.New("out of subnets")
- } else {
- i := randInt(0, len(bag))
- return ip.IP4Net{IP: bag[i], PrefixLen: config.SubnetLen}, nil
- }
- }
- // getLeases queries etcd to get a list of currently allocated leases for a given network.
- // It returns the leases along with the "as-of" etcd-index that can be used as the starting
- // point for etcd watch.
- func (m *EtcdManager) getLeases(ctx context.Context, network string) ([]Lease, uint64, error) {
- resp, err := m.registry.getSubnets(ctx, network)
- leases := []Lease{}
- if err == nil {
- for _, node := range resp.Node.Nodes {
- sn, err := parseSubnetKey(node.Key)
- if err == nil {
- attrs := &LeaseAttrs{}
- if err = json.Unmarshal([]byte(node.Value), attrs); err == nil {
- exp := time.Time{}
- if node.Expiration != nil {
- exp = *node.Expiration
- }
- lease := Lease{
- Subnet: sn,
- Attrs: attrs,
- Expiration: exp,
- }
- leases = append(leases, lease)
- }
- }
- }
- return leases, resp.Index, nil
- }
- if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
- // key not found: treat it as empty set
- return leases, etcdErr.Index, nil
- }
- return nil, 0, err
- }
- func (m *EtcdManager) RenewLease(ctx context.Context, network string, lease *Lease) error {
- attrBytes, err := json.Marshal(lease.Attrs)
- if err != nil {
- return err
- }
- // TODO(eyakubovich): propogate ctx into registry
- resp, err := m.registry.updateSubnet(ctx, network, lease.Key(), string(attrBytes), subnetTTL)
- if err != nil {
- return err
- }
- lease.Expiration = *resp.Node.Expiration
- return nil
- }
- func getNextIndex(cursor interface{}) (uint64, error) {
- nextIndex := uint64(0)
- if wc, ok := cursor.(watchCursor); ok {
- nextIndex = wc.index
- } else if s, ok := cursor.(string); ok {
- var err error
- nextIndex, err = strconv.ParseUint(s, 10, 64)
- if err != nil {
- return 0, fmt.Errorf("failed to parse cursor: %v", err)
- }
- } else {
- return 0, fmt.Errorf("internal error: watch cursor is of unknown type")
- }
- return nextIndex, nil
- }
- func (m *EtcdManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (LeaseWatchResult, error) {
- if cursor == nil {
- return m.leaseWatchReset(ctx, network)
- }
- nextIndex, err := getNextIndex(cursor)
- if err != nil {
- return LeaseWatchResult{}, err
- }
- resp, err := m.registry.watch(ctx, path.Join(network, "subnets"), nextIndex)
- switch {
- case err == nil:
- return parseSubnetWatchResponse(resp)
- case isIndexTooSmall(err):
- log.Warning("Watch of subnet leases failed because etcd index outside history window")
- return m.leaseWatchReset(ctx, network)
- default:
- return LeaseWatchResult{}, err
- }
- }
- func (m *EtcdManager) WatchNetworks(ctx context.Context, cursor interface{}) (NetworkWatchResult, error) {
- if cursor == nil {
- return m.networkWatchReset(ctx)
- }
- nextIndex, err := getNextIndex(cursor)
- if err != nil {
- return NetworkWatchResult{}, err
- }
- resp, err := m.registry.watch(ctx, "", nextIndex)
- switch {
- case err == nil:
- return m.parseNetworkWatchResponse(resp)
- case isIndexTooSmall(err):
- log.Warning("Watch of subnet leases failed because etcd index outside history window")
- return m.networkWatchReset(ctx)
- default:
- return NetworkWatchResult{}, err
- }
- }
- func isIndexTooSmall(err error) bool {
- etcdErr, ok := err.(etcd.Error)
- return ok && etcdErr.Code == etcd.ErrorCodeEventIndexCleared
- }
- func parseSubnetWatchResponse(resp *etcd.Response) (LeaseWatchResult, error) {
- sn, err := parseSubnetKey(resp.Node.Key)
- if err != nil {
- return LeaseWatchResult{}, fmt.Errorf("error parsing subnet IP: %s", resp.Node.Key)
- }
- evt := Event{}
- switch resp.Action {
- case "delete", "expire":
- evt = Event{
- EventRemoved,
- Lease{Subnet: sn},
- "",
- }
- default:
- attrs := &LeaseAttrs{}
- err := json.Unmarshal([]byte(resp.Node.Value), attrs)
- if err != nil {
- return LeaseWatchResult{}, err
- }
- exp := time.Time{}
- if resp.Node.Expiration != nil {
- exp = *resp.Node.Expiration
- }
- evt = Event{
- EventAdded,
- Lease{
- Subnet: sn,
- Attrs: attrs,
- Expiration: exp,
- },
- "",
- }
- }
- return LeaseWatchResult{
- Cursor: watchCursor{resp.Node.ModifiedIndex},
- Events: []Event{evt},
- }, nil
- }
- // Returns network name from config key (eg, /coreos.com/network/foobar/config),
- // if the 'config' key isn't present we don't consider the network valid
- func (m *EtcdManager) parseNetworkKey(s string) (string, error) {
- if parts := m.networkRegex.FindStringSubmatch(s); len(parts) == 2 {
- return parts[1], nil
- }
- return "", errors.New("Error parsing Network key")
- }
- func (m *EtcdManager) parseNetworkWatchResponse(resp *etcd.Response) (NetworkWatchResult, error) {
- netname, err := m.parseNetworkKey(resp.Node.Key)
- if err != nil {
- // Ignore non .../<netname>/config keys
- return NetworkWatchResult{}, nil
- }
- evt := Event{}
- switch resp.Action {
- case "delete":
- evt = Event{
- EventRemoved,
- Lease{},
- netname,
- }
- default:
- _, err := ParseConfig(resp.Node.Value)
- if err != nil {
- return NetworkWatchResult{}, err
- }
- evt = Event{
- EventAdded,
- Lease{},
- netname,
- }
- }
- return NetworkWatchResult{
- Cursor: watchCursor{resp.Node.ModifiedIndex},
- Events: []Event{evt},
- }, nil
- }
- // getNetworks queries etcd to get a list of network names. It returns the
- // networks along with the 'as-of' etcd-index that can be used as the starting
- // point for etcd watch.
- func (m *EtcdManager) getNetworks(ctx context.Context) ([]string, uint64, error) {
- resp, err := m.registry.getNetworks(ctx)
- networks := []string{}
- if err == nil {
- for _, node := range resp.Node.Nodes {
- netname, err := m.parseNetworkKey(node.Key)
- if err == nil {
- networks = append(networks, netname)
- }
- }
- return networks, resp.Index, nil
- }
- if etcdErr, ok := err.(etcd.Error); ok && etcdErr.Code == etcd.ErrorCodeKeyNotFound {
- // key not found: treat it as empty set
- return networks, etcdErr.Index, nil
- }
- return nil, 0, err
- }
- // leaseWatchReset is called when incremental lease watch failed and we need to grab a snapshot
- func (m *EtcdManager) leaseWatchReset(ctx context.Context, network string) (LeaseWatchResult, error) {
- wr := LeaseWatchResult{}
- leases, index, err := m.getLeases(ctx, network)
- if err != nil {
- return wr, fmt.Errorf("failed to retrieve subnet leases: %v", err)
- }
- wr.Cursor = watchCursor{index}
- wr.Snapshot = leases
- return wr, nil
- }
- // networkWatchReset is called when incremental network watch failed and we need to grab a snapshot
- func (m *EtcdManager) networkWatchReset(ctx context.Context) (NetworkWatchResult, error) {
- wr := NetworkWatchResult{}
- networks, index, err := m.getNetworks(ctx)
- if err != nil {
- return wr, fmt.Errorf("failed to retrieve networks: %v", err)
- }
- wr.Cursor = watchCursor{index}
- wr.Snapshot = networks
- return wr, nil
- }
- func isSubnetConfigCompat(config *Config, sn ip.IP4Net) bool {
- if sn.IP < config.SubnetMin || sn.IP > config.SubnetMax {
- return false
- }
- return sn.PrefixLen == config.SubnetLen
- }
|