Browse Source

Add/remove networks when registry changes

Add a new -watch-networks option to watch for registry (either
remote or local) changes and add/remove networks for those events.
If this option is given along with -networks, only the networks
specified in -networks will be acted upon.  If -networks is
empty, then all network events will be acted upon.
Dan Williams 9 years ago
parent
commit
6b68c43610
3 changed files with 343 additions and 169 deletions
  1. 13 155
      main.go
  2. 306 0
      network/manager.go
  3. 24 14
      network/network.go

+ 13 - 155
main.go

@@ -17,28 +17,22 @@ package main
 import (
 	"flag"
 	"fmt"
-	"net"
 	"os"
 	"os/signal"
-	"path/filepath"
 	"strings"
 	"sync"
 	"syscall"
 
-	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-systemd/daemon"
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/pkg/flagutil"
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 
-	"github.com/coreos/flannel/backend"
 	"github.com/coreos/flannel/network"
-	"github.com/coreos/flannel/pkg/ip"
 	"github.com/coreos/flannel/remote"
 	"github.com/coreos/flannel/subnet"
 )
 
 type CmdLineOpts struct {
-	publicIP       string
 	etcdEndpoints  string
 	etcdPrefix     string
 	etcdKeyfile    string
@@ -46,108 +40,30 @@ type CmdLineOpts struct {
 	etcdCAFile     string
 	help           bool
 	version        bool
-	ipMasq         bool
-	subnetFile     string
-	subnetDir      string
-	iface          string
 	listen         string
 	remote         string
 	remoteKeyfile  string
 	remoteCertfile string
 	remoteCAFile   string
-	networks       string
 }
 
 var opts CmdLineOpts
 
 func init() {
-	flag.StringVar(&opts.publicIP, "public-ip", "", "IP accessible by other nodes for inter-host communication")
 	flag.StringVar(&opts.etcdEndpoints, "etcd-endpoints", "http://127.0.0.1:4001,http://127.0.0.1:2379", "a comma-delimited list of etcd endpoints")
 	flag.StringVar(&opts.etcdPrefix, "etcd-prefix", "/coreos.com/network", "etcd prefix")
 	flag.StringVar(&opts.etcdKeyfile, "etcd-keyfile", "", "SSL key file used to secure etcd communication")
 	flag.StringVar(&opts.etcdCertfile, "etcd-certfile", "", "SSL certification file used to secure etcd communication")
 	flag.StringVar(&opts.etcdCAFile, "etcd-cafile", "", "SSL Certificate Authority file used to secure etcd communication")
-	flag.StringVar(&opts.subnetFile, "subnet-file", "/run/flannel/subnet.env", "filename where env variables (subnet, MTU, ... ) will be written to")
-	flag.StringVar(&opts.subnetDir, "subnet-dir", "/run/flannel/networks", "directory where files with env variables (subnet, MTU, ...) will be written to")
-	flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
 	flag.StringVar(&opts.listen, "listen", "", "run as server and listen on specified address (e.g. ':8080')")
 	flag.StringVar(&opts.remote, "remote", "", "run as client and connect to server on specified address (e.g. '10.1.2.3:8080')")
 	flag.StringVar(&opts.remoteKeyfile, "remote-keyfile", "", "SSL key file used to secure client/server communication")
 	flag.StringVar(&opts.remoteCertfile, "remote-certfile", "", "SSL certification file used to secure client/server communication")
 	flag.StringVar(&opts.remoteCAFile, "remote-cafile", "", "SSL Certificate Authority file used to secure client/server communication")
-	flag.StringVar(&opts.networks, "networks", "", "run in multi-network mode and service the specified networks")
-	flag.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
 	flag.BoolVar(&opts.help, "help", false, "print this message")
 	flag.BoolVar(&opts.version, "version", false, "print version and exit")
 }
 
-func writeSubnetFile(path string, nw ip.IP4Net, sd *backend.SubnetDef) error {
-	dir, name := filepath.Split(path)
-	os.MkdirAll(dir, 0755)
-
-	tempFile := filepath.Join(dir, "."+name)
-	f, err := os.Create(tempFile)
-	if err != nil {
-		return err
-	}
-
-	// Write out the first usable IP by incrementing
-	// sn.IP by one
-	sn := sd.Lease.Subnet
-	sn.IP += 1
-
-	fmt.Fprintf(f, "FLANNEL_NETWORK=%s\n", nw)
-	fmt.Fprintf(f, "FLANNEL_SUBNET=%s\n", sn)
-	fmt.Fprintf(f, "FLANNEL_MTU=%d\n", sd.MTU)
-	_, err = fmt.Fprintf(f, "FLANNEL_IPMASQ=%v\n", opts.ipMasq)
-	f.Close()
-	if err != nil {
-		return err
-	}
-
-	// rename(2) the temporary file to the desired location so that it becomes
-	// atomically visible with the contents
-	return os.Rename(tempFile, path)
-}
-
-func lookupIface() (*net.Interface, net.IP, error) {
-	var iface *net.Interface
-	var iaddr net.IP
-	var err error
-
-	if len(opts.iface) > 0 {
-		if iaddr = net.ParseIP(opts.iface); iaddr != nil {
-			iface, err = ip.GetInterfaceByIP(iaddr)
-			if err != nil {
-				return nil, nil, fmt.Errorf("Error looking up interface %s: %s", opts.iface, err)
-			}
-		} else {
-			iface, err = net.InterfaceByName(opts.iface)
-			if err != nil {
-				return nil, nil, fmt.Errorf("Error looking up interface %s: %s", opts.iface, err)
-			}
-		}
-	} else {
-		log.Info("Determining IP address of default interface")
-		if iface, err = ip.GetDefaultGatewayIface(); err != nil {
-			return nil, nil, fmt.Errorf("Failed to get default interface: %s", err)
-		}
-	}
-
-	if iaddr == nil {
-		iaddr, err = ip.GetIfaceIP4Addr(iface)
-		if err != nil {
-			return nil, nil, fmt.Errorf("Failed to find IPv4 address for interface %s", iface.Name)
-		}
-	}
-
-	return iface, iaddr, nil
-}
-
-func isMultiNetwork() bool {
-	return len(opts.networks) > 0
-}
-
 func newSubnetManager() (subnet.Manager, error) {
 	if opts.remote != "" {
 		return remote.NewRemoteManager(opts.remote, opts.remoteCAFile, opts.remoteCertfile, opts.remoteKeyfile)
@@ -164,66 +80,6 @@ func newSubnetManager() (subnet.Manager, error) {
 	return subnet.NewEtcdManager(cfg)
 }
 
-func initAndRun(ctx context.Context, sm subnet.Manager, netnames []string) {
-	iface, iaddr, err := lookupIface()
-	if err != nil {
-		log.Error(err)
-		return
-	}
-
-	if iface.MTU == 0 {
-		log.Errorf("Failed to determine MTU for %s interface", iaddr)
-		return
-	}
-
-	var eaddr net.IP
-
-	if len(opts.publicIP) > 0 {
-		eaddr = net.ParseIP(opts.publicIP)
-	}
-
-	if eaddr == nil {
-		eaddr = iaddr
-	}
-
-	log.Infof("Using %s as external interface", iaddr)
-	log.Infof("Using %s as external endpoint", eaddr)
-
-	nets := []*network.Network{}
-	for _, n := range netnames {
-		nets = append(nets, network.New(sm, n, opts.ipMasq))
-	}
-
-	wg := sync.WaitGroup{}
-
-	for _, n := range nets {
-		go func(n *network.Network) {
-			wg.Add(1)
-			defer wg.Done()
-
-			sn := n.Init(ctx, iface, iaddr, eaddr)
-			if sn != nil {
-				if isMultiNetwork() {
-					path := filepath.Join(opts.subnetDir, n.Name) + ".env"
-					if err := writeSubnetFile(path, n.Config.Network, sn); err != nil {
-						return
-					}
-				} else {
-					if err := writeSubnetFile(opts.subnetFile, n.Config.Network, sn); err != nil {
-						return
-					}
-					daemon.SdNotify("READY=1")
-				}
-
-				n.Run(ctx)
-				log.Infof("%v exited", n.Name)
-			}
-		}(n)
-	}
-
-	wg.Wait()
-}
-
 func main() {
 	// glog will log to tmp files by default. override so all entries
 	// can flow into journald (if running under systemd)
@@ -251,6 +107,13 @@ func main() {
 		os.Exit(1)
 	}
 
+	// Register for SIGINT and SIGTERM
+	log.Info("Installing signal handlers")
+	sigs := make(chan os.Signal, 1)
+	signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
+
+	ctx, cancel := context.WithCancel(context.Background())
+
 	var runFunc func(ctx context.Context)
 
 	if opts.listen != "" {
@@ -263,22 +126,17 @@ func main() {
 			remote.RunServer(ctx, sm, opts.listen, opts.remoteCAFile, opts.remoteCertfile, opts.remoteKeyfile)
 		}
 	} else {
-		networks := strings.Split(opts.networks, ",")
-		if len(networks) == 0 {
-			networks = append(networks, "")
+		nm, err := network.NewNetworkManager(ctx, sm)
+		if err != nil {
+			log.Error("Failed to create NetworkManager: ", err)
+			os.Exit(1)
 		}
+
 		runFunc = func(ctx context.Context) {
-			initAndRun(ctx, sm, networks)
+			nm.Run()
 		}
 	}
 
-	// Register for SIGINT and SIGTERM
-	log.Info("Installing signal handlers")
-	sigs := make(chan os.Signal, 1)
-	signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
-
-	ctx, cancel := context.WithCancel(context.Background())
-
 	wg := sync.WaitGroup{}
 	wg.Add(1)
 	go func() {

+ 306 - 0
network/manager.go

@@ -0,0 +1,306 @@
+// Copyright 2015 CoreOS, Inc.
+// Copyright 2015 Red Hat, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package network
+
+import (
+	"flag"
+	"fmt"
+	"net"
+	"os"
+	"path/filepath"
+	"strings"
+	"sync"
+
+	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/coreos/go-systemd/daemon"
+	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
+	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+
+	"github.com/coreos/flannel/backend"
+	"github.com/coreos/flannel/pkg/ip"
+	"github.com/coreos/flannel/subnet"
+)
+
+type CmdLineOpts struct {
+	publicIP      string
+	ipMasq        bool
+	subnetFile    string
+	subnetDir     string
+	iface         string
+	networks      string
+	watchNetworks bool
+}
+
+var opts CmdLineOpts
+
+func init() {
+	flag.StringVar(&opts.publicIP, "public-ip", "", "IP accessible by other nodes for inter-host communication")
+	flag.StringVar(&opts.subnetFile, "subnet-file", "/run/flannel/subnet.env", "filename where env variables (subnet, MTU, ... ) will be written to")
+	flag.StringVar(&opts.subnetDir, "subnet-dir", "/run/flannel/networks", "directory where files with env variables (subnet, MTU, ...) will be written to")
+	flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
+	flag.StringVar(&opts.networks, "networks", "", "run in multi-network mode and service the specified networks")
+	flag.BoolVar(&opts.watchNetworks, "watch-networks", false, "run in multi-network mode and watch for networks from 'networks' or all networks")
+	flag.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
+}
+
+type Manager struct {
+	ctx             context.Context
+	sm              subnet.Manager
+	allowedNetworks map[string]bool
+	networks        map[string]*Network
+	watch           bool
+	ipMasq          bool
+	extIface        *net.Interface
+	iaddr           net.IP
+	eaddr           net.IP
+}
+
+func (m *Manager) isNetAllowed(name string) bool {
+	// If allowedNetworks is empty all networks are allowed
+	if len(m.allowedNetworks) > 0 {
+		_, ok := m.allowedNetworks[name]
+		return ok
+	}
+	return true
+}
+
+func (m *Manager) isMultiNetwork() bool {
+	return len(m.allowedNetworks) > 0 || m.watch
+}
+
+func NewNetworkManager(ctx context.Context, sm subnet.Manager) (*Manager, error) {
+	iface, iaddr, err := lookupExtIface(opts.iface)
+	if err != nil {
+		return nil, err
+	}
+
+	if iface.MTU == 0 {
+		return nil, fmt.Errorf("Failed to determine MTU for %s interface", iaddr)
+	}
+
+	var eaddr net.IP
+
+	if len(opts.publicIP) > 0 {
+		eaddr = net.ParseIP(opts.publicIP)
+		if eaddr == nil {
+			return nil, fmt.Errorf("Invalid public IP address", opts.publicIP)
+		}
+	}
+
+	if eaddr == nil {
+		eaddr = iaddr
+	}
+
+	log.Infof("Using %s as external interface", iaddr)
+	log.Infof("Using %s as external endpoint", eaddr)
+
+	manager := &Manager{
+		ctx:             ctx,
+		sm:              sm,
+		allowedNetworks: make(map[string]bool),
+		networks:        make(map[string]*Network),
+		watch:           opts.watchNetworks,
+		ipMasq:          opts.ipMasq,
+		extIface:        iface,
+		iaddr:           iaddr,
+		eaddr:           eaddr,
+	}
+
+	for _, name := range strings.Split(opts.networks, ",") {
+		manager.allowedNetworks[name] = true
+	}
+
+	if manager.isMultiNetwork() {
+		// Get list of existing networks
+		result, err := manager.sm.WatchNetworks(ctx, nil)
+		if err != nil {
+			return nil, err
+		}
+
+		for _, n := range result.Snapshot {
+			if manager.isNetAllowed(n) {
+				manager.networks[n] = NewNetwork(ctx, sm, n, manager.ipMasq)
+			}
+		}
+	} else {
+		manager.networks[""] = NewNetwork(ctx, sm, "", manager.ipMasq)
+	}
+
+	return manager, nil
+}
+
+func lookupExtIface(ifname string) (*net.Interface, net.IP, error) {
+	var iface *net.Interface
+	var iaddr net.IP
+	var err error
+
+	if len(ifname) > 0 {
+		if iaddr = net.ParseIP(ifname); iaddr != nil {
+			iface, err = ip.GetInterfaceByIP(iaddr)
+			if err != nil {
+				return nil, nil, fmt.Errorf("Error looking up interface %s: %s", ifname, err)
+			}
+		} else {
+			iface, err = net.InterfaceByName(ifname)
+			if err != nil {
+				return nil, nil, fmt.Errorf("Error looking up interface %s: %s", ifname, err)
+			}
+		}
+	} else {
+		log.Info("Determining IP address of default interface")
+		if iface, err = ip.GetDefaultGatewayIface(); err != nil {
+			return nil, nil, fmt.Errorf("Failed to get default interface: %s", err)
+		}
+	}
+
+	if iaddr == nil {
+		iaddr, err = ip.GetIfaceIP4Addr(iface)
+		if err != nil {
+			return nil, nil, fmt.Errorf("Failed to find IPv4 address for interface %s", iface.Name)
+		}
+	}
+
+	return iface, iaddr, nil
+}
+
+func writeSubnetFile(path string, nw ip.IP4Net, ipMasq bool, sd *backend.SubnetDef) error {
+	dir, name := filepath.Split(path)
+	os.MkdirAll(dir, 0755)
+
+	tempFile := filepath.Join(dir, "."+name)
+	f, err := os.Create(tempFile)
+	if err != nil {
+		return err
+	}
+
+	// Write out the first usable IP by incrementing
+	// sn.IP by one
+	sn := sd.Lease.Subnet
+	sn.IP += 1
+
+	fmt.Fprintf(f, "FLANNEL_NETWORK=%s\n", nw)
+	fmt.Fprintf(f, "FLANNEL_SUBNET=%s\n", sn)
+	fmt.Fprintf(f, "FLANNEL_MTU=%d\n", sd.MTU)
+	_, err = fmt.Fprintf(f, "FLANNEL_IPMASQ=%v\n", ipMasq)
+	f.Close()
+	if err != nil {
+		return err
+	}
+
+	// rename(2) the temporary file to the desired location so that it becomes
+	// atomically visible with the contents
+	return os.Rename(tempFile, path)
+}
+
+func (m *Manager) RunNetwork(net *Network) {
+	sn := net.Init(m.extIface, m.iaddr, m.eaddr)
+	if sn != nil {
+		if m.isMultiNetwork() {
+			path := filepath.Join(opts.subnetDir, net.Name) + ".env"
+			if err := writeSubnetFile(path, net.Config.Network, m.ipMasq, sn); err != nil {
+				log.Warningf("%v failed to write subnet file: %s", net.Name, err)
+				return
+			}
+		} else {
+			if err := writeSubnetFile(opts.subnetFile, net.Config.Network, m.ipMasq, sn); err != nil {
+				log.Warningf("%v failed to write subnet file: %s", net.Name, err)
+				return
+			}
+			daemon.SdNotify("READY=1")
+		}
+
+		log.Infof("Running network %v", net.Name)
+		net.Run()
+		log.Infof("%v exited", net.Name)
+	}
+}
+
+func (m *Manager) watchNetworks() {
+	wg := sync.WaitGroup{}
+
+	events := make(chan []subnet.Event)
+	wg.Add(1)
+	go func() {
+		subnet.WatchNetworks(m.ctx, m.sm, events)
+		wg.Done()
+	}()
+	// skip over the initial snapshot
+	<-events
+
+	for {
+		select {
+		case <-m.ctx.Done():
+			break
+
+		case evtBatch := <-events:
+			for _, e := range evtBatch {
+				netname := e.Network
+				if !m.isNetAllowed(netname) {
+					continue
+				}
+
+				switch e.Type {
+				case subnet.EventAdded:
+					if _, ok := m.networks[netname]; ok {
+						continue
+					}
+					net := NewNetwork(m.ctx, m.sm, netname, m.ipMasq)
+					m.networks[netname] = net
+					wg.Add(1)
+					go func() {
+						m.RunNetwork(net)
+						wg.Done()
+					}()
+
+				case subnet.EventRemoved:
+					net, ok := m.networks[netname]
+					if !ok {
+						log.Warningf("Network %v unknown; ignoring EventRemoved", netname)
+						continue
+					}
+					net.Cancel()
+					delete(m.networks, netname)
+				}
+			}
+		}
+	}
+
+	wg.Wait()
+}
+
+func (m *Manager) Run() {
+	wg := sync.WaitGroup{}
+
+	// Run existing networks
+	for _, net := range m.networks {
+		wg.Add(1)
+		go func() {
+			m.RunNetwork(net)
+			wg.Done()
+		}()
+	}
+
+	if opts.watchNetworks {
+		wg.Add(1)
+		go func() {
+			m.watchNetworks()
+			wg.Done()
+		}()
+	} else {
+		<-m.ctx.Done()
+	}
+
+	wg.Wait()
+}

+ 24 - 14
network/network.go

@@ -26,8 +26,10 @@ import (
 )
 
 type Network struct {
-	Name   string
-	Config *subnet.Config
+	Name       string
+	Config     *subnet.Config
+	ctx        context.Context
+	cancelFunc context.CancelFunc
 
 	sm     subnet.Manager
 	ipMasq bool
@@ -35,21 +37,25 @@ type Network struct {
 	lease  *subnet.Lease
 }
 
-func New(sm subnet.Manager, name string, ipMasq bool) *Network {
+func NewNetwork(ctx context.Context, sm subnet.Manager, name string, ipMasq bool) *Network {
+	ctx, cancel := context.WithCancel(ctx)
+
 	return &Network{
-		Name:   name,
-		sm:     sm,
-		ipMasq: ipMasq,
+		Name:       name,
+		ctx:        ctx,
+		cancelFunc: cancel,
+		sm:         sm,
+		ipMasq:     ipMasq,
 	}
 }
 
-func (n *Network) Init(ctx context.Context, iface *net.Interface, iaddr net.IP, eaddr net.IP) *backend.SubnetDef {
+func (n *Network) Init(iface *net.Interface, iaddr net.IP, eaddr net.IP) *backend.SubnetDef {
 	var be backend.Backend
 	var sn *backend.SubnetDef
 
 	steps := []func() error{
 		func() (err error) {
-			n.Config, err = n.sm.GetNetworkConfig(ctx, n.Name)
+			n.Config, err = n.sm.GetNetworkConfig(n.ctx, n.Name)
 			if err != nil {
 				log.Error("Failed to retrieve network config: ", err)
 			}
@@ -67,7 +73,7 @@ func (n *Network) Init(ctx context.Context, iface *net.Interface, iaddr net.IP,
 		},
 
 		func() (err error) {
-			sn, err = be.RegisterNetwork(ctx, n.Name, n.Config)
+			sn, err = be.RegisterNetwork(n.ctx, n.Name, n.Config)
 			if err != nil {
 				log.Errorf("Failed register network %v (type %v): %v", n.Name, n.Config.BackendType, err)
 			} else {
@@ -90,7 +96,7 @@ func (n *Network) Init(ctx context.Context, iface *net.Interface, iaddr net.IP,
 	for _, s := range steps {
 		for ; ; time.Sleep(time.Second) {
 			select {
-			case <-ctx.Done():
+			case <-n.ctx.Done():
 				return nil
 			default:
 			}
@@ -105,20 +111,24 @@ func (n *Network) Init(ctx context.Context, iface *net.Interface, iaddr net.IP,
 	return sn
 }
 
-func (n *Network) Run(ctx context.Context) {
+func (n *Network) Run() {
 	wg := sync.WaitGroup{}
 	wg.Add(1)
 	go func() {
-		n.be.Run(ctx)
+		n.be.Run(n.ctx)
 		wg.Done()
 	}()
 
 	wg.Add(1)
 	go func() {
-		subnet.LeaseRenewer(ctx, n.sm, n.Name, n.lease)
+		subnet.LeaseRenewer(n.ctx, n.sm, n.Name, n.lease)
 		wg.Done()
 	}()
 
-	<-ctx.Done()
+	<-n.ctx.Done()
 	wg.Wait()
 }
+
+func (n *Network) Cancel() {
+	n.cancelFunc()
+}