فهرست منبع

refactor to use netlink.route object in backend

Huang 10 سال پیش
والد
کامیت
94a80b6b85
2فایلهای تغییر یافته به همراه62 افزوده شده و 69 حذف شده
  1. 59 34
      backend/hostgw/hostgw.go
  2. 3 35
      subnet/subnet.go

+ 59 - 34
backend/hostgw/hostgw.go

@@ -18,6 +18,7 @@ import (
 	"fmt"
 	"net"
 	"sync"
+	"time"
 
 	log "github.com/coreos/flannel/Godeps/_workspace/src/github.com/golang/glog"
 	"github.com/coreos/flannel/Godeps/_workspace/src/github.com/vishvananda/netlink"
@@ -28,6 +29,15 @@ import (
 	"github.com/coreos/flannel/subnet"
 )
 
+const (
+	routeCheckBegins  = 120
+	routeCheckRetries = 10
+)
+
+var (
+	routeList []netlink.Route
+)
+
 type HostgwBackend struct {
 	sm       *subnet.SubnetManager
 	extIface *net.Interface
@@ -87,9 +97,10 @@ func (rb *HostgwBackend) Run() {
 
 	defer rb.wg.Wait()
 
+	routeList = make([]netlink.Route, 0, 10)
 	rb.wg.Add(1)
 	go func() {
-		rb.sm.ListLeases(evts, rb.stop)
+		rb.routeCheck(rb.stop)
 		rb.wg.Done()
 	}()
 
@@ -132,6 +143,7 @@ func (rb *HostgwBackend) handleSubnetEvents(batch subnet.EventBatch) {
 				log.Errorf("Error adding route to %v via %v: %v", evt.Lease.Network, evt.Lease.Attrs.PublicIP, err)
 				continue
 			}
+			addToRouteList(route)
 
 		case subnet.SubnetRemoved:
 			log.Info("Subnet removed: ", evt.Lease.Network)
@@ -150,55 +162,68 @@ func (rb *HostgwBackend) handleSubnetEvents(batch subnet.EventBatch) {
 				log.Errorf("Error deleting route to %v: %v", evt.Lease.Network, err)
 				continue
 			}
+			removeFromRouteList(route)
 
-		case subnet.SubnetListed:
-			log.Info("Subnet listed: ", evt.Lease.Network)
-
-			if evt.Lease.Attrs.BackendType != "host-gw" {
-				log.Warningf("Ignoring non-host-gw subnet: type=%v", evt.Lease.Attrs.BackendType)
-				continue
-			}
+		default:
+			log.Error("Internal error: unknown event type: ", int(evt.Type))
+		}
+	}
+}
 
-			subExistInRoute := checkSubnetExistInRouts(evt.Lease)
+func addToRouteList(route netlink.Route) {
+	routeList = append(routeList, route)
+}
 
-			if !subExistInRoute {
-				route := netlink.Route{
-					Dst:       evt.Lease.Network.ToIPNet(),
-					Gw:        evt.Lease.Attrs.PublicIP.ToIP(),
-					LinkIndex: rb.extIface.Index,
-				}
-				if err := netlink.RouteAdd(&route); err != nil {
-					log.Errorf("Error adding route to %v via %v: %v", evt.Lease.Network, evt.Lease.Attrs.PublicIP, err)
-					continue
-				}
-			}
+func removeFromRouteList(route netlink.Route) {
+	for index, r := range routeList {
+		if routeEqual(r, route) {
+			routeList = append(routeList[:index], routeList[index+1:]...)
+			return
+		}
+	}
+}
 
+func (rb *HostgwBackend) routeCheck(cancel chan bool) {
+	time.Sleep(routeCheckBegins * time.Second)
+	for {
+		select {
+		case <-cancel:
+			return
 		default:
-			log.Error("Internal error: unknown event type: ", int(evt.Type))
+			rb.checkSubnetExistInRoutes()
 		}
+		time.Sleep(routeCheckRetries * time.Second)
 	}
 }
 
-func checkSubnetExistInRouts(lease subnet.SubnetLease) bool {
-	routeList, err := netlink.RouteList(nil, netlink.FAMILY_V4)
-	if err != nil {
-		return true
-	} else {
-		ipNet := lease.Network.ToIPNet()
-		gateWay := lease.Attrs.PublicIP.ToIP()
+func (rb *HostgwBackend) checkSubnetExistInRoutes() {
+	rl, err := netlink.RouteList(nil, netlink.FAMILY_V4)
+	if err == nil {
 		for _, route := range routeList {
-			if route.Dst == nil {
-				continue
+			exist := false
+			for _, r := range rl {
+				if r.Dst == nil {
+					continue
+				}
+				if routeEqual(r, route) {
+					exist = true
+					break
+				}
 			}
-			if route.Dst.IP.Equal(ipNet.IP) && route.Gw.Equal(gateWay) && bytesEqual(route.Dst.Mask, ipNet.Mask) {
-				return true
+			if !exist {
+				netlink.RouteAdd(&route)
 			}
 		}
-		log.Warningf("%v via %v does not exits in the Route table", lease.Network, lease.Attrs.PublicIP)
-		return false
 	}
 }
 
+func routeEqual(x, y netlink.Route) bool {
+	if x.Dst.IP.Equal(y.Dst.IP) && x.Gw.Equal(y.Gw) && bytesEqual(x.Dst.Mask, y.Dst.Mask) {
+		return true
+	}
+	return false
+}
+
 func bytesEqual(x, y []byte) bool {
 	if len(x) != len(y) {
 		return false

+ 3 - 35
subnet/subnet.go

@@ -31,10 +31,9 @@ import (
 )
 
 const (
-	registerRetries   = 10
-	subnetTTL         = 24 * 3600
-	renewMargin       = time.Hour
-	subnetListRetries = 10
+	registerRetries = 10
+	subnetTTL       = 24 * 3600
+	renewMargin     = time.Hour
 )
 
 // etcd error codes
@@ -47,7 +46,6 @@ const (
 const (
 	SubnetAdded = iota
 	SubnetRemoved
-	SubnetListed
 )
 
 var (
@@ -396,36 +394,6 @@ func (sm *SubnetManager) WatchLeases(receiver chan EventBatch, cancel chan bool)
 	}
 }
 
-func (sm *SubnetManager) ListLeases(receiver chan EventBatch, cancel chan bool) {
-	//periodly list leases
-	for {
-		resp, err := sm.registry.watchSubnets(sm.lastIndex+1, cancel)
-
-		// watchSubnets exited by cancel chan being signaled
-		if err == nil && resp == nil {
-			return
-		}
-
-		leases, err := sm.getLeases()
-		if err == nil {
-
-			var batch EventBatch
-			for _, l := range leases {
-				// skip self
-				if l.Network.Equal(sm.myLease.Network) {
-					continue
-				}
-
-				batch = append(batch, Event{SubnetListed, l})
-			}
-			if &batch != nil {
-				receiver <- batch
-			}
-		}
-		time.Sleep(subnetListRetries * time.Second)
-	}
-}
-
 func (sm *SubnetManager) parseSubnetWatchResponse(resp *etcd.Response) (batch *EventBatch, err error) {
 	sm.lastIndex = resp.Node.ModifiedIndex