ソースを参照

upgrade client-go to 1.19.4

Luther Monson 4 年 前
コミット
ba8c21659e
2 ファイル変更23 行追加29 行削除
  1. 10 9
      main.go
  2. 13 20
      subnet/kube/kube.go

+ 10 - 9
main.go

@@ -159,9 +159,9 @@ func usage() {
 	os.Exit(0)
 }
 
-func newSubnetManager() (subnet.Manager, error) {
+func newSubnetManager(ctx context.Context) (subnet.Manager, error) {
 	if opts.kubeSubnetMgr {
-		return kube.NewSubnetManager(opts.kubeApiUrl, opts.kubeConfigFile, opts.kubeAnnotationPrefix, opts.netConfPath)
+		return kube.NewSubnetManager(ctx, opts.kubeApiUrl, opts.kubeConfigFile, opts.kubeAnnotationPrefix, opts.netConfPath)
 	}
 
 	cfg := &etcdv2.EtcdConfig{
@@ -238,7 +238,14 @@ func main() {
 		}
 	}
 
-	sm, err := newSubnetManager()
+	// This is the main context that everything should run in.
+	// All spawned goroutines should exit when cancel is called on this context.
+	// Go routines spawned from main.go coordinate using a WaitGroup. This provides a mechanism to allow the shutdownHandler goroutine
+	// to block until all the goroutines return . If those goroutines spawn other goroutines then they are responsible for
+	// blocking and returning only when cancel() is called.
+	ctx, cancel := context.WithCancel(context.Background())
+
+	sm, err := newSubnetManager(ctx)
 	if err != nil {
 		log.Error("Failed to create SubnetManager: ", err)
 		os.Exit(1)
@@ -250,12 +257,6 @@ func main() {
 	sigs := make(chan os.Signal, 1)
 	signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
 
-	// This is the main context that everything should run in.
-	// All spawned goroutines should exit when cancel is called on this context.
-	// Go routines spawned from main.go coordinate using a WaitGroup. This provides a mechanism to allow the shutdownHandler goroutine
-	// to block until all the goroutines return . If those goroutines spawn other goroutines then they are responsible for
-	// blocking and returning only when cancel() is called.
-	ctx, cancel := context.WithCancel(context.Background())
 	wg := sync.WaitGroup{}
 
 	wg.Add(1)

+ 13 - 20
subnet/kube/kube.go

@@ -25,9 +25,9 @@ import (
 
 	"github.com/coreos/flannel/pkg/ip"
 	"github.com/coreos/flannel/subnet"
-
 	"github.com/golang/glog"
 	"golang.org/x/net/context"
+	v1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/apimachinery/pkg/runtime"
 	"k8s.io/apimachinery/pkg/types"
@@ -36,8 +36,6 @@ import (
 	"k8s.io/apimachinery/pkg/watch"
 	clientset "k8s.io/client-go/kubernetes"
 	listers "k8s.io/client-go/listers/core/v1"
-	"k8s.io/client-go/pkg/api"
-	"k8s.io/client-go/pkg/api/v1"
 	"k8s.io/client-go/rest"
 	"k8s.io/client-go/tools/cache"
 	"k8s.io/client-go/tools/clientcmd"
@@ -62,8 +60,7 @@ type kubeSubnetManager struct {
 	events         chan subnet.Event
 }
 
-func NewSubnetManager(apiUrl, kubeconfig, prefix, netConfPath string) (subnet.Manager, error) {
-
+func NewSubnetManager(ctx context.Context, apiUrl, kubeconfig, prefix, netConfPath string) (subnet.Manager, error) {
 	var cfg *rest.Config
 	var err error
 	// Try to build kubernetes config from a master url or a kubeconfig filepath. If neither masterUrl
@@ -90,7 +87,7 @@ func NewSubnetManager(apiUrl, kubeconfig, prefix, netConfPath string) (subnet.Ma
 			return nil, fmt.Errorf("env variables POD_NAME and POD_NAMESPACE must be set")
 		}
 
-		pod, err := c.Pods(podNamespace).Get(podName, metav1.GetOptions{})
+		pod, err := c.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
 		if err != nil {
 			return nil, fmt.Errorf("error retrieving pod spec for '%s/%s': %v", podNamespace, podName, err)
 		}
@@ -110,7 +107,7 @@ func NewSubnetManager(apiUrl, kubeconfig, prefix, netConfPath string) (subnet.Ma
 		return nil, fmt.Errorf("error parsing subnet config: %s", err)
 	}
 
-	sm, err := newKubeSubnetManager(c, sc, nodeName, prefix)
+	sm, err := newKubeSubnetManager(ctx, c, sc, nodeName, prefix)
 	if err != nil {
 		return nil, fmt.Errorf("error creating network manager: %s", err)
 	}
@@ -128,7 +125,7 @@ func NewSubnetManager(apiUrl, kubeconfig, prefix, netConfPath string) (subnet.Ma
 	return sm, nil
 }
 
-func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName, prefix string) (*kubeSubnetManager, error) {
+func newKubeSubnetManager(ctx context.Context, c clientset.Interface, sc *subnet.Config, nodeName, prefix string) (*kubeSubnetManager, error) {
 	var err error
 	var ksm kubeSubnetManager
 	ksm.annotations, err = newAnnotations(prefix)
@@ -142,10 +139,10 @@ func newKubeSubnetManager(c clientset.Interface, sc *subnet.Config, nodeName, pr
 	indexer, controller := cache.NewIndexerInformer(
 		&cache.ListWatch{
 			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
-				return ksm.client.CoreV1().Nodes().List(options)
+				return ksm.client.CoreV1().Nodes().List(ctx, options)
 			},
 			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
-				return ksm.client.CoreV1().Nodes().Watch(options)
+				return ksm.client.CoreV1().Nodes().Watch(ctx, options)
 			},
 		},
 		&v1.Node{},
@@ -224,12 +221,8 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *subnet.Le
 	if err != nil {
 		return nil, err
 	}
-	nobj, err := api.Scheme.DeepCopy(cachedNode)
-	if err != nil {
-		return nil, err
-	}
-	n := nobj.(*v1.Node)
 
+	n := cachedNode.DeepCopy()
 	if n.Spec.PodCIDR == "" {
 		return nil, fmt.Errorf("node %q pod cidr not assigned", ksm.nodeName)
 	}
@@ -275,12 +268,12 @@ func (ksm *kubeSubnetManager) AcquireLease(ctx context.Context, attrs *subnet.Le
 			return nil, fmt.Errorf("failed to create patch for node %q: %v", ksm.nodeName, err)
 		}
 
-		_, err = ksm.client.CoreV1().Nodes().Patch(ksm.nodeName, types.StrategicMergePatchType, patchBytes, "status")
+		_, err = ksm.client.CoreV1().Nodes().Patch(ctx, ksm.nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
 		if err != nil {
 			return nil, err
 		}
 	}
-	err = ksm.setNodeNetworkUnavailableFalse()
+	err = ksm.setNodeNetworkUnavailableFalse(ctx)
 	if err != nil {
 		glog.Errorf("Unable to set NetworkUnavailable to False for %q: %v", ksm.nodeName, err)
 	}
@@ -298,7 +291,7 @@ func (ksm *kubeSubnetManager) WatchLeases(ctx context.Context, cursor interface{
 			Events: []subnet.Event{event},
 		}, nil
 	case <-ctx.Done():
-		return subnet.LeaseWatchResult{}, nil
+		return subnet.LeaseWatchResult{}, context.Canceled
 	}
 }
 
@@ -340,7 +333,7 @@ func (ksm *kubeSubnetManager) Name() string {
 
 // Set Kubernetes NodeNetworkUnavailable to false when starting
 // https://kubernetes.io/docs/concepts/architecture/nodes/#condition
-func (ksm *kubeSubnetManager) setNodeNetworkUnavailableFalse() error {
+func (ksm *kubeSubnetManager) setNodeNetworkUnavailableFalse(ctx context.Context) error {
 	condition := v1.NodeCondition{
 		Type:               v1.NodeNetworkUnavailable,
 		Status:             v1.ConditionFalse,
@@ -354,6 +347,6 @@ func (ksm *kubeSubnetManager) setNodeNetworkUnavailableFalse() error {
 		return err
 	}
 	patch := []byte(fmt.Sprintf(`{"status":{"conditions":%s}}`, raw))
-	_, err = ksm.client.CoreV1().Nodes().PatchStatus(ksm.nodeName, patch)
+	_, err = ksm.client.CoreV1().Nodes().PatchStatus(ctx, ksm.nodeName, patch)
 	return err
 }