Browse Source

make backend selectable and configurable via etcd config key

This allows the user to change the backend configuration in one place.
For example, changing a port used for encap in etcd is less error prone
than passing it to every instance.
Eugene Yakubovich 10 years ago
parent
commit
5f6f8467f6
6 changed files with 119 additions and 75 deletions
  1. 0 3
      backend/common.go
  2. 33 43
      backend/udp/udp.go
  3. 46 24
      main.go
  4. 5 0
      pkg/task/errors.go
  5. 1 0
      subnet/config.go
  6. 34 5
      subnet/subnet.go

+ 0 - 3
backend/common.go

@@ -1,14 +1,11 @@
 package backend
 
 import (
-	"errors"
 	"net"
 
 	"github.com/coreos/rudder/pkg/ip"
 )
 
-var ErrInterrupted = errors.New("Interrupted by user")
-
 type Backend interface {
 	Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (ip.IP4Net, int, error)
 	Run()

+ 33 - 43
backend/udp/udp.go

@@ -8,45 +8,67 @@ import (
 	"strings"
 	"sync"
 	"syscall"
-	"time"
 
 	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/docker/libcontainer/netlink"
 	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
 
 	"github.com/coreos/rudder/backend"
 	"github.com/coreos/rudder/pkg/ip"
+	"github.com/coreos/rudder/pkg/task"
 	"github.com/coreos/rudder/subnet"
 )
 
 const (
 	encapOverhead = 28 // 20 bytes IP hdr + 8 bytes UDP hdr
+	defaultPort   = 8285
 )
 
 type UdpBackend struct {
 	sm     *subnet.SubnetManager
+	rawCfg json.RawMessage
+	cfg    struct {
+		Port int
+	}
 	ctl    *os.File
 	ctl2   *os.File
 	tun    *os.File
 	conn   *net.UDPConn
-	port   int
 	mtu    int
 	tunNet ip.IP4Net
 	stop   chan bool
 	wg     sync.WaitGroup
 }
 
-func New(sm *subnet.SubnetManager, port int) backend.Backend {
-	return &UdpBackend{
-		sm: sm,
-		port: port,
-		stop: make(chan bool),
+func New(sm *subnet.SubnetManager, config json.RawMessage) backend.Backend {
+	be := UdpBackend{
+		sm:     sm,
+		rawCfg: config,
+		stop:   make(chan bool),
 	}
+	be.cfg.Port = defaultPort
+	return &be
 }
 
 func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (ip.IP4Net, int, error) {
-	sn, err := m.acquireLease(extIP)
+	// Parse our configuration
+	if len(m.rawCfg) > 0 {
+		if err := json.Unmarshal(m.rawCfg, &m.cfg); err != nil {
+			return ip.IP4Net{}, 0, fmt.Errorf("Error decoding UDP backend config: %v", err)
+		}
+	}
+
+	// Acquire the lease form subnet manager
+	attrs := subnet.BaseAttrs{
+		PublicIP: ip.FromIP(extIP),
+	}
+
+	sn, err := m.sm.AcquireLease(attrs.PublicIP, &attrs, m.stop)
 	if err != nil {
-		return ip.IP4Net{}, 0, fmt.Errorf("Failed to acquire lease: %s", err)
+		if err == task.ErrCanceled {
+			return ip.IP4Net{}, 0, err
+		} else {
+			return ip.IP4Net{}, 0, fmt.Errorf("Failed to acquire lease: %s", err)
+		}
 	}
 
 	// Tunnel's subnet is that of the whole overlay network (e.g. /16)
@@ -63,7 +85,7 @@ func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (i
 		return ip.IP4Net{}, 0, err
 	}
 
-	m.conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: m.port})
+	m.conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: m.cfg.Port})
 	if err != nil {
 		return ip.IP4Net{}, 0, fmt.Errorf("Failed to start listening on UDP socket: %s", err)
 	}
@@ -108,38 +130,6 @@ func (m *UdpBackend) Name() string {
 	return "UDP"
 }
 
-func (m *UdpBackend) acquireLease(extIP net.IP) (ip.IP4Net, error) {
-	attrs := subnet.BaseAttrs{
-		PublicIP: ip.FromIP(extIP),
-	}
-
-	data, err := json.Marshal(&attrs)
-	if err != nil {
-		return ip.IP4Net{}, err
-	}
-
-	var sn ip.IP4Net
-	for {
-		sn, err = m.sm.AcquireLease(attrs.PublicIP, string(data), m.stop)
-		if err == nil {
-			log.Info("Subnet lease acquired: ", sn)
-			break
-		}
-
-		log.Error("Failed to acquire subnet: ", err)
-
-		select {
-		case <-time.After(time.Second):
-			break
-
-		case <-m.stop:
-			return ip.IP4Net{}, backend.ErrInterrupted
-		}
-	}
-
-	return sn, nil
-}
-
 func newCtlSockets() (*os.File, *os.File, error) {
 	fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
 	if err != nil {
@@ -274,7 +264,7 @@ func (m *UdpBackend) monitorEvents() {
 						continue
 					}
 
-					setRoute(m.ctl, evt.Lease.Network, attrs.PublicIP, m.port)
+					setRoute(m.ctl, evt.Lease.Network, attrs.PublicIP, m.cfg.Port)
 
 				case subnet.SubnetRemoved:
 					log.Info("Subnet removed: ", evt.Lease.Network)

+ 46 - 24
main.go

@@ -1,12 +1,14 @@
 package main
 
 import (
+	"encoding/json"
 	"flag"
 	"fmt"
 	"net"
 	"os"
 	"os/signal"
 	"path"
+	"strings"
 	"syscall"
 	"time"
 
@@ -15,21 +17,17 @@ import (
 
 	"github.com/coreos/rudder/backend"
 	"github.com/coreos/rudder/pkg/ip"
+	"github.com/coreos/rudder/pkg/task"
 	"github.com/coreos/rudder/subnet"
 	"github.com/coreos/rudder/backend/udp"
 )
 
-const (
-	defaultPort = 8285
-)
-
 type CmdLineOpts struct {
 	etcdEndpoint string
 	etcdPrefix   string
 	help         bool
 	version      bool
 	ipMasq       bool
-	port         int
 	subnetFile   string
 	iface        string
 }
@@ -39,7 +37,6 @@ var opts CmdLineOpts
 func init() {
 	flag.StringVar(&opts.etcdEndpoint, "etcd-endpoint", "http://127.0.0.1:4001", "etcd endpoint")
 	flag.StringVar(&opts.etcdPrefix, "etcd-prefix", "/coreos.com/network", "etcd prefix")
-	flag.IntVar(&opts.port, "port", defaultPort, "port to use for inter-node communications")
 	flag.StringVar(&opts.subnetFile, "subnet-file", "/run/rudder/subnet.env", "filename where env variables (subnet and MTU values) will be written to")
 	flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
 	flag.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
@@ -85,12 +82,8 @@ func lookupIface() (*net.Interface, net.IP, error) {
 		}
 	} else {
 		log.Info("Determining IP address of default interface")
-		for {
-			if iface, err = ip.GetDefaultGatewayIface(); err == nil {
-				break
-			}
-			log.Error("Failed to get default interface: ", err)
-			time.Sleep(time.Second)
+		if iface, err = ip.GetDefaultGatewayIface(); err != nil {
+			return nil, nil, fmt.Errorf("Failed to get default interface: %s", err)
 		}
 	}
 
@@ -116,22 +109,48 @@ func makeSubnetManager() *subnet.SubnetManager {
 	}
 }
 
-func newBackend() backend.Backend {
+func newBackend() (backend.Backend, error) {
 	sm := makeSubnetManager()
-	return udp.New(sm, opts.port)
+	config := sm.GetConfig()
+
+	var bt struct {
+		Type string
+	}
+
+	if len(config.Backend) == 0 {
+		bt.Type = "udp"
+	} else {
+		if err := json.Unmarshal(config.Backend, &bt); err != nil {
+			return nil, fmt.Errorf("Error decoding Backend property of config: %v", err)
+		}
+	}
+
+	switch strings.ToLower(bt.Type) {
+	case "udp":
+		return udp.New(sm, config.Backend), nil
+	default:
+		return nil, fmt.Errorf("'%v': unknown backend type", bt.Type)
+	}
 }
 
-func run(be backend.Backend, quit chan bool) {
-	defer close(quit)
+func run(be backend.Backend, exit chan int) {
+	var err error
+	defer func() {
+		if err == nil || err == task.ErrCanceled {
+			exit <- 0
+		} else {
+			log.Error(err)
+			exit <- 1
+		}
+	}()
 
 	iface, ipaddr, err := lookupIface()
 	if err != nil {
-		log.Error(err)
 		return
 	}
 
 	if iface.MTU == 0 {
-		log.Errorf("Failed to determine MTU for %s interface", ipaddr)
+		err = fmt.Errorf("Failed to determine MTU for %s interface", ipaddr)
 		return
 	}
 
@@ -139,7 +158,6 @@ func run(be backend.Backend, quit chan bool) {
 
 	sn, mtu, err := be.Init(iface, ipaddr, opts.ipMasq)
 	if err != nil {
-		log.Error(err)
 		return
 	}
 
@@ -169,15 +187,19 @@ func main() {
 		os.Exit(0)
 	}
 
-	be := newBackend()
+	be, err := newBackend()
+	if err != nil {
+		log.Info(err)
+		os.Exit(1)
+	}
 
 	// Register for SIGINT and SIGTERM and wait for one of them to arrive
 	log.Info("Installing signal handlers")
 	sigs := make(chan os.Signal, 5)
 	signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
 
-	quit := make(chan bool)
-	go run(be, quit)
+	exit := make(chan int)
+	go run(be, exit)
 
 	for {
 		select {
@@ -188,9 +210,9 @@ func main() {
 			log.Info("Exiting...")
 			be.Stop()
 
-		case <-quit:
+		case code := <-exit:
 			log.Infof("%s mode exited", be.Name())
-			os.Exit(0)
+			os.Exit(code)
 		}
 	}
 }

+ 5 - 0
pkg/task/errors.go

@@ -0,0 +1,5 @@
+package task
+
+import "errors"
+
+var ErrCanceled = errors.New("Task canceled")

+ 1 - 0
subnet/config.go

@@ -12,6 +12,7 @@ type Config struct {
 	SubnetMin ip.IP4
 	SubnetMax ip.IP4
 	SubnetLen uint
+	Backend   json.RawMessage
 }
 
 func ParseConfig(s string) (*Config, error) {

+ 34 - 5
subnet/subnet.go

@@ -13,6 +13,7 @@ import (
 	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
 
 	"github.com/coreos/rudder/pkg/ip"
+	"github.com/coreos/rudder/pkg/task"
 )
 
 const (
@@ -35,8 +36,6 @@ const (
 
 var (
 	subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`)
-
-	ErrCanceled = errors.New("Canceled by user")
 )
 
 type SubnetLease struct {
@@ -67,7 +66,37 @@ func NewSubnetManager(etcdEndpoint, prefix string) (*SubnetManager, error) {
 	return newSubnetManager(esr)
 }
 
-func (sm *SubnetManager) AcquireLease(tep ip.IP4, data string, cancel chan bool) (ip.IP4Net, error) {
+func (sm *SubnetManager) AcquireLease(extIP ip.IP4, data interface{}, cancel chan bool) (ip.IP4Net, error) {
+	dataBytes, err := json.Marshal(data)
+	if err != nil {
+		return ip.IP4Net{}, err
+	}
+
+	var sn ip.IP4Net
+	for {
+		sn, err = sm.acquireLeaseOnce(extIP, string(dataBytes), cancel)
+		switch {
+		case err == nil:
+			log.Info("Subnet lease acquired: ", sn)
+			return sn, nil
+
+		case err == task.ErrCanceled:
+			return ip.IP4Net{}, err
+
+		default:
+			log.Error("Failed to acquire subnet: ", err)
+		}
+
+		select {
+		case <-time.After(time.Second):
+
+		case <-cancel:
+			return ip.IP4Net{}, task.ErrCanceled
+		}
+	}
+}
+
+func (sm *SubnetManager) acquireLeaseOnce(extIP ip.IP4, data string, cancel chan bool) (ip.IP4Net, error) {
 	for i := 0; i < registerRetries; i++ {
 		var err error
 		sm.leases, err = sm.getLeases()
@@ -82,7 +111,7 @@ func (sm *SubnetManager) AcquireLease(tep ip.IP4, data string, cancel chan bool)
 			if err != nil {
 				log.Error("Error parsing subnet lease JSON: ", err)
 			} else {
-				if tep == ba.PublicIP {
+				if extIP == ba.PublicIP {
 					resp, err := sm.registry.updateSubnet(l.Network.StringSep(".", "-"), data, subnetTTL)
 					if err != nil {
 						return ip.IP4Net{}, err
@@ -118,7 +147,7 @@ func (sm *SubnetManager) AcquireLease(tep ip.IP4, data string, cancel chan bool)
 
 		// before moving on, check for cancel
 		if interrupted(cancel) {
-			return ip.IP4Net{}, ErrCanceled
+			return ip.IP4Net{}, task.ErrCanceled
 		}
 	}