Browse Source

Merge pull request #564 from aaronlevy/kube-lease

Improve kube subnet lease handling
Brandon Philips 8 years ago
parent
commit
b5ea6d3a6b
1 changed files with 97 additions and 41 deletions
  1. 97 41
      subnet/kube/kube.go

+ 97 - 41
subnet/kube/kube.go

@@ -35,6 +35,7 @@ import (
 	"k8s.io/kubernetes/pkg/controller/framework"
 	"k8s.io/kubernetes/pkg/runtime"
 	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
+	"k8s.io/kubernetes/pkg/util/wait"
 	"k8s.io/kubernetes/pkg/watch"
 )
 
@@ -44,7 +45,8 @@ var (
 )
 
 const (
-	resyncPeriod = 5 * time.Minute
+	resyncPeriod              = 5 * time.Minute
+	nodeControllerSyncTimeout = 10 * time.Minute
 
 	subnetKubeManagedAnnotation = "flannel.alpha.coreos.com/kube-subnet-manager"
 	backendDataAnnotation       = "flannel.alpha.coreos.com/backend-data"
@@ -60,6 +62,8 @@ type kubeSubnetManager struct {
 	nodeStore      cache.StoreToNodeLister
 	nodeController *framework.Controller
 	subnetConf     *subnet.Config
+	events         chan subnet.Event
+	selfEvents     chan subnet.Event
 }
 
 func NewSubnetManager() (subnet.Manager, error) {
@@ -101,7 +105,17 @@ func NewSubnetManager() (subnet.Manager, error) {
 		return nil, fmt.Errorf("error creating network manager: %s", err)
 	}
 	go sm.Run(context.Background())
-	return sm, err
+
+	glog.Infof("Waiting %s for node controller to sync", nodeControllerSyncTimeout)
+	err = wait.Poll(time.Second, nodeControllerSyncTimeout, func() (bool, error) {
+		return sm.nodeController.HasSynced(), nil
+	})
+	if err != nil {
+		return nil, fmt.Errorf("error waiting for nodeController to sync state: %v", err)
+	}
+	glog.Infof("Node controller sync successful")
+
+	return sm, nil
 }
 
 func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName string) (*kubeSubnetManager, error) {
@@ -109,6 +123,8 @@ func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName str
 	ksm.client = c
 	ksm.nodeName = nodeName
 	ksm.subnetConf = sc
+	ksm.events = make(chan subnet.Event, 100)
+	ksm.selfEvents = make(chan subnet.Event, 100)
 	ksm.nodeStore.Store, ksm.nodeController = framework.NewInformer(
 		&cache.ListWatch{
 			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@@ -120,11 +136,59 @@ func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName str
 		},
 		&api.Node{},
 		resyncPeriod,
-		framework.ResourceEventHandlerFuncs{},
+		framework.ResourceEventHandlerFuncs{
+			AddFunc: func(obj interface{}) {
+				ksm.handleAddLeaseEvent(subnet.EventAdded, obj)
+			},
+			UpdateFunc: ksm.handleUpdateLeaseEvent,
+			DeleteFunc: func(obj interface{}) {
+				ksm.handleAddLeaseEvent(subnet.EventRemoved, obj)
+			},
+		},
 	)
 	return &ksm, nil
 }
 
+func (ksm *kubeSubnetManager) handleAddLeaseEvent(et subnet.EventType, obj interface{}) {
+	n := obj.(*api.Node)
+	if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
+		return
+	}
+
+	l, err := nodeToLease(*n)
+	if err != nil {
+		glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
+		return
+	}
+	ksm.events <- subnet.Event{et, l, ""}
+	if n.ObjectMeta.Name == ksm.nodeName {
+		ksm.selfEvents <- subnet.Event{et, l, ""}
+	}
+}
+
+func (ksm *kubeSubnetManager) handleUpdateLeaseEvent(oldObj, newObj interface{}) {
+	o := oldObj.(*api.Node)
+	n := newObj.(*api.Node)
+	if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
+		return
+	}
+	if o.Annotations[backendDataAnnotation] == n.Annotations[backendDataAnnotation] &&
+		o.Annotations[backendTypeAnnotation] == n.Annotations[backendTypeAnnotation] &&
+		o.Annotations[backendPublicIPAnnotation] == n.Annotations[backendPublicIPAnnotation] {
+		return // No change to lease
+	}
+
+	l, err := nodeToLease(*n)
+	if err != nil {
+		glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
+		return
+	}
+	ksm.events <- subnet.Event{subnet.EventAdded, l, ""}
+	if n.ObjectMeta.Name == ksm.nodeName {
+		ksm.selfEvents <- subnet.Event{subnet.EventAdded, l, ""}
+	}
+}
+
 func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context, network string) (*subnet.Config, error) {
 	return ksm.subnetConf, nil
 }
@@ -137,10 +201,18 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, network string,
 	if !found {
 		return nil, fmt.Errorf("node %q not found", ksm.nodeName)
 	}
-	n, ok := nobj.(*api.Node)
+	cacheNode, ok := nobj.(*api.Node)
 	if !ok {
 		return nil, fmt.Errorf("nobj was not a *api.Node")
 	}
+
+	// Make a copy so we're not modifying state of our cache
+	objCopy, err := api.Scheme.Copy(cacheNode)
+	if err != nil {
+		return nil, fmt.Errorf("failed to make copy of node: %v", err)
+	}
+	n := objCopy.(*api.Node)
+
 	if n.Spec.PodCIDR == "" {
 		return nil, fmt.Errorf("node %q pod cidr not assigned", ksm.nodeName)
 	}
@@ -173,52 +245,36 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, network string,
 }
 
 func (ksm *kubeSubnetManager) RenewLease(ctx context.Context, network string, lease *subnet.Lease) error {
+	l, err := ksm.AcquireLease(ctx, network, &lease.Attrs)
+	if err != nil {
+		return err
+	}
+	lease.Subnet = l.Subnet
+	lease.Attrs = l.Attrs
+	lease.Expiration = l.Expiration
 	return nil
 }
 
 func (ksm *kubeSubnetManager) WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
-	time.Sleep(time.Second)
-	nobj, found, err := ksm.nodeStore.Store.GetByKey(ksm.nodeName)
-	if err != nil {
-		return subnet.LeaseWatchResult{}, err
-	}
-	if !found {
-		return subnet.LeaseWatchResult{}, fmt.Errorf("node %q not found", ksm.nodeName)
-	}
-	n, ok := nobj.(*api.Node)
-	if !ok {
-		return subnet.LeaseWatchResult{}, fmt.Errorf("nobj was not a *api.Node")
-	}
-	l, err := nodeToLease(*n)
-	if err != nil {
-		return subnet.LeaseWatchResult{}, err
+	select {
+	case event := <-ksm.selfEvents:
+		return subnet.LeaseWatchResult{
+			Events: []subnet.Event{event},
+		}, nil
+	case <-ctx.Done():
+		return subnet.LeaseWatchResult{}, nil
 	}
-	return subnet.LeaseWatchResult{
-		Snapshot: []subnet.Lease{l},
-	}, nil
 }
 
 func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (subnet.LeaseWatchResult, error) {
-	time.Sleep(time.Second)
-	leases := make([]subnet.Lease, 0)
-	nl, err := ksm.nodeStore.List()
-	if err != nil {
-		return subnet.LeaseWatchResult{}, err
-	}
-	for _, n := range nl.Items {
-		if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
-			continue
-		}
-		l, err := nodeToLease(n)
-		if err != nil {
-			glog.Infof("error turning node %q to lease: %v", n.ObjectMeta.Name, err)
-			continue
-		}
-		leases = append(leases, l)
+	select {
+	case event := <-ksm.events:
+		return subnet.LeaseWatchResult{
+			Events: []subnet.Event{event},
+		}, nil
+	case <-ctx.Done():
+		return subnet.LeaseWatchResult{}, nil
 	}
-	return subnet.LeaseWatchResult{
-		Snapshot: leases,
-	}, nil
 }
 
 func (ksm *kubeSubnetManager) WatchNetworks(ctx context.Context, cursor interface{}) (subnet.NetworkWatchResult, error) {