Parcourir la source

Merge pull request #71 from eyakubovich/master

Add VXLAN backend
Eugene Yakubovich il y a 10 ans
Parent
commit
c2cad129d6
10 fichiers modifiés avec 742 ajouts et 89 suppressions
  1. 13 2
      README.md
  2. 2 2
      backend/alloc/alloc.go
  3. 1 2
      backend/udp/cproxy.go
  4. 3 9
      backend/udp/udp.go
  5. 287 0
      backend/vxlan/device.go
  6. 55 0
      backend/vxlan/routes.go
  7. 277 0
      backend/vxlan/vxlan.go
  8. 3 1
      main.go
  9. 95 68
      subnet/subnet.go
  10. 6 5
      subnet/subnet_test.go

+ 13 - 2
README.md

@@ -72,6 +72,10 @@ of available backends and the keys that can be put into the this dictionary are
 * alloc: only perform subnet allocation (no forwarding of data packets)
   * ```Type``` (string): ```alloc```
 
+* vxlan: use in-kernel VXLAN to encapsulate the packets.
+  * ```Type``` (string): ```vxlan```
+  * ```VNI```  (number): VXLAN Identifier (VNI) to be used. Defaults to 1
+
 ### Example configuration JSON
 
 The following configuration illustrates the use of most options.
@@ -90,8 +94,9 @@ The following configuration illustrates the use of most options.
 ```
 
 ### Firewalls
-flannel uses UDP port 8285 for sending encapsulated packets. Make sure that your firewall rules allow
-this traffic for all hosts participating in the overlay network.
+When using ```udp``` backend, flannel uses UDP port 8285 for sending encapsulated packets.
+When using ```vxlan``` backend, kernel uses UDP port 8472 for sending encapsulated packets.
+Make sure that your firewall rules allow this traffic for all hosts participating in the overlay network.
 
 ## Running
 
@@ -115,6 +120,12 @@ MTU that it supports.
 -v=0: log level for V logs. Set to 1 to see messages related to data path
 ```
 
+## Zero-downtime restarts
+When running in VXLAN mode, the kernel is providing the data path with flanneld acting as the control plane. As such, flanneld
+can be restarted (even to do an upgrade) without disturbing existing flows. However, this needs to be done in few seconds as ARP
+entries can start to timeout requiring the flanneld daemon to refresh them. Also, to avoid interruptions during restart, the configuration
+must not be changed (e.g. VNI, --iface value).
+
 ## Docker integration
 
 Docker daemon accepts ```--bip``` argument to configure the subnet of the docker0 bridge. It also accepts ```--mtu``` to set the MTU

+ 2 - 2
backend/alloc/alloc.go

@@ -24,11 +24,11 @@ func New(sm *subnet.SubnetManager) backend.Backend {
 }
 
 func (m *AllocBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*backend.SubnetDef, error) {
-	attrs := subnet.BaseAttrs{
+	attrs := subnet.LeaseAttrs{
 		PublicIP: ip.FromIP(extIP),
 	}
 
-	sn, err := m.sm.AcquireLease(ip.FromIP(extIP), &attrs, m.stop)
+	sn, err := m.sm.AcquireLease(&attrs, m.stop)
 	if err != nil {
 		if err == task.ErrCanceled {
 			return nil, err

+ 1 - 2
backend/udp/cproxy.go

@@ -72,9 +72,8 @@ func removeRoute(ctl *os.File, dst ip.IP4Net) {
 
 func stopProxy(ctl *os.File) {
 	cmd := C.command{
-		cmd:          C.CMD_STOP,
+		cmd: C.CMD_STOP,
 	}
 
 	writeCommand(ctl, &cmd)
 }
-

+ 3 - 9
backend/udp/udp.go

@@ -58,11 +58,11 @@ func (m *UdpBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*
 	}
 
 	// Acquire the lease form subnet manager
-	attrs := subnet.BaseAttrs{
+	attrs := subnet.LeaseAttrs{
 		PublicIP: ip.FromIP(extIP),
 	}
 
-	sn, err := m.sm.AcquireLease(attrs.PublicIP, &attrs, m.stop)
+	sn, err := m.sm.AcquireLease(&attrs, m.stop)
 	if err != nil {
 		if err == task.ErrCanceled {
 			return nil, err
@@ -263,13 +263,7 @@ func (m *UdpBackend) processSubnetEvents(batch subnet.EventBatch) {
 		case subnet.SubnetAdded:
 			log.Info("Subnet added: ", evt.Lease.Network)
 
-			var attrs subnet.BaseAttrs
-			if err := json.Unmarshal([]byte(evt.Lease.Data), &attrs); err != nil {
-				log.Error("Error decoding subnet lease JSON: ", err)
-				continue
-			}
-
-			setRoute(m.ctl, evt.Lease.Network, attrs.PublicIP, m.cfg.Port)
+			setRoute(m.ctl, evt.Lease.Network, evt.Lease.Attrs.PublicIP, m.cfg.Port)
 
 		case subnet.SubnetRemoved:
 			log.Info("Subnet removed: ", evt.Lease.Network)

+ 287 - 0
backend/vxlan/device.go

@@ -0,0 +1,287 @@
+package vxlan
+
+import (
+	"fmt"
+	"net"
+	"os"
+	"syscall"
+	"time"
+
+	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
+	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink"
+	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink/nl"
+
+	"github.com/coreos/flannel/pkg/ip"
+)
+
+type vxlanDeviceAttrs struct {
+	vni       uint32
+	name      string
+	vtepIndex int
+	vtepAddr  net.IP
+	vtepPort  int
+}
+
+type vxlanDevice struct {
+	link *netlink.Vxlan
+}
+
+func sysctlSet(path, value string) error {
+	f, err := os.Create(path)
+	if err != nil {
+		return err
+	}
+	defer f.Close()
+
+	_, err = f.Write([]byte(value))
+	return err
+}
+
+func newVXLANDevice(devAttrs *vxlanDeviceAttrs) (*vxlanDevice, error) {
+	link := &netlink.Vxlan{
+		LinkAttrs: netlink.LinkAttrs{
+			Name: devAttrs.name,
+		},
+		VxlanId:      int(devAttrs.vni),
+		VtepDevIndex: devAttrs.vtepIndex,
+		SrcAddr:      devAttrs.vtepAddr,
+		Port:         devAttrs.vtepPort,
+		Learning:     true,
+		Proxy:        true,
+		L2miss:       true,
+	}
+
+	link, err := ensureLink(link)
+	if err != nil {
+		return nil, err
+	}
+
+	// this enables ARP requests being sent to userspace via netlink
+	sysctlPath := fmt.Sprintf("/proc/sys/net/ipv4/neigh/%s/app_solicit", devAttrs.name)
+	sysctlSet(sysctlPath, "3")
+
+	return &vxlanDevice{
+		link: link,
+	}, nil
+}
+
+func ensureLink(vxlan *netlink.Vxlan) (*netlink.Vxlan, error) {
+	err := netlink.LinkAdd(vxlan)
+	if err != nil {
+		if err == syscall.EEXIST {
+			// it's ok if the device already exists as long as config is similar
+			existing, err := netlink.LinkByName(vxlan.Name)
+			if err != nil {
+				return nil, err
+			}
+			if incompat := vxlanLinksIncompat(vxlan, existing); incompat != "" {
+				log.Warningf("%q already exists with incompatable configuration: %v; recreating device", vxlan.Name, incompat)
+
+				// delete existing
+				if err = netlink.LinkDel(existing); err != nil {
+					return nil, fmt.Errorf("failed to delete interface: %v", err)
+				}
+
+				// create new
+				if err = netlink.LinkAdd(vxlan); err != nil {
+					return nil, fmt.Errorf("failed to create vxlan interface: %v", err)
+				}
+			} else {
+				return existing.(*netlink.Vxlan), nil
+			}
+		} else {
+			return nil, err
+		}
+	}
+
+	ifindex := vxlan.Index
+	link, err := netlink.LinkByIndex(vxlan.Index)
+	if err != nil {
+		return nil, fmt.Errorf("can't locate created vxlan device with index %v", ifindex)
+	}
+	var ok bool
+	if vxlan, ok = link.(*netlink.Vxlan); !ok {
+		return nil, fmt.Errorf("created vxlan device with index %v is not vxlan", ifindex)
+	}
+
+	return vxlan, nil
+}
+
+func (dev *vxlanDevice) Configure(ipn ip.IP4Net) error {
+	setAddr4(dev.link, ipn.ToIPNet())
+
+	if err := netlink.LinkSetUp(dev.link); err != nil {
+		return fmt.Errorf("failed to set interface %s to UP state: %s", dev.link.Attrs().Name, err)
+	}
+
+	// explicitly add a route since there might be a route for a subnet already
+	// installed by Docker and then it won't get auto added
+	route := netlink.Route{
+		LinkIndex: dev.link.Attrs().Index,
+		Scope:     netlink.SCOPE_UNIVERSE,
+		Dst:       ipn.Network().ToIPNet(),
+	}
+	if err := netlink.RouteAdd(&route); err != nil && err != syscall.EEXIST {
+		return fmt.Errorf("failed to add route (%s -> %s): %v", ipn.Network().String(), dev.link.Attrs().Name, err)
+	}
+
+	return nil
+}
+
+func (dev *vxlanDevice) Destroy() {
+	netlink.LinkDel(dev.link)
+}
+
+func (dev *vxlanDevice) MACAddr() net.HardwareAddr {
+	return dev.link.HardwareAddr
+}
+
+func (dev *vxlanDevice) MTU() int {
+	return dev.link.MTU
+}
+
+func (dev *vxlanDevice) MonitorMisses(misses chan *netlink.Neigh) {
+	nlsock, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH)
+	if err != nil {
+		log.Error("Failed to subscribe to netlink RTNLGRP_NEIGH messages")
+		return
+	}
+
+	for {
+		msgs, err := nlsock.Recieve()
+		if err != nil {
+			log.Errorf("Failed to receive from netlink: %v ", err)
+
+			// wait 1 sec before retrying but honor the cancel channel
+			time.Sleep(1*time.Second)
+			continue
+		}
+
+		for _, msg := range msgs {
+			dev.processNeighMsg(msg, misses)
+		}
+	}
+}
+
+func isNeighResolving(state int) bool {
+	return (state & (netlink.NUD_INCOMPLETE | netlink.NUD_STALE | netlink.NUD_DELAY | netlink.NUD_PROBE)) != 0
+}
+
+func (dev *vxlanDevice) processNeighMsg(msg syscall.NetlinkMessage, misses chan *netlink.Neigh) {
+	neigh, err := netlink.NeighDeserialize(msg.Data)
+	if err != nil {
+		log.Error("Failed to deserialize netlink ndmsg: %v", err)
+		return
+	}
+
+	if int(neigh.LinkIndex) != dev.link.Index {
+		return
+	}
+
+	if msg.Header.Type != syscall.RTM_GETNEIGH && msg.Header.Type != syscall.RTM_NEWNEIGH {
+		return
+	}
+
+	if !isNeighResolving(neigh.State) {
+		// misses come with NUD_STALE bit set
+		return
+	}
+
+	misses <- neigh
+}
+
+func (dev *vxlanDevice) AddL2(mac net.HardwareAddr, vtep net.IP) error {
+	neigh := netlink.Neigh{
+		LinkIndex:    dev.link.Index,
+		State:        netlink.NUD_REACHABLE,
+		Family:       syscall.AF_BRIDGE,
+		Flags:        netlink.NTF_SELF,
+		IP:           vtep,
+		HardwareAddr: mac,
+	}
+
+	log.Infof("calling NeighAdd: %v, %v", vtep, mac)
+	return netlink.NeighAdd(&neigh)
+}
+
+func (dev *vxlanDevice) DelL2(mac net.HardwareAddr, vtep net.IP) error {
+	neigh := netlink.Neigh{
+		LinkIndex:    dev.link.Index,
+		Family:       syscall.AF_BRIDGE,
+		Flags:        netlink.NTF_SELF,
+		IP:           vtep,
+		HardwareAddr: mac,
+	}
+
+	log.Infof("calling NeighDel: %v, %v", vtep, mac)
+	return netlink.NeighDel(&neigh)
+}
+
+func (dev *vxlanDevice) AddL3(ip net.IP, mac net.HardwareAddr) error {
+	neigh := netlink.Neigh{
+		LinkIndex:    dev.link.Index,
+		State:        netlink.NUD_REACHABLE,
+		Type:         syscall.RTN_UNICAST,
+		IP:           ip,
+		HardwareAddr: mac,
+	}
+
+	log.Infof("calling NeighSet: %v, %v", ip, mac)
+	return netlink.NeighSet(&neigh)
+}
+
+func vxlanLinksIncompat(l1, l2 netlink.Link) string {
+	if l1.Type() != l2.Type() {
+		return fmt.Sprintf("link type: %v vs %v", l1.Type(), l2.Type())
+	}
+
+	v1 := l1.(*netlink.Vxlan)
+	v2 := l2.(*netlink.Vxlan)
+
+	if v1.VxlanId != v2.VxlanId {
+		return fmt.Sprintf("vni: %v vs %v", v1.VxlanId, v2.VxlanId)
+	}
+
+	if v1.VtepDevIndex > 0 && v2.VtepDevIndex > 0 && v1.VtepDevIndex != v2.VtepDevIndex {
+		return fmt.Sprintf("vtep (external) interface: %v vs %v", v1.VtepDevIndex, v2.VtepDevIndex)
+	}
+
+	if len(v1.SrcAddr) > 0 && len(v2.SrcAddr) > 0 && !v1.SrcAddr.Equal(v2.SrcAddr) {
+		return fmt.Sprintf("vtep (external) IP: %v vs %v", v1.SrcAddr, v2.SrcAddr)
+	}
+
+	if len(v1.Group) > 0 && len(v2.Group) > 0 && !v1.Group.Equal(v2.Group) {
+		return fmt.Sprintf("group address: %v vs %v", v1.Group, v2.Group)
+	}
+
+	if v1.L2miss != v2.L2miss {
+		return fmt.Sprintf("l2miss: %v vs %v", v1.L2miss, v2.L2miss)
+	}
+
+	if v1.Port > 0 && v2.Port > 0 && v1.Port != v2.Port {
+		return fmt.Sprintf("port: %v vs %v", v1.Port, v2.Port)
+	}
+
+	return ""
+}
+
+// sets IP4 addr on link removing any existing ones first
+func setAddr4(link *netlink.Vxlan, ipn *net.IPNet) error {
+	addrs, err := netlink.AddrList(link, syscall.AF_INET)
+	if err != nil {
+		return err
+	}
+
+	for _, addr := range addrs {
+		if err = netlink.AddrDel(link, &addr); err != nil {
+			return fmt.Errorf("failed to delete IPv4 addr %s from %s", addr.String(), link.Attrs().Name)
+		}
+	}
+
+	addr := netlink.Addr{ ipn, "" }
+	if err = netlink.AddrAdd(link, &addr); err != nil {
+		return fmt.Errorf("failed to add IP address %s to %s: %s", ipn.String(), link.Attrs().Name, err)
+	}
+
+	return nil
+}

+ 55 - 0
backend/vxlan/routes.go

@@ -0,0 +1,55 @@
+package vxlan
+
+import (
+	"bytes"
+	"net"
+
+	"github.com/coreos/flannel/pkg/ip"
+)
+
+type route struct {
+	network ip.IP4Net
+	vtepIP  net.IP
+	vtepMAC net.HardwareAddr
+}
+
+type routes []route
+
+func (rts *routes) set(nw ip.IP4Net, vtepIP net.IP, vtepMAC net.HardwareAddr) {
+	for i, rt := range *rts {
+		if rt.network.Equal(nw) {
+			(*rts)[i].vtepIP = vtepIP
+			(*rts)[i].vtepMAC = vtepMAC
+			return
+		}
+	}
+	*rts = append(*rts, route{nw, vtepIP, vtepMAC})
+}
+
+func (rts *routes) remove(nw ip.IP4Net) {
+	for i, rt := range *rts {
+		if rt.network.Equal(nw) {
+			(*rts)[i] = (*rts)[len(*rts)-1]
+			(*rts) = (*rts)[0 : len(*rts)-1]
+			return
+		}
+	}
+}
+
+func (rts routes) findByNetwork(ipAddr ip.IP4) *route {
+	for i, rt := range rts {
+		if rt.network.Contains(ipAddr) {
+			return &rts[i]
+		}
+	}
+	return nil
+}
+
+func (rts routes) findByVtepMAC(mac net.HardwareAddr) *route {
+	for i, rt := range rts {
+		if bytes.Equal(rt.vtepMAC, mac) {
+			return &rts[i]
+		}
+	}
+	return nil
+}

+ 277 - 0
backend/vxlan/vxlan.go

@@ -0,0 +1,277 @@
+package vxlan
+
+import (
+	"encoding/json"
+	"fmt"
+	"net"
+	"sync"
+
+	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
+	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink"
+
+	"github.com/coreos/flannel/backend"
+	"github.com/coreos/flannel/subnet"
+	"github.com/coreos/flannel/pkg/ip"
+	"github.com/coreos/flannel/pkg/task"
+)
+
+const (
+	defaultVNI = 1
+)
+
+type VXLANBackend struct {
+	sm     *subnet.SubnetManager
+	rawCfg json.RawMessage
+	cfg    struct {
+		Vni  int
+		Port int
+	}
+	dev    *vxlanDevice
+	stop   chan bool
+	wg     sync.WaitGroup
+	rts    routes
+}
+
+func New(sm *subnet.SubnetManager, config json.RawMessage) backend.Backend {
+	vb := &VXLANBackend{
+		sm:     sm,
+		rawCfg: config,
+		stop:   make(chan bool),
+	}
+	vb.cfg.Vni = defaultVNI
+
+	return vb
+}
+
+func newSubnetAttrs(pubIP net.IP, mac net.HardwareAddr) (*subnet.LeaseAttrs, error) {
+	sa := subnet.LeaseAttrs{
+		PublicIP:    ip.FromIP(pubIP),
+		BackendType: "vxlan",
+	}
+
+	data, err := json.Marshal(vxlanLeaseAttrs{hardwareAddr(mac)})
+	if err != nil {
+		return nil, err
+	}
+
+	err = sa.BackendData.UnmarshalJSON(data)
+	if err != nil {
+		return nil, err
+	}
+
+	return &sa, nil
+}
+
+func (vb *VXLANBackend) Init(extIface *net.Interface, extIP net.IP, ipMasq bool) (*backend.SubnetDef, error) {
+	// Parse our configuration
+	if len(vb.rawCfg) > 0 {
+		if err := json.Unmarshal(vb.rawCfg, &vb.cfg); err != nil {
+			return nil, fmt.Errorf("error decoding UDP backend config: %v", err)
+		}
+	}
+
+	devAttrs := vxlanDeviceAttrs{
+		vni:       uint32(vb.cfg.Vni),
+		name:      fmt.Sprintf("flannel.%v", vb.cfg.Vni),
+		vtepIndex: extIface.Index,
+		vtepAddr:  extIP,
+		vtepPort:  vb.cfg.Port,
+	}
+
+	var err error
+	vb.dev, err = newVXLANDevice(&devAttrs)
+	if err != nil {
+		return nil, err
+	}
+
+	sa, err := newSubnetAttrs(extIP, vb.dev.MACAddr())
+	if err != nil {
+		return nil, err
+	}
+
+	sn, err := vb.sm.AcquireLease(sa, vb.stop)
+	if err != nil {
+		if err == task.ErrCanceled {
+			return nil, err
+		} else {
+			return nil, fmt.Errorf("failed to acquire lease: %v", err)
+		}
+	}
+
+	// vxlan's subnet is that of the whole overlay network (e.g. /16)
+	// and not that of the individual host (e.g. /24)
+	vxlanNet := ip.IP4Net{
+		IP:        sn.IP,
+		PrefixLen: vb.sm.GetConfig().Network.PrefixLen,
+	}
+	if err = vb.dev.Configure(vxlanNet); err != nil {
+		return nil, err
+	}
+
+	return &backend.SubnetDef{sn, vb.dev.MTU()}, nil
+}
+
+func (vb *VXLANBackend) Run() {
+	vb.wg.Add(1)
+	go func() {
+		vb.sm.LeaseRenewer(vb.stop)
+		vb.wg.Done()
+	}()
+
+	log.Info("Watching for L2/L3 misses")
+	misses := make(chan *netlink.Neigh, 100)
+	// Unfortunately MonitorMisses does not take a cancel channel
+	// as there's no wait to interrupt netlink socket recv
+	go vb.dev.MonitorMisses(misses)
+
+	log.Info("Watching for new subnet leases")
+	evts := make(chan subnet.EventBatch)
+	vb.wg.Add(1)
+	go func() {
+		vb.sm.WatchLeases(evts, vb.stop)
+		vb.wg.Done()
+	}()
+
+	defer vb.wg.Wait()
+
+	for {
+		select {
+		case miss := <-misses:
+			vb.handleMiss(miss)
+
+		case evtBatch := <-evts:
+			vb.handleSubnetEvents(evtBatch)
+
+		case <-vb.stop:
+			return
+		}
+	}
+}
+
+func (vb *VXLANBackend) Stop() {
+	close(vb.stop)
+}
+
+func (vb *VXLANBackend) Name() string {
+	return "VXLAN"
+}
+
+// So we can make it JSON (un)marshalable
+type hardwareAddr net.HardwareAddr
+
+func (hw hardwareAddr) MarshalJSON() ([]byte, error) {
+	return []byte(fmt.Sprintf("%q", net.HardwareAddr(hw))), nil
+}
+
+func (hw *hardwareAddr) UnmarshalJSON(b []byte) error {
+	if len(b) < 2 || b[0] != '"' || b[len(b)-1] != '"' {
+		return fmt.Errorf("error parsing hardware addr")
+	}
+
+	b = b[1:len(b)-1]
+
+	mac, err := net.ParseMAC(string(b))
+	if err != nil {
+		return err
+	}
+
+	*hw = hardwareAddr(mac)
+	return nil
+}
+
+type vxlanLeaseAttrs struct {
+	VtepMAC hardwareAddr
+}
+
+func (vb *VXLANBackend) handleSubnetEvents(batch subnet.EventBatch) {
+	for _, evt := range batch {
+		switch evt.Type {
+		case subnet.SubnetAdded:
+			log.Info("Subnet added: ", evt.Lease.Network)
+
+			if evt.Lease.Attrs.BackendType != "vxlan" {
+				log.Warningf("Ignoring non-vxlan subnet: type=%v", evt.Lease.Attrs.BackendType)
+				continue
+			}
+
+			var attrs vxlanLeaseAttrs
+			if err := json.Unmarshal(evt.Lease.Attrs.BackendData, &attrs); err != nil {
+				log.Error("Error decoding subnet lease JSON: ", err)
+				continue
+			}
+
+			vb.rts.set(evt.Lease.Network, evt.Lease.Attrs.PublicIP.ToIP(), net.HardwareAddr(attrs.VtepMAC))
+
+		case subnet.SubnetRemoved:
+			log.Info("Subnet removed: ", evt.Lease.Network)
+
+			vb.rts.remove(evt.Lease.Network)
+
+			if evt.Lease.Attrs.BackendType != "vxlan" {
+				log.Warningf("Ignoring non-vxlan subnet: type=%v", evt.Lease.Attrs.BackendType)
+				continue
+			}
+
+			var attrs vxlanLeaseAttrs
+			if err := json.Unmarshal(evt.Lease.Attrs.BackendData, &attrs); err != nil {
+				log.Error("Error decoding subnet lease JSON: ", err)
+				continue
+			}
+
+			if len(attrs.VtepMAC) > 0 {
+				vb.dev.DelL2(net.HardwareAddr(attrs.VtepMAC), evt.Lease.Attrs.PublicIP.ToIP())
+			}
+
+		default:
+			log.Error("Internal error: unknown event type: ", int(evt.Type))
+		}
+	}
+}
+
+func (vb *VXLANBackend) handleMiss(miss *netlink.Neigh) {
+	switch {
+	case len(miss.IP) == 0 && len(miss.HardwareAddr) == 0:
+		log.Info("Ignoring nil miss")
+
+	case len(miss.IP) == 0:
+		vb.handleL2Miss(miss)
+
+	case len(miss.HardwareAddr) == 0:
+		vb.handleL3Miss(miss)
+
+	default:
+		log.Infof("Ignoring not a miss: %v, %v", miss.HardwareAddr, miss.IP)
+	}
+}
+
+func (vb *VXLANBackend) handleL2Miss(miss *netlink.Neigh) {
+	log.Infof("L2 miss: %v", miss.HardwareAddr)
+
+	rt := vb.rts.findByVtepMAC(miss.HardwareAddr)
+	if rt == nil {
+		log.Infof("Route for %v not found", miss.HardwareAddr)
+		return
+	}
+
+	if err := vb.dev.AddL2(miss.HardwareAddr, rt.vtepIP); err != nil {
+		log.Errorf("AddL2 failed: %v", err)
+	} else {
+		log.Info("AddL2 succeeded")
+	}
+}
+
+func (vb *VXLANBackend) handleL3Miss(miss *netlink.Neigh) {
+	log.Infof("L3 miss: %v", miss.IP)
+
+	rt := vb.rts.findByNetwork(ip.FromIP(miss.IP))
+	if rt == nil {
+		log.Infof("Route for %v not found", miss.IP)
+		return
+	}
+
+	if err := vb.dev.AddL3(miss.IP, rt.vtepMAC); err != nil {
+		log.Errorf("AddL3 failed: %v", err)
+	} else {
+		log.Info("AddL3 succeeded")
+	}
+}

+ 3 - 1
main.go

@@ -21,6 +21,7 @@ import (
 	"github.com/coreos/flannel/subnet"
 	"github.com/coreos/flannel/backend/alloc"
 	"github.com/coreos/flannel/backend/udp"
+	"github.com/coreos/flannel/backend/vxlan"
 )
 
 type CmdLineOpts struct {
@@ -159,6 +160,8 @@ func newBackend() (backend.Backend, error) {
 		return udp.New(sm, config.Backend), nil
 	case "alloc":
 		return alloc.New(sm), nil
+	case "vxlan":
+		return vxlan.New(sm, config.Backend), nil
 	default:
 		return nil, fmt.Errorf("'%v': unknown backend type", bt.Type)
 	}
@@ -189,7 +192,6 @@ func run(be backend.Backend, exit chan int) {
 
 	sn, err := be.Init(iface, ipaddr, opts.ipMasq)
 	if err != nil {
-		log.Errorf("Could not init %v backend: %v", be.Name(), err)
 		return
 	}
 

+ 95 - 68
subnet/subnet.go

@@ -38,9 +38,15 @@ var (
 	subnetRegex *regexp.Regexp = regexp.MustCompile(`(\d+\.\d+.\d+.\d+)-(\d+)`)
 )
 
+type LeaseAttrs struct {
+	PublicIP    ip.IP4
+	BackendType string          `json:",omitempty"`
+	BackendData json.RawMessage `json:",omitempty"`
+}
+
 type SubnetLease struct {
 	Network ip.IP4Net
-	Data    string
+	Attrs   LeaseAttrs
 }
 
 type SubnetManager struct {
@@ -66,15 +72,9 @@ func NewSubnetManager(etcdEndpoint []string, prefix string) (*SubnetManager, err
 	return newSubnetManager(esr)
 }
 
-func (sm *SubnetManager) AcquireLease(extIP ip.IP4, data interface{}, cancel chan bool) (ip.IP4Net, error) {
-	dataBytes, err := json.Marshal(data)
-	if err != nil {
-		return ip.IP4Net{}, err
-	}
-
-	var sn ip.IP4Net
+func (sm *SubnetManager) AcquireLease(attrs *LeaseAttrs, cancel chan bool) (ip.IP4Net, error) {
 	for {
-		sn, err = sm.acquireLeaseOnce(extIP, string(dataBytes), cancel)
+		sn, err := sm.acquireLeaseOnce(attrs, cancel)
 		switch {
 		case err == nil:
 			log.Info("Subnet lease acquired: ", sn)
@@ -96,53 +96,71 @@ func (sm *SubnetManager) AcquireLease(extIP ip.IP4, data interface{}, cancel cha
 	}
 }
 
-func (sm *SubnetManager) acquireLeaseOnce(extIP ip.IP4, data string, cancel chan bool) (ip.IP4Net, error) {
-	for i := 0; i < registerRetries; i++ {
-		var err error
-		sm.leases, err = sm.getLeases()
-		if err != nil {
-			return ip.IP4Net{}, err
+func findLeaseByIP(leases []SubnetLease, pubIP ip.IP4) *SubnetLease {
+	for _, l := range leases {
+		if pubIP == l.Attrs.PublicIP {
+			return &l
 		}
+	}
 
-		// try to reuse a subnet if there's one that matches our IP
-		for _, l := range sm.leases {
-			var ba BaseAttrs
-			err = json.Unmarshal([]byte(l.Data), &ba)
-			if err != nil {
-				log.Error("Error parsing subnet lease JSON: ", err)
-			} else {
-				if extIP == ba.PublicIP {
-					resp, err := sm.registry.updateSubnet(l.Network.StringSep(".", "-"), data, subnetTTL)
-					if err != nil {
-						return ip.IP4Net{}, err
-					}
-
-					sm.myLease.Network = l.Network
-					sm.leaseExp = *resp.Node.Expiration
-					return l.Network, nil
-				}
-			}
-		}
+	return nil
+}
 
-		// no existing match, grab a new one
-		sn, err := sm.allocateSubnet()
+func (sm *SubnetManager) tryAcquireLease(extIP ip.IP4, attrs []byte) (ip.IP4Net, error) {
+	var err error
+	sm.leases, err = sm.getLeases()
+	if err != nil {
+		return ip.IP4Net{}, err
+	}
+
+	// try to reuse a subnet if there's one that matches our IP
+	if l := findLeaseByIP(sm.leases, extIP); l != nil {
+		resp, err := sm.registry.updateSubnet(l.Network.StringSep(".", "-"), string(attrs), subnetTTL)
 		if err != nil {
 			return ip.IP4Net{}, err
 		}
 
-		resp, err := sm.registry.createSubnet(sn.StringSep(".", "-"), data, subnetTTL)
-		switch {
-		case err == nil:
-			sm.myLease.Network = sn
-			sm.leaseExp = *resp.Node.Expiration
-			return sn, nil
+		sm.myLease.Network = l.Network
+		sm.leaseExp = *resp.Node.Expiration
+		return l.Network, nil
+	}
 
-		// if etcd returned Key Already Exists, try again.
-		case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists:
-			break
+	// no existing match, grab a new one
+	sn, err := sm.allocateSubnet()
+	if err != nil {
+		return ip.IP4Net{}, err
+	}
 
-		default:
+	resp, err := sm.registry.createSubnet(sn.StringSep(".", "-"), string(attrs), subnetTTL)
+	switch {
+	case err == nil:
+		sm.myLease.Network = sn
+		sm.leaseExp = *resp.Node.Expiration
+		return sn, nil
+
+	// if etcd returned Key Already Exists, try again.
+	case err.(*etcd.EtcdError).ErrorCode == etcdKeyAlreadyExists:
+		return ip.IP4Net{}, nil
+
+	default:
+		return ip.IP4Net{}, err
+	}
+}
+
+func (sm *SubnetManager) acquireLeaseOnce(attrs *LeaseAttrs, cancel chan bool) (ip.IP4Net, error) {
+	attrBytes, err := json.Marshal(attrs)
+	if err != nil {
+		log.Errorf("marshal failed: %#v, %v", attrs, err)
+		return ip.IP4Net{}, err
+	}
+
+	for i := 0; i < registerRetries; i++ {
+		sn, err := sm.tryAcquireLease(attrs.PublicIP, attrBytes)
+		switch {
+		case err != nil:
 			return ip.IP4Net{}, err
+		case sn.IP != 0:
+			return sn, nil
 		}
 
 		// before moving on, check for cancel
@@ -154,12 +172,6 @@ func (sm *SubnetManager) acquireLeaseOnce(extIP ip.IP4, data string, cancel chan
 	return ip.IP4Net{}, errors.New("Max retries reached trying to acquire a subnet")
 }
 
-func (sm *SubnetManager) UpdateSubnet(data string) error {
-	resp, err := sm.registry.updateSubnet(sm.myLease.Network.StringSep(".", "-"), data, subnetTTL)
-	sm.leaseExp = *resp.Node.Expiration
-	return err
-}
-
 func (sm *SubnetManager) GetConfig() *Config {
 	return sm.config
 }
@@ -205,8 +217,11 @@ func (sm *SubnetManager) getLeases() ([]SubnetLease, error) {
 		for _, node := range resp.Node.Nodes {
 			sn, err := parseSubnetKey(node.Key)
 			if err == nil {
-				lease := SubnetLease{sn, node.Value}
-				leases = append(leases, lease)
+				var attrs LeaseAttrs
+				if err = json.Unmarshal([]byte(node.Value), &attrs); err == nil {
+					lease := SubnetLease{sn, attrs}
+					leases = append(leases, lease)
+				}
 			}
 		}
 		sm.lastIndex = resp.EtcdIndex
@@ -261,39 +276,41 @@ func (sm *SubnetManager) applyLeases(newLeases []SubnetLease) EventBatch {
 	return batch
 }
 
-func (sm *SubnetManager) applySubnetChange(action string, ipn ip.IP4Net, data string) Event {
+func (sm *SubnetManager) applySubnetChange(action string, ipn ip.IP4Net, data string) (Event, error) {
 	switch action {
 	case "delete", "expire":
 		for i, l := range sm.leases {
 			if l.Network.Equal(ipn) {
 				deleteLease(sm.leases, i)
-				return Event{SubnetRemoved, l}
+				return Event{SubnetRemoved, l}, nil
 			}
 		}
 
 		log.Errorf("Removed subnet (%s) was not found", ipn)
 		return Event{
 			SubnetRemoved,
-			SubnetLease{ipn, ""},
-		}
+			SubnetLease{ipn, LeaseAttrs{}},
+		}, nil
 
 	default:
+		var attrs LeaseAttrs
+		err := json.Unmarshal([]byte(data), &attrs)
+		if err != nil {
+			return Event{}, err
+		}
+
 		for i, l := range sm.leases {
 			if l.Network.Equal(ipn) {
-				sm.leases[i] = SubnetLease{ipn, data}
-				return Event{SubnetAdded, sm.leases[i]}
+				sm.leases[i] = SubnetLease{ipn, attrs}
+				return Event{SubnetAdded, sm.leases[i]}, nil
 			}
 		}
 
-		sm.leases = append(sm.leases, SubnetLease{ipn, data})
-		return Event{SubnetAdded, sm.leases[len(sm.leases)-1]}
+		sm.leases = append(sm.leases, SubnetLease{ipn, attrs})
+		return Event{SubnetAdded, sm.leases[len(sm.leases)-1]}, nil
 	}
 }
 
-type BaseAttrs struct {
-	PublicIP ip.IP4
-}
-
 func (sm *SubnetManager) allocateSubnet() (ip.IP4Net, error) {
 	log.Infof("Picking subnet in range %s ... %s", sm.config.SubnetMin, sm.config.SubnetMax)
 
@@ -369,7 +386,10 @@ func (sm *SubnetManager) parseSubnetWatchResponse(resp *etcd.Response) (batch *E
 
 	// Don't process our own changes
 	if !sm.myLease.Network.Equal(sn) {
-		evt := sm.applySubnetChange(resp.Action, sn, resp.Node.Value)
+		evt, err := sm.applySubnetChange(resp.Action, sn, resp.Node.Value)
+		if err != nil {
+			return nil, err
+		}
 		batch = &EventBatch{evt}
 	}
 
@@ -404,7 +424,14 @@ func (sm *SubnetManager) LeaseRenewer(cancel chan bool) {
 	for {
 		select {
 		case <-time.After(dur):
-			resp, err := sm.registry.updateSubnet(sm.myLease.Network.StringSep(".", "-"), sm.myLease.Data, subnetTTL)
+			attrBytes, err := json.Marshal(sm.myLease.Attrs)
+			if err != nil {
+				log.Error("Error renewing lease (trying again in 1 min): ", err)
+				dur = time.Minute
+				continue
+			}
+
+			resp, err := sm.registry.updateSubnet(sm.myLease.Network.StringSep(".", "-"), string(attrBytes), subnetTTL)
 			if err != nil {
 				log.Error("Error renewing lease (trying again in 1 min): ", err)
 				dur = time.Minute

+ 6 - 5
subnet/subnet_test.go

@@ -112,6 +112,7 @@ func (msr *mockSubnetRegistry) watchSubnets(since uint64, stop chan bool) (*etcd
 	case sn = <-msr.addCh:
 		n := etcd.Node{
 			Key:           sn,
+			Value:         `{"PublicIP": "1.1.1.1"}`,
 			ModifiedIndex: msr.index,
 		}
 		msr.subnets.Nodes = append(msr.subnets.Nodes, &n)
@@ -152,12 +153,12 @@ func TestAcquireLease(t *testing.T) {
 	}
 
 	extIP, _ := ip.ParseIP4("1.2.3.4")
-	data := BaseAttrs{
+	attrs := LeaseAttrs{
 		PublicIP: extIP,
 	}
 
 	cancel := make(chan bool)
-	sn, err := sm.AcquireLease(extIP, data, cancel)
+	sn, err := sm.AcquireLease(&attrs, cancel)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
@@ -167,7 +168,7 @@ func TestAcquireLease(t *testing.T) {
 	}
 
 	// Acquire again, should reuse
-	if sn, err = sm.AcquireLease(extIP, data, cancel); err != nil {
+	if sn, err = sm.AcquireLease(&attrs, cancel); err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}
 
@@ -258,14 +259,14 @@ func TestRenewLease(t *testing.T) {
 	}
 
 	extIP, _ := ip.ParseIP4("1.2.3.4")
-	data := BaseAttrs{
+	attrs := LeaseAttrs{
 		PublicIP: extIP,
 	}
 
 	cancel := make(chan bool)
 	defer close(cancel)
 
-	sn, err := sm.AcquireLease(extIP, data, cancel)
+	sn, err := sm.AcquireLease(&attrs, cancel)
 	if err != nil {
 		t.Fatal("AcquireLease failed: ", err)
 	}