Browse Source

subnet/kube: Use informer callbacks for lease events

Rather than trying to reset lease state every 1 second, we can use the
informer callbacks to emit add/remove events for leases (via node
objects).
Aaron Levy 8 years ago
parent
commit
2e8af17795
1 changed files with 74 additions and 38 deletions
  1. 74 38
      subnet/kube/kube.go

+ 74 - 38
subnet/kube/kube.go

@@ -60,6 +60,8 @@ type kubeSubnetManager struct {
 	nodeStore      cache.StoreToNodeLister
 	nodeController *framework.Controller
 	subnetConf     *subnet.Config
+	events         chan subnet.Event
+	selfEvents     chan subnet.Event
 }
 
 func NewSubnetManager() (subnet.Manager, error) {
@@ -109,6 +111,8 @@ func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName str
 	ksm.client = c
 	ksm.nodeName = nodeName
 	ksm.subnetConf = sc
+	ksm.events = make(chan subnet.Event, 100)
+	ksm.selfEvents = make(chan subnet.Event, 100)
 	ksm.nodeStore.Store, ksm.nodeController = framework.NewInformer(
 		&cache.ListWatch{
 			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@@ -120,11 +124,59 @@ func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName str
 		},
 		&api.Node{},
 		resyncPeriod,
-		framework.ResourceEventHandlerFuncs{},
+		framework.ResourceEventHandlerFuncs{
+			AddFunc: func(obj interface{}) {
+				ksm.handleAddLeaseEvent(subnet.EventAdded, obj)
+			},
+			UpdateFunc: ksm.handleUpdateLeaseEvent,
+			DeleteFunc: func(obj interface{}) {
+				ksm.handleAddLeaseEvent(subnet.EventRemoved, obj)
+			},
+		},
 	)
 	return &ksm, nil
 }
 
+func (ksm *kubeSubnetManager) handleAddLeaseEvent(et subnet.EventType, obj interface{}) {
+	n := obj.(*api.Node)
+	if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
+		return
+	}
+
+	l, err := nodeToLease(*n)
+	if err != nil {
+		glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
+		return
+	}
+	ksm.events <- subnet.Event{et, l, ""}
+	if n.ObjectMeta.Name == ksm.nodeName {
+		ksm.selfEvents <- subnet.Event{et, l, ""}
+	}
+}
+
+func (ksm *kubeSubnetManager) handleUpdateLeaseEvent(oldObj, newObj interface{}) {
+	o := oldObj.(*api.Node)
+	n := newObj.(*api.Node)
+	if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
+		return
+	}
+	if o.Annotations[backendDataAnnotation] == n.Annotations[backendDataAnnotation] &&
+		o.Annotations[backendTypeAnnotation] == n.Annotations[backendTypeAnnotation] &&
+		o.Annotations[backendPublicIPAnnotation] == n.Annotations[backendPublicIPAnnotation] {
+		return // No change to lease
+	}
+
+	l, err := nodeToLease(*n)
+	if err != nil {
+		glog.Infof("Error turning node %q to lease: %v", n.ObjectMeta.Name, err)
+		return
+	}
+	ksm.events <- subnet.Event{subnet.EventAdded, l, ""}
+	if n.ObjectMeta.Name == ksm.nodeName {
+		ksm.selfEvents <- subnet.Event{subnet.EventAdded, l, ""}
+	}
+}
+
 func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context, network string) (*subnet.Config, error) {
 	return ksm.subnetConf, nil
 }
@@ -173,52 +225,36 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, network string,
 }
 
 func (ksm *kubeSubnetManager) RenewLease(ctx context.Context, network string, lease *subnet.Lease) error {
+	l, err := ksm.AcquireLease(ctx, network, &lease.Attrs)
+	if err != nil {
+		return err
+	}
+	lease.Subnet = l.Subnet
+	lease.Attrs = l.Attrs
+	lease.Expiration = l.Expiration
 	return nil
 }
 
 func (ksm *kubeSubnetManager) WatchLease(ctx context.Context, network string, sn ip.IP4Net, cursor interface{}) (subnet.LeaseWatchResult, error) {
-	time.Sleep(time.Second)
-	nobj, found, err := ksm.nodeStore.Store.GetByKey(ksm.nodeName)
-	if err != nil {
-		return subnet.LeaseWatchResult{}, err
-	}
-	if !found {
-		return subnet.LeaseWatchResult{}, fmt.Errorf("node %q not found", ksm.nodeName)
-	}
-	n, ok := nobj.(*api.Node)
-	if !ok {
-		return subnet.LeaseWatchResult{}, fmt.Errorf("nobj was not a *api.Node")
-	}
-	l, err := nodeToLease(*n)
-	if err != nil {
-		return subnet.LeaseWatchResult{}, err
+	select {
+	case event := <-ksm.selfEvents:
+		return subnet.LeaseWatchResult{
+			Events: []subnet.Event{event},
+		}, nil
+	case <-ctx.Done():
+		return subnet.LeaseWatchResult{}, nil
 	}
-	return subnet.LeaseWatchResult{
-		Snapshot: []subnet.Lease{l},
-	}, nil
 }
 
 func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, network string, cursor interface{}) (subnet.LeaseWatchResult, error) {
-	time.Sleep(time.Second)
-	leases := make([]subnet.Lease, 0)
-	nl, err := ksm.nodeStore.List()
-	if err != nil {
-		return subnet.LeaseWatchResult{}, err
+	select {
+	case event := <-ksm.events:
+		return subnet.LeaseWatchResult{
+			Events: []subnet.Event{event},
+		}, nil
+	case <-ctx.Done():
+		return subnet.LeaseWatchResult{}, nil
 	}
-	for _, n := range nl.Items {
-		if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
-			continue
-		}
-		l, err := nodeToLease(n)
-		if err != nil {
-			glog.Infof("error turning node %q to lease: %v", n.ObjectMeta.Name, err)
-			continue
-		}
-		leases = append(leases, l)
-	}
-	return subnet.LeaseWatchResult{
-		Snapshot: leases,
-	}, nil
 }
 
 func (ksm *kubeSubnetManager) WatchNetworks(ctx context.Context, cursor interface{}) (subnet.NetworkWatchResult, error) {