ソースを参照

Merge pull request #34 from eyakubovich/master

Support multiple backends and add alloc backend
Eugene Yakubovich 10 年 前
コミット
b6c86227d5
15 ファイル変更662 行追加539 行削除
  1. 56 0
      backend/alloc/alloc.go
  2. 13 1
      backend/common.go
  3. 80 0
      backend/udp/cproxy.go
  4. 0 0
      backend/udp/proxy.c
  5. 0 0
      backend/udp/proxy.h
  6. 282 0
      backend/udp/udp.go
  7. 114 38
      main.go
  8. 5 0
      pkg/task/errors.go
  9. 1 0
      subnet/config.go
  10. 39 6
      subnet/registry.go
  11. 58 24
      subnet/subnet.go
  12. 14 9
      subnet/subnet_test.go
  13. 0 118
      udp/cproxy.go
  14. 0 168
      udp/proxy.go
  15. 0 175
      udp/run.go

+ 56 - 0
backend/alloc/alloc.go

@@ -0,0 +1,56 @@
+package alloc
+
+import (
+	"fmt"
+	"net"
+
+	"github.com/coreos/rudder/backend"
+	"github.com/coreos/rudder/pkg/ip"
+	"github.com/coreos/rudder/pkg/task"
+	"github.com/coreos/rudder/subnet"
+)
+
+
+type AllocBackend struct {
+	sm   *subnet.SubnetManager
+	stop chan bool
+}
+
+func New(sm *subnet.SubnetManager) backend.Backend {
+	return &AllocBackend{
+		sm: sm,
+		stop: make(chan bool),
+	}
+}
+
+func (m *AllocBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*backend.SubnetDef, error) {
+	attrs := subnet.BaseAttrs{
+		PublicIP: ip.FromIP(extIP),
+	}
+
+	sn, err := m.sm.AcquireLease(ip.FromIP(extIP), &attrs, m.stop)
+	if err != nil {
+		if err == task.ErrCanceled {
+			return nil, err
+		} else {
+			return nil, fmt.Errorf("failed to acquire lease: %v", err)
+		}
+	}
+
+	return &backend.SubnetDef{
+		Net: sn,
+		MTU: extIface.MTU,
+	}, nil
+}
+
+func (m *AllocBackend) Run() {
+	m.sm.LeaseRenewer(m.stop)
+}
+
+func (m *AllocBackend) Stop() {
+	close(m.stop)
+}
+
+func (m *AllocBackend) Name() string {
+	return "allocation"
+}

+ 13 - 1
backend/common.go

@@ -1,7 +1,19 @@
 package backend
 
 import (
+	"net"
+
 	"github.com/coreos/rudder/pkg/ip"
 )
 
-type ReadyFunc func(sn ip.IP4Net, mtu int)
+type SubnetDef struct {
+	Net ip.IP4Net
+	MTU int
+}
+
+type Backend interface {
+	Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*SubnetDef, error)
+	Run()
+	Stop()
+	Name() string
+}

+ 80 - 0
backend/udp/cproxy.go

@@ -0,0 +1,80 @@
+package udp
+
+//#include "proxy.h"
+import "C"
+
+import (
+	"net"
+	"os"
+	"reflect"
+	"unsafe"
+
+	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
+
+	"github.com/coreos/rudder/pkg/ip"
+)
+
+func runCProxy(tun *os.File, conn *net.UDPConn, ctl *os.File, tunIP ip.IP4, tunMTU int) {
+	var log_errors int
+	if log.V(1) {
+		log_errors = 1
+	}
+
+	c, err := conn.File()
+	if err != nil {
+		log.Error("Converting UDPConn to File failed: ", err)
+		return
+	}
+	defer c.Close()
+
+	C.run_proxy(
+		C.int(tun.Fd()),
+		C.int(c.Fd()),
+		C.int(ctl.Fd()),
+		C.in_addr_t(tunIP.NetworkOrder()),
+		C.size_t(tunMTU),
+		C.int(log_errors),
+	)
+}
+
+func writeCommand(f *os.File, cmd *C.command) {
+	hdr := reflect.SliceHeader{
+		Data: uintptr(unsafe.Pointer(cmd)),
+		Len:  int(unsafe.Sizeof(*cmd)),
+		Cap:  int(unsafe.Sizeof(*cmd)),
+	}
+	buf := *(*[]byte)(unsafe.Pointer(&hdr))
+
+	f.Write(buf)
+}
+
+func setRoute(ctl *os.File, dst ip.IP4Net, nextHopIP ip.IP4, nextHopPort int) {
+	cmd := C.command{
+		cmd:           C.CMD_SET_ROUTE,
+		dest_net:      C.in_addr_t(dst.IP.NetworkOrder()),
+		dest_net_len:  C.int(dst.PrefixLen),
+		next_hop_ip:   C.in_addr_t(nextHopIP.NetworkOrder()),
+		next_hop_port: C.short(nextHopPort),
+	}
+
+	writeCommand(ctl, &cmd)
+}
+
+func removeRoute(ctl *os.File, dst ip.IP4Net) {
+	cmd := C.command{
+		cmd:          C.CMD_DEL_ROUTE,
+		dest_net:     C.in_addr_t(dst.IP.NetworkOrder()),
+		dest_net_len: C.int(dst.PrefixLen),
+	}
+
+	writeCommand(ctl, &cmd)
+}
+
+func stopProxy(ctl *os.File) {
+	cmd := C.command{
+		cmd:          C.CMD_STOP,
+	}
+
+	writeCommand(ctl, &cmd)
+}
+

+ 0 - 0
udp/proxy.c → backend/udp/proxy.c


+ 0 - 0
udp/proxy.h → backend/udp/proxy.h


+ 282 - 0
backend/udp/udp.go

@@ -0,0 +1,282 @@
+package udp
+
+import (
+	"fmt"
+	"encoding/json"
+	"net"
+	"os"
+	"strings"
+	"sync"
+	"syscall"
+
+	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/docker/libcontainer/netlink"
+	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
+
+	"github.com/coreos/rudder/backend"
+	"github.com/coreos/rudder/pkg/ip"
+	"github.com/coreos/rudder/pkg/task"
+	"github.com/coreos/rudder/subnet"
+)
+
+const (
+	encapOverhead = 28 // 20 bytes IP hdr + 8 bytes UDP hdr
+	defaultPort   = 8285
+)
+
+type UdpBackend struct {
+	sm     *subnet.SubnetManager
+	rawCfg json.RawMessage
+	cfg    struct {
+		Port int
+	}
+	ctl    *os.File
+	ctl2   *os.File
+	tun    *os.File
+	conn   *net.UDPConn
+	mtu    int
+	tunNet ip.IP4Net
+	stop   chan bool
+	wg     sync.WaitGroup
+}
+
+func New(sm *subnet.SubnetManager, config json.RawMessage) backend.Backend {
+	be := UdpBackend{
+		sm:     sm,
+		rawCfg: config,
+		stop:   make(chan bool),
+	}
+	be.cfg.Port = defaultPort
+	return &be
+}
+
+func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*backend.SubnetDef, error) {
+	// Parse our configuration
+	if len(m.rawCfg) > 0 {
+		if err := json.Unmarshal(m.rawCfg, &m.cfg); err != nil {
+			return nil, fmt.Errorf("error decoding UDP backend config: %v", err)
+		}
+	}
+
+	// Acquire the lease form subnet manager
+	attrs := subnet.BaseAttrs{
+		PublicIP: ip.FromIP(extIP),
+	}
+
+	sn, err := m.sm.AcquireLease(attrs.PublicIP, &attrs, m.stop)
+	if err != nil {
+		if err == task.ErrCanceled {
+			return nil, err
+		} else {
+			return nil, fmt.Errorf("failed to acquire lease: %v", err)
+		}
+	}
+
+	// Tunnel's subnet is that of the whole overlay network (e.g. /16)
+	// and not that of the individual host (e.g. /24)
+	m.tunNet = ip.IP4Net{
+		IP:        sn.IP,
+		PrefixLen: m.sm.GetConfig().Network.PrefixLen,
+	}
+
+	// TUN MTU will be smaller b/c of encap (IP+UDP hdrs)
+	m.mtu = extIface.MTU - encapOverhead
+
+	if err = m.initTun(ipMasq); err != nil {
+		return nil, err
+	}
+
+	m.conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: m.cfg.Port})
+	if err != nil {
+		return nil, fmt.Errorf("failed to start listening on UDP socket: %v", err)
+	}
+
+
+	m.ctl, m.ctl2, err = newCtlSockets()
+	if err != nil {
+		return nil, fmt.Errorf("failed to create control socket: %v", err)
+	}
+
+	return &backend.SubnetDef{
+		Net: sn,
+		MTU: m.mtu,
+	}, nil
+}
+
+func (m *UdpBackend) Run() {
+	// one for each goroutine below
+	m.wg.Add(2)
+
+	go func() {
+		runCProxy(m.tun, m.conn, m.ctl2, m.tunNet.IP, m.mtu)
+		m.wg.Done()
+	}()
+
+	go func() {
+		m.sm.LeaseRenewer(m.stop)
+		m.wg.Done()
+	}()
+
+	m.monitorEvents()
+
+	m.wg.Wait()
+}
+
+func (m *UdpBackend) Stop() {
+	if m.ctl != nil {
+		stopProxy(m.ctl)
+	}
+
+	close(m.stop)
+}
+
+func (m *UdpBackend) Name() string {
+	return "UDP"
+}
+
+func newCtlSockets() (*os.File, *os.File, error) {
+	fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
+	if err != nil {
+		return nil, nil, err
+	}
+
+	f1 := os.NewFile(uintptr(fds[0]), "ctl")
+	f2 := os.NewFile(uintptr(fds[1]), "ctl")
+	return f1, f2, nil
+}
+
+func (m *UdpBackend) initTun(ipMasq bool) error {
+	var tunName string
+	var err error
+
+	m.tun, tunName, err = ip.OpenTun("rudder%d")
+	if err != nil {
+		return fmt.Errorf("Failed to open TUN device: %v", err)
+	}
+
+	err = configureIface(tunName, m.tunNet, m.mtu)
+	if err != nil {
+		return err
+	}
+
+	if ipMasq {
+		err = setupIpMasq(m.tunNet.Network(), tunName)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func configureIface(ifname string, ipn ip.IP4Net, mtu int) error {
+	iface, err := net.InterfaceByName(ifname)
+	if err != nil {
+		return fmt.Errorf("failed to lookup interface %v", ifname)
+	}
+
+	n := ipn.ToIPNet()
+	err = netlink.NetworkLinkAddIp(iface, n.IP, n)
+	if err != nil {
+		return fmt.Errorf("failed to add IP address %v to %v: %v", n.IP, ifname, err)
+	}
+
+	err = netlink.NetworkSetMTU(iface, mtu)
+	if err != nil {
+		return fmt.Errorf("failed to set MTU for %v: %v", ifname, err)
+	}
+
+	err = netlink.NetworkLinkUp(iface)
+	if err != nil {
+		return fmt.Errorf("failed to set interface %v to UP state: %v", ifname, err)
+	}
+
+	// explicitly add a route since there might be a route for a subnet already
+	// installed by Docker and then it won't get auto added
+	err = netlink.AddRoute(ipn.Network().String(), "", "", ifname)
+	if err != nil && err != syscall.EEXIST {
+		return fmt.Errorf("Failed to add route (%v -> %v): %v", ipn.Network().String(), ifname, err)
+	}
+
+	return nil
+}
+
+func setupIpMasq(ipn ip.IP4Net, iface string) error {
+	ipt, err := ip.NewIPTables()
+	if err != nil {
+		return fmt.Errorf("failed to setup IP Masquerade. iptables was not found")
+	}
+
+	err = ipt.ClearChain("nat", "RUDDER")
+	if err != nil {
+		return fmt.Errorf("Failed to create/clear RUDDER chain in NAT table: %v", err)
+	}
+
+	rules := [][]string{
+		// This rule makes sure we don't NAT traffic within overlay network (e.g. coming out of docker0)
+		[]string{ "RUDDER", "-d", ipn.String(), "-j", "ACCEPT" },
+		// This rule makes sure we don't NAT multicast traffic within overlay network
+		[]string{ "RUDDER", "-d", "224.0.0.0/4", "-j", "ACCEPT" },
+		// This rule will NAT everything originating from our overlay network and 
+		[]string{ "RUDDER", "!", "-o", iface, "-j", "MASQUERADE" },
+		// This rule will take everything coming from overlay and sent it to RUDDER chain
+		[]string{ "POSTROUTING", "-s", ipn.String(), "-j", "RUDDER" },
+	}
+
+	for _, args := range rules {
+		log.Info("Adding iptables rule: ", strings.Join(args, " "))
+
+		err = ipt.AppendUnique("nat", args...)
+		if err != nil {
+			return fmt.Errorf("Failed to insert IP masquerade rule: %v", err)
+		}
+	}
+
+	return nil
+}
+
+func (m *UdpBackend) monitorEvents() {
+	log.Info("Watching for new subnet leases")
+
+	evts := make(chan subnet.EventBatch)
+
+	m.wg.Add(1)
+	go func() {
+		m.sm.WatchLeases(evts, m.stop)
+		m.wg.Done()
+	}()
+
+	for {
+		select {
+		case evtBatch := <-evts:
+			m.processSubnetEvents(evtBatch)
+
+		case <-m.stop:
+			return
+		}
+	}
+}
+
+func (m *UdpBackend) processSubnetEvents(batch subnet.EventBatch) {
+	for _, evt := range batch {
+		switch evt.Type {
+		case subnet.SubnetAdded:
+			log.Info("Subnet added: ", evt.Lease.Network)
+
+			var attrs subnet.BaseAttrs
+			if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
+				log.Error("Error decoding subnet lease JSON: ", err)
+				continue
+			}
+
+			setRoute(m.ctl, evt.Lease.Network, attrs.PublicIP, m.cfg.Port)
+
+		case subnet.SubnetRemoved:
+			log.Info("Subnet removed: ", evt.Lease.Network)
+
+			removeRoute(m.ctl, evt.Lease.Network)
+
+		default:
+			log.Error("Internal error: unknown event type: ", int(evt.Type))
+		}
+	}
+}

+ 114 - 38
main.go

@@ -1,23 +1,26 @@
 package main
 
 import (
+	"encoding/json"
 	"flag"
 	"fmt"
 	"net"
 	"os"
+	"os/signal"
 	"path"
+	"strings"
+	"syscall"
 	"time"
 
 	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-systemd/daemon"
 	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
 
+	"github.com/coreos/rudder/backend"
 	"github.com/coreos/rudder/pkg/ip"
+	"github.com/coreos/rudder/pkg/task"
 	"github.com/coreos/rudder/subnet"
-	"github.com/coreos/rudder/udp"
-)
-
-const (
-	defaultPort = 8285
+	"github.com/coreos/rudder/backend/alloc"
+	"github.com/coreos/rudder/backend/udp"
 )
 
 type CmdLineOpts struct {
@@ -26,7 +29,6 @@ type CmdLineOpts struct {
 	help         bool
 	version      bool
 	ipMasq       bool
-	port         int
 	subnetFile   string
 	iface        string
 }
@@ -36,7 +38,6 @@ var opts CmdLineOpts
 func init() {
 	flag.StringVar(&opts.etcdEndpoint, "etcd-endpoint", "http://127.0.0.1:4001", "etcd endpoint")
 	flag.StringVar(&opts.etcdPrefix, "etcd-prefix", "/coreos.com/network", "etcd prefix")
-	flag.IntVar(&opts.port, "port", defaultPort, "port to use for inter-node communications")
 	flag.StringVar(&opts.subnetFile, "subnet-file", "/run/rudder/subnet.env", "filename where env variables (subnet and MTU values) will be written to")
 	flag.StringVar(&opts.iface, "iface", "", "interface to use (IP or name) for inter-host communication")
 	flag.BoolVar(&opts.ipMasq, "ip-masq", false, "setup IP masquerade rule for traffic destined outside of overlay network")
@@ -44,10 +45,10 @@ func init() {
 	flag.BoolVar(&opts.version, "version", false, "print version and exit")
 }
 
-func writeSubnet(sn ip.IP4Net, mtu int) error {
+func writeSubnetFile(sn *backend.SubnetDef) error {
 	// Write out the first usable IP by incrementing
 	// sn.IP by one
-	sn.IP += 1
+	sn.Net.IP += 1
 
 	dir, _ := path.Split(opts.subnetFile)
 	os.MkdirAll(dir, 0755)
@@ -58,49 +59,47 @@ func writeSubnet(sn ip.IP4Net, mtu int) error {
 	}
 	defer f.Close()
 
-	fmt.Fprintf(f, "RUDDER_SUBNET=%s\n", sn)
-	fmt.Fprintf(f, "RUDDER_MTU=%d\n", mtu)
+	if _, err = fmt.Fprintf(f, "RUDDER_SUBNET=%s\n", sn.Net); err != nil {
+		return err
+	}
+	if _, err = fmt.Fprintf(f, "RUDDER_MTU=%d\n", sn.MTU); err != nil {
+		return err
+	}
 	return nil
 }
 
-func lookupIface() (*net.Interface, net.IP) {
+func lookupIface() (*net.Interface, net.IP, error) {
 	var iface *net.Interface
-	var tep net.IP
+	var ipaddr net.IP
 	var err error
 
 	if len(opts.iface) > 0 {
-		if tep = net.ParseIP(opts.iface); tep != nil {
-			iface, err = ip.GetInterfaceByIP(tep)
+		if ipaddr = net.ParseIP(opts.iface); ipaddr != nil {
+			iface, err = ip.GetInterfaceByIP(ipaddr)
 			if err != nil {
-				log.Errorf("Error looking up interface %s: %s", opts.iface, err)
-				return nil, nil
+				return nil, nil, fmt.Errorf("Error looking up interface %s: %s", opts.iface, err)
 			}
 		} else {
 			iface, err = net.InterfaceByName(opts.iface)
 			if err != nil {
-				log.Errorf("Error looking up interface %s: %s", opts.iface, err)
-				return nil, nil
+				return nil, nil, fmt.Errorf("Error looking up interface %s: %s", opts.iface, err)
 			}
 		}
 	} else {
 		log.Info("Determining IP address of default interface")
-		for {
-			if iface, err = ip.GetDefaultGatewayIface(); err == nil {
-				break
-			}
-			log.Error("Failed to get default interface: ", err)
-			time.Sleep(time.Second)
+		if iface, err = ip.GetDefaultGatewayIface(); err != nil {
+			return nil, nil, fmt.Errorf("Failed to get default interface: %s", err)
 		}
 	}
 
-	if tep == nil {
-		tep, err = ip.GetIfaceIP4Addr(iface)
+	if ipaddr == nil {
+		ipaddr, err = ip.GetIfaceIP4Addr(iface)
 		if err != nil {
-			log.Error("Failed to find IPv4 address for interface ", iface.Name)
+			return nil, nil, fmt.Errorf("Failed to find IPv4 address for interface %s", iface.Name)
 		}
 	}
 
-	return iface, tep
+	return iface, ipaddr, nil
 }
 
 func makeSubnetManager() *subnet.SubnetManager {
@@ -115,6 +114,68 @@ func makeSubnetManager() *subnet.SubnetManager {
 	}
 }
 
+func newBackend() (backend.Backend, error) {
+	sm := makeSubnetManager()
+	config := sm.GetConfig()
+
+	var bt struct {
+		Type string
+	}
+
+	if len(config.Backend) == 0 {
+		bt.Type = "udp"
+	} else {
+		if err := json.Unmarshal(config.Backend, &bt); err != nil {
+			return nil, fmt.Errorf("Error decoding Backend property of config: %v", err)
+		}
+	}
+
+	switch strings.ToLower(bt.Type) {
+	case "udp":
+		return udp.New(sm, config.Backend), nil
+	case "alloc":
+		return alloc.New(sm), nil
+	default:
+		return nil, fmt.Errorf("'%v': unknown backend type", bt.Type)
+	}
+}
+
+func run(be backend.Backend, exit chan int) {
+	var err error
+	defer func() {
+		if err == nil || err == task.ErrCanceled {
+			exit <- 0
+		} else {
+			log.Error(err)
+			exit <- 1
+		}
+	}()
+
+	iface, ipaddr, err := lookupIface()
+	if err != nil {
+		return
+	}
+
+	if iface.MTU == 0 {
+		err = fmt.Errorf("Failed to determine MTU for %s interface", ipaddr)
+		return
+	}
+
+	log.Infof("Using %s as external interface", ipaddr)
+
+	sn, err := be.Init(iface, ipaddr, opts.ipMasq)
+	if err != nil {
+		log.Error("Could not init %v backend: %v", be.Name(), err)
+		return
+	}
+
+	writeSubnetFile(sn)
+	daemon.SdNotify("READY=1")
+
+	log.Infof("%s mode initialized", be.Name())
+	be.Run()
+}
+
 func main() {
 	// glog will log to tmp files by default. override so all entries
 	// can flow into journald (if running under systemd)
@@ -134,17 +195,32 @@ func main() {
 		os.Exit(0)
 	}
 
-	iface, tep := lookupIface()
-	if iface == nil || tep == nil {
-		return
+	be, err := newBackend()
+	if err != nil {
+		log.Info(err)
+		os.Exit(1)
 	}
 
-	log.Infof("Using %s to tunnel", tep)
+	// Register for SIGINT and SIGTERM and wait for one of them to arrive
+	log.Info("Installing signal handlers")
+	sigs := make(chan os.Signal, 1)
+	signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
 
-	sm := makeSubnetManager()
+	exit := make(chan int)
+	go run(be, exit)
 
-	udp.Run(sm, iface, tep, opts.port, opts.ipMasq, func(sn ip.IP4Net, mtu int) {
-		writeSubnet(sn, mtu)
-		daemon.SdNotify("READY=1")
-	})
+	for {
+		select {
+		case <-sigs:
+			// unregister to get default OS nuke behaviour in case we don't exit cleanly
+			signal.Stop(sigs)
+
+			log.Info("Exiting...")
+			be.Stop()
+
+		case code := <-exit:
+			log.Infof("%s mode exited", be.Name())
+			os.Exit(code)
+		}
+	}
 }

+ 5 - 0
pkg/task/errors.go

@@ -0,0 +1,5 @@
+package task
+
+import "errors"
+
+var ErrCanceled = errors.New("Task canceled")

+ 1 - 0
subnet/config.go

@@ -12,6 +12,7 @@ type Config struct {
 	SubnetMin ip.IP4
 	SubnetMax ip.IP4
 	SubnetLen uint
+	Backend   json.RawMessage
 }
 
 func ParseConfig(s string) (*Config, error) {

+ 39 - 6
subnet/registry.go

@@ -2,8 +2,11 @@ package subnet
 
 import (
 	"sync"
+	"time"
+	"path"
 
 	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
+	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
 )
 
 type subnetRegistry interface {
@@ -30,7 +33,8 @@ func newEtcdSubnetRegistry(endpoint, prefix string) subnetRegistry {
 }
 
 func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
-	resp, err := esr.client().Get(esr.prefix+"/config", false, false)
+	key := path.Join(esr.prefix, "config")
+	resp, err := esr.client().Get(key, false, false)
 	if err != nil {
 		return nil, err
 	}
@@ -38,23 +42,43 @@ func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
 }
 
 func (esr *etcdSubnetRegistry) getSubnets() (*etcd.Response, error) {
-	return esr.client().Get(esr.prefix+"/subnets", false, true)
+	key := path.Join(esr.prefix, "subnets")
+	return esr.client().Get(key, false, true)
 }
 
 func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
-	return esr.client().Create(esr.prefix+"/subnets/"+sn, data, ttl)
+	key := path.Join(esr.prefix, "subnets", sn)
+	resp, err := esr.client().Create(key, data, ttl)
+	if err != nil {
+		return nil, err
+	}
+
+	ensureExpiration(resp, ttl)
+	return resp, nil
 }
 
 func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
-	return esr.client().Set(esr.prefix+"/subnets/"+sn, data, ttl)
+	key := path.Join(esr.prefix, "subnets", sn)
+	resp, err := esr.client().Set(key, data, ttl)
+	if err != nil {
+		return nil, err
+	}
+
+	ensureExpiration(resp, ttl)
+	return resp, nil
 }
 
 func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
 	for {
-		resp, err := esr.client().RawWatch(esr.prefix+"/subnets", since, true, nil, stop)
+		key := path.Join(esr.prefix, "subnets")
+		resp, err := esr.client().RawWatch(key, since, true, nil, stop)
 
 		if err != nil {
-			return nil, err
+			if err == etcd.ErrWatchStoppedByUser {
+				return nil, nil
+			} else {
+				return nil, err
+			}
 		}
 
 		if len(resp.Body) == 0 {
@@ -79,3 +103,12 @@ func (esr *etcdSubnetRegistry) resetClient() {
 	defer esr.mux.Unlock()
 	esr.cli = etcd.NewClient([]string{esr.endpoint})
 }
+
+func ensureExpiration(resp *etcd.Response, ttl uint64) {
+	if resp.Node.Expiration == nil {
+		// should not be but calc it ourselves in this case
+		log.Info("Expiration field missing on etcd response, calculating locally")
+		exp := time.Now().Add(time.Duration(ttl) * time.Second)
+		resp.Node.Expiration = &exp
+	}
+}

+ 58 - 24
subnet/subnet.go

@@ -13,6 +13,7 @@ import (
 	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
 
 	"github.com/coreos/rudder/pkg/ip"
+	"github.com/coreos/rudder/pkg/task"
 )
 
 const (
@@ -49,7 +50,6 @@ type SubnetManager struct {
 	leaseExp  time.Time
 	lastIndex uint64
 	leases    []SubnetLease
-	stop      chan bool
 }
 
 type EventType int
@@ -66,7 +66,37 @@ func NewSubnetManager(etcdEndpoint, prefix string) (*SubnetManager, error) {
 	return newSubnetManager(esr)
 }
 
-func (sm *SubnetManager) AcquireLease(tep ip.IP4, data string) (ip.IP4Net, error) {
+func (sm *SubnetManager) AcquireLease(extIP ip.IP4, data interface{}, cancel chan bool) (ip.IP4Net, error) {
+	dataBytes, err := json.Marshal(data)
+	if err != nil {
+		return ip.IP4Net{}, err
+	}
+
+	var sn ip.IP4Net
+	for {
+		sn, err = sm.acquireLeaseOnce(extIP, string(dataBytes), cancel)
+		switch {
+		case err == nil:
+			log.Info("Subnet lease acquired: ", sn)
+			return sn, nil
+
+		case err == task.ErrCanceled:
+			return ip.IP4Net{}, err
+
+		default:
+			log.Error("Failed to acquire subnet: ", err)
+		}
+
+		select {
+		case <-time.After(time.Second):
+
+		case <-cancel:
+			return ip.IP4Net{}, task.ErrCanceled
+		}
+	}
+}
+
+func (sm *SubnetManager) acquireLeaseOnce(extIP ip.IP4, data string, cancel chan bool) (ip.IP4Net, error) {
 	for i := 0; i < registerRetries; i++ {
 		var err error
 		sm.leases, err = sm.getLeases()
@@ -74,14 +104,14 @@ func (sm *SubnetManager) AcquireLease(tep ip.IP4, data string) (ip.IP4Net, error
 			return ip.IP4Net{}, err
 		}
 
-		// try to reuse a subnet if there's one that match our IP
+		// try to reuse a subnet if there's one that matches our IP
 		for _, l := range sm.leases {
 			var ba BaseAttrs
 			err = json.Unmarshal([]byte(l.Data), &ba)
 			if err != nil {
 				log.Error("Error parsing subnet lease JSON: ", err)
 			} else {
-				if tep == ba.PublicIP {
+				if extIP == ba.PublicIP {
 					resp, err := sm.registry.updateSubnet(l.Network.StringSep(".", "-"), data, subnetTTL)
 					if err != nil {
 						return ip.IP4Net{}, err
@@ -109,11 +139,16 @@ func (sm *SubnetManager) AcquireLease(tep ip.IP4, data string) (ip.IP4Net, error
 
 		// if etcd returned Key Already Exists, try again.
 		case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists:
-			continue
+			break
 
 		default:
 			return ip.IP4Net{}, err
 		}
+
+		// before moving on, check for cancel
+		if interrupted(cancel) {
+			return ip.IP4Net{}, task.ErrCanceled
+		}
 	}
 
 	return ip.IP4Net{}, errors.New("Max retries reached trying to acquire a subnet")
@@ -125,17 +160,6 @@ func (sm *SubnetManager) UpdateSubnet(data string) error {
 	return err
 }
 
-func (sm *SubnetManager) Start(receiver chan EventBatch) {
-	go sm.watchLeases(receiver)
-	go sm.leaseRenewer()
-}
-
-func (sm *SubnetManager) Stop() {
-	// once for each goroutine
-	sm.stop <- true
-	sm.stop <- true
-}
-
 func (sm *SubnetManager) GetConfig() *Config {
 	return sm.config
 }
@@ -164,11 +188,12 @@ func newSubnetManager(r subnetRegistry) (*SubnetManager, error) {
 		return nil, err
 	}
 
-	return &SubnetManager{
+	sm := SubnetManager{
 		registry: r,
 		config:   cfg,
-		stop:     make(chan bool, 2),
-	}, nil
+	}
+
+	return &sm, nil
 }
 
 func (sm *SubnetManager) getLeases() ([]SubnetLease, error) {
@@ -293,7 +318,7 @@ OuterLoop:
 	}
 }
 
-func (sm *SubnetManager) watchLeases(receiver chan EventBatch) {
+func (sm *SubnetManager) WatchLeases(receiver chan EventBatch, cancel chan bool) {
 	// "catch up" by replaying all the leases we discovered during
 	// AcquireLease
 	var batch EventBatch
@@ -307,9 +332,9 @@ func (sm *SubnetManager) watchLeases(receiver chan EventBatch) {
 	}
 
 	for {
-		resp, err := sm.registry.watchSubnets(sm.lastIndex+1, sm.stop)
+		resp, err := sm.registry.watchSubnets(sm.lastIndex+1, cancel)
 
-		// watchSubnets exited by stop chan being signaled
+		// watchSubnets exited by cancel chan being signaled
 		if err == nil && resp == nil {
 			return
 		}
@@ -373,7 +398,7 @@ func (sm *SubnetManager) parseSubnetWatchError(err error) (batch *EventBatch, ou
 	return
 }
 
-func (sm *SubnetManager) leaseRenewer() {
+func (sm *SubnetManager) LeaseRenewer(cancel chan bool) {
 	dur := sm.leaseExp.Sub(time.Now()) - renewMargin
 
 	for {
@@ -390,8 +415,17 @@ func (sm *SubnetManager) leaseRenewer() {
 			log.Info("Lease renewed, new expiration: ", sm.leaseExp)
 			dur = sm.leaseExp.Sub(time.Now()) - renewMargin
 
-		case <-sm.stop:
+		case <-cancel:
 			return
 		}
 	}
 }
+
+func interrupted(cancel chan bool) bool {
+	select {
+	case <-cancel:
+		return true
+	default:
+		return false
+	}
+}

+ 14 - 9
subnet/subnet_test.go

@@ -154,7 +154,8 @@ func TestAcquireLease(t *testing.T) {
 	ip, _ := ip.ParseIP4("1.2.3.4")
 	data := `{ "PublicIP": "1.2.3.4" }`
 
-	sn, err := sm.AcquireLease(ip, data)
+	cancel := make(chan bool)
+	sn, err := sm.AcquireLease(ip, data, cancel)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
@@ -164,7 +165,7 @@ func TestAcquireLease(t *testing.T) {
 	}
 
 	// Acquire again, should reuse
-	if sn, err = sm.AcquireLease(ip, data); err != nil {
+	if sn, err = sm.AcquireLease(ip, data, cancel); err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
@@ -181,7 +182,8 @@ func TestWatchLeaseAdded(t *testing.T) {
 	}
 
 	events := make(chan EventBatch)
-	sm.Start(events)
+	cancel := make(chan bool)
+	go sm.WatchLeases(events, cancel)
 
 	expected := "10.3.3.0-24"
 	msr.addCh <- expected
@@ -206,7 +208,7 @@ func TestWatchLeaseAdded(t *testing.T) {
 		t.Errorf("WatchSubnet produced wrong subnet: expected %s, got %s", expected, actual)
 	}
 
-	sm.Stop()
+	close(cancel)
 }
 
 func TestWatchLeaseRemoved(t *testing.T) {
@@ -217,7 +219,8 @@ func TestWatchLeaseRemoved(t *testing.T) {
 	}
 
 	events := make(chan EventBatch)
-	sm.Start(events)
+	cancel := make(chan bool)
+	go sm.WatchLeases(events, cancel)
 
 	expected := "10.3.4.0-24"
 	msr.delCh <- expected
@@ -242,7 +245,7 @@ func TestWatchLeaseRemoved(t *testing.T) {
 		t.Errorf("WatchSubnet produced wrong subnet: expected %s, got %s", expected, actual)
 	}
 
-	sm.Stop()
+	close(cancel)
 }
 
 func TestRenewLease(t *testing.T) {
@@ -255,13 +258,15 @@ func TestRenewLease(t *testing.T) {
 	ip, _ := ip.ParseIP4("1.2.3.4")
 	data := `{ "PublicIP": "1.2.3.4" }`
 
-	sn, err := sm.AcquireLease(ip, data)
+	cancel := make(chan bool)
+	defer close(cancel)
+
+	sn, err := sm.AcquireLease(ip, data, cancel)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
-	events := make(chan EventBatch)
-	sm.Start(events)
+	go sm.LeaseRenewer(cancel)
 
 	fmt.Println("Waiting for lease to pass original expiration")
 	time.Sleep(2 * time.Second)

+ 0 - 118
udp/cproxy.go

@@ -1,118 +0,0 @@
-package udp
-
-//#include "proxy.h"
-import "C"
-
-import (
-	"encoding/json"
-	"net"
-	"os"
-	"reflect"
-	"syscall"
-	"unsafe"
-
-	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
-
-	"github.com/coreos/rudder/pkg/ip"
-	"github.com/coreos/rudder/subnet"
-)
-
-func runCProxy(tun *os.File, conn *os.File, ctl *os.File, tunIP ip.IP4, tunMTU uint) {
-	var log_errors int
-	if log.V(1) {
-		log_errors = 1
-	}
-
-	C.run_proxy(
-		C.int(tun.Fd()),
-		C.int(conn.Fd()),
-		C.int(ctl.Fd()),
-		C.in_addr_t(tunIP.NetworkOrder()),
-		C.size_t(tunMTU),
-		C.int(log_errors),
-	)
-}
-
-func writeCommand(f *os.File, cmd *C.command) {
-	hdr := reflect.SliceHeader{
-		Data: uintptr(unsafe.Pointer(cmd)),
-		Len:  int(unsafe.Sizeof(*cmd)),
-		Cap:  int(unsafe.Sizeof(*cmd)),
-	}
-	buf := *(*[]byte)(unsafe.Pointer(&hdr))
-
-	f.Write(buf)
-}
-
-func newCtlSockets() (*os.File, *os.File, error) {
-	fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
-	if err != nil {
-		return nil, nil, err
-	}
-
-	f1 := os.NewFile(uintptr(fds[0]), "ctl")
-	f2 := os.NewFile(uintptr(fds[1]), "ctl")
-	return f1, f2, nil
-}
-
-func fastProxy(sm *subnet.SubnetManager, tun *os.File, conn *net.UDPConn, tunIP ip.IP4, tunMTU uint, port int) {
-	log.Info("Running fast proxy loop")
-
-	c, err := conn.File()
-	if err != nil {
-		log.Error("Converting UDPConn to File failed: ", err)
-		return
-	}
-	defer c.Close()
-
-	ctl, peerCtl, err := newCtlSockets()
-	if err != nil {
-		log.Error("Failed to create control socket: ", err)
-		return
-	}
-	defer ctl.Close()
-	defer peerCtl.Close()
-
-	go runCProxy(tun, c, peerCtl, tunIP, tunMTU)
-
-	log.Info("Watching for new subnet leases")
-	evts := make(chan subnet.EventBatch)
-	sm.Start(evts)
-
-	for evtBatch := range evts {
-		for _, evt := range evtBatch {
-			if evt.Type == subnet.SubnetAdded {
-				log.Info("Subnet added: ", evt.Lease.Network)
-				var attrs subnet.BaseAttrs
-				if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
-					log.Error("Error decoding subnet lease JSON: ", err)
-					continue
-				}
-
-				cmd := C.command{
-					cmd:           C.CMD_SET_ROUTE,
-					dest_net:      C.in_addr_t(evt.Lease.Network.IP.NetworkOrder()),
-					dest_net_len:  C.int(evt.Lease.Network.PrefixLen),
-					next_hop_ip:   C.in_addr_t(attrs.PublicIP.NetworkOrder()),
-					next_hop_port: C.short(port),
-				}
-
-				writeCommand(ctl, &cmd)
-
-			} else if evt.Type == subnet.SubnetRemoved {
-				log.Info("Subnet removed: ", evt.Lease.Network)
-
-				cmd := C.command{
-					cmd:          C.CMD_DEL_ROUTE,
-					dest_net:     C.in_addr_t(evt.Lease.Network.IP.NetworkOrder()),
-					dest_net_len: C.int(evt.Lease.Network.PrefixLen),
-				}
-
-				writeCommand(ctl, &cmd)
-
-			} else {
-				log.Error("Internal error: unknown event type: ", int(evt.Type))
-			}
-		}
-	}
-}

+ 0 - 168
udp/proxy.go

@@ -1,168 +0,0 @@
-package udp
-
-import (
-	"encoding/json"
-	"net"
-	"os"
-	"sync"
-
-	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
-
-	"github.com/coreos/rudder/pkg/ip"
-	"github.com/coreos/rudder/subnet"
-)
-
-const (
-	minIP4HdrSize = 20
-)
-
-type routeEntry struct {
-	sn   ip.IP4Net
-	addr *net.UDPAddr
-}
-
-type Router struct {
-	mux    sync.Mutex
-	port   int
-	routes []routeEntry
-}
-
-func NewRouter(port int) *Router {
-	return &Router{
-		port: port,
-	}
-}
-
-func (r *Router) SetRoute(sn ip.IP4Net, dst ip.IP4) {
-	r.mux.Lock()
-	defer r.mux.Unlock()
-
-	for _, re := range r.routes {
-		if re.sn.Equal(sn) {
-			re.addr = &net.UDPAddr{
-				IP:   dst.ToIP(),
-				Port: r.port,
-			}
-			return
-		}
-	}
-
-	re := routeEntry{
-		sn: sn,
-		addr: &net.UDPAddr{
-			IP:   dst.ToIP(),
-			Port: r.port,
-		},
-	}
-
-	r.routes = append(r.routes, re)
-}
-
-func (r *Router) DelRoute(sn ip.IP4Net) {
-	r.mux.Lock()
-	defer r.mux.Unlock()
-
-	for i, re := range r.routes {
-		if re.sn.Equal(sn) {
-			r.routes[i] = r.routes[len(r.routes)-1]
-			r.routes = r.routes[:len(r.routes)-1]
-			return
-		}
-	}
-}
-
-func (r *Router) routePacket(pkt []byte, conn *net.UDPConn) {
-	if len(pkt) < minIP4HdrSize {
-		log.V(1).Infof("Packet too small (%d bytes), unable to route", len(pkt))
-		return
-	}
-
-	r.mux.Lock()
-	defer r.mux.Unlock()
-
-	dstIP := ip.FromBytes(pkt[16:20])
-
-	for i, re := range r.routes {
-		if re.sn.Contains(dstIP) {
-			nbytes, err := conn.WriteToUDP(pkt, re.addr)
-			switch {
-			case err != nil:
-				log.V(1).Info("UDP send failed with: ", err)
-			case nbytes != len(pkt):
-				log.V(1).Infof("Was only able to UDP send %d out of %d bytes to %s: ", nbytes, len(pkt), re.addr.IP)
-			}
-
-			// packets for same dest tend to come in burst. swap to front make it faster for subsequent ones
-			if i != 0 {
-				r.routes[0], r.routes[i] = r.routes[i], r.routes[0]
-			}
-			return
-		}
-	}
-
-	log.V(1).Info("No route found for ", dstIP)
-}
-
-func proxy(sm *subnet.SubnetManager, tun *os.File, conn *net.UDPConn, tunMTU uint, port int) {
-	log.Info("Running slow proxy loop")
-
-	rtr := NewRouter(port)
-
-	go proxyTunToUdp(rtr, tun, conn, tunMTU)
-	go proxyUdpToTun(conn, tun, tunMTU)
-
-	log.Info("Watching for new subnet leases")
-	evts := make(chan subnet.EventBatch)
-	sm.Start(evts)
-
-	for evtBatch := range evts {
-		for _, evt := range evtBatch {
-			if evt.Type == subnet.SubnetAdded {
-				log.Info("Subnet added: ", evt.Lease.Network)
-				var attrs subnet.BaseAttrs
-				if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
-					log.Error("Error decoding subnet lease JSON: ", err)
-					continue
-				}
-				rtr.SetRoute(evt.Lease.Network, attrs.PublicIP)
-
-			} else if evt.Type == subnet.SubnetRemoved {
-				log.Info("Subnet removed: ", evt.Lease.Network)
-				rtr.DelRoute(evt.Lease.Network)
-
-			} else {
-				log.Error("Internal error: unknown event type: ", int(evt.Type))
-			}
-		}
-	}
-}
-
-func proxyTunToUdp(r *Router, tun *os.File, conn *net.UDPConn, tunMTU uint) {
-	pkt := make([]byte, tunMTU)
-	for {
-		nbytes, err := tun.Read(pkt)
-		if err != nil {
-			log.V(1).Info("Error reading from TUN device: ", err)
-		} else {
-			r.routePacket(pkt[:nbytes], conn)
-		}
-	}
-}
-
-func proxyUdpToTun(conn *net.UDPConn, tun *os.File, tunMTU uint) {
-	pkt := make([]byte, tunMTU)
-	for {
-		nrecv, err := conn.Read(pkt)
-		if err != nil {
-			log.V(1).Info("Error reading from socket: ", err)
-		} else {
-			nsent, err := tun.Write(pkt[:nrecv])
-			switch {
-			case err != nil:
-				log.V(1).Info("Error writing to TUN device: ", err)
-			case nsent != nrecv:
-				log.V(1).Infof("Was only able to write %d out of %d bytes to TUN device: ", nsent, nrecv)
-			}
-		}
-	}
-}

+ 0 - 175
udp/run.go

@@ -1,175 +0,0 @@
-package udp
-
-import (
-	"encoding/json"
-	"net"
-	"strings"
-	"syscall"
-	"time"
-
-	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/docker/libcontainer/netlink"
-	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
-
-	"github.com/coreos/rudder/backend"
-	"github.com/coreos/rudder/pkg/ip"
-	"github.com/coreos/rudder/subnet"
-)
-
-const (
-	encapOverhead = 28 // 20 bytes IP hdr + 8 bytes UDP hdr
-)
-
-func configureIface(ifname string, ipn ip.IP4Net, mtu int) error {
-	iface, err := net.InterfaceByName(ifname)
-	if err != nil {
-		log.Error("Failed to lookup interface ", ifname)
-		return err
-	}
-
-	n := ipn.ToIPNet()
-	err = netlink.NetworkLinkAddIp(iface, n.IP, n)
-	if err != nil {
-		log.Errorf("Failed to add IP address %s to %s: %s", n.IP, ifname, err)
-		return err
-	}
-
-	err = netlink.NetworkSetMTU(iface, mtu)
-	if err != nil {
-		log.Errorf("Failed to set MTU for %s: %v", ifname, err)
-		return err
-	}
-
-	err = netlink.NetworkLinkUp(iface)
-	if err != nil {
-		log.Errorf("Failed to set interface %s to UP state: %s", ifname, err)
-		return err
-	}
-
-	// explicitly add a route since there might be a route for a subnet already
-	// installed by Docker and then it won't get auto added
-	err = netlink.AddRoute(ipn.Network().String(), "", "", ifname)
-	if err != nil && err != syscall.EEXIST {
-		log.Errorf("Failed to add route (%s -> %s): %v", ipn.Network().String(), ifname, err)
-		return err
-	}
-
-	return nil
-}
-
-func setupIpMasq(ipn ip.IP4Net, iface string) error {
-	ipt, err := ip.NewIPTables()
-	if err != nil {
-		log.Error("Failed to setup IP Masquerade. iptables was not found")
-		return err
-	}
-
-	err = ipt.ClearChain("nat", "RUDDER")
-	if err != nil {
-		log.Error("Failed to create/clear RUDDER chain in NAT table: ", err)
-		return err
-	}
-
-	rules := [][]string{
-		// This rule makes sure we don't NAT traffic within overlay network (e.g. coming out of docker0)
-		[]string{"RUDDER", "-d", ipn.String(), "-j", "ACCEPT"},
-		// This rule makes sure we don't NAT multicast traffic within overlay network
-		[]string{"RUDDER", "-d", "224.0.0.0/4", "-j", "ACCEPT"},
-		// This rule will NAT everything originating from our overlay network and
-		[]string{"RUDDER", "!", "-o", iface, "-j", "MASQUERADE"},
-		// This rule will take everything coming from overlay and sent it to RUDDER chain
-		[]string{"POSTROUTING", "-s", ipn.String(), "-j", "RUDDER"},
-	}
-
-	for _, args := range rules {
-		log.Info("Adding iptables rule: ", strings.Join(args, " "))
-
-		err = ipt.AppendUnique("nat", args...)
-		if err != nil {
-			log.Error("Failed to insert IP masquerade rule: ", err)
-			return err
-		}
-	}
-
-	return nil
-}
-
-func acquireLease(sm *subnet.SubnetManager, pubIP net.IP) (ip.IP4Net, error) {
-	attrs := subnet.BaseAttrs{
-		PublicIP: ip.FromIP(pubIP),
-	}
-	data, err := json.Marshal(&attrs)
-	if err != nil {
-		return ip.IP4Net{}, err
-	}
-
-	var sn ip.IP4Net
-	for {
-		sn, err = sm.AcquireLease(attrs.PublicIP, string(data))
-		if err == nil {
-			log.Info("Subnet lease acquired: ", sn)
-			break
-		}
-		log.Error("Failed to acquire subnet: ", err)
-		time.Sleep(time.Second)
-	}
-
-	return sn, nil
-}
-
-func Run(sm *subnet.SubnetManager, tepIface *net.Interface, tepIP net.IP, port int, ipMasq bool, ready backend.ReadyFunc) {
-	sn, err := acquireLease(sm, tepIP)
-	if err != nil {
-		log.Error("Failed to acquire lease: ", err)
-		return
-	}
-
-	tun, tunName, err := ip.OpenTun("rudder%d")
-	if err != nil {
-		log.Error("Failed to open TUN device: ", err)
-		return
-	}
-
-	localAddr := net.UDPAddr{
-		Port: port,
-	}
-
-	conn, err := net.ListenUDP("udp4", &localAddr)
-	if err != nil {
-		log.Error("Failed to start listening on UDP socket: ", err)
-		return
-	}
-
-	// Interface's subnet is that of the whole overlay network (e.g. /16)
-	// and not that of the individual host (e.g. /24)
-	tunNet := ip.IP4Net{
-		IP:        sn.IP,
-		PrefixLen: sm.GetConfig().Network.PrefixLen,
-	}
-
-	// TUN MTU will be smaller b/c of encap (IP+UDP hdrs)
-	var mtu int
-	if tepIface.MTU > 0 {
-		mtu = tepIface.MTU - encapOverhead
-	} else {
-		log.Errorf("Failed to determine MTU for %s interface", tepIP)
-		return
-	}
-
-	err = configureIface(tunName, tunNet, mtu)
-	if err != nil {
-		return
-	}
-
-	if ipMasq {
-		err = setupIpMasq(tunNet.Network(), tunName)
-		if err != nil {
-			return
-		}
-	}
-
-	// all initialized and ready for business
-	log.Info("UDP encapsulation initialized")
-	ready(sn, mtu)
-
-	fastProxy(sm, tun, conn, tunNet.IP, uint(mtu), port)
-}