Преглед изворни кода

Merge pull request #207 from MohdAhmad/dynamicallyHandleL3Misses

Dynamically handles L3 misses with vxlan backend
Mohammad Ahmad пре 9 година
родитељ
комит
a55abbe578
3 измењених фајлова са 162 додато и 33 уклоњено
  1. 56 7
      backend/vxlan/device.go
  2. 57 0
      backend/vxlan/routes.go
  3. 49 26
      backend/vxlan/vxlan.go

+ 56 - 7
backend/vxlan/device.go

@@ -19,9 +19,11 @@ import (
 	"net"
 	"net"
 	"os"
 	"os"
 	"syscall"
 	"syscall"
+	"time"
 
 
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink"
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink"
+	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink/nl"
 
 
 	"github.com/coreos/flannel/pkg/ip"
 	"github.com/coreos/flannel/pkg/ip"
 )
 )
@@ -65,6 +67,9 @@ func newVXLANDevice(devAttrs *vxlanDeviceAttrs) (*vxlanDevice, error) {
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
+	// this enables ARP requests being sent to userspace via netlink
+	sysctlPath := fmt.Sprintf("/proc/sys/net/ipv4/neigh/%s/app_solicit", devAttrs.name)
+	sysctlSet(sysctlPath, "3")
 
 
 	return &vxlanDevice{
 	return &vxlanDevice{
 		link: link,
 		link: link,
@@ -155,11 +160,6 @@ func (dev *vxlanDevice) GetL2List() ([]netlink.Neigh, error) {
 	return netlink.NeighList(dev.link.Index, syscall.AF_BRIDGE)
 	return netlink.NeighList(dev.link.Index, syscall.AF_BRIDGE)
 }
 }
 
 
-func (dev *vxlanDevice) GetL3List() ([]netlink.Neigh, error) {
-	log.Infof("calling GetL3List() dev.link.Index: %d ", dev.link.Index)
-	return netlink.NeighList(dev.link.Index, syscall.AF_INET)
-}
-
 func (dev *vxlanDevice) AddL2(n neigh) error {
 func (dev *vxlanDevice) AddL2(n neigh) error {
 	log.Infof("calling NeighAdd: %v, %v", n.IP, n.MAC)
 	log.Infof("calling NeighAdd: %v, %v", n.IP, n.MAC)
 	return netlink.NeighAdd(&netlink.Neigh{
 	return netlink.NeighAdd(&netlink.Neigh{
@@ -187,7 +187,7 @@ func (dev *vxlanDevice) AddL3(n neigh) error {
 	log.Infof("calling NeighSet: %v, %v", n.IP, n.MAC)
 	log.Infof("calling NeighSet: %v, %v", n.IP, n.MAC)
 	return netlink.NeighSet(&netlink.Neigh{
 	return netlink.NeighSet(&netlink.Neigh{
 		LinkIndex:    dev.link.Index,
 		LinkIndex:    dev.link.Index,
-		State:        netlink.NUD_PERMANENT,
+		State:        netlink.NUD_REACHABLE,
 		Type:         syscall.RTN_UNICAST,
 		Type:         syscall.RTN_UNICAST,
 		IP:           n.IP.ToIP(),
 		IP:           n.IP.ToIP(),
 		HardwareAddr: n.MAC,
 		HardwareAddr: n.MAC,
@@ -198,13 +198,62 @@ func (dev *vxlanDevice) DelL3(n neigh) error {
 	log.Infof("calling NeighDel: %v, %v", n.IP, n.MAC)
 	log.Infof("calling NeighDel: %v, %v", n.IP, n.MAC)
 	return netlink.NeighDel(&netlink.Neigh{
 	return netlink.NeighDel(&netlink.Neigh{
 		LinkIndex:    dev.link.Index,
 		LinkIndex:    dev.link.Index,
-		State:        netlink.NUD_PERMANENT,
+		State:        netlink.NUD_REACHABLE,
 		Type:         syscall.RTN_UNICAST,
 		Type:         syscall.RTN_UNICAST,
 		IP:           n.IP.ToIP(),
 		IP:           n.IP.ToIP(),
 		HardwareAddr: n.MAC,
 		HardwareAddr: n.MAC,
 	})
 	})
 }
 }
 
 
+func (dev *vxlanDevice) MonitorMisses(misses chan *netlink.Neigh) {
+	nlsock, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH)
+	if err != nil {
+		log.Error("Failed to subscribe to netlink RTNLGRP_NEIGH messages")
+		return
+	}
+
+	for {
+		msgs, err := nlsock.Receive()
+		if err != nil {
+			log.Errorf("Failed to receive from netlink: %v ", err)
+
+			time.Sleep(1 * time.Second)
+			continue
+		}
+
+		for _, msg := range msgs {
+			dev.processNeighMsg(msg, misses)
+		}
+	}
+}
+
+func isNeighResolving(state int) bool {
+	return (state & (netlink.NUD_INCOMPLETE | netlink.NUD_STALE | netlink.NUD_DELAY | netlink.NUD_PROBE)) != 0
+}
+
+func (dev *vxlanDevice) processNeighMsg(msg syscall.NetlinkMessage, misses chan *netlink.Neigh) {
+	neigh, err := netlink.NeighDeserialize(msg.Data)
+	if err != nil {
+		log.Error("Failed to deserialize netlink ndmsg: %v", err)
+		return
+	}
+
+	if int(neigh.LinkIndex) != dev.link.Index {
+		return
+	}
+
+	if msg.Header.Type != syscall.RTM_GETNEIGH && msg.Header.Type != syscall.RTM_NEWNEIGH {
+		return
+	}
+
+	if !isNeighResolving(neigh.State) {
+		// misses come with NUD_STALE bit set
+		return
+	}
+
+	misses <- neigh
+}
+
 func vxlanLinksIncompat(l1, l2 netlink.Link) string {
 func vxlanLinksIncompat(l1, l2 netlink.Link) string {
 	if l1.Type() != l2.Type() {
 	if l1.Type() != l2.Type() {
 		return fmt.Sprintf("link type: %v vs %v", l1.Type(), l2.Type())
 		return fmt.Sprintf("link type: %v vs %v", l1.Type(), l2.Type())

+ 57 - 0
backend/vxlan/routes.go

@@ -0,0 +1,57 @@
+// Copyright 2015 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package vxlan
+
+import (
+	"net"
+
+	"github.com/coreos/flannel/pkg/ip"
+)
+
+type route struct {
+	network ip.IP4Net
+	vtepMAC net.HardwareAddr
+}
+
+type routes []route
+
+func (rts *routes) set(nw ip.IP4Net, vtepMAC net.HardwareAddr) {
+	for i, rt := range *rts {
+		if rt.network.Equal(nw) {
+			(*rts)[i].vtepMAC = vtepMAC
+			return
+		}
+	}
+	*rts = append(*rts, route{nw, vtepMAC})
+}
+
+func (rts *routes) remove(nw ip.IP4Net) {
+	for i, rt := range *rts {
+		if rt.network.Equal(nw) {
+			(*rts)[i] = (*rts)[len(*rts)-1]
+			(*rts) = (*rts)[0 : len(*rts)-1]
+			return
+		}
+	}
+}
+
+func (rts routes) findByNetwork(ipAddr ip.IP4) *route {
+	for i, rt := range rts {
+		if rt.network.Contains(ipAddr) {
+			return &rts[i]
+		}
+	}
+	return nil
+}

+ 49 - 26
backend/vxlan/vxlan.go

@@ -18,14 +18,17 @@ import (
 	"bytes"
 	"bytes"
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
+	"net"
+	"sync"
+	"time"
+
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
+	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
 	"github.com/coreos/flannel/Godeps/_workspace/src/golang.org/x/net/context"
+
 	"github.com/coreos/flannel/backend"
 	"github.com/coreos/flannel/backend"
 	"github.com/coreos/flannel/pkg/ip"
 	"github.com/coreos/flannel/pkg/ip"
 	"github.com/coreos/flannel/subnet"
 	"github.com/coreos/flannel/subnet"
-	"net"
-	"sync"
-	"time"
 )
 )
 
 
 const (
 const (
@@ -45,6 +48,7 @@ type VXLANBackend struct {
 	ctx    context.Context
 	ctx    context.Context
 	cancel context.CancelFunc
 	cancel context.CancelFunc
 	wg     sync.WaitGroup
 	wg     sync.WaitGroup
+	rts    routes
 }
 }
 
 
 func New(sm subnet.Manager, network string, config *subnet.Config) backend.Backend {
 func New(sm subnet.Manager, network string, config *subnet.Config) backend.Backend {
@@ -146,6 +150,12 @@ func (vb *VXLANBackend) Run() {
 		vb.wg.Done()
 		vb.wg.Done()
 	}()
 	}()
 
 
+	log.Info("Watching for L3 misses")
+	misses := make(chan *netlink.Neigh, 100)
+	// Unfrtunately MonitorMisses does not take a cancel channel
+	// as there's no wait to interrupt netlink socket recv
+	go vb.dev.MonitorMisses(misses)
+
 	log.Info("Watching for new subnet leases")
 	log.Info("Watching for new subnet leases")
 	evts := make(chan []subnet.Event)
 	evts := make(chan []subnet.Event)
 	vb.wg.Add(1)
 	vb.wg.Add(1)
@@ -168,6 +178,9 @@ func (vb *VXLANBackend) Run() {
 
 
 	for {
 	for {
 		select {
 		select {
+		case miss := <-misses:
+			vb.handleMiss(miss)
+
 		case evtBatch := <-evts:
 		case evtBatch := <-evts:
 			vb.handleSubnetEvents(evtBatch)
 			vb.handleSubnetEvents(evtBatch)
 
 
@@ -228,8 +241,8 @@ func (vb *VXLANBackend) handleSubnetEvents(batch []subnet.Event) {
 				log.Error("Error decoding subnet lease JSON: ", err)
 				log.Error("Error decoding subnet lease JSON: ", err)
 				continue
 				continue
 			}
 			}
+			vb.rts.set(evt.Lease.Subnet, net.HardwareAddr(attrs.VtepMAC))
 			vb.dev.AddL2(neigh{IP: evt.Lease.Attrs.PublicIP, MAC: net.HardwareAddr(attrs.VtepMAC)})
 			vb.dev.AddL2(neigh{IP: evt.Lease.Attrs.PublicIP, MAC: net.HardwareAddr(attrs.VtepMAC)})
-			vb.dev.AddL3(neigh{IP: evt.Lease.Subnet.IP, MAC: net.HardwareAddr(attrs.VtepMAC)})
 
 
 		case subnet.SubnetRemoved:
 		case subnet.SubnetRemoved:
 			log.Info("Subnet removed: ", evt.Lease.Subnet)
 			log.Info("Subnet removed: ", evt.Lease.Subnet)
@@ -247,8 +260,8 @@ func (vb *VXLANBackend) handleSubnetEvents(batch []subnet.Event) {
 
 
 			if len(attrs.VtepMAC) > 0 {
 			if len(attrs.VtepMAC) > 0 {
 				vb.dev.DelL2(neigh{IP: evt.Lease.Attrs.PublicIP, MAC: net.HardwareAddr(attrs.VtepMAC)})
 				vb.dev.DelL2(neigh{IP: evt.Lease.Attrs.PublicIP, MAC: net.HardwareAddr(attrs.VtepMAC)})
-				vb.dev.DelL3(neigh{IP: evt.Lease.Subnet.IP, MAC: net.HardwareAddr(attrs.VtepMAC)})
 			}
 			}
+			vb.rts.remove(evt.Lease.Subnet)
 
 
 		default:
 		default:
 			log.Error("Internal error: unknown event type: ", int(evt.Type))
 			log.Error("Internal error: unknown event type: ", int(evt.Type))
@@ -263,19 +276,10 @@ func (vb *VXLANBackend) handleInitialSubnetEvents(batch []subnet.Event) error {
 		return fmt.Errorf("Error fetching L2 table: %v", err)
 		return fmt.Errorf("Error fetching L2 table: %v", err)
 	}
 	}
 
 
-	l3Table, err := vb.dev.GetL3List()
-	if err != nil {
-		return fmt.Errorf("Error fetching L3 table: %v", err)
-	}
-
 	for _, fdbEntry := range fdbTable {
 	for _, fdbEntry := range fdbTable {
 		log.Infof("fdb already populated with: %s %s ", fdbEntry.IP, fdbEntry.HardwareAddr)
 		log.Infof("fdb already populated with: %s %s ", fdbEntry.IP, fdbEntry.HardwareAddr)
 	}
 	}
 
 
-	for _, l3Entry := range l3Table {
-		log.Infof("l3 table already populated with: %s %s", l3Entry.IP, l3Entry.HardwareAddr)
-	}
-
 	evtMarker := make([]bool, len(batch))
 	evtMarker := make([]bool, len(batch))
 	leaseAttrsList := make([]vxlanLeaseAttrs, len(batch))
 	leaseAttrsList := make([]vxlanLeaseAttrs, len(batch))
 	fdbEntryMarker := make([]bool, len(fdbTable))
 	fdbEntryMarker := make([]bool, len(fdbTable))
@@ -300,10 +304,7 @@ func (vb *VXLANBackend) handleInitialSubnetEvents(batch []subnet.Event) error {
 				break
 				break
 			}
 			}
 		}
 		}
-	}
-
-	for _, l3Entry := range l3Table {
-		vb.dev.DelL3(neigh{IP: ip.FromIP(l3Entry.IP), MAC: l3Entry.HardwareAddr})
+		vb.rts.set(evt.Lease.Subnet, net.HardwareAddr(leaseAttrsList[i].VtepMAC))
 	}
 	}
 
 
 	for j, marker := range fdbEntryMarker {
 	for j, marker := range fdbEntryMarker {
@@ -321,15 +322,37 @@ func (vb *VXLANBackend) handleInitialSubnetEvents(batch []subnet.Event) error {
 			if err != nil {
 			if err != nil {
 				log.Error("Add L2 failed: ", err)
 				log.Error("Add L2 failed: ", err)
 			}
 			}
-		}
-		err := vb.dev.AddL3(neigh{IP: batch[i].Lease.Subnet.IP, MAC: net.HardwareAddr(leaseAttrsList[i].VtepMAC)})
-		if err != nil {
-			log.Error("Add L3 failed: ", err)
-			err1 := vb.dev.DelL2(neigh{IP: batch[i].Lease.Attrs.PublicIP, MAC: net.HardwareAddr(leaseAttrsList[i].VtepMAC)})
-			if err1 != nil {
-				log.Error("Attempt to remove matching L2 entry failed: ", err1)
-			}
+
 		}
 		}
 	}
 	}
 	return nil
 	return nil
 }
 }
+
+func (vb *VXLANBackend) handleMiss(miss *netlink.Neigh) {
+	switch {
+	case len(miss.IP) == 0 && len(miss.HardwareAddr) == 0:
+		log.Info("Ignoring nil miss")
+
+	case len(miss.HardwareAddr) == 0:
+		vb.handleL3Miss(miss)
+
+	default:
+		log.Infof("Ignoring not a miss: %v, %v", miss.HardwareAddr, miss.IP)
+	}
+}
+
+func (vb *VXLANBackend) handleL3Miss(miss *netlink.Neigh) {
+	log.Infof("L3 miss: %v", miss.IP)
+
+	rt := vb.rts.findByNetwork(ip.FromIP(miss.IP))
+	if rt == nil {
+		log.Infof("Route for %v not found", miss.IP)
+		return
+	}
+
+	if err := vb.dev.AddL3(neigh{IP: ip.FromIP(miss.IP), MAC: rt.vtepMAC}); err != nil {
+		log.Errorf("AddL3 failed: %v", err)
+	} else {
+		log.Info("AddL3 succeeded")
+	}
+}