Browse Source

refactor to support pluggable backends and shutdown cleanly

We want to support multiple backends for different types of
encapsulation. This patch defines an interface for initializing,
running and stopping the backend. It also ports existing encap
to into a udp backend.

Fixes #14.
Eugene Yakubovich 10 years ago
parent
commit
ba78a27cfa
12 changed files with 503 additions and 520 deletions
  1. 11 1
      backend/common.go
  2. 80 0
      backend/udp/cproxy.go
  3. 2 1
      backend/udp/proxy.c
  4. 0 0
      backend/udp/proxy.h
  5. 293 0
      backend/udp/udp.go
  6. 70 24
      main.go
  7. 5 1
      subnet/registry.go
  8. 28 23
      subnet/subnet.go
  9. 14 9
      subnet/subnet_test.go
  10. 0 118
      udp/cproxy.go
  11. 0 168
      udp/proxy.go
  12. 0 175
      udp/run.go

+ 11 - 1
backend/common.go

@@ -1,7 +1,17 @@
 package backend
 package backend
 
 
 import (
 import (
+	"errors"
+	"net"
+
 	"github.com/coreos/rudder/pkg/ip"
 	"github.com/coreos/rudder/pkg/ip"
 )
 )
 
 
-type ReadyFunc func(sn ip.IP4Net, mtu int)
+var ErrInterrupted = errors.New("Interrupted by user")
+
+type Backend interface {
+	Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (ip.IP4Net, int, error)
+	Run()
+	Stop()
+	Name() string
+}

+ 80 - 0
backend/udp/cproxy.go

@@ -0,0 +1,80 @@
+package udp
+
+//#include "proxy.h"
+import "C"
+
+import (
+	"net"
+	"os"
+	"reflect"
+	"unsafe"
+
+	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
+
+	"github.com/coreos/rudder/pkg/ip"
+)
+
+func runCProxy(tun *os.File, conn *net.UDPConn, ctl *os.File, tunIP ip.IP4, tunMTU int) {
+	var log_errors int
+	if log.V(1) {
+		log_errors = 1
+	}
+
+	c, err := conn.File()
+	if err != nil {
+		log.Error("Converting UDPConn to File failed: ", err)
+		return
+	}
+	defer c.Close()
+
+	C.run_proxy(
+		C.int(tun.Fd()),
+		C.int(c.Fd()),
+		C.int(ctl.Fd()),
+		C.in_addr_t(tunIP.NetworkOrder()),
+		C.size_t(tunMTU),
+		C.int(log_errors),
+	)
+}
+
+func writeCommand(f *os.File, cmd *C.command) {
+	hdr := reflect.SliceHeader{
+		Data: uintptr(unsafe.Pointer(cmd)),
+		Len:  int(unsafe.Sizeof(*cmd)),
+		Cap:  int(unsafe.Sizeof(*cmd)),
+	}
+	buf := *(*[]byte)(unsafe.Pointer(&hdr))
+
+	f.Write(buf)
+}
+
+func setRoute(ctl *os.File, dst ip.IP4Net, nextHopIP ip.IP4, nextHopPort int) {
+	cmd := C.command{
+		cmd:           C.CMD_SET_ROUTE,
+		dest_net:      C.in_addr_t(dst.IP.NetworkOrder()),
+		dest_net_len:  C.int(dst.PrefixLen),
+		next_hop_ip:   C.in_addr_t(nextHopIP.NetworkOrder()),
+		next_hop_port: C.short(nextHopPort),
+	}
+
+	writeCommand(ctl, &cmd)
+}
+
+func removeRoute(ctl *os.File, dst ip.IP4Net) {
+	cmd := C.command{
+		cmd:          C.CMD_DEL_ROUTE,
+		dest_net:     C.in_addr_t(dst.IP.NetworkOrder()),
+		dest_net_len: C.int(dst.PrefixLen),
+	}
+
+	writeCommand(ctl, &cmd)
+}
+
+func stopProxy(ctl *os.File) {
+	cmd := C.command{
+		cmd:          C.CMD_STOP,
+	}
+
+	writeCommand(ctl, &cmd)
+}
+

+ 2 - 1
udp/proxy.c → backend/udp/proxy.c

@@ -413,8 +413,9 @@ void run_proxy(int tun, int sock, int ctl, in_addr_t tun_ip, size_t tun_mtu, int
 		if( fds[1].revents & POLLIN )
 		if( fds[1].revents & POLLIN )
 			udp_to_tun(sock, tun, buf, tun_mtu);
 			udp_to_tun(sock, tun, buf, tun_mtu);
 
 
-		if( fds[2].revents & POLLIN )
+		if( fds[2].revents & POLLIN ) {
 			process_cmd(ctl);
 			process_cmd(ctl);
+		}
 	}
 	}
 
 
 	free(buf);
 	free(buf);

+ 0 - 0
udp/proxy.h → backend/udp/proxy.h


+ 293 - 0
backend/udp/udp.go

@@ -0,0 +1,293 @@
+package udp
+
+import (
+	"fmt"
+	"encoding/json"
+	"net"
+	"os"
+	"strings"
+	"sync"
+	"syscall"
+	"time"
+
+	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/docker/libcontainer/netlink"
+	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
+
+	"github.com/coreos/rudder/backend"
+	"github.com/coreos/rudder/pkg/ip"
+	"github.com/coreos/rudder/subnet"
+)
+
+const (
+	encapOverhead = 28 // 20 bytes IP hdr + 8 bytes UDP hdr
+)
+
+type UdpBackend struct {
+	sm     *subnet.SubnetManager
+	ctl    *os.File
+	ctl2   *os.File
+	tun    *os.File
+	conn   *net.UDPConn
+	port   int
+	mtu    int
+	tunNet ip.IP4Net
+	stop   chan bool
+	wg     sync.WaitGroup
+}
+
+func New(sm *subnet.SubnetManager, port int) backend.Backend {
+	return &UdpBackend{
+		sm: sm,
+		port: port,
+		stop: make(chan bool),
+	}
+}
+
+func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (ip.IP4Net, int, error) {
+	sn, err := m.acquireLease(extIP)
+	if err != nil {
+		return ip.IP4Net{}, 0, fmt.Errorf("Failed to acquire lease: %s", err)
+	}
+
+	// Tunnel's subnet is that of the whole overlay network (e.g. /16)
+	// and not that of the individual host (e.g. /24)
+	m.tunNet = ip.IP4Net{
+		IP:        sn.IP,
+		PrefixLen: m.sm.GetConfig().Network.PrefixLen,
+	}
+
+	// TUN MTU will be smaller b/c of encap (IP+UDP hdrs)
+	m.mtu = extIface.MTU - encapOverhead
+
+	if err = m.initTun(ipMasq); err != nil {
+		return ip.IP4Net{}, 0, err
+	}
+
+	m.conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: m.port})
+	if err != nil {
+		return ip.IP4Net{}, 0, fmt.Errorf("Failed to start listening on UDP socket: %s", err)
+	}
+
+
+	m.ctl, m.ctl2, err = newCtlSockets()
+	if err != nil {
+		return ip.IP4Net{}, 0, fmt.Errorf("Failed to create control socket: %s", err)
+	}
+
+	return sn, m.mtu, nil
+}
+
+func (m *UdpBackend) Run() {
+	// one for each goroutine below
+	m.wg.Add(2)
+
+	go func() {
+		runCProxy(m.tun, m.conn, m.ctl2, m.tunNet.IP, m.mtu)
+		m.wg.Done()
+	}()
+
+	go func() {
+		m.sm.LeaseRenewer(m.stop)
+		m.wg.Done()
+	}()
+
+	m.monitorEvents()
+
+	m.wg.Wait()
+}
+
+func (m *UdpBackend) Stop() {
+	if m.ctl != nil {
+		stopProxy(m.ctl)
+	}
+
+	close(m.stop)
+}
+
+func (m *UdpBackend) Name() string {
+	return "UDP"
+}
+
+func (m *UdpBackend) acquireLease(extIP net.IP) (ip.IP4Net, error) {
+	attrs := subnet.BaseAttrs{
+		PublicIP: ip.FromIP(extIP),
+	}
+
+	data, err := json.Marshal(&attrs)
+	if err != nil {
+		return ip.IP4Net{}, err
+	}
+
+	var sn ip.IP4Net
+	for {
+		sn, err = m.sm.AcquireLease(attrs.PublicIP, string(data), m.stop)
+		if err == nil {
+			log.Info("Subnet lease acquired: ", sn)
+			break
+		}
+
+		log.Error("Failed to acquire subnet: ", err)
+
+		select {
+		case <-time.After(time.Second):
+			break
+
+		case <-m.stop:
+			return ip.IP4Net{}, backend.ErrInterrupted
+		}
+	}
+
+	return sn, nil
+}
+
+func newCtlSockets() (*os.File, *os.File, error) {
+	fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
+	if err != nil {
+		return nil, nil, err
+	}
+
+	f1 := os.NewFile(uintptr(fds[0]), "ctl")
+	f2 := os.NewFile(uintptr(fds[1]), "ctl")
+	return f1, f2, nil
+}
+
+func (m *UdpBackend) initTun(ipMasq bool) error {
+	var tunName string
+	var err error
+
+	m.tun, tunName, err = ip.OpenTun("rudder%d")
+	if err != nil {
+		log.Error("Failed to open TUN device: ", err)
+		return err
+	}
+
+	err = configureIface(tunName, m.tunNet, m.mtu)
+	if err != nil {
+		return err
+	}
+
+	if ipMasq {
+		err = setupIpMasq(m.tunNet.Network(), tunName)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func configureIface(ifname string, ipn ip.IP4Net, mtu int) error {
+	iface, err := net.InterfaceByName(ifname)
+	if err != nil {
+		log.Error("Failed to lookup interface ", ifname)
+		return err
+	}
+
+	n := ipn.ToIPNet()
+	err = netlink.NetworkLinkAddIp(iface, n.IP, n)
+	if err != nil {
+		log.Errorf("Failed to add IP address %s to %s: %s", n.IP, ifname, err)
+		return err
+	}
+
+	err = netlink.NetworkSetMTU(iface, mtu)
+	if err != nil {
+		log.Errorf("Failed to set MTU for %s: ", ifname, err)
+		return err
+	}
+
+	err = netlink.NetworkLinkUp(iface)
+	if err != nil {
+		log.Errorf("Failed to set interface %s to UP state: %s", ifname, err)
+		return err
+	}
+
+	// explicitly add a route since there might be a route for a subnet already
+	// installed by Docker and then it won't get auto added
+	err = netlink.AddRoute(ipn.Network().String(), "", "", ifname)
+	if err != nil && err != syscall.EEXIST {
+		log.Errorf("Failed to add route (%s -> %s): ", ipn.Network().String(), ifname, err)
+		return err
+	}
+
+	return nil
+}
+
+func setupIpMasq(ipn ip.IP4Net, iface string) error {
+	ipt, err := ip.NewIPTables()
+	if err != nil {
+		log.Error("Failed to setup IP Masquerade. iptables was not found")
+		return err
+	}
+
+	err = ipt.ClearChain("nat", "RUDDER")
+	if err != nil {
+		log.Error("Failed to create/clear RUDDER chain in NAT table: ", err)
+		return err
+	}
+
+	rules := [][]string{
+		// This rule makes sure we don't NAT traffic within overlay network (e.g. coming out of docker0)
+		[]string{ "RUDDER", "-d", ipn.String(), "-j", "ACCEPT" },
+		// This rule makes sure we don't NAT multicast traffic within overlay network
+		[]string{ "RUDDER", "-d", "224.0.0.0/4", "-j", "ACCEPT" },
+		// This rule will NAT everything originating from our overlay network and 
+		[]string{ "RUDDER", "!", "-o", iface, "-j", "MASQUERADE" },
+		// This rule will take everything coming from overlay and sent it to RUDDER chain
+		[]string{ "POSTROUTING", "-s", ipn.String(), "-j", "RUDDER" },
+	}
+
+	for _, args := range rules {
+		log.Info("Adding iptables rule: ", strings.Join(args, " "))
+
+		err = ipt.AppendUnique("nat", args...)
+		if err != nil {
+			log.Error("Failed to insert IP masquerade rule: ", err)
+			return err
+		}
+	}
+
+	return nil
+}
+func (m *UdpBackend) monitorEvents() {
+	log.Info("Watching for new subnet leases")
+
+	evts := make(chan subnet.EventBatch)
+
+	m.wg.Add(1)
+	go func() {
+		m.sm.WatchLeases(evts, m.stop)
+		m.wg.Done()
+	}()
+
+	for {
+		select {
+		case evtBatch := <-evts:
+			for _, evt := range evtBatch {
+				switch evt.Type {
+				case subnet.SubnetAdded:
+					log.Info("Subnet added: ", evt.Lease.Network)
+
+					var attrs subnet.BaseAttrs
+					if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
+						log.Error("Error decoding subnet lease JSON: ", err)
+						continue
+					}
+
+					setRoute(m.ctl, evt.Lease.Network, attrs.PublicIP, m.port)
+
+				case subnet.SubnetRemoved:
+					log.Info("Subnet removed: ", evt.Lease.Network)
+
+					removeRoute(m.ctl, evt.Lease.Network)
+
+				default:
+					log.Error("Internal error: unknown event type: ", int(evt.Type))
+				}
+			}
+
+		case <-m.stop:
+			return
+		}
+	}
+}

+ 70 - 24
main.go

@@ -5,15 +5,18 @@ import (
 	"fmt"
 	"fmt"
 	"net"
 	"net"
 	"os"
 	"os"
+	"os/signal"
 	"path"
 	"path"
+	"syscall"
 	"time"
 	"time"
 
 
 	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-systemd/daemon"
 	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-systemd/daemon"
 	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
 	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
 
 
+	"github.com/coreos/rudder/backend"
 	"github.com/coreos/rudder/pkg/ip"
 	"github.com/coreos/rudder/pkg/ip"
 	"github.com/coreos/rudder/subnet"
 	"github.com/coreos/rudder/subnet"
-	"github.com/coreos/rudder/udp"
+	"github.com/coreos/rudder/backend/udp"
 )
 )
 
 
 const (
 const (
@@ -44,7 +47,7 @@ func init() {
 	flag.BoolVar(&opts.version, "version", false, "print version and exit")
 	flag.BoolVar(&opts.version, "version", false, "print version and exit")
 }
 }
 
 
-func writeSubnet(sn ip.IP4Net, mtu int) error {
+func writeSubnetFile(sn ip.IP4Net, mtu int) error {
 	// Write out the first usable IP by incrementing
 	// Write out the first usable IP by incrementing
 	// sn.IP by one
 	// sn.IP by one
 	sn.IP += 1
 	sn.IP += 1
@@ -63,23 +66,21 @@ func writeSubnet(sn ip.IP4Net, mtu int) error {
 	return nil
 	return nil
 }
 }
 
 
-func lookupIface() (*net.Interface, net.IP) {
+func lookupIface() (*net.Interface, net.IP, error) {
 	var iface *net.Interface
 	var iface *net.Interface
-	var tep net.IP
+	var ipaddr net.IP
 	var err error
 	var err error
 
 
 	if len(opts.iface) > 0 {
 	if len(opts.iface) > 0 {
-		if tep = net.ParseIP(opts.iface); tep != nil {
-			iface, err = ip.GetInterfaceByIP(tep)
+		if ipaddr = net.ParseIP(opts.iface); ipaddr != nil {
+			iface, err = ip.GetInterfaceByIP(ipaddr)
 			if err != nil {
 			if err != nil {
-				log.Errorf("Error looking up interface %s: %s", opts.iface, err)
-				return nil, nil
+				return nil, nil, fmt.Errorf("Error looking up interface %s: %s", opts.iface, err)
 			}
 			}
 		} else {
 		} else {
 			iface, err = net.InterfaceByName(opts.iface)
 			iface, err = net.InterfaceByName(opts.iface)
 			if err != nil {
 			if err != nil {
-				log.Errorf("Error looking up interface %s: %s", opts.iface, err)
-				return nil, nil
+				return nil, nil, fmt.Errorf("Error looking up interface %s: %s", opts.iface, err)
 			}
 			}
 		}
 		}
 	} else {
 	} else {
@@ -93,14 +94,14 @@ func lookupIface() (*net.Interface, net.IP) {
 		}
 		}
 	}
 	}
 
 
-	if tep == nil {
-		tep, err = ip.GetIfaceIP4Addr(iface)
+	if ipaddr == nil {
+		ipaddr, err = ip.GetIfaceIP4Addr(iface)
 		if err != nil {
 		if err != nil {
-			log.Error("Failed to find IPv4 address for interface ", iface.Name)
+			return nil, nil, fmt.Errorf("Failed to find IPv4 address for interface %s", iface.Name)
 		}
 		}
 	}
 	}
 
 
-	return iface, tep
+	return iface, ipaddr, nil
 }
 }
 
 
 func makeSubnetManager() *subnet.SubnetManager {
 func makeSubnetManager() *subnet.SubnetManager {
@@ -115,6 +116,40 @@ func makeSubnetManager() *subnet.SubnetManager {
 	}
 	}
 }
 }
 
 
+func newBackend() backend.Backend {
+	sm := makeSubnetManager()
+	return udp.New(sm, opts.port)
+}
+
+func run(be backend.Backend, quit chan bool) {
+	defer close(quit)
+
+	iface, ipaddr, err := lookupIface()
+	if err != nil {
+		log.Error(err)
+		return
+	}
+
+	if iface.MTU == 0 {
+		log.Errorf("Failed to determine MTU for %s interface", ipaddr)
+		return
+	}
+
+	log.Infof("Using %s as external interface", ipaddr)
+
+	sn, mtu, err := be.Init(iface, ipaddr, opts.ipMasq)
+	if err != nil {
+		log.Error(err)
+		return
+	}
+
+	writeSubnetFile(sn, mtu)
+	daemon.SdNotify("READY=1")
+
+	log.Infof("%s mode initialized", be.Name())
+	be.Run()
+}
+
 func main() {
 func main() {
 	// glog will log to tmp files by default. override so all entries
 	// glog will log to tmp files by default. override so all entries
 	// can flow into journald (if running under systemd)
 	// can flow into journald (if running under systemd)
@@ -134,17 +169,28 @@ func main() {
 		os.Exit(0)
 		os.Exit(0)
 	}
 	}
 
 
-	iface, tep := lookupIface()
-	if iface == nil || tep == nil {
-		return
-	}
+	be := newBackend()
 
 
-	log.Infof("Using %s to tunnel", tep)
+	// Register for SIGINT and SIGTERM and wait for one of them to arrive
+	log.Info("Installing signal handlers")
+	sigs := make(chan os.Signal, 5)
+	signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
 
 
-	sm := makeSubnetManager()
+	quit := make(chan bool)
+	go run(be, quit)
+
+	for {
+		select {
+		case <-sigs:
+			// unregister to get default OS nuke behaviour in case we don't exit cleanly
+			signal.Stop(sigs)
 
 
-	udp.Run(sm, iface, tep, opts.port, opts.ipMasq, func(sn ip.IP4Net, mtu int) {
-		writeSubnet(sn, mtu)
-		daemon.SdNotify("READY=1")
-	})
+			log.Info("Exiting...")
+			be.Stop()
+
+		case <-quit:
+			log.Infof("%s mode exited", be.Name())
+			os.Exit(0)
+		}
+	}
 }
 }

+ 5 - 1
subnet/registry.go

@@ -54,7 +54,11 @@ func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd
 		resp, err := esr.client().RawWatch(esr.prefix+"/subnets", since, true, nil, stop)
 		resp, err := esr.client().RawWatch(esr.prefix+"/subnets", since, true, nil, stop)
 
 
 		if err != nil {
 		if err != nil {
-			return nil, err
+			if err == etcd.ErrWatchStoppedByUser {
+				return nil, nil
+			} else {
+				return nil, err
+			}
 		}
 		}
 
 
 		if len(resp.Body) == 0 {
 		if len(resp.Body) == 0 {

+ 28 - 23
subnet/subnet.go

@@ -35,6 +35,8 @@ const (
 
 
 var (
 var (
 	subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`)
 	subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`)
+
+	ErrCanceled = errors.New("Canceled by user")
 )
 )
 
 
 type SubnetLease struct {
 type SubnetLease struct {
@@ -49,7 +51,6 @@ type SubnetManager struct {
 	leaseExp  time.Time
 	leaseExp  time.Time
 	lastIndex uint64
 	lastIndex uint64
 	leases    []SubnetLease
 	leases    []SubnetLease
-	stop      chan bool
 }
 }
 
 
 type EventType int
 type EventType int
@@ -66,7 +67,7 @@ func NewSubnetManager(etcdEndpoint, prefix string) (*SubnetManager, error) {
 	return newSubnetManager(esr)
 	return newSubnetManager(esr)
 }
 }
 
 
-func (sm *SubnetManager) AcquireLease(tep ip.IP4, data string) (ip.IP4Net, error) {
+func (sm *SubnetManager) AcquireLease(tep ip.IP4, data string, cancel chan bool) (ip.IP4Net, error) {
 	for i := 0; i < registerRetries; i++ {
 	for i := 0; i < registerRetries; i++ {
 		var err error
 		var err error
 		sm.leases, err = sm.getLeases()
 		sm.leases, err = sm.getLeases()
@@ -74,7 +75,7 @@ func (sm *SubnetManager) AcquireLease(tep ip.IP4, data string) (ip.IP4Net, error
 			return ip.IP4Net{}, err
 			return ip.IP4Net{}, err
 		}
 		}
 
 
-		// try to reuse a subnet if there's one that match our IP
+		// try to reuse a subnet if there's one that matches our IP
 		for _, l := range sm.leases {
 		for _, l := range sm.leases {
 			var ba BaseAttrs
 			var ba BaseAttrs
 			err = json.Unmarshal([]byte(l.Data), &ba)
 			err = json.Unmarshal([]byte(l.Data), &ba)
@@ -109,11 +110,16 @@ func (sm *SubnetManager) AcquireLease(tep ip.IP4, data string) (ip.IP4Net, error
 
 
 		// if etcd returned Key Already Exists, try again.
 		// if etcd returned Key Already Exists, try again.
 		case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists:
 		case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists:
-			continue
+			break
 
 
 		default:
 		default:
 			return ip.IP4Net{}, err
 			return ip.IP4Net{}, err
 		}
 		}
+
+		// before moving on, check for cancel
+		if interrupted(cancel) {
+			return ip.IP4Net{}, ErrCanceled
+		}
 	}
 	}
 
 
 	return ip.IP4Net{}, errors.New("Max retries reached trying to acquire a subnet")
 	return ip.IP4Net{}, errors.New("Max retries reached trying to acquire a subnet")
@@ -125,17 +131,6 @@ func (sm *SubnetManager) UpdateSubnet(data string) error {
 	return err
 	return err
 }
 }
 
 
-func (sm *SubnetManager) Start(receiver chan EventBatch) {
-	go sm.watchLeases(receiver)
-	go sm.leaseRenewer()
-}
-
-func (sm *SubnetManager) Stop() {
-	// once for each goroutine
-	sm.stop <- true
-	sm.stop <- true
-}
-
 func (sm *SubnetManager) GetConfig() *Config {
 func (sm *SubnetManager) GetConfig() *Config {
 	return sm.config
 	return sm.config
 }
 }
@@ -164,11 +159,12 @@ func newSubnetManager(r subnetRegistry) (*SubnetManager, error) {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	return &SubnetManager{
+	sm := SubnetManager{
 		registry: r,
 		registry: r,
 		config:   cfg,
 		config:   cfg,
-		stop:     make(chan bool, 2),
-	}, nil
+	}
+
+	return &sm, nil
 }
 }
 
 
 func (sm *SubnetManager) getLeases() ([]SubnetLease, error) {
 func (sm *SubnetManager) getLeases() ([]SubnetLease, error) {
@@ -293,7 +289,7 @@ OuterLoop:
 	}
 	}
 }
 }
 
 
-func (sm *SubnetManager) watchLeases(receiver chan EventBatch) {
+func (sm *SubnetManager) WatchLeases(receiver chan EventBatch, cancel chan bool) {
 	// "catch up" by replaying all the leases we discovered during
 	// "catch up" by replaying all the leases we discovered during
 	// AcquireLease
 	// AcquireLease
 	var batch EventBatch
 	var batch EventBatch
@@ -307,9 +303,9 @@ func (sm *SubnetManager) watchLeases(receiver chan EventBatch) {
 	}
 	}
 
 
 	for {
 	for {
-		resp, err := sm.registry.watchSubnets(sm.lastIndex+1, sm.stop)
+		resp, err := sm.registry.watchSubnets(sm.lastIndex+1, cancel)
 
 
-		// watchSubnets exited by stop chan being signaled
+		// watchSubnets exited by cancel chan being signaled
 		if err == nil && resp == nil {
 		if err == nil && resp == nil {
 			return
 			return
 		}
 		}
@@ -373,7 +369,7 @@ func (sm *SubnetManager) parseSubnetWatchError(err error) (batch *EventBatch, ou
 	return
 	return
 }
 }
 
 
-func (sm *SubnetManager) leaseRenewer() {
+func (sm *SubnetManager) LeaseRenewer(cancel chan bool) {
 	dur := sm.leaseExp.Sub(time.Now()) - renewMargin
 	dur := sm.leaseExp.Sub(time.Now()) - renewMargin
 
 
 	for {
 	for {
@@ -390,8 +386,17 @@ func (sm *SubnetManager) leaseRenewer() {
 			log.Info("Lease renewed, new expiration: ", sm.leaseExp)
 			log.Info("Lease renewed, new expiration: ", sm.leaseExp)
 			dur = sm.leaseExp.Sub(time.Now()) - renewMargin
 			dur = sm.leaseExp.Sub(time.Now()) - renewMargin
 
 
-		case <-sm.stop:
+		case <-cancel:
 			return
 			return
 		}
 		}
 	}
 	}
 }
 }
+
+func interrupted(cancel chan bool) bool {
+	select {
+	case <-cancel:
+		return true
+	default:
+		return false
+	}
+}

+ 14 - 9
subnet/subnet_test.go

@@ -154,7 +154,8 @@ func TestAcquireLease(t *testing.T) {
 	ip, _ := ip.ParseIP4("1.2.3.4")
 	ip, _ := ip.ParseIP4("1.2.3.4")
 	data := `{ "PublicIP": "1.2.3.4" }`
 	data := `{ "PublicIP": "1.2.3.4" }`
 
 
-	sn, err := sm.AcquireLease(ip, data)
+	cancel := make(chan bool)
+	sn, err := sm.AcquireLease(ip, data, cancel)
 	if err != nil {
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 		t.Fatal("AcquireLease failed: ", err)
 	}
 	}
@@ -164,7 +165,7 @@ func TestAcquireLease(t *testing.T) {
 	}
 	}
 
 
 	// Acquire again, should reuse
 	// Acquire again, should reuse
-	if sn, err = sm.AcquireLease(ip, data); err != nil {
+	if sn, err = sm.AcquireLease(ip, data, cancel); err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 		t.Fatal("AcquireLease failed: ", err)
 	}
 	}
 
 
@@ -181,7 +182,8 @@ func TestWatchLeaseAdded(t *testing.T) {
 	}
 	}
 
 
 	events := make(chan EventBatch)
 	events := make(chan EventBatch)
-	sm.Start(events)
+	cancel := make(chan bool)
+	go sm.WatchLeases(events, cancel)
 
 
 	expected := "10.3.3.0-24"
 	expected := "10.3.3.0-24"
 	msr.addCh <- expected
 	msr.addCh <- expected
@@ -206,7 +208,7 @@ func TestWatchLeaseAdded(t *testing.T) {
 		t.Errorf("WatchSubnet produced wrong subnet: expected %s, got %s", expected, actual)
 		t.Errorf("WatchSubnet produced wrong subnet: expected %s, got %s", expected, actual)
 	}
 	}
 
 
-	sm.Stop()
+	close(cancel)
 }
 }
 
 
 func TestWatchLeaseRemoved(t *testing.T) {
 func TestWatchLeaseRemoved(t *testing.T) {
@@ -217,7 +219,8 @@ func TestWatchLeaseRemoved(t *testing.T) {
 	}
 	}
 
 
 	events := make(chan EventBatch)
 	events := make(chan EventBatch)
-	sm.Start(events)
+	cancel := make(chan bool)
+	go sm.WatchLeases(events, cancel)
 
 
 	expected := "10.3.4.0-24"
 	expected := "10.3.4.0-24"
 	msr.delCh <- expected
 	msr.delCh <- expected
@@ -242,7 +245,7 @@ func TestWatchLeaseRemoved(t *testing.T) {
 		t.Errorf("WatchSubnet produced wrong subnet: expected %s, got %s", expected, actual)
 		t.Errorf("WatchSubnet produced wrong subnet: expected %s, got %s", expected, actual)
 	}
 	}
 
 
-	sm.Stop()
+	close(cancel)
 }
 }
 
 
 func TestRenewLease(t *testing.T) {
 func TestRenewLease(t *testing.T) {
@@ -255,13 +258,15 @@ func TestRenewLease(t *testing.T) {
 	ip, _ := ip.ParseIP4("1.2.3.4")
 	ip, _ := ip.ParseIP4("1.2.3.4")
 	data := `{ "PublicIP": "1.2.3.4" }`
 	data := `{ "PublicIP": "1.2.3.4" }`
 
 
-	sn, err := sm.AcquireLease(ip, data)
+	cancel := make(chan bool)
+	defer close(cancel)
+
+	sn, err := sm.AcquireLease(ip, data, cancel)
 	if err != nil {
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 		t.Fatal("AcquireLease failed: ", err)
 	}
 	}
 
 
-	events := make(chan EventBatch)
-	sm.Start(events)
+	go sm.LeaseRenewer(cancel)
 
 
 	fmt.Println("Waiting for lease to pass original expiration")
 	fmt.Println("Waiting for lease to pass original expiration")
 	time.Sleep(2 * time.Second)
 	time.Sleep(2 * time.Second)

+ 0 - 118
udp/cproxy.go

@@ -1,118 +0,0 @@
-package udp
-
-//#include "proxy.h"
-import "C"
-
-import (
-	"encoding/json"
-	"net"
-	"os"
-	"reflect"
-	"syscall"
-	"unsafe"
-
-	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
-
-	"github.com/coreos/rudder/pkg/ip"
-	"github.com/coreos/rudder/subnet"
-)
-
-func runCProxy(tun *os.File, conn *os.File, ctl *os.File, tunIP ip.IP4, tunMTU uint) {
-	var log_errors int
-	if log.V(1) {
-		log_errors = 1
-	}
-
-	C.run_proxy(
-		C.int(tun.Fd()),
-		C.int(conn.Fd()),
-		C.int(ctl.Fd()),
-		C.in_addr_t(tunIP.NetworkOrder()),
-		C.size_t(tunMTU),
-		C.int(log_errors),
-	)
-}
-
-func writeCommand(f *os.File, cmd *C.command) {
-	hdr := reflect.SliceHeader{
-		Data: uintptr(unsafe.Pointer(cmd)),
-		Len:  int(unsafe.Sizeof(*cmd)),
-		Cap:  int(unsafe.Sizeof(*cmd)),
-	}
-	buf := *(*[]byte)(unsafe.Pointer(&hdr))
-
-	f.Write(buf)
-}
-
-func newCtlSockets() (*os.File, *os.File, error) {
-	fds, err := syscall.Socketpair(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
-	if err != nil {
-		return nil, nil, err
-	}
-
-	f1 := os.NewFile(uintptr(fds[0]), "ctl")
-	f2 := os.NewFile(uintptr(fds[1]), "ctl")
-	return f1, f2, nil
-}
-
-func fastProxy(sm *subnet.SubnetManager, tun *os.File, conn *net.UDPConn, tunIP ip.IP4, tunMTU uint, port int) {
-	log.Info("Running fast proxy loop")
-
-	c, err := conn.File()
-	if err != nil {
-		log.Error("Converting UDPConn to File failed: ", err)
-		return
-	}
-	defer c.Close()
-
-	ctl, peerCtl, err := newCtlSockets()
-	if err != nil {
-		log.Error("Failed to create control socket: ", err)
-		return
-	}
-	defer ctl.Close()
-	defer peerCtl.Close()
-
-	go runCProxy(tun, c, peerCtl, tunIP, tunMTU)
-
-	log.Info("Watching for new subnet leases")
-	evts := make(chan subnet.EventBatch)
-	sm.Start(evts)
-
-	for evtBatch := range evts {
-		for _, evt := range evtBatch {
-			if evt.Type == subnet.SubnetAdded {
-				log.Info("Subnet added: ", evt.Lease.Network)
-				var attrs subnet.BaseAttrs
-				if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
-					log.Error("Error decoding subnet lease JSON: ", err)
-					continue
-				}
-
-				cmd := C.command{
-					cmd:           C.CMD_SET_ROUTE,
-					dest_net:      C.in_addr_t(evt.Lease.Network.IP.NetworkOrder()),
-					dest_net_len:  C.int(evt.Lease.Network.PrefixLen),
-					next_hop_ip:   C.in_addr_t(attrs.PublicIP.NetworkOrder()),
-					next_hop_port: C.short(port),
-				}
-
-				writeCommand(ctl, &cmd)
-
-			} else if evt.Type == subnet.SubnetRemoved {
-				log.Info("Subnet removed: ", evt.Lease.Network)
-
-				cmd := C.command{
-					cmd:          C.CMD_DEL_ROUTE,
-					dest_net:     C.in_addr_t(evt.Lease.Network.IP.NetworkOrder()),
-					dest_net_len: C.int(evt.Lease.Network.PrefixLen),
-				}
-
-				writeCommand(ctl, &cmd)
-
-			} else {
-				log.Error("Internal error: unknown event type: ", int(evt.Type))
-			}
-		}
-	}
-}

+ 0 - 168
udp/proxy.go

@@ -1,168 +0,0 @@
-package udp
-
-import (
-	"encoding/json"
-	"net"
-	"os"
-	"sync"
-
-	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
-
-	"github.com/coreos/rudder/pkg/ip"
-	"github.com/coreos/rudder/subnet"
-)
-
-const (
-	minIP4HdrSize = 20
-)
-
-type routeEntry struct {
-	sn   ip.IP4Net
-	addr *net.UDPAddr
-}
-
-type Router struct {
-	mux    sync.Mutex
-	port   int
-	routes []routeEntry
-}
-
-func NewRouter(port int) *Router {
-	return &Router{
-		port: port,
-	}
-}
-
-func (r *Router) SetRoute(sn ip.IP4Net, dst ip.IP4) {
-	r.mux.Lock()
-	defer r.mux.Unlock()
-
-	for _, re := range r.routes {
-		if re.sn.Equal(sn) {
-			re.addr = &net.UDPAddr{
-				IP:   dst.ToIP(),
-				Port: r.port,
-			}
-			return
-		}
-	}
-
-	re := routeEntry{
-		sn: sn,
-		addr: &net.UDPAddr{
-			IP:   dst.ToIP(),
-			Port: r.port,
-		},
-	}
-
-	r.routes = append(r.routes, re)
-}
-
-func (r *Router) DelRoute(sn ip.IP4Net) {
-	r.mux.Lock()
-	defer r.mux.Unlock()
-
-	for i, re := range r.routes {
-		if re.sn.Equal(sn) {
-			r.routes[i] = r.routes[len(r.routes)-1]
-			r.routes = r.routes[:len(r.routes)-1]
-			return
-		}
-	}
-}
-
-func (r *Router) routePacket(pkt []byte, conn *net.UDPConn) {
-	if len(pkt) < minIP4HdrSize {
-		log.V(1).Infof("Packet too small (%d bytes), unable to route", len(pkt))
-		return
-	}
-
-	r.mux.Lock()
-	defer r.mux.Unlock()
-
-	dstIP := ip.FromBytes(pkt[16:20])
-
-	for i, re := range r.routes {
-		if re.sn.Contains(dstIP) {
-			nbytes, err := conn.WriteToUDP(pkt, re.addr)
-			switch {
-			case err != nil:
-				log.V(1).Info("UDP send failed with: ", err)
-			case nbytes != len(pkt):
-				log.V(1).Infof("Was only able to UDP send %d out of %d bytes to %s: ", nbytes, len(pkt), re.addr.IP)
-			}
-
-			// packets for same dest tend to come in burst. swap to front make it faster for subsequent ones
-			if i != 0 {
-				r.routes[0], r.routes[i] = r.routes[i], r.routes[0]
-			}
-			return
-		}
-	}
-
-	log.V(1).Info("No route found for ", dstIP)
-}
-
-func proxy(sm *subnet.SubnetManager, tun *os.File, conn *net.UDPConn, tunMTU uint, port int) {
-	log.Info("Running slow proxy loop")
-
-	rtr := NewRouter(port)
-
-	go proxyTunToUdp(rtr, tun, conn, tunMTU)
-	go proxyUdpToTun(conn, tun, tunMTU)
-
-	log.Info("Watching for new subnet leases")
-	evts := make(chan subnet.EventBatch)
-	sm.Start(evts)
-
-	for evtBatch := range evts {
-		for _, evt := range evtBatch {
-			if evt.Type == subnet.SubnetAdded {
-				log.Info("Subnet added: ", evt.Lease.Network)
-				var attrs subnet.BaseAttrs
-				if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
-					log.Error("Error decoding subnet lease JSON: ", err)
-					continue
-				}
-				rtr.SetRoute(evt.Lease.Network, attrs.PublicIP)
-
-			} else if evt.Type == subnet.SubnetRemoved {
-				log.Info("Subnet removed: ", evt.Lease.Network)
-				rtr.DelRoute(evt.Lease.Network)
-
-			} else {
-				log.Error("Internal error: unknown event type: ", int(evt.Type))
-			}
-		}
-	}
-}
-
-func proxyTunToUdp(r *Router, tun *os.File, conn *net.UDPConn, tunMTU uint) {
-	pkt := make([]byte, tunMTU)
-	for {
-		nbytes, err := tun.Read(pkt)
-		if err != nil {
-			log.V(1).Info("Error reading from TUN device: ", err)
-		} else {
-			r.routePacket(pkt[:nbytes], conn)
-		}
-	}
-}
-
-func proxyUdpToTun(conn *net.UDPConn, tun *os.File, tunMTU uint) {
-	pkt := make([]byte, tunMTU)
-	for {
-		nrecv, err := conn.Read(pkt)
-		if err != nil {
-			log.V(1).Info("Error reading from socket: ", err)
-		} else {
-			nsent, err := tun.Write(pkt[:nrecv])
-			switch {
-			case err != nil:
-				log.V(1).Info("Error writing to TUN device: ", err)
-			case nsent != nrecv:
-				log.V(1).Infof("Was only able to write %d out of %d bytes to TUN device: ", nsent, nrecv)
-			}
-		}
-	}
-}

+ 0 - 175
udp/run.go

@@ -1,175 +0,0 @@
-package udp
-
-import (
-	"encoding/json"
-	"net"
-	"strings"
-	"syscall"
-	"time"
-
-	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/docker/libcontainer/netlink"
-	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
-
-	"github.com/coreos/rudder/backend"
-	"github.com/coreos/rudder/pkg/ip"
-	"github.com/coreos/rudder/subnet"
-)
-
-const (
-	encapOverhead = 28 // 20 bytes IP hdr + 8 bytes UDP hdr
-)
-
-func configureIface(ifname string, ipn ip.IP4Net, mtu int) error {
-	iface, err := net.InterfaceByName(ifname)
-	if err != nil {
-		log.Error("Failed to lookup interface ", ifname)
-		return err
-	}
-
-	n := ipn.ToIPNet()
-	err = netlink.NetworkLinkAddIp(iface, n.IP, n)
-	if err != nil {
-		log.Errorf("Failed to add IP address %s to %s: %s", n.IP, ifname, err)
-		return err
-	}
-
-	err = netlink.NetworkSetMTU(iface, mtu)
-	if err != nil {
-		log.Errorf("Failed to set MTU for %s: %v", ifname, err)
-		return err
-	}
-
-	err = netlink.NetworkLinkUp(iface)
-	if err != nil {
-		log.Errorf("Failed to set interface %s to UP state: %s", ifname, err)
-		return err
-	}
-
-	// explicitly add a route since there might be a route for a subnet already
-	// installed by Docker and then it won't get auto added
-	err = netlink.AddRoute(ipn.Network().String(), "", "", ifname)
-	if err != nil && err != syscall.EEXIST {
-		log.Errorf("Failed to add route (%s -> %s): %v", ipn.Network().String(), ifname, err)
-		return err
-	}
-
-	return nil
-}
-
-func setupIpMasq(ipn ip.IP4Net, iface string) error {
-	ipt, err := ip.NewIPTables()
-	if err != nil {
-		log.Error("Failed to setup IP Masquerade. iptables was not found")
-		return err
-	}
-
-	err = ipt.ClearChain("nat", "RUDDER")
-	if err != nil {
-		log.Error("Failed to create/clear RUDDER chain in NAT table: ", err)
-		return err
-	}
-
-	rules := [][]string{
-		// This rule makes sure we don't NAT traffic within overlay network (e.g. coming out of docker0)
-		[]string{"RUDDER", "-d", ipn.String(), "-j", "ACCEPT"},
-		// This rule makes sure we don't NAT multicast traffic within overlay network
-		[]string{"RUDDER", "-d", "224.0.0.0/4", "-j", "ACCEPT"},
-		// This rule will NAT everything originating from our overlay network and
-		[]string{"RUDDER", "!", "-o", iface, "-j", "MASQUERADE"},
-		// This rule will take everything coming from overlay and sent it to RUDDER chain
-		[]string{"POSTROUTING", "-s", ipn.String(), "-j", "RUDDER"},
-	}
-
-	for _, args := range rules {
-		log.Info("Adding iptables rule: ", strings.Join(args, " "))
-
-		err = ipt.AppendUnique("nat", args...)
-		if err != nil {
-			log.Error("Failed to insert IP masquerade rule: ", err)
-			return err
-		}
-	}
-
-	return nil
-}
-
-func acquireLease(sm *subnet.SubnetManager, pubIP net.IP) (ip.IP4Net, error) {
-	attrs := subnet.BaseAttrs{
-		PublicIP: ip.FromIP(pubIP),
-	}
-	data, err := json.Marshal(&attrs)
-	if err != nil {
-		return ip.IP4Net{}, err
-	}
-
-	var sn ip.IP4Net
-	for {
-		sn, err = sm.AcquireLease(attrs.PublicIP, string(data))
-		if err == nil {
-			log.Info("Subnet lease acquired: ", sn)
-			break
-		}
-		log.Error("Failed to acquire subnet: ", err)
-		time.Sleep(time.Second)
-	}
-
-	return sn, nil
-}
-
-func Run(sm *subnet.SubnetManager, tepIface *net.Interface, tepIP net.IP, port int, ipMasq bool, ready backend.ReadyFunc) {
-	sn, err := acquireLease(sm, tepIP)
-	if err != nil {
-		log.Error("Failed to acquire lease: ", err)
-		return
-	}
-
-	tun, tunName, err := ip.OpenTun("rudder%d")
-	if err != nil {
-		log.Error("Failed to open TUN device: ", err)
-		return
-	}
-
-	localAddr := net.UDPAddr{
-		Port: port,
-	}
-
-	conn, err := net.ListenUDP("udp4", &localAddr)
-	if err != nil {
-		log.Error("Failed to start listening on UDP socket: ", err)
-		return
-	}
-
-	// Interface's subnet is that of the whole overlay network (e.g. /16)
-	// and not that of the individual host (e.g. /24)
-	tunNet := ip.IP4Net{
-		IP:        sn.IP,
-		PrefixLen: sm.GetConfig().Network.PrefixLen,
-	}
-
-	// TUN MTU will be smaller b/c of encap (IP+UDP hdrs)
-	var mtu int
-	if tepIface.MTU > 0 {
-		mtu = tepIface.MTU - encapOverhead
-	} else {
-		log.Errorf("Failed to determine MTU for %s interface", tepIP)
-		return
-	}
-
-	err = configureIface(tunName, tunNet, mtu)
-	if err != nil {
-		return
-	}
-
-	if ipMasq {
-		err = setupIpMasq(tunNet.Network(), tunName)
-		if err != nil {
-			return
-		}
-	}
-
-	// all initialized and ready for business
-	log.Info("UDP encapsulation initialized")
-	ready(sn, mtu)
-
-	fastProxy(sm, tun, conn, tunNet.IP, uint(mtu), port)
-}