Forráskód Böngészése

migrate kube subnet package to k8s.io/client-go

Mike Danese 8 éve
szülő
commit
08354e7700
1 módosított fájl, 35 hozzáadás és 49 törlés
  1. 35 49
      subnet/kube/kube.go

+ 35 - 49
subnet/kube/kube.go

@@ -28,17 +28,18 @@ import (
 
 	"github.com/golang/glog"
 	"golang.org/x/net/context"
-	"k8s.io/kubernetes/pkg/api"
-	"k8s.io/kubernetes/pkg/api/v1"
-	"k8s.io/kubernetes/pkg/client/cache"
-	clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
-	"k8s.io/kubernetes/pkg/client/restclient"
-	"k8s.io/kubernetes/pkg/controller/framework"
-	"k8s.io/kubernetes/pkg/runtime"
-	utilruntime "k8s.io/kubernetes/pkg/util/runtime"
-	"k8s.io/kubernetes/pkg/util/strategicpatch"
-	"k8s.io/kubernetes/pkg/util/wait"
-	"k8s.io/kubernetes/pkg/watch"
+	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/runtime"
+	"k8s.io/apimachinery/pkg/types"
+	"k8s.io/apimachinery/pkg/util/strategicpatch"
+	"k8s.io/apimachinery/pkg/util/wait"
+	"k8s.io/apimachinery/pkg/watch"
+	clientset "k8s.io/client-go/kubernetes"
+	listers "k8s.io/client-go/listers/core/v1"
+	"k8s.io/client-go/pkg/api"
+	"k8s.io/client-go/pkg/api/v1"
+	"k8s.io/client-go/rest"
+	"k8s.io/client-go/tools/cache"
 )
 
 var (
@@ -60,15 +61,15 @@ const (
 type kubeSubnetManager struct {
 	client         clientset.Interface
 	nodeName       string
-	nodeStore      cache.StoreToNodeLister
-	nodeController *framework.Controller
+	nodeStore      listers.NodeLister
+	nodeController cache.Controller
 	subnetConf     *subnet.Config
 	events         chan subnet.Event
 	selfEvents     chan subnet.Event
 }
 
 func NewSubnetManager() (subnet.Manager, error) {
-	cfg, err := restclient.InClusterConfig()
+	cfg, err := rest.InClusterConfig()
 	if err != nil {
 		return nil, fmt.Errorf("unable to initialize inclusterconfig: %v", err)
 	}
@@ -83,7 +84,7 @@ func NewSubnetManager() (subnet.Manager, error) {
 		return nil, fmt.Errorf("env variables POD_NAME and POD_NAMESPACE must be set")
 	}
 
-	pod, err := c.Pods(podNamespace).Get(podName)
+	pod, err := c.Pods(podNamespace).Get(podName, metav1.GetOptions{})
 	if err != nil {
 		return nil, fmt.Errorf("error retrieving pod spec for '%s/%s': %v", podNamespace, podName, err)
 	}
@@ -126,18 +127,18 @@ func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName str
 	ksm.subnetConf = sc
 	ksm.events = make(chan subnet.Event, 100)
 	ksm.selfEvents = make(chan subnet.Event, 100)
-	ksm.nodeStore.Store, ksm.nodeController = framework.NewInformer(
+	indexer, controller := cache.NewIndexerInformer(
 		&cache.ListWatch{
-			ListFunc: func(options api.ListOptions) (runtime.Object, error) {
+			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
 				return ksm.client.Core().Nodes().List(options)
 			},
-			WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
+			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
 				return ksm.client.Core().Nodes().Watch(options)
 			},
 		},
-		&api.Node{},
+		&v1.Node{},
 		resyncPeriod,
-		framework.ResourceEventHandlerFuncs{
+		cache.ResourceEventHandlerFuncs{
 			AddFunc: func(obj interface{}) {
 				ksm.handleAddLeaseEvent(subnet.EventAdded, obj)
 			},
@@ -146,12 +147,15 @@ func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName str
 				ksm.handleAddLeaseEvent(subnet.EventRemoved, obj)
 			},
 		},
+		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
 	)
+	ksm.nodeController = controller
+	ksm.nodeStore = listers.NewNodeLister(indexer)
 	return &ksm, nil
 }
 
 func (ksm *kubeSubnetManager) handleAddLeaseEvent(et subnet.EventType, obj interface{}) {
-	n := obj.(*api.Node)
+	n := obj.(*v1.Node)
 	if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
 		return
 	}
@@ -168,8 +172,8 @@ func (ksm *kubeSubnetManager) handleAddLeaseEvent(et subnet.EventType, obj inter
 }
 
 func (ksm *kubeSubnetManager) handleUpdateLeaseEvent(oldObj, newObj interface{}) {
-	o := oldObj.(*api.Node)
-	n := newObj.(*api.Node)
+	o := oldObj.(*v1.Node)
+	n := newObj.(*v1.Node)
 	if s, ok := n.Annotations[subnetKubeManagedAnnotation]; !ok || s != "true" {
 		return
 	}
@@ -195,24 +199,15 @@ func (ksm *kubeSubnetManager) GetNetworkConfig(ctx context.Context, network stri
 }
 
 func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, network string, attrs *subnet.LeaseAttrs) (*subnet.Lease, error) {
-	nobj, found, err := ksm.nodeStore.Store.GetByKey(ksm.nodeName)
+	cachedNode, err := ksm.nodeStore.Get(ksm.nodeName)
 	if err != nil {
 		return nil, err
 	}
-	if !found {
-		return nil, fmt.Errorf("node %q not found", ksm.nodeName)
-	}
-	cacheNode, ok := nobj.(*api.Node)
-	if !ok {
-		return nil, fmt.Errorf("nobj was not a *api.Node")
-	}
-
-	// Make a copy so we're not modifying state of our cache
-	objCopy, err := api.Scheme.Copy(cacheNode)
+	nobj, err := api.Scheme.DeepCopy(cachedNode)
 	if err != nil {
-		return nil, fmt.Errorf("failed to make copy of node: %v", err)
+		return nil, err
 	}
-	n := objCopy.(*api.Node)
+	n := nobj.(*v1.Node)
 
 	if n.Spec.PodCIDR == "" {
 		return nil, fmt.Errorf("node %q pod cidr not assigned", ksm.nodeName)
@@ -234,20 +229,12 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, network string,
 		n.Annotations[backendPublicIPAnnotation] = attrs.PublicIP.String()
 		n.Annotations[subnetKubeManagedAnnotation] = "true"
 
-		var oldNode, newNode v1.Node
-		if err := api.Scheme.Convert(cacheNode, &oldNode, nil); err != nil {
-			return nil, err
-		}
-		if err := api.Scheme.Convert(n, &newNode, nil); err != nil {
-			return nil, err
-		}
-
-		oldData, err := json.Marshal(oldNode)
+		oldData, err := json.Marshal(cachedNode)
 		if err != nil {
 			return nil, err
 		}
 
-		newData, err := json.Marshal(newNode)
+		newData, err := json.Marshal(n)
 		if err != nil {
 			return nil, err
 		}
@@ -257,7 +244,7 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, network string,
 			return nil, fmt.Errorf("failed to create patch for node %q: %v", ksm.nodeName, err)
 		}
 
-		_, err = ksm.client.Core().Nodes().Patch(ksm.nodeName, api.StrategicMergePatchType, patchBytes, "status")
+		_, err = ksm.client.Core().Nodes().Patch(ksm.nodeName, types.StrategicMergePatchType, patchBytes, "status")
 		if err != nil {
 			return nil, err
 		}
@@ -310,12 +297,11 @@ func (ksm *kubeSubnetManager) WatchNetworks(ctx context.Context, cursor interfac
 }
 
 func (ksm *kubeSubnetManager) Run(ctx context.Context) {
-	defer utilruntime.HandleCrash()
 	glog.Infof("starting kube subnet manager")
 	ksm.nodeController.Run(ctx.Done())
 }
 
-func nodeToLease(n api.Node) (l subnet.Lease, err error) {
+func nodeToLease(n v1.Node) (l subnet.Lease, err error) {
 	l.Attrs.PublicIP, err = ip.ParseIP4(n.Annotations[backendPublicIPAnnotation])
 	if err != nil {
 		return l, err