server.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package app makes it easy to create a kubelet server for various contexts.
  14. package app
  15. import (
  16. "crypto/tls"
  17. "errors"
  18. "fmt"
  19. "io/ioutil"
  20. "math/rand"
  21. "net"
  22. "net/http"
  23. _ "net/http/pprof"
  24. "os"
  25. "path"
  26. "strconv"
  27. "strings"
  28. "time"
  29. "github.com/golang/glog"
  30. "github.com/spf13/cobra"
  31. "github.com/spf13/pflag"
  32. "k8s.io/kubernetes/cmd/kubelet/app/options"
  33. "k8s.io/kubernetes/pkg/api"
  34. "k8s.io/kubernetes/pkg/apis/componentconfig"
  35. v1alpha1 "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
  36. "k8s.io/kubernetes/pkg/capabilities"
  37. "k8s.io/kubernetes/pkg/client/chaosclient"
  38. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  39. unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
  40. "k8s.io/kubernetes/pkg/client/record"
  41. "k8s.io/kubernetes/pkg/client/restclient"
  42. clientauth "k8s.io/kubernetes/pkg/client/unversioned/auth"
  43. "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
  44. clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
  45. "k8s.io/kubernetes/pkg/cloudprovider"
  46. "k8s.io/kubernetes/pkg/credentialprovider"
  47. "k8s.io/kubernetes/pkg/healthz"
  48. "k8s.io/kubernetes/pkg/kubelet"
  49. "k8s.io/kubernetes/pkg/kubelet/cadvisor"
  50. "k8s.io/kubernetes/pkg/kubelet/cm"
  51. "k8s.io/kubernetes/pkg/kubelet/config"
  52. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  53. "k8s.io/kubernetes/pkg/kubelet/dockertools"
  54. "k8s.io/kubernetes/pkg/kubelet/server"
  55. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  56. "k8s.io/kubernetes/pkg/runtime"
  57. utilconfig "k8s.io/kubernetes/pkg/util/config"
  58. "k8s.io/kubernetes/pkg/util/configz"
  59. "k8s.io/kubernetes/pkg/util/crypto"
  60. "k8s.io/kubernetes/pkg/util/flock"
  61. kubeio "k8s.io/kubernetes/pkg/util/io"
  62. "k8s.io/kubernetes/pkg/util/mount"
  63. nodeutil "k8s.io/kubernetes/pkg/util/node"
  64. "k8s.io/kubernetes/pkg/util/oom"
  65. "k8s.io/kubernetes/pkg/util/rlimit"
  66. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  67. "k8s.io/kubernetes/pkg/util/wait"
  68. "k8s.io/kubernetes/pkg/version"
  69. )
  70. // NewKubeletCommand creates a *cobra.Command object with default parameters
  71. func NewKubeletCommand() *cobra.Command {
  72. s := options.NewKubeletServer()
  73. s.AddFlags(pflag.CommandLine)
  74. cmd := &cobra.Command{
  75. Use: "kubelet",
  76. Long: `The kubelet is the primary "node agent" that runs on each
  77. node. The kubelet works in terms of a PodSpec. A PodSpec is a YAML or JSON object
  78. that describes a pod. The kubelet takes a set of PodSpecs that are provided through
  79. various mechanisms (primarily through the apiserver) and ensures that the containers
  80. described in those PodSpecs are running and healthy. The kubelet doesn't manage
  81. containers which were not created by Kubernetes.
  82. Other than from an PodSpec from the apiserver, there are three ways that a container
  83. manifest can be provided to the Kubelet.
  84. File: Path passed as a flag on the command line. This file is rechecked every 20
  85. seconds (configurable with a flag).
  86. HTTP endpoint: HTTP endpoint passed as a parameter on the command line. This endpoint
  87. is checked every 20 seconds (also configurable with a flag).
  88. HTTP server: The kubelet can also listen for HTTP and respond to a simple API
  89. (underspec'd currently) to submit a new manifest.`,
  90. Run: func(cmd *cobra.Command, args []string) {
  91. },
  92. }
  93. return cmd
  94. }
  95. // UnsecuredKubeletDeps returns a KubeletDeps suitable for being run, or an error if the server setup
  96. // is not valid. It will not start any background processes, and does not include authentication/authorization
  97. func UnsecuredKubeletDeps(s *options.KubeletServer) (*kubelet.KubeletDeps, error) {
  98. // Initialize the TLS Options
  99. tlsOptions, err := InitializeTLS(&s.KubeletConfiguration)
  100. if err != nil {
  101. return nil, err
  102. }
  103. mounter := mount.New()
  104. var writer kubeio.Writer = &kubeio.StdWriter{}
  105. if s.Containerized {
  106. glog.V(2).Info("Running kubelet in containerized mode (experimental)")
  107. mounter = mount.NewNsenterMounter()
  108. writer = &kubeio.NsenterWriter{}
  109. }
  110. var dockerClient dockertools.DockerInterface
  111. if s.ContainerRuntime == "docker" {
  112. dockerClient = dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration)
  113. } else {
  114. dockerClient = nil
  115. }
  116. return &kubelet.KubeletDeps{
  117. Auth: nil, // default does not enforce auth[nz]
  118. CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
  119. Cloud: nil, // cloud provider might start background processes
  120. ContainerManager: nil,
  121. DockerClient: dockerClient,
  122. KubeClient: nil,
  123. Mounter: mounter,
  124. NetworkPlugins: ProbeNetworkPlugins(s.NetworkPluginDir),
  125. OOMAdjuster: oom.NewOOMAdjuster(),
  126. OSInterface: kubecontainer.RealOS{},
  127. Writer: writer,
  128. VolumePlugins: ProbeVolumePlugins(s.VolumePluginDir),
  129. TLSOptions: tlsOptions,
  130. }, nil
  131. }
  132. func getKubeClient(s *options.KubeletServer) (*clientset.Clientset, error) {
  133. clientConfig, err := CreateAPIServerClientConfig(s)
  134. if err == nil {
  135. kubeClient, err := clientset.NewForConfig(clientConfig)
  136. if err != nil {
  137. return nil, err
  138. }
  139. return kubeClient, nil
  140. }
  141. return nil, err
  142. }
  143. // Tries to download the kubelet-<node-name> configmap from "kube-system" namespace via the API server and returns a JSON string or error
  144. func getRemoteKubeletConfig(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (string, error) {
  145. // TODO(mtaufen): should probably cache clientset and pass into this function rather than regenerate on every request
  146. kubeClient, err := getKubeClient(s)
  147. if err != nil {
  148. return "", err
  149. }
  150. configmap, err := func() (*api.ConfigMap, error) {
  151. var nodename string
  152. hostname := nodeutil.GetHostname(s.HostnameOverride)
  153. if kubeDeps != nil && kubeDeps.Cloud != nil {
  154. instances, ok := kubeDeps.Cloud.Instances()
  155. if !ok {
  156. err = fmt.Errorf("failed to get instances from cloud provider, can't determine nodename.")
  157. return nil, err
  158. }
  159. nodename, err = instances.CurrentNodeName(hostname)
  160. if err != nil {
  161. err = fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
  162. return nil, err
  163. }
  164. // look for kubelet-<node-name> configmap from "kube-system"
  165. configmap, err := kubeClient.CoreClient.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", nodename))
  166. if err != nil {
  167. return nil, err
  168. }
  169. return configmap, nil
  170. }
  171. // No cloud provider yet, so can't get the nodename via Cloud.Instances().CurrentNodeName(hostname), try just using the hostname
  172. configmap, err := kubeClient.CoreClient.ConfigMaps("kube-system").Get(fmt.Sprintf("kubelet-%s", hostname))
  173. if err != nil {
  174. return nil, fmt.Errorf("cloud provider was nil, and attempt to use hostname to find config resulted in: %v", err)
  175. }
  176. return configmap, nil
  177. }()
  178. if err != nil {
  179. return "", err
  180. }
  181. // When we create the KubeletConfiguration configmap, we put a json string
  182. // representation of the config in a `kubelet.config` key.
  183. jsonstr, ok := configmap.Data["kubelet.config"]
  184. if !ok {
  185. return "", fmt.Errorf("KubeletConfiguration configmap did not contain a value with key `kubelet.config`")
  186. }
  187. return jsonstr, nil
  188. }
  189. func startKubeletConfigSyncLoop(s *options.KubeletServer, currentKC string) {
  190. glog.Infof("Starting Kubelet configuration sync loop")
  191. go func() {
  192. wait.PollInfinite(30*time.Second, func() (bool, error) {
  193. glog.Infof("Checking API server for new Kubelet configuration.")
  194. remoteKC, err := getRemoteKubeletConfig(s, nil)
  195. if err == nil {
  196. // Detect new config by comparing with the last JSON string we extracted.
  197. if remoteKC != currentKC {
  198. glog.Info("Found new Kubelet configuration via API server, restarting!")
  199. os.Exit(0)
  200. }
  201. } else {
  202. glog.Infof("Did not find a configuration for this Kubelet via API server: %v", err)
  203. }
  204. return false, nil // Always return (false, nil) so we poll forever.
  205. })
  206. }()
  207. }
  208. // Try to check for config on the API server, return that config if we get it, and start
  209. // a background thread that checks for updates to configs.
  210. func initKubeletConfigSync(s *options.KubeletServer) (*componentconfig.KubeletConfiguration, error) {
  211. jsonstr, err := getRemoteKubeletConfig(s, nil)
  212. if err == nil {
  213. // We will compare future API server config against the config we just got (jsonstr):
  214. startKubeletConfigSyncLoop(s, jsonstr)
  215. // Convert json from API server to external type struct, and convert that to internal type struct
  216. extKC := v1alpha1.KubeletConfiguration{}
  217. err := runtime.DecodeInto(api.Codecs.UniversalDecoder(), []byte(jsonstr), &extKC)
  218. if err != nil {
  219. return nil, err
  220. }
  221. kc := componentconfig.KubeletConfiguration{}
  222. err = api.Scheme.Convert(&extKC, &kc, nil)
  223. if err != nil {
  224. return nil, err
  225. }
  226. return &kc, nil
  227. } else {
  228. // Couldn't get a configuration from the API server yet.
  229. // Restart as soon as anything comes back from the API server.
  230. startKubeletConfigSyncLoop(s, "")
  231. return nil, err
  232. }
  233. }
  234. // Run runs the specified KubeletServer with the given KubeletDeps. This should never exit.
  235. // The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
  236. // Otherwise, the caller is assumed to have set up the KubeletDeps object and a default one will
  237. // not be generated.
  238. func Run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) error {
  239. if err := run(s, kubeDeps); err != nil {
  240. return fmt.Errorf("failed to run Kubelet: %v", err)
  241. }
  242. return nil
  243. }
  244. func checkPermissions() error {
  245. if uid := os.Getuid(); uid != 0 {
  246. return fmt.Errorf("Kubelet needs to run as uid `0`. It is being run as %d", uid)
  247. }
  248. // TODO: Check if kubelet is running in the `initial` user namespace.
  249. // http://man7.org/linux/man-pages/man7/user_namespaces.7.html
  250. return nil
  251. }
  252. func setConfigz(cz *configz.Config, kc *componentconfig.KubeletConfiguration) {
  253. tmp := v1alpha1.KubeletConfiguration{}
  254. api.Scheme.Convert(kc, &tmp, nil)
  255. cz.Set(tmp)
  256. }
  257. func initConfigz(kc *componentconfig.KubeletConfiguration) (*configz.Config, error) {
  258. cz, err := configz.New("componentconfig")
  259. if err == nil {
  260. setConfigz(cz, kc)
  261. } else {
  262. glog.Errorf("unable to register configz: %s", err)
  263. }
  264. return cz, err
  265. }
  266. func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) {
  267. // TODO: this should be replaced by a --standalone flag
  268. standaloneMode := (len(s.APIServerList) == 0)
  269. if s.ExitOnLockContention && s.LockFilePath == "" {
  270. return errors.New("cannot exit on lock file contention: no lock file specified")
  271. }
  272. done := make(chan struct{})
  273. if s.LockFilePath != "" {
  274. glog.Infof("acquiring lock on %q", s.LockFilePath)
  275. if err := flock.Acquire(s.LockFilePath); err != nil {
  276. return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
  277. }
  278. if s.ExitOnLockContention {
  279. glog.Infof("watching for inotify events for: %v", s.LockFilePath)
  280. if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
  281. return err
  282. }
  283. }
  284. }
  285. // Register current configuration with /configz endpoint
  286. cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration)
  287. if utilconfig.DefaultFeatureGate.DynamicKubeletConfig() {
  288. // Look for config on the API server. If it exists, replace s.KubeletConfiguration
  289. // with it and continue. initKubeletConfigSync also starts the background thread that checks for new config.
  290. // Don't do dynamic Kubelet configuration in runonce mode
  291. if s.RunOnce == false {
  292. remoteKC, err := initKubeletConfigSync(s)
  293. if err == nil {
  294. // Update s (KubeletServer) with new config from API server
  295. s.KubeletConfiguration = *remoteKC
  296. // Ensure that /configz is up to date with the new config
  297. if cfgzErr != nil {
  298. glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr)
  299. } else {
  300. setConfigz(cfgz, &s.KubeletConfiguration)
  301. }
  302. }
  303. }
  304. }
  305. if kubeDeps == nil {
  306. var kubeClient, eventClient *clientset.Clientset
  307. var cloud cloudprovider.Interface
  308. if s.CloudProvider != v1alpha1.AutoDetectCloudProvider {
  309. cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
  310. if err != nil {
  311. return err
  312. }
  313. if cloud == nil {
  314. glog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
  315. } else {
  316. glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
  317. }
  318. }
  319. if s.BootstrapKubeconfig != "" {
  320. nodeName, err := getNodeName(cloud, nodeutil.GetHostname(s.HostnameOverride))
  321. if err != nil {
  322. return err
  323. }
  324. if err := bootstrapClientCert(s.KubeConfig.Value(), s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
  325. return err
  326. }
  327. }
  328. clientConfig, err := CreateAPIServerClientConfig(s)
  329. if err == nil {
  330. kubeClient, err = clientset.NewForConfig(clientConfig)
  331. // make a separate client for events
  332. eventClientConfig := *clientConfig
  333. eventClientConfig.QPS = float32(s.EventRecordQPS)
  334. eventClientConfig.Burst = int(s.EventBurst)
  335. eventClient, err = clientset.NewForConfig(&eventClientConfig)
  336. }
  337. if err != nil {
  338. if s.RequireKubeConfig {
  339. return fmt.Errorf("invalid kubeconfig: %v", err)
  340. }
  341. if standaloneMode {
  342. glog.Warningf("No API client: %v", err)
  343. }
  344. }
  345. kubeDeps, err = UnsecuredKubeletDeps(s)
  346. if err != nil {
  347. return err
  348. }
  349. kubeDeps.Cloud = cloud
  350. kubeDeps.KubeClient = kubeClient
  351. kubeDeps.EventClient = eventClient
  352. }
  353. if kubeDeps.CAdvisorInterface == nil {
  354. kubeDeps.CAdvisorInterface, err = cadvisor.New(uint(s.CAdvisorPort), s.ContainerRuntime)
  355. if err != nil {
  356. return err
  357. }
  358. }
  359. if kubeDeps.ContainerManager == nil {
  360. if s.SystemCgroups != "" && s.CgroupRoot == "" {
  361. return fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified")
  362. }
  363. kubeDeps.ContainerManager, err = cm.NewContainerManager(kubeDeps.Mounter, kubeDeps.CAdvisorInterface, cm.NodeConfig{
  364. RuntimeCgroupsName: s.RuntimeCgroups,
  365. SystemCgroupsName: s.SystemCgroups,
  366. KubeletCgroupsName: s.KubeletCgroups,
  367. ContainerRuntime: s.ContainerRuntime,
  368. CgroupsPerQOS: s.CgroupsPerQOS,
  369. CgroupRoot: s.CgroupRoot,
  370. ProtectKernelDefaults: s.ProtectKernelDefaults,
  371. })
  372. if err != nil {
  373. return err
  374. }
  375. }
  376. if err := checkPermissions(); err != nil {
  377. glog.Error(err)
  378. }
  379. utilruntime.ReallyCrash = s.ReallyCrashForTesting
  380. rand.Seed(time.Now().UTC().UnixNano())
  381. // TODO(vmarmol): Do this through container config.
  382. oomAdjuster := kubeDeps.OOMAdjuster
  383. if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
  384. glog.Warning(err)
  385. }
  386. if err := RunKubelet(&s.KubeletConfiguration, kubeDeps, s.RunOnce, standaloneMode); err != nil {
  387. return err
  388. }
  389. if s.HealthzPort > 0 {
  390. healthz.DefaultHealthz()
  391. go wait.Until(func() {
  392. err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
  393. if err != nil {
  394. glog.Errorf("Starting health server failed: %v", err)
  395. }
  396. }, 5*time.Second, wait.NeverStop)
  397. }
  398. if s.RunOnce {
  399. return nil
  400. }
  401. <-done
  402. return nil
  403. }
  404. // getNodeName returns the node name according to the cloud provider
  405. // if cloud provider is specified. Otherwise, returns the hostname of the node.
  406. func getNodeName(cloud cloudprovider.Interface, hostname string) (string, error) {
  407. if cloud == nil {
  408. return hostname, nil
  409. }
  410. instances, ok := cloud.Instances()
  411. if !ok {
  412. return "", fmt.Errorf("failed to get instances from cloud provider")
  413. }
  414. nodeName, err := instances.CurrentNodeName(hostname)
  415. if err != nil {
  416. return "", fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
  417. }
  418. glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
  419. return nodeName, nil
  420. }
  421. // InitializeTLS checks for a configured TLSCertFile and TLSPrivateKeyFile: if unspecified a new self-signed
  422. // certificate and key file are generated. Returns a configured server.TLSOptions object.
  423. func InitializeTLS(kc *componentconfig.KubeletConfiguration) (*server.TLSOptions, error) {
  424. if kc.TLSCertFile == "" && kc.TLSPrivateKeyFile == "" {
  425. kc.TLSCertFile = path.Join(kc.CertDirectory, "kubelet.crt")
  426. kc.TLSPrivateKeyFile = path.Join(kc.CertDirectory, "kubelet.key")
  427. if !crypto.FoundCertOrKey(kc.TLSCertFile, kc.TLSPrivateKeyFile) {
  428. if err := crypto.GenerateSelfSignedCert(nodeutil.GetHostname(kc.HostnameOverride), kc.TLSCertFile, kc.TLSPrivateKeyFile, nil, nil); err != nil {
  429. return nil, fmt.Errorf("unable to generate self signed cert: %v", err)
  430. }
  431. glog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile)
  432. }
  433. }
  434. tlsOptions := &server.TLSOptions{
  435. Config: &tls.Config{
  436. // Can't use SSLv3 because of POODLE and BEAST
  437. // Can't use TLSv1.0 because of POODLE and BEAST using CBC cipher
  438. // Can't use TLSv1.1 because of RC4 cipher usage
  439. MinVersion: tls.VersionTLS12,
  440. // Populate PeerCertificates in requests, but don't yet reject connections without certificates.
  441. ClientAuth: tls.RequestClientCert,
  442. },
  443. CertFile: kc.TLSCertFile,
  444. KeyFile: kc.TLSPrivateKeyFile,
  445. }
  446. return tlsOptions, nil
  447. }
  448. func kubeconfigClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
  449. if s.RequireKubeConfig {
  450. // Ignores the values of s.APIServerList
  451. return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
  452. &clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig.Value()},
  453. &clientcmd.ConfigOverrides{},
  454. ).ClientConfig()
  455. }
  456. return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
  457. &clientcmd.ClientConfigLoadingRules{ExplicitPath: s.KubeConfig.Value()},
  458. &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: s.APIServerList[0]}},
  459. ).ClientConfig()
  460. }
  461. // createClientConfig creates a client configuration from the command line
  462. // arguments. If --kubeconfig is explicitly set, it will be used. If it is
  463. // not set, we attempt to load the default kubeconfig file, and if we cannot,
  464. // we fall back to the default client with no auth - this fallback does not, in
  465. // and of itself, constitute an error.
  466. func createClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
  467. if s.RequireKubeConfig {
  468. return kubeconfigClientConfig(s)
  469. }
  470. // TODO: handle a new --standalone flag that bypasses kubeconfig loading and returns no error.
  471. // DEPRECATED: all subsequent code is deprecated
  472. if len(s.APIServerList) == 0 {
  473. return nil, fmt.Errorf("no api servers specified")
  474. }
  475. // TODO: adapt Kube client to support LB over several servers
  476. if len(s.APIServerList) > 1 {
  477. glog.Infof("Multiple api servers specified. Picking first one")
  478. }
  479. if s.KubeConfig.Provided() {
  480. return kubeconfigClientConfig(s)
  481. }
  482. // If KubeConfig was not provided, try to load the default file, then fall back
  483. // to a default auth config.
  484. clientConfig, err := kubeconfigClientConfig(s)
  485. if err != nil {
  486. glog.Warningf("Could not load kubeconfig file %s: %v. Using default client config instead.", s.KubeConfig, err)
  487. authInfo := &clientauth.Info{}
  488. authConfig, err := authInfo.MergeWithConfig(restclient.Config{})
  489. if err != nil {
  490. return nil, err
  491. }
  492. authConfig.Host = s.APIServerList[0]
  493. clientConfig = &authConfig
  494. }
  495. return clientConfig, nil
  496. }
  497. // CreateAPIServerClientConfig generates a client.Config from command line flags,
  498. // including api-server-list, via createClientConfig and then injects chaos into
  499. // the configuration via addChaosToClientConfig. This func is exported to support
  500. // integration with third party kubelet extensions (e.g. kubernetes-mesos).
  501. func CreateAPIServerClientConfig(s *options.KubeletServer) (*restclient.Config, error) {
  502. clientConfig, err := createClientConfig(s)
  503. if err != nil {
  504. return nil, err
  505. }
  506. clientConfig.ContentType = s.ContentType
  507. // Override kubeconfig qps/burst settings from flags
  508. clientConfig.QPS = float32(s.KubeAPIQPS)
  509. clientConfig.Burst = int(s.KubeAPIBurst)
  510. addChaosToClientConfig(s, clientConfig)
  511. return clientConfig, nil
  512. }
  513. // addChaosToClientConfig injects random errors into client connections if configured.
  514. func addChaosToClientConfig(s *options.KubeletServer, config *restclient.Config) {
  515. if s.ChaosChance != 0.0 {
  516. config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
  517. seed := chaosclient.NewSeed(1)
  518. // TODO: introduce a standard chaos package with more tunables - this is just a proof of concept
  519. // TODO: introduce random latency and stalls
  520. return chaosclient.NewChaosRoundTripper(rt, chaosclient.LogChaos, seed.P(s.ChaosChance, chaosclient.ErrSimulatedConnectionResetByPeer))
  521. }
  522. }
  523. }
  524. // RunKubelet is responsible for setting up and running a kubelet. It is used in three different applications:
  525. // 1 Integration tests
  526. // 2 Kubelet binary
  527. // 3 Standalone 'kubernetes' binary
  528. // Eventually, #2 will be replaced with instances of #3
  529. func RunKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error {
  530. hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride)
  531. // Query the cloud provider for our node name, default to hostname if kcfg.Cloud == nil
  532. nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
  533. if err != nil {
  534. return err
  535. }
  536. eventBroadcaster := record.NewBroadcaster()
  537. kubeDeps.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: nodeName})
  538. eventBroadcaster.StartLogging(glog.V(3).Infof)
  539. if kubeDeps.EventClient != nil {
  540. glog.V(4).Infof("Sending events to api server.")
  541. eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
  542. } else {
  543. glog.Warning("No api server defined - no events will be sent to API server.")
  544. }
  545. // TODO(mtaufen): I moved the validation of these fields here, from UnsecuredKubeletConfig,
  546. // so that I could remove the associated fields from KubeletConfig. I would
  547. // prefer this to be done as part of an independent validation step on the
  548. // KubeletConfiguration. But as far as I can tell, we don't have an explicit
  549. // place for validation of the KubeletConfiguration yet.
  550. hostNetworkSources, err := kubetypes.GetValidatedSources(kubeCfg.HostNetworkSources)
  551. if err != nil {
  552. return err
  553. }
  554. hostPIDSources, err := kubetypes.GetValidatedSources(kubeCfg.HostPIDSources)
  555. if err != nil {
  556. return err
  557. }
  558. hostIPCSources, err := kubetypes.GetValidatedSources(kubeCfg.HostIPCSources)
  559. if err != nil {
  560. return err
  561. }
  562. privilegedSources := capabilities.PrivilegedSources{
  563. HostNetworkSources: hostNetworkSources,
  564. HostPIDSources: hostPIDSources,
  565. HostIPCSources: hostIPCSources,
  566. }
  567. capabilities.Setup(kubeCfg.AllowPrivileged, privilegedSources, 0)
  568. credentialprovider.SetPreferredDockercfgPath(kubeCfg.RootDirectory)
  569. glog.V(2).Infof("Using root directory: %v", kubeCfg.RootDirectory)
  570. builder := kubeDeps.Builder
  571. if builder == nil {
  572. builder = CreateAndInitKubelet
  573. }
  574. if kubeDeps.OSInterface == nil {
  575. kubeDeps.OSInterface = kubecontainer.RealOS{}
  576. }
  577. k, err := builder(kubeCfg, kubeDeps, standaloneMode)
  578. if err != nil {
  579. return fmt.Errorf("failed to create kubelet: %v", err)
  580. }
  581. // NewMainKubelet should have set up a pod source config if one didn't exist
  582. // when the builder was run. This is just a precaution.
  583. if kubeDeps.PodConfig == nil {
  584. return fmt.Errorf("failed to create kubelet, pod source config was nil!")
  585. }
  586. podCfg := kubeDeps.PodConfig
  587. rlimit.RlimitNumFiles(uint64(kubeCfg.MaxOpenFiles))
  588. // TODO(dawnchen): remove this once we deprecated old debian containervm images.
  589. // This is a workaround for issue: https://github.com/opencontainers/runc/issues/726
  590. // The current chosen number is consistent with most of other os dist.
  591. const maxkeysPath = "/proc/sys/kernel/keys/root_maxkeys"
  592. const minKeys uint64 = 1000000
  593. key, err := ioutil.ReadFile(maxkeysPath)
  594. if err != nil {
  595. glog.Errorf("Cannot read keys quota in %s", maxkeysPath)
  596. } else {
  597. fields := strings.Fields(string(key))
  598. nkey, _ := strconv.ParseUint(fields[0], 10, 64)
  599. if nkey < minKeys {
  600. glog.Infof("Setting keys quota in %s to %d", maxkeysPath, minKeys)
  601. err = ioutil.WriteFile(maxkeysPath, []byte(fmt.Sprintf("%d", uint64(minKeys))), 0644)
  602. if err != nil {
  603. glog.Warningf("Failed to update %s: %v", maxkeysPath, err)
  604. }
  605. }
  606. }
  607. const maxbytesPath = "/proc/sys/kernel/keys/root_maxbytes"
  608. const minBytes uint64 = 25000000
  609. bytes, err := ioutil.ReadFile(maxbytesPath)
  610. if err != nil {
  611. glog.Errorf("Cannot read keys bytes in %s", maxbytesPath)
  612. } else {
  613. fields := strings.Fields(string(bytes))
  614. nbyte, _ := strconv.ParseUint(fields[0], 10, 64)
  615. if nbyte < minBytes {
  616. glog.Infof("Setting keys bytes in %s to %d", maxbytesPath, minBytes)
  617. err = ioutil.WriteFile(maxbytesPath, []byte(fmt.Sprintf("%d", uint64(minBytes))), 0644)
  618. if err != nil {
  619. glog.Warningf("Failed to update %s: %v", maxbytesPath, err)
  620. }
  621. }
  622. }
  623. // process pods and exit.
  624. if runOnce {
  625. if _, err := k.RunOnce(podCfg.Updates()); err != nil {
  626. return fmt.Errorf("runonce failed: %v", err)
  627. }
  628. glog.Infof("Started kubelet %s as runonce", version.Get().String())
  629. } else {
  630. err := startKubelet(k, podCfg, kubeCfg, kubeDeps)
  631. if err != nil {
  632. return err
  633. }
  634. glog.Infof("Started kubelet %s", version.Get().String())
  635. }
  636. return nil
  637. }
  638. func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) error {
  639. // start the kubelet
  640. go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)
  641. // start the kubelet server
  642. if kubeCfg.EnableServer {
  643. go wait.Until(func() {
  644. k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers)
  645. }, 0, wait.NeverStop)
  646. }
  647. if kubeCfg.ReadOnlyPort > 0 {
  648. go wait.Until(func() {
  649. k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
  650. }, 0, wait.NeverStop)
  651. }
  652. return nil
  653. }
  654. func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool) (k kubelet.KubeletBootstrap, err error) {
  655. // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
  656. // up into "per source" synchronizations
  657. k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, standaloneMode)
  658. if err != nil {
  659. return nil, err
  660. }
  661. k.BirthCry()
  662. k.StartGarbageCollection()
  663. return k, nil
  664. }