controllermanager.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package app implements a server that runs a set of active
  14. // components. This includes replication controllers, service endpoints and
  15. // nodes.
  16. //
  17. // CAUTION: If you update code in this file, you may need to also update code
  18. // in contrib/mesos/pkg/controllermanager/controllermanager.go
  19. package app
  20. import (
  21. "fmt"
  22. "io/ioutil"
  23. "math/rand"
  24. "net"
  25. "net/http"
  26. "net/http/pprof"
  27. "os"
  28. "strconv"
  29. "time"
  30. "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
  31. "k8s.io/kubernetes/pkg/api"
  32. "k8s.io/kubernetes/pkg/api/unversioned"
  33. "k8s.io/kubernetes/pkg/apis/batch"
  34. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  35. "k8s.io/kubernetes/pkg/client/leaderelection"
  36. "k8s.io/kubernetes/pkg/client/record"
  37. "k8s.io/kubernetes/pkg/client/restclient"
  38. "k8s.io/kubernetes/pkg/client/typed/dynamic"
  39. client "k8s.io/kubernetes/pkg/client/unversioned"
  40. "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
  41. "k8s.io/kubernetes/pkg/cloudprovider"
  42. "k8s.io/kubernetes/pkg/controller"
  43. certcontroller "k8s.io/kubernetes/pkg/controller/certificates"
  44. "k8s.io/kubernetes/pkg/controller/daemon"
  45. "k8s.io/kubernetes/pkg/controller/deployment"
  46. "k8s.io/kubernetes/pkg/controller/disruption"
  47. endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
  48. "k8s.io/kubernetes/pkg/controller/framework/informers"
  49. "k8s.io/kubernetes/pkg/controller/garbagecollector"
  50. "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
  51. "k8s.io/kubernetes/pkg/controller/job"
  52. namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
  53. nodecontroller "k8s.io/kubernetes/pkg/controller/node"
  54. petset "k8s.io/kubernetes/pkg/controller/petset"
  55. "k8s.io/kubernetes/pkg/controller/podautoscaler"
  56. "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
  57. "k8s.io/kubernetes/pkg/controller/podgc"
  58. replicaset "k8s.io/kubernetes/pkg/controller/replicaset"
  59. replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
  60. resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
  61. routecontroller "k8s.io/kubernetes/pkg/controller/route"
  62. "k8s.io/kubernetes/pkg/controller/scheduledjob"
  63. servicecontroller "k8s.io/kubernetes/pkg/controller/service"
  64. serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
  65. "k8s.io/kubernetes/pkg/controller/volume/attachdetach"
  66. persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
  67. "k8s.io/kubernetes/pkg/healthz"
  68. quotainstall "k8s.io/kubernetes/pkg/quota/install"
  69. "k8s.io/kubernetes/pkg/runtime/serializer"
  70. "k8s.io/kubernetes/pkg/serviceaccount"
  71. "k8s.io/kubernetes/pkg/util/configz"
  72. "k8s.io/kubernetes/pkg/util/crypto"
  73. "k8s.io/kubernetes/pkg/util/wait"
  74. "github.com/golang/glog"
  75. "github.com/prometheus/client_golang/prometheus"
  76. "github.com/spf13/cobra"
  77. "github.com/spf13/pflag"
  78. )
  79. const (
  80. // Jitter used when starting controller managers
  81. ControllerStartJitter = 1.0
  82. )
  83. // NewControllerManagerCommand creates a *cobra.Command object with default parameters
  84. func NewControllerManagerCommand() *cobra.Command {
  85. s := options.NewCMServer()
  86. s.AddFlags(pflag.CommandLine)
  87. cmd := &cobra.Command{
  88. Use: "kube-controller-manager",
  89. Long: `The Kubernetes controller manager is a daemon that embeds
  90. the core control loops shipped with Kubernetes. In applications of robotics and
  91. automation, a control loop is a non-terminating loop that regulates the state of
  92. the system. In Kubernetes, a controller is a control loop that watches the shared
  93. state of the cluster through the apiserver and makes changes attempting to move the
  94. current state towards the desired state. Examples of controllers that ship with
  95. Kubernetes today are the replication controller, endpoints controller, namespace
  96. controller, and serviceaccounts controller.`,
  97. Run: func(cmd *cobra.Command, args []string) {
  98. },
  99. }
  100. return cmd
  101. }
  102. func ResyncPeriod(s *options.CMServer) func() time.Duration {
  103. return func() time.Duration {
  104. factor := rand.Float64() + 1
  105. return time.Duration(float64(s.MinResyncPeriod.Nanoseconds()) * factor)
  106. }
  107. }
  108. // Run runs the CMServer. This should never exit.
  109. func Run(s *options.CMServer) error {
  110. if c, err := configz.New("componentconfig"); err == nil {
  111. c.Set(s.KubeControllerManagerConfiguration)
  112. } else {
  113. glog.Errorf("unable to register configz: %s", err)
  114. }
  115. kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Kubeconfig)
  116. if err != nil {
  117. return err
  118. }
  119. kubeconfig.ContentConfig.ContentType = s.ContentType
  120. // Override kubeconfig qps/burst settings from flags
  121. kubeconfig.QPS = s.KubeAPIQPS
  122. kubeconfig.Burst = int(s.KubeAPIBurst)
  123. kubeClient, err := client.New(kubeconfig)
  124. if err != nil {
  125. glog.Fatalf("Invalid API configuration: %v", err)
  126. }
  127. go func() {
  128. mux := http.NewServeMux()
  129. healthz.InstallHandler(mux)
  130. if s.EnableProfiling {
  131. mux.HandleFunc("/debug/pprof/", pprof.Index)
  132. mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
  133. mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
  134. }
  135. configz.InstallHandler(mux)
  136. mux.Handle("/metrics", prometheus.Handler())
  137. server := &http.Server{
  138. Addr: net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))),
  139. Handler: mux,
  140. }
  141. glog.Fatal(server.ListenAndServe())
  142. }()
  143. eventBroadcaster := record.NewBroadcaster()
  144. eventBroadcaster.StartLogging(glog.Infof)
  145. eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
  146. recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controller-manager"})
  147. run := func(stop <-chan struct{}) {
  148. err := StartControllers(s, kubeClient, kubeconfig, stop, recorder)
  149. glog.Fatalf("error running controllers: %v", err)
  150. panic("unreachable")
  151. }
  152. if !s.LeaderElection.LeaderElect {
  153. run(nil)
  154. panic("unreachable")
  155. }
  156. id, err := os.Hostname()
  157. if err != nil {
  158. return err
  159. }
  160. leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{
  161. EndpointsMeta: api.ObjectMeta{
  162. Namespace: "kube-system",
  163. Name: "kube-controller-manager",
  164. },
  165. Client: kubeClient,
  166. Identity: id,
  167. EventRecorder: recorder,
  168. LeaseDuration: s.LeaderElection.LeaseDuration.Duration,
  169. RenewDeadline: s.LeaderElection.RenewDeadline.Duration,
  170. RetryPeriod: s.LeaderElection.RetryPeriod.Duration,
  171. Callbacks: leaderelection.LeaderCallbacks{
  172. OnStartedLeading: run,
  173. OnStoppedLeading: func() {
  174. glog.Fatalf("leaderelection lost")
  175. },
  176. },
  177. })
  178. panic("unreachable")
  179. }
  180. func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}, recorder record.EventRecorder) error {
  181. sharedInformers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "shared-informers")), ResyncPeriod(s)())
  182. go endpointcontroller.NewEndpointController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "endpoint-controller"))).
  183. Run(int(s.ConcurrentEndpointSyncs), wait.NeverStop)
  184. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  185. go replicationcontroller.NewReplicationManager(
  186. sharedInformers.Pods().Informer(),
  187. clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replication-controller")),
  188. ResyncPeriod(s),
  189. replicationcontroller.BurstReplicas,
  190. int(s.LookupCacheSizeForRC),
  191. s.EnableGarbageCollector,
  192. ).Run(int(s.ConcurrentRCSyncs), wait.NeverStop)
  193. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  194. if s.TerminatedPodGCThreshold > 0 {
  195. go podgc.New(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-garbage-collector")), ResyncPeriod(s), int(s.TerminatedPodGCThreshold)).
  196. Run(wait.NeverStop)
  197. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  198. }
  199. cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
  200. if err != nil {
  201. glog.Fatalf("Cloud provider could not be initialized: %v", err)
  202. }
  203. _, clusterCIDR, err := net.ParseCIDR(s.ClusterCIDR)
  204. if err != nil {
  205. glog.Warningf("Unsuccessful parsing of cluster CIDR %v: %v", s.ClusterCIDR, err)
  206. }
  207. _, serviceCIDR, err := net.ParseCIDR(s.ServiceCIDR)
  208. if err != nil {
  209. glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
  210. }
  211. nodeController, err := nodecontroller.NewNodeController(sharedInformers.Pods().Informer(), cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
  212. s.PodEvictionTimeout.Duration, s.NodeEvictionRate, s.SecondaryNodeEvictionRate, s.LargeClusterSizeThreshold, s.UnhealthyZoneThreshold, s.NodeMonitorGracePeriod.Duration,
  213. s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR,
  214. int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
  215. if err != nil {
  216. glog.Fatalf("Failed to initialize nodecontroller: %v", err)
  217. }
  218. nodeController.Run()
  219. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  220. serviceController, err := servicecontroller.New(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-controller")), s.ClusterName)
  221. if err != nil {
  222. glog.Errorf("Failed to start service controller: %v", err)
  223. } else {
  224. serviceController.Run(int(s.ConcurrentServiceSyncs))
  225. }
  226. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  227. if s.AllocateNodeCIDRs && s.ConfigureCloudRoutes {
  228. if cloud == nil {
  229. glog.Warning("configure-cloud-routes is set, but no cloud provider specified. Will not configure cloud provider routes.")
  230. } else if routes, ok := cloud.Routes(); !ok {
  231. glog.Warning("configure-cloud-routes is set, but cloud provider does not support routes. Will not configure cloud provider routes.")
  232. } else {
  233. routeController := routecontroller.New(routes, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "route-controller")), s.ClusterName, clusterCIDR)
  234. routeController.Run(s.NodeSyncPeriod.Duration)
  235. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  236. }
  237. } else {
  238. glog.Infof("Will not configure cloud provider routes for allocate-node-cidrs: %v, configure-cloud-routes: %v.", s.AllocateNodeCIDRs, s.ConfigureCloudRoutes)
  239. }
  240. resourceQuotaControllerClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "resourcequota-controller"))
  241. resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient)
  242. groupKindsToReplenish := []unversioned.GroupKind{
  243. api.Kind("Pod"),
  244. api.Kind("Service"),
  245. api.Kind("ReplicationController"),
  246. api.Kind("PersistentVolumeClaim"),
  247. api.Kind("Secret"),
  248. api.Kind("ConfigMap"),
  249. }
  250. resourceQuotaControllerOptions := &resourcequotacontroller.ResourceQuotaControllerOptions{
  251. KubeClient: resourceQuotaControllerClient,
  252. ResyncPeriod: controller.StaticResyncPeriodFunc(s.ResourceQuotaSyncPeriod.Duration),
  253. Registry: resourceQuotaRegistry,
  254. ControllerFactory: resourcequotacontroller.NewReplenishmentControllerFactory(sharedInformers.Pods().Informer(), resourceQuotaControllerClient),
  255. ReplenishmentResyncPeriod: ResyncPeriod(s),
  256. GroupKindsToReplenish: groupKindsToReplenish,
  257. }
  258. go resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions).Run(int(s.ConcurrentResourceQuotaSyncs), wait.NeverStop)
  259. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  260. // If apiserver is not running we should wait for some time and fail only then. This is particularly
  261. // important when we start apiserver and controller manager at the same time.
  262. var versionStrings []string
  263. err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
  264. if versionStrings, err = restclient.ServerAPIVersions(kubeconfig); err == nil {
  265. return true, nil
  266. }
  267. glog.Errorf("Failed to get api versions from server: %v", err)
  268. return false, nil
  269. })
  270. if err != nil {
  271. glog.Fatalf("Failed to get api versions from server: %v", err)
  272. }
  273. versions := &unversioned.APIVersions{Versions: versionStrings}
  274. resourceMap, err := kubeClient.Discovery().ServerResources()
  275. if err != nil {
  276. glog.Fatalf("Failed to get supported resources from server: %v", err)
  277. }
  278. // Find the list of namespaced resources via discovery that the namespace controller must manage
  279. namespaceKubeClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller"))
  280. namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), dynamic.LegacyAPIPathResolverFunc)
  281. groupVersionResources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources()
  282. if err != nil {
  283. glog.Fatalf("Failed to get supported resources from server: %v", err)
  284. }
  285. namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, groupVersionResources, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes)
  286. go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), wait.NeverStop)
  287. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  288. groupVersion := "extensions/v1beta1"
  289. resources, found := resourceMap[groupVersion]
  290. // TODO: this needs to be dynamic so users don't have to restart their controller manager if they change the apiserver
  291. if containsVersion(versions, groupVersion) && found {
  292. glog.Infof("Starting %s apis", groupVersion)
  293. if containsResource(resources, "horizontalpodautoscalers") {
  294. glog.Infof("Starting horizontal pod controller.")
  295. hpaClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "horizontal-pod-autoscaler"))
  296. metricsClient := metrics.NewHeapsterMetricsClient(
  297. hpaClient,
  298. metrics.DefaultHeapsterNamespace,
  299. metrics.DefaultHeapsterScheme,
  300. metrics.DefaultHeapsterService,
  301. metrics.DefaultHeapsterPort,
  302. )
  303. go podautoscaler.NewHorizontalController(hpaClient.Core(), hpaClient.Extensions(), hpaClient, metricsClient, s.HorizontalPodAutoscalerSyncPeriod.Duration).
  304. Run(wait.NeverStop)
  305. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  306. }
  307. if containsResource(resources, "daemonsets") {
  308. glog.Infof("Starting daemon set controller")
  309. go daemon.NewDaemonSetsController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), int(s.LookupCacheSizeForDaemonSet)).
  310. Run(int(s.ConcurrentDaemonSetSyncs), wait.NeverStop)
  311. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  312. }
  313. if containsResource(resources, "jobs") {
  314. glog.Infof("Starting job controller")
  315. go job.NewJobController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "job-controller"))).
  316. Run(int(s.ConcurrentJobSyncs), wait.NeverStop)
  317. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  318. }
  319. if containsResource(resources, "deployments") {
  320. glog.Infof("Starting deployment controller")
  321. go deployment.NewDeploymentController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "deployment-controller")), ResyncPeriod(s)).
  322. Run(int(s.ConcurrentDeploymentSyncs), wait.NeverStop)
  323. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  324. }
  325. if containsResource(resources, "replicasets") {
  326. glog.Infof("Starting ReplicaSet controller")
  327. go replicaset.NewReplicaSetController(sharedInformers.Pods().Informer(), clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector).
  328. Run(int(s.ConcurrentRSSyncs), wait.NeverStop)
  329. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  330. }
  331. }
  332. groupVersion = "policy/v1alpha1"
  333. resources, found = resourceMap[groupVersion]
  334. glog.Infof("Attempting to start disruption controller, full resource map %+v", resourceMap)
  335. if containsVersion(versions, groupVersion) && found {
  336. glog.Infof("Starting %s apis", groupVersion)
  337. if containsResource(resources, "poddisruptionbudgets") {
  338. glog.Infof("Starting disruption controller")
  339. go disruption.NewDisruptionController(sharedInformers.Pods().Informer(), kubeClient).Run(wait.NeverStop)
  340. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  341. }
  342. }
  343. groupVersion = "apps/v1alpha1"
  344. resources, found = resourceMap[groupVersion]
  345. glog.Infof("Attempting to start petset, full resource map %+v", resourceMap)
  346. if containsVersion(versions, groupVersion) && found {
  347. glog.Infof("Starting %s apis", groupVersion)
  348. if containsResource(resources, "petsets") {
  349. glog.Infof("Starting PetSet controller")
  350. resyncPeriod := ResyncPeriod(s)()
  351. go petset.NewPetSetController(
  352. sharedInformers.Pods().Informer(),
  353. // TODO: Switch to using clientset
  354. kubeClient,
  355. resyncPeriod,
  356. ).Run(1, wait.NeverStop)
  357. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  358. }
  359. }
  360. groupVersion = "batch/v2alpha1"
  361. resources, found = resourceMap[groupVersion]
  362. if containsVersion(versions, groupVersion) && found {
  363. glog.Infof("Starting %s apis", groupVersion)
  364. if containsResource(resources, "scheduledjobs") {
  365. glog.Infof("Starting scheduledjob controller")
  366. // TODO: this is a temp fix for allowing kubeClient list v2alpha1 sj, should switch to using clientset
  367. kubeconfig.ContentConfig.GroupVersion = &unversioned.GroupVersion{Group: batch.GroupName, Version: "v2alpha1"}
  368. kubeClient, err := client.New(kubeconfig)
  369. if err != nil {
  370. glog.Fatalf("Invalid API configuration: %v", err)
  371. }
  372. go scheduledjob.NewScheduledJobController(kubeClient).
  373. Run(wait.NeverStop)
  374. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  375. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  376. }
  377. } else {
  378. glog.Infof("Not starting %s apis", groupVersion)
  379. }
  380. alphaProvisioner, err := NewAlphaVolumeProvisioner(cloud, s.VolumeConfiguration)
  381. if err != nil {
  382. glog.Fatalf("An backward-compatible provisioner could not be created: %v, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.", err)
  383. }
  384. volumeController := persistentvolumecontroller.NewPersistentVolumeController(
  385. clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "persistent-volume-binder")),
  386. s.PVClaimBinderSyncPeriod.Duration,
  387. alphaProvisioner,
  388. ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
  389. cloud,
  390. s.ClusterName,
  391. nil, // volumeSource
  392. nil, // claimSource
  393. nil, // classSource
  394. nil, // eventRecorder
  395. s.VolumeConfiguration.EnableDynamicProvisioning,
  396. )
  397. volumeController.Run(wait.NeverStop)
  398. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  399. attachDetachController, attachDetachControllerErr :=
  400. attachdetach.NewAttachDetachController(
  401. clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "attachdetach-controller")),
  402. sharedInformers.Pods().Informer(),
  403. sharedInformers.Nodes().Informer(),
  404. sharedInformers.PersistentVolumeClaims().Informer(),
  405. sharedInformers.PersistentVolumes().Informer(),
  406. cloud,
  407. ProbeAttachableVolumePlugins(s.VolumeConfiguration),
  408. recorder)
  409. if attachDetachControllerErr != nil {
  410. glog.Fatalf("Failed to start attach/detach controller: %v", attachDetachControllerErr)
  411. }
  412. go attachDetachController.Run(wait.NeverStop)
  413. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  414. groupVersion = "certificates/v1alpha1"
  415. resources, found = resourceMap[groupVersion]
  416. glog.Infof("Attempting to start certificates, full resource map %+v", resourceMap)
  417. if containsVersion(versions, groupVersion) && found {
  418. glog.Infof("Starting %s apis", groupVersion)
  419. if containsResource(resources, "certificatesigningrequests") {
  420. glog.Infof("Starting certificate request controller")
  421. resyncPeriod := ResyncPeriod(s)()
  422. certController, err := certcontroller.NewCertificateController(
  423. clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "certificate-controller")),
  424. resyncPeriod,
  425. s.ClusterSigningCertFile,
  426. s.ClusterSigningKeyFile,
  427. s.ApproveAllKubeletCSRsForGroup,
  428. )
  429. if err != nil {
  430. glog.Errorf("Failed to start certificate controller: %v", err)
  431. } else {
  432. go certController.Run(1, wait.NeverStop)
  433. }
  434. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  435. }
  436. }
  437. var rootCA []byte
  438. if s.RootCAFile != "" {
  439. rootCA, err = ioutil.ReadFile(s.RootCAFile)
  440. if err != nil {
  441. return fmt.Errorf("error reading root-ca-file at %s: %v", s.RootCAFile, err)
  442. }
  443. if _, err := crypto.CertsFromPEM(rootCA); err != nil {
  444. return fmt.Errorf("error parsing root-ca-file at %s: %v", s.RootCAFile, err)
  445. }
  446. } else {
  447. rootCA = kubeconfig.CAData
  448. }
  449. if len(s.ServiceAccountKeyFile) > 0 {
  450. privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile)
  451. if err != nil {
  452. glog.Errorf("Error reading key for service account token controller: %v", err)
  453. } else {
  454. go serviceaccountcontroller.NewTokensController(
  455. clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "tokens-controller")),
  456. serviceaccountcontroller.TokensControllerOptions{
  457. TokenGenerator: serviceaccount.JWTTokenGenerator(privateKey),
  458. RootCA: rootCA,
  459. },
  460. ).Run(int(s.ConcurrentSATokenSyncs), wait.NeverStop)
  461. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  462. }
  463. }
  464. serviceaccountcontroller.NewServiceAccountsController(
  465. clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "service-account-controller")),
  466. serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
  467. ).Run()
  468. time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
  469. if s.EnableGarbageCollector {
  470. gcClientset := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "generic-garbage-collector"))
  471. groupVersionResources, err := gcClientset.Discovery().ServerPreferredResources()
  472. if err != nil {
  473. glog.Fatalf("Failed to get supported resources from server: %v", err)
  474. }
  475. config := restclient.AddUserAgent(kubeconfig, "generic-garbage-collector")
  476. config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
  477. metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
  478. config.ContentConfig.NegotiatedSerializer = nil
  479. clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
  480. garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, groupVersionResources)
  481. if err != nil {
  482. glog.Errorf("Failed to start the generic garbage collector: %v", err)
  483. } else {
  484. workers := int(s.ConcurrentGCSyncs)
  485. go garbageCollector.Run(workers, wait.NeverStop)
  486. }
  487. }
  488. sharedInformers.Start(stop)
  489. select {}
  490. }
  491. func containsVersion(versions *unversioned.APIVersions, version string) bool {
  492. for ix := range versions.Versions {
  493. if versions.Versions[ix] == version {
  494. return true
  495. }
  496. }
  497. return false
  498. }
  499. func containsResource(resources *unversioned.APIResourceList, resourceName string) bool {
  500. for ix := range resources.APIResources {
  501. resource := resources.APIResources[ix]
  502. if resource.Name == resourceName {
  503. return true
  504. }
  505. }
  506. return false
  507. }