Browse Source

fix issues based on code review

Eugene Yakubovich 10 years ago
parent
commit
653b2bf728
6 changed files with 81 additions and 64 deletions
  1. 7 4
      backend/alloc/alloc.go
  2. 6 1
      backend/common.go
  3. 1 2
      backend/udp/proxy.c
  4. 44 45
      backend/udp/udp.go
  5. 12 7
      main.go
  6. 11 5
      subnet/registry.go

+ 7 - 4
backend/alloc/alloc.go

@@ -23,7 +23,7 @@ func New(sm *subnet.SubnetManager) backend.Backend {
 	}
 }
 
-func (m *AllocBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (ip.IP4Net, int, error) {
+func (m *AllocBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*backend.SubnetDef, error) {
 	attrs := subnet.BaseAttrs{
 		PublicIP: ip.FromIP(extIP),
 	}
@@ -31,13 +31,16 @@ func (m *AllocBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool)
 	sn, err := m.sm.AcquireLease(ip.FromIP(extIP), &attrs, m.stop)
 	if err != nil {
 		if err == task.ErrCanceled {
-			return ip.IP4Net{}, 0, err
+			return nil, err
 		} else {
-			return ip.IP4Net{}, 0, fmt.Errorf("Failed to acquire lease: %v", err)
+			return nil, fmt.Errorf("failed to acquire lease: %v", err)
 		}
 	}
 
-	return sn, extIface.MTU, nil
+	return &backend.SubnetDef{
+		Net: sn,
+		MTU: extIface.MTU,
+	}, nil
 }
 
 func (m *AllocBackend) Run() {

+ 6 - 1
backend/common.go

@@ -6,8 +6,13 @@ import (
 	"github.com/coreos/rudder/pkg/ip"
 )
 
+type SubnetDef struct {
+	Net ip.IP4Net
+	MTU int
+}
+
 type Backend interface {
-	Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (ip.IP4Net, int, error)
+	Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*SubnetDef, error)
 	Run()
 	Stop()
 	Name() string

+ 1 - 2
backend/udp/proxy.c

@@ -413,9 +413,8 @@ void run_proxy(int tun, int sock, int ctl, in_addr_t tun_ip, size_t tun_mtu, int
 		if( fds[1].revents & POLLIN )
 			udp_to_tun(sock, tun, buf, tun_mtu);
 
-		if( fds[2].revents & POLLIN ) {
+		if( fds[2].revents & POLLIN )
 			process_cmd(ctl);
-		}
 	}
 
 	free(buf);

+ 44 - 45
backend/udp/udp.go

@@ -49,11 +49,11 @@ func New(sm *subnet.SubnetManager, config json.RawMessage) backend.Backend {
 	return &be
 }
 
-func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (ip.IP4Net, int, error) {
+func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*backend.SubnetDef, error) {
 	// Parse our configuration
 	if len(m.rawCfg) > 0 {
 		if err := json.Unmarshal(m.rawCfg, &m.cfg); err != nil {
-			return ip.IP4Net{}, 0, fmt.Errorf("Error decoding UDP backend config: %v", err)
+			return nil, fmt.Errorf("error decoding UDP backend config: %v", err)
 		}
 	}
 
@@ -65,9 +65,9 @@ func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (i
 	sn, err := m.sm.AcquireLease(attrs.PublicIP, &attrs, m.stop)
 	if err != nil {
 		if err == task.ErrCanceled {
-			return ip.IP4Net{}, 0, err
+			return nil, err
 		} else {
-			return ip.IP4Net{}, 0, fmt.Errorf("Failed to acquire lease: %s", err)
+			return nil, fmt.Errorf("failed to acquire lease: %v", err)
 		}
 	}
 
@@ -82,21 +82,24 @@ func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (i
 	m.mtu = extIface.MTU - encapOverhead
 
 	if err = m.initTun(ipMasq); err != nil {
-		return ip.IP4Net{}, 0, err
+		return nil, err
 	}
 
 	m.conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: m.cfg.Port})
 	if err != nil {
-		return ip.IP4Net{}, 0, fmt.Errorf("Failed to start listening on UDP socket: %s", err)
+		return nil, fmt.Errorf("failed to start listening on UDP socket: %v", err)
 	}
 
 
 	m.ctl, m.ctl2, err = newCtlSockets()
 	if err != nil {
-		return ip.IP4Net{}, 0, fmt.Errorf("Failed to create control socket: %s", err)
+		return nil, fmt.Errorf("failed to create control socket: %v", err)
 	}
 
-	return sn, m.mtu, nil
+	return &backend.SubnetDef{
+		Net: sn,
+		MTU: m.mtu,
+	}, nil
 }
 
 func (m *UdpBackend) Run() {
@@ -147,8 +150,7 @@ func (m *UdpBackend) initTun(ipMasq bool) error {
 
 	m.tun, tunName, err = ip.OpenTun("rudder%d")
 	if err != nil {
-		log.Error("Failed to open TUN device: ", err)
-		return err
+		return fmt.Errorf("Failed to open TUN device: %v", err)
 	}
 
 	err = configureIface(tunName, m.tunNet, m.mtu)
@@ -169,35 +171,30 @@ func (m *UdpBackend) initTun(ipMasq bool) error {
 func configureIface(ifname string, ipn ip.IP4Net, mtu int) error {
 	iface, err := net.InterfaceByName(ifname)
 	if err != nil {
-		log.Error("Failed to lookup interface ", ifname)
-		return err
+		return fmt.Errorf("failed to lookup interface %v", ifname)
 	}
 
 	n := ipn.ToIPNet()
 	err = netlink.NetworkLinkAddIp(iface, n.IP, n)
 	if err != nil {
-		log.Errorf("Failed to add IP address %s to %s: %s", n.IP, ifname, err)
-		return err
+		return fmt.Errorf("failed to add IP address %v to %v: %v", n.IP, ifname, err)
 	}
 
 	err = netlink.NetworkSetMTU(iface, mtu)
 	if err != nil {
-		log.Errorf("Failed to set MTU for %s: ", ifname, err)
-		return err
+		return fmt.Errorf("failed to set MTU for %v: %v", ifname, err)
 	}
 
 	err = netlink.NetworkLinkUp(iface)
 	if err != nil {
-		log.Errorf("Failed to set interface %s to UP state: %s", ifname, err)
-		return err
+		return fmt.Errorf("failed to set interface %v to UP state: %v", ifname, err)
 	}
 
 	// explicitly add a route since there might be a route for a subnet already
 	// installed by Docker and then it won't get auto added
 	err = netlink.AddRoute(ipn.Network().String(), "", "", ifname)
 	if err != nil && err != syscall.EEXIST {
-		log.Errorf("Failed to add route (%s -> %s): ", ipn.Network().String(), ifname, err)
-		return err
+		return fmt.Errorf("Failed to add route (%v -> %v): %v", ipn.Network().String(), ifname, err)
 	}
 
 	return nil
@@ -206,14 +203,12 @@ func configureIface(ifname string, ipn ip.IP4Net, mtu int) error {
 func setupIpMasq(ipn ip.IP4Net, iface string) error {
 	ipt, err := ip.NewIPTables()
 	if err != nil {
-		log.Error("Failed to setup IP Masquerade. iptables was not found")
-		return err
+		return fmt.Errorf("failed to setup IP Masquerade. iptables was not found")
 	}
 
 	err = ipt.ClearChain("nat", "RUDDER")
 	if err != nil {
-		log.Error("Failed to create/clear RUDDER chain in NAT table: ", err)
-		return err
+		return fmt.Errorf("Failed to create/clear RUDDER chain in NAT table: %v", err)
 	}
 
 	rules := [][]string{
@@ -232,13 +227,13 @@ func setupIpMasq(ipn ip.IP4Net, iface string) error {
 
 		err = ipt.AppendUnique("nat", args...)
 		if err != nil {
-			log.Error("Failed to insert IP masquerade rule: ", err)
-			return err
+			return fmt.Errorf("Failed to insert IP masquerade rule: %v", err)
 		}
 	}
 
 	return nil
 }
+
 func (m *UdpBackend) monitorEvents() {
 	log.Info("Watching for new subnet leases")
 
@@ -253,31 +248,35 @@ func (m *UdpBackend) monitorEvents() {
 	for {
 		select {
 		case evtBatch := <-evts:
-			for _, evt := range evtBatch {
-				switch evt.Type {
-				case subnet.SubnetAdded:
-					log.Info("Subnet added: ", evt.Lease.Network)
+			m.processSubnetEvents(evtBatch)
 
-					var attrs subnet.BaseAttrs
-					if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
-						log.Error("Error decoding subnet lease JSON: ", err)
-						continue
-					}
+		case <-m.stop:
+			return
+		}
+	}
+}
 
-					setRoute(m.ctl, evt.Lease.Network, attrs.PublicIP, m.cfg.Port)
+func (m *UdpBackend) processSubnetEvents(batch subnet.EventBatch) {
+	for _, evt := range batch {
+		switch evt.Type {
+		case subnet.SubnetAdded:
+			log.Info("Subnet added: ", evt.Lease.Network)
 
-				case subnet.SubnetRemoved:
-					log.Info("Subnet removed: ", evt.Lease.Network)
+			var attrs subnet.BaseAttrs
+			if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
+				log.Error("Error decoding subnet lease JSON: ", err)
+				continue
+			}
 
-					removeRoute(m.ctl, evt.Lease.Network)
+			setRoute(m.ctl, evt.Lease.Network, attrs.PublicIP, m.cfg.Port)
 
-				default:
-					log.Error("Internal error: unknown event type: ", int(evt.Type))
-				}
-			}
+		case subnet.SubnetRemoved:
+			log.Info("Subnet removed: ", evt.Lease.Network)
 
-		case <-m.stop:
-			return
+			removeRoute(m.ctl, evt.Lease.Network)
+
+		default:
+			log.Error("Internal error: unknown event type: ", int(evt.Type))
 		}
 	}
 }

+ 12 - 7
main.go

@@ -45,10 +45,10 @@ func init() {
 	flag.BoolVar(&opts.version, "version", false, "print version and exit")
 }
 
-func writeSubnetFile(sn ip.IP4Net, mtu int) error {
+func writeSubnetFile(sn *backend.SubnetDef) error {
 	// Write out the first usable IP by incrementing
 	// sn.IP by one
-	sn.IP += 1
+	sn.Net.IP += 1
 
 	dir, _ := path.Split(opts.subnetFile)
 	os.MkdirAll(dir, 0755)
@@ -59,8 +59,12 @@ func writeSubnetFile(sn ip.IP4Net, mtu int) error {
 	}
 	defer f.Close()
 
-	fmt.Fprintf(f, "RUDDER_SUBNET=%s\n", sn)
-	fmt.Fprintf(f, "RUDDER_MTU=%d\n", mtu)
+	if _, err = fmt.Fprintf(f, "RUDDER_SUBNET=%s\n", sn.Net); err != nil {
+		return err
+	}
+	if _, err = fmt.Fprintf(f, "RUDDER_MTU=%d\n", sn.MTU); err != nil {
+		return err
+	}
 	return nil
 }
 
@@ -159,12 +163,13 @@ func run(be backend.Backend, exit chan int) {
 
 	log.Infof("Using %s as external interface", ipaddr)
 
-	sn, mtu, err := be.Init(iface, ipaddr, opts.ipMasq)
+	sn, err := be.Init(iface, ipaddr, opts.ipMasq)
 	if err != nil {
+		log.Error("Could not init %v backend: %v", be.Name(), err)
 		return
 	}
 
-	writeSubnetFile(sn, mtu)
+	writeSubnetFile(sn)
 	daemon.SdNotify("READY=1")
 
 	log.Infof("%s mode initialized", be.Name())
@@ -198,7 +203,7 @@ func main() {
 
 	// Register for SIGINT and SIGTERM and wait for one of them to arrive
 	log.Info("Installing signal handlers")
-	sigs := make(chan os.Signal, 5)
+	sigs := make(chan os.Signal, 1)
 	signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
 
 	exit := make(chan int)

+ 11 - 5
subnet/registry.go

@@ -3,6 +3,7 @@ package subnet
 import (
 	"sync"
 	"time"
+	"path"
 
 	"github.com/coreos/rudder/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
 	log "github.com/coreos/rudder/Godeps/_workspace/src/github.com/golang/glog"
@@ -32,7 +33,8 @@ func newEtcdSubnetRegistry(endpoint, prefix string) subnetRegistry {
 }
 
 func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
-	resp, err := esr.client().Get(esr.prefix+"/config", false, false)
+	key := path.Join(esr.prefix, "config")
+	resp, err := esr.client().Get(key, false, false)
 	if err != nil {
 		return nil, err
 	}
@@ -40,11 +42,13 @@ func (esr *etcdSubnetRegistry) getConfig() (*etcd.Response, error) {
 }
 
 func (esr *etcdSubnetRegistry) getSubnets() (*etcd.Response, error) {
-	return esr.client().Get(esr.prefix+"/subnets", false, true)
+	key := path.Join(esr.prefix, "subnets")
+	return esr.client().Get(key, false, true)
 }
 
 func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
-	resp, err := esr.client().Create(esr.prefix+"/subnets/"+sn, data, ttl)
+	key := path.Join(esr.prefix, "subnets", sn)
+	resp, err := esr.client().Create(key, data, ttl)
 	if err != nil {
 		return nil, err
 	}
@@ -54,7 +58,8 @@ func (esr *etcdSubnetRegistry) createSubnet(sn, data string, ttl uint64) (*etcd.
 }
 
 func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.Response, error) {
-	resp, err := esr.client().Set(esr.prefix+"/subnets/"+sn, data, ttl)
+	key := path.Join(esr.prefix, "subnets", sn)
+	resp, err := esr.client().Set(key, data, ttl)
 	if err != nil {
 		return nil, err
 	}
@@ -65,7 +70,8 @@ func (esr *etcdSubnetRegistry) updateSubnet(sn, data string, ttl uint64) (*etcd.
 
 func (esr *etcdSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd.Response, error) {
 	for {
-		resp, err := esr.client().RawWatch(esr.prefix+"/subnets", since, true, nil, stop)
+		key := path.Join(esr.prefix, "subnets")
+		resp, err := esr.client().RawWatch(key, since, true, nil, stop)
 
 		if err != nil {
 			if err == etcd.ErrWatchStoppedByUser {