kubelet.go 115 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubelet
  14. import (
  15. "bytes"
  16. "fmt"
  17. "io"
  18. "io/ioutil"
  19. "net"
  20. "net/http"
  21. "os"
  22. "path"
  23. "path/filepath"
  24. "sort"
  25. "strings"
  26. "sync"
  27. "sync/atomic"
  28. "time"
  29. "github.com/golang/glog"
  30. cadvisorapi "github.com/google/cadvisor/info/v1"
  31. "k8s.io/kubernetes/pkg/api"
  32. utilpod "k8s.io/kubernetes/pkg/api/pod"
  33. "k8s.io/kubernetes/pkg/api/resource"
  34. "k8s.io/kubernetes/pkg/api/unversioned"
  35. "k8s.io/kubernetes/pkg/api/validation"
  36. "k8s.io/kubernetes/pkg/apis/componentconfig"
  37. kubeExternal "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
  38. "k8s.io/kubernetes/pkg/client/cache"
  39. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  40. "k8s.io/kubernetes/pkg/client/record"
  41. "k8s.io/kubernetes/pkg/cloudprovider"
  42. "k8s.io/kubernetes/pkg/fieldpath"
  43. "k8s.io/kubernetes/pkg/fields"
  44. "k8s.io/kubernetes/pkg/kubelet/cadvisor"
  45. "k8s.io/kubernetes/pkg/kubelet/cm"
  46. "k8s.io/kubernetes/pkg/kubelet/config"
  47. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  48. "k8s.io/kubernetes/pkg/kubelet/dockertools"
  49. "k8s.io/kubernetes/pkg/kubelet/envvars"
  50. "k8s.io/kubernetes/pkg/kubelet/events"
  51. "k8s.io/kubernetes/pkg/kubelet/eviction"
  52. "k8s.io/kubernetes/pkg/kubelet/images"
  53. "k8s.io/kubernetes/pkg/kubelet/kuberuntime"
  54. "k8s.io/kubernetes/pkg/kubelet/lifecycle"
  55. "k8s.io/kubernetes/pkg/kubelet/metrics"
  56. "k8s.io/kubernetes/pkg/kubelet/network"
  57. "k8s.io/kubernetes/pkg/kubelet/pleg"
  58. kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
  59. "k8s.io/kubernetes/pkg/kubelet/prober"
  60. proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
  61. "k8s.io/kubernetes/pkg/kubelet/remote"
  62. "k8s.io/kubernetes/pkg/kubelet/rkt"
  63. "k8s.io/kubernetes/pkg/kubelet/server"
  64. "k8s.io/kubernetes/pkg/kubelet/server/stats"
  65. "k8s.io/kubernetes/pkg/kubelet/status"
  66. "k8s.io/kubernetes/pkg/kubelet/sysctl"
  67. kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
  68. "k8s.io/kubernetes/pkg/kubelet/util/format"
  69. "k8s.io/kubernetes/pkg/kubelet/util/ioutils"
  70. "k8s.io/kubernetes/pkg/kubelet/util/queue"
  71. "k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
  72. "k8s.io/kubernetes/pkg/kubelet/volumemanager"
  73. "k8s.io/kubernetes/pkg/runtime"
  74. "k8s.io/kubernetes/pkg/security/apparmor"
  75. "k8s.io/kubernetes/pkg/types"
  76. "k8s.io/kubernetes/pkg/util/bandwidth"
  77. "k8s.io/kubernetes/pkg/util/clock"
  78. utilconfig "k8s.io/kubernetes/pkg/util/config"
  79. utildbus "k8s.io/kubernetes/pkg/util/dbus"
  80. utilexec "k8s.io/kubernetes/pkg/util/exec"
  81. "k8s.io/kubernetes/pkg/util/flowcontrol"
  82. "k8s.io/kubernetes/pkg/util/integer"
  83. kubeio "k8s.io/kubernetes/pkg/util/io"
  84. utilipt "k8s.io/kubernetes/pkg/util/iptables"
  85. "k8s.io/kubernetes/pkg/util/mount"
  86. nodeutil "k8s.io/kubernetes/pkg/util/node"
  87. "k8s.io/kubernetes/pkg/util/oom"
  88. "k8s.io/kubernetes/pkg/util/procfs"
  89. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  90. "k8s.io/kubernetes/pkg/util/sets"
  91. "k8s.io/kubernetes/pkg/util/term"
  92. utilvalidation "k8s.io/kubernetes/pkg/util/validation"
  93. "k8s.io/kubernetes/pkg/util/validation/field"
  94. "k8s.io/kubernetes/pkg/util/wait"
  95. "k8s.io/kubernetes/pkg/volume"
  96. "k8s.io/kubernetes/pkg/volume/util/volumehelper"
  97. "k8s.io/kubernetes/pkg/watch"
  98. "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
  99. "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
  100. "k8s.io/kubernetes/third_party/forked/golang/expansion"
  101. )
  102. const (
  103. // Max amount of time to wait for the container runtime to come up.
  104. maxWaitForContainerRuntime = 5 * time.Minute
  105. // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
  106. nodeStatusUpdateRetry = 5
  107. // Location of container logs.
  108. containerLogsDir = "/var/log/containers"
  109. // max backoff period, exported for the e2e test
  110. MaxContainerBackOff = 300 * time.Second
  111. // Capacity of the channel for storing pods to kill. A small number should
  112. // suffice because a goroutine is dedicated to check the channel and does
  113. // not block on anything else.
  114. podKillingChannelCapacity = 50
  115. // Period for performing global cleanup tasks.
  116. housekeepingPeriod = time.Second * 2
  117. // Period for performing eviction monitoring.
  118. // TODO ensure this is in sync with internal cadvisor housekeeping.
  119. evictionMonitoringPeriod = time.Second * 10
  120. // The path in containers' filesystems where the hosts file is mounted.
  121. etcHostsPath = "/etc/hosts"
  122. // Capacity of the channel for receiving pod lifecycle events. This number
  123. // is a bit arbitrary and may be adjusted in the future.
  124. plegChannelCapacity = 1000
  125. // Generic PLEG relies on relisting for discovering container events.
  126. // A longer period means that kubelet will take longer to detect container
  127. // changes and to update pod status. On the other hand, a shorter period
  128. // will cause more frequent relisting (e.g., container runtime operations),
  129. // leading to higher cpu usage.
  130. // Note that even though we set the period to 1s, the relisting itself can
  131. // take more than 1s to finish if the container runtime responds slowly
  132. // and/or when there are many container changes in one cycle.
  133. plegRelistPeriod = time.Second * 1
  134. // backOffPeriod is the period to back off when pod syncing results in an
  135. // error. It is also used as the base period for the exponential backoff
  136. // container restarts and image pulls.
  137. backOffPeriod = time.Second * 10
  138. // Period for performing container garbage collection.
  139. ContainerGCPeriod = time.Minute
  140. // Period for performing image garbage collection.
  141. ImageGCPeriod = 5 * time.Minute
  142. // maxImagesInStatus is the number of max images we store in image status.
  143. maxImagesInNodeStatus = 50
  144. // Minimum number of dead containers to keep in a pod
  145. minDeadContainerInPod = 1
  146. )
  147. // SyncHandler is an interface implemented by Kubelet, for testability
  148. type SyncHandler interface {
  149. HandlePodAdditions(pods []*api.Pod)
  150. HandlePodUpdates(pods []*api.Pod)
  151. HandlePodRemoves(pods []*api.Pod)
  152. HandlePodReconcile(pods []*api.Pod)
  153. HandlePodSyncs(pods []*api.Pod)
  154. HandlePodCleanups() error
  155. }
  156. // Option is a functional option type for Kubelet
  157. type Option func(*Kubelet)
  158. // bootstrapping interface for kubelet, targets the initialization protocol
  159. type KubeletBootstrap interface {
  160. GetConfiguration() componentconfig.KubeletConfiguration
  161. BirthCry()
  162. StartGarbageCollection()
  163. ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool)
  164. ListenAndServeReadOnly(address net.IP, port uint)
  165. Run(<-chan kubetypes.PodUpdate)
  166. RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
  167. }
  168. // create and initialize a Kubelet instance
  169. type KubeletBuilder func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (KubeletBootstrap, error)
  170. // KubeletDeps is a bin for things we might consider "injected dependencies" -- objects constructed
  171. // at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
  172. // these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
  173. type KubeletDeps struct {
  174. // TODO(mtaufen): KubeletBuilder:
  175. // Mesos currently uses this as a hook to let them make their own call to
  176. // let them wrap the KubeletBootstrap that CreateAndInitKubelet returns with
  177. // their own KubeletBootstrap. It's a useful hook. I need to think about what
  178. // a nice home for it would be. There seems to be a trend, between this and
  179. // the Options fields below, of providing hooks where you can add extra functionality
  180. // to the Kubelet for your solution. Maybe we should centralize these sorts of things?
  181. Builder KubeletBuilder
  182. // TODO(mtaufen): ContainerRuntimeOptions and Options:
  183. // Arrays of functions that can do arbitrary things to the Kubelet and the Runtime
  184. // seem like a difficult path to trace when it's time to debug something.
  185. // I'm leaving these fields here for now, but there is likely an easier-to-follow
  186. // way to support their intended use cases. E.g. ContainerRuntimeOptions
  187. // is used by Mesos to set an environment variable in containers which has
  188. // some connection to their container GC. It seems that Mesos intends to use
  189. // Options to add additional node conditions that are updated as part of the
  190. // Kubelet lifecycle (see https://github.com/kubernetes/kubernetes/pull/21521).
  191. // We should think about providing more explicit ways of doing these things.
  192. ContainerRuntimeOptions []kubecontainer.Option
  193. Options []Option
  194. // Injected Dependencies
  195. Auth server.AuthInterface
  196. CAdvisorInterface cadvisor.Interface
  197. Cloud cloudprovider.Interface
  198. ContainerManager cm.ContainerManager
  199. DockerClient dockertools.DockerInterface
  200. EventClient *clientset.Clientset
  201. KubeClient *clientset.Clientset
  202. Mounter mount.Interface
  203. NetworkPlugins []network.NetworkPlugin
  204. OOMAdjuster *oom.OOMAdjuster
  205. OSInterface kubecontainer.OSInterface
  206. PodConfig *config.PodConfig
  207. Recorder record.EventRecorder
  208. Writer kubeio.Writer
  209. VolumePlugins []volume.VolumePlugin
  210. TLSOptions *server.TLSOptions
  211. }
  212. func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, nodeName string) (*config.PodConfig, error) {
  213. manifestURLHeader := make(http.Header)
  214. if kubeCfg.ManifestURLHeader != "" {
  215. pieces := strings.Split(kubeCfg.ManifestURLHeader, ":")
  216. if len(pieces) != 2 {
  217. return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", kubeCfg.ManifestURLHeader)
  218. }
  219. manifestURLHeader.Set(pieces[0], pieces[1])
  220. }
  221. // source of all configuration
  222. cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
  223. // define file config source
  224. if kubeCfg.PodManifestPath != "" {
  225. glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath)
  226. config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
  227. }
  228. // define url config source
  229. if kubeCfg.ManifestURL != "" {
  230. glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader)
  231. config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
  232. }
  233. if kubeDeps.KubeClient != nil {
  234. glog.Infof("Watching apiserver")
  235. config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
  236. }
  237. return cfg, nil
  238. }
  239. // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
  240. // No initialization of Kubelet and its modules should happen here.
  241. func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) {
  242. if kubeCfg.RootDirectory == "" {
  243. return nil, fmt.Errorf("invalid root directory %q", kubeCfg.RootDirectory)
  244. }
  245. if kubeCfg.SyncFrequency.Duration <= 0 {
  246. return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
  247. }
  248. if kubeCfg.MakeIPTablesUtilChains {
  249. if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 {
  250. return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
  251. }
  252. if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 {
  253. return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
  254. }
  255. if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit {
  256. return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
  257. }
  258. }
  259. hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride)
  260. // Query the cloud provider for our node name, default to hostname
  261. nodeName := hostname
  262. if kubeDeps.Cloud != nil {
  263. var err error
  264. instances, ok := kubeDeps.Cloud.Instances()
  265. if !ok {
  266. return nil, fmt.Errorf("failed to get instances from cloud provider")
  267. }
  268. nodeName, err = instances.CurrentNodeName(hostname)
  269. if err != nil {
  270. return nil, fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
  271. }
  272. glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
  273. }
  274. // TODO: KubeletDeps.KubeClient should be a client interface, but client interface misses certain methods
  275. // used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing
  276. // a nil pointer to it when what we really want is a nil interface.
  277. var kubeClient clientset.Interface
  278. if kubeDeps.KubeClient != nil {
  279. kubeClient = kubeDeps.KubeClient
  280. // TODO: remove this when we've refactored kubelet to only use clientset.
  281. }
  282. if kubeDeps.PodConfig == nil {
  283. var err error
  284. kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
  285. if err != nil {
  286. return nil, err
  287. }
  288. }
  289. containerGCPolicy := kubecontainer.ContainerGCPolicy{
  290. MinAge: kubeCfg.MinimumGCAge.Duration,
  291. MaxPerPodContainer: int(kubeCfg.MaxPerPodContainerCount),
  292. MaxContainers: int(kubeCfg.MaxContainerCount),
  293. }
  294. daemonEndpoints := &api.NodeDaemonEndpoints{
  295. KubeletEndpoint: api.DaemonEndpoint{Port: kubeCfg.Port},
  296. }
  297. imageGCPolicy := images.ImageGCPolicy{
  298. MinAge: kubeCfg.ImageMinimumGCAge.Duration,
  299. HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
  300. LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
  301. }
  302. diskSpacePolicy := DiskSpacePolicy{
  303. DockerFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB),
  304. RootFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB),
  305. }
  306. thresholds, err := eviction.ParseThresholdConfig(kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
  307. if err != nil {
  308. return nil, err
  309. }
  310. evictionConfig := eviction.Config{
  311. PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
  312. MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
  313. Thresholds: thresholds,
  314. }
  315. reservation, err := ParseReservation(kubeCfg.KubeReserved, kubeCfg.SystemReserved)
  316. if err != nil {
  317. return nil, err
  318. }
  319. var dockerExecHandler dockertools.ExecHandler
  320. switch kubeCfg.DockerExecHandlerName {
  321. case "native":
  322. dockerExecHandler = &dockertools.NativeExecHandler{}
  323. case "nsenter":
  324. dockerExecHandler = &dockertools.NsenterExecHandler{}
  325. default:
  326. glog.Warningf("Unknown Docker exec handler %q; defaulting to native", kubeCfg.DockerExecHandlerName)
  327. dockerExecHandler = &dockertools.NativeExecHandler{}
  328. }
  329. serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
  330. if kubeClient != nil {
  331. // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
  332. // than an interface. There is no way to construct a list+watcher using resource name.
  333. listWatch := &cache.ListWatch{
  334. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  335. return kubeClient.Core().Services(api.NamespaceAll).List(options)
  336. },
  337. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  338. return kubeClient.Core().Services(api.NamespaceAll).Watch(options)
  339. },
  340. }
  341. cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run()
  342. }
  343. serviceLister := &cache.StoreToServiceLister{Store: serviceStore}
  344. nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
  345. if kubeClient != nil {
  346. // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
  347. // than an interface. There is no way to construct a list+watcher using resource name.
  348. fieldSelector := fields.Set{api.ObjectNameField: nodeName}.AsSelector()
  349. listWatch := &cache.ListWatch{
  350. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  351. options.FieldSelector = fieldSelector
  352. return kubeClient.Core().Nodes().List(options)
  353. },
  354. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  355. options.FieldSelector = fieldSelector
  356. return kubeClient.Core().Nodes().Watch(options)
  357. },
  358. }
  359. cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
  360. }
  361. nodeLister := &cache.StoreToNodeLister{Store: nodeStore}
  362. nodeInfo := &predicates.CachedNodeInfo{StoreToNodeLister: nodeLister}
  363. // TODO: get the real node object of ourself,
  364. // and use the real node name and UID.
  365. // TODO: what is namespace for node?
  366. nodeRef := &api.ObjectReference{
  367. Kind: "Node",
  368. Name: nodeName,
  369. UID: types.UID(nodeName),
  370. Namespace: "",
  371. }
  372. diskSpaceManager, err := newDiskSpaceManager(kubeDeps.CAdvisorInterface, diskSpacePolicy)
  373. if err != nil {
  374. return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
  375. }
  376. containerRefManager := kubecontainer.NewRefManager()
  377. oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)
  378. // TODO(mtaufen): remove when internal cbr0 implementation gets removed in favor
  379. // of the kubenet network plugin
  380. var myConfigureCBR0 bool = kubeCfg.ConfigureCBR0
  381. var myFlannelExperimentalOverlay bool = kubeCfg.ExperimentalFlannelOverlay
  382. if kubeCfg.NetworkPluginName == "kubenet" {
  383. myConfigureCBR0 = false
  384. myFlannelExperimentalOverlay = false
  385. }
  386. klet := &Kubelet{
  387. hostname: hostname,
  388. nodeName: nodeName,
  389. dockerClient: kubeDeps.DockerClient,
  390. kubeClient: kubeClient,
  391. rootDirectory: kubeCfg.RootDirectory,
  392. resyncInterval: kubeCfg.SyncFrequency.Duration,
  393. containerRefManager: containerRefManager,
  394. httpClient: &http.Client{},
  395. sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
  396. registerNode: kubeCfg.RegisterNode,
  397. registerSchedulable: kubeCfg.RegisterSchedulable,
  398. standaloneMode: standaloneMode,
  399. clusterDomain: kubeCfg.ClusterDomain,
  400. clusterDNS: net.ParseIP(kubeCfg.ClusterDNS),
  401. serviceLister: serviceLister,
  402. nodeLister: nodeLister,
  403. nodeInfo: nodeInfo,
  404. masterServiceNamespace: kubeCfg.MasterServiceNamespace,
  405. streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
  406. recorder: kubeDeps.Recorder,
  407. cadvisor: kubeDeps.CAdvisorInterface,
  408. diskSpaceManager: diskSpaceManager,
  409. cloud: kubeDeps.Cloud,
  410. autoDetectCloudProvider: (kubeExternal.AutoDetectCloudProvider == kubeCfg.CloudProvider),
  411. nodeRef: nodeRef,
  412. nodeLabels: kubeCfg.NodeLabels,
  413. nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
  414. os: kubeDeps.OSInterface,
  415. oomWatcher: oomWatcher,
  416. cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
  417. cgroupRoot: kubeCfg.CgroupRoot,
  418. mounter: kubeDeps.Mounter,
  419. writer: kubeDeps.Writer,
  420. configureCBR0: myConfigureCBR0,
  421. nonMasqueradeCIDR: kubeCfg.NonMasqueradeCIDR,
  422. reconcileCIDR: kubeCfg.ReconcileCIDR,
  423. maxPods: int(kubeCfg.MaxPods),
  424. podsPerCore: int(kubeCfg.PodsPerCore),
  425. nvidiaGPUs: int(kubeCfg.NvidiaGPUs),
  426. syncLoopMonitor: atomic.Value{},
  427. resolverConfig: kubeCfg.ResolverConfig,
  428. cpuCFSQuota: kubeCfg.CPUCFSQuota,
  429. daemonEndpoints: daemonEndpoints,
  430. containerManager: kubeDeps.ContainerManager,
  431. flannelExperimentalOverlay: myFlannelExperimentalOverlay,
  432. flannelHelper: nil,
  433. nodeIP: net.ParseIP(kubeCfg.NodeIP),
  434. clock: clock.RealClock{},
  435. outOfDiskTransitionFrequency: kubeCfg.OutOfDiskTransitionFrequency.Duration,
  436. reservation: *reservation,
  437. enableCustomMetrics: kubeCfg.EnableCustomMetrics,
  438. babysitDaemons: kubeCfg.BabysitDaemons,
  439. enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
  440. iptClient: utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4),
  441. makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
  442. iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
  443. iptablesDropBit: int(kubeCfg.IPTablesDropBit),
  444. }
  445. if klet.flannelExperimentalOverlay {
  446. klet.flannelHelper = NewFlannelHelper()
  447. glog.Infof("Flannel is in charge of podCIDR and overlay networking.")
  448. }
  449. if klet.nodeIP != nil {
  450. if err := klet.validateNodeIP(); err != nil {
  451. return nil, err
  452. }
  453. glog.Infof("Using node IP: %q", klet.nodeIP.String())
  454. }
  455. if mode, err := effectiveHairpinMode(componentconfig.HairpinMode(kubeCfg.HairpinMode), kubeCfg.ContainerRuntime, kubeCfg.ConfigureCBR0, kubeCfg.NetworkPluginName); err != nil {
  456. // This is a non-recoverable error. Returning it up the callstack will just
  457. // lead to retries of the same failure, so just fail hard.
  458. glog.Fatalf("Invalid hairpin mode: %v", err)
  459. } else {
  460. klet.hairpinMode = mode
  461. }
  462. glog.Infof("Hairpin mode set to %q", klet.hairpinMode)
  463. if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &networkHost{klet}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil {
  464. return nil, err
  465. } else {
  466. klet.networkPlugin = plug
  467. }
  468. machineInfo, err := klet.GetCachedMachineInfo()
  469. if err != nil {
  470. return nil, err
  471. }
  472. procFs := procfs.NewProcFS()
  473. imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
  474. klet.livenessManager = proberesults.NewManager()
  475. klet.podCache = kubecontainer.NewCache()
  476. klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
  477. if kubeCfg.RemoteRuntimeEndpoint != "" {
  478. kubeCfg.ContainerRuntime = "remote"
  479. // kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified
  480. if kubeCfg.RemoteImageEndpoint == "" {
  481. kubeCfg.RemoteImageEndpoint = kubeCfg.RemoteRuntimeEndpoint
  482. }
  483. }
  484. // Initialize the runtime.
  485. switch kubeCfg.ContainerRuntime {
  486. case "docker":
  487. // Only supported one for now, continue.
  488. klet.containerRuntime = dockertools.NewDockerManager(
  489. kubeDeps.DockerClient,
  490. kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
  491. klet.livenessManager,
  492. containerRefManager,
  493. klet.podManager,
  494. machineInfo,
  495. kubeCfg.PodInfraContainerImage,
  496. float32(kubeCfg.RegistryPullQPS),
  497. int(kubeCfg.RegistryBurst),
  498. containerLogsDir,
  499. kubeDeps.OSInterface,
  500. klet.networkPlugin,
  501. klet,
  502. klet.httpClient,
  503. dockerExecHandler,
  504. kubeDeps.OOMAdjuster,
  505. procFs,
  506. klet.cpuCFSQuota,
  507. imageBackOff,
  508. kubeCfg.SerializeImagePulls,
  509. kubeCfg.EnableCustomMetrics,
  510. // If using "kubenet", the Kubernetes network plugin that wraps
  511. // CNI's bridge plugin, it knows how to set the hairpin veth flag
  512. // so we tell the container runtime to back away from setting it.
  513. // If the kubelet is started with any other plugin we can't be
  514. // sure it handles the hairpin case so we instruct the docker
  515. // runtime to set the flag instead.
  516. klet.hairpinMode == componentconfig.HairpinVeth && kubeCfg.NetworkPluginName != "kubenet",
  517. kubeCfg.SeccompProfileRoot,
  518. kubeDeps.ContainerRuntimeOptions...,
  519. )
  520. case "rkt":
  521. // TODO: Include hairpin mode settings in rkt?
  522. conf := &rkt.Config{
  523. Path: kubeCfg.RktPath,
  524. Stage1Image: kubeCfg.RktStage1Image,
  525. InsecureOptions: "image,ondisk",
  526. }
  527. rktRuntime, err := rkt.New(
  528. kubeCfg.RktAPIEndpoint,
  529. conf,
  530. klet,
  531. kubeDeps.Recorder,
  532. containerRefManager,
  533. klet.podManager,
  534. klet.livenessManager,
  535. klet.httpClient,
  536. klet.networkPlugin,
  537. klet.hairpinMode == componentconfig.HairpinVeth,
  538. utilexec.New(),
  539. kubecontainer.RealOS{},
  540. imageBackOff,
  541. kubeCfg.SerializeImagePulls,
  542. kubeCfg.RuntimeRequestTimeout.Duration,
  543. )
  544. if err != nil {
  545. return nil, err
  546. }
  547. klet.containerRuntime = rktRuntime
  548. case "remote":
  549. remoteRuntimeService, err := remote.NewRemoteRuntimeService(kubeCfg.RemoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
  550. if err != nil {
  551. return nil, err
  552. }
  553. remoteImageService, err := remote.NewRemoteImageService(kubeCfg.RemoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
  554. if err != nil {
  555. return nil, err
  556. }
  557. klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager(
  558. kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
  559. klet.livenessManager,
  560. containerRefManager,
  561. kubeDeps.OSInterface,
  562. klet.networkPlugin,
  563. klet,
  564. klet.httpClient,
  565. imageBackOff,
  566. kubeCfg.SerializeImagePulls,
  567. klet.cpuCFSQuota,
  568. remoteRuntimeService,
  569. remoteImageService,
  570. )
  571. if err != nil {
  572. return nil, err
  573. }
  574. default:
  575. return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime)
  576. }
  577. // TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
  578. klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, klet.containerRuntime)
  579. klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
  580. klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
  581. klet.updatePodCIDR(kubeCfg.PodCIDR)
  582. // setup containerGC
  583. containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
  584. if err != nil {
  585. return nil, err
  586. }
  587. klet.containerGC = containerGC
  588. klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
  589. // setup imageManager
  590. imageManager, err := images.NewImageGCManager(klet.containerRuntime, kubeDeps.CAdvisorInterface, kubeDeps.Recorder, nodeRef, imageGCPolicy)
  591. if err != nil {
  592. return nil, fmt.Errorf("failed to initialize image manager: %v", err)
  593. }
  594. klet.imageManager = imageManager
  595. klet.runner = klet.containerRuntime
  596. klet.statusManager = status.NewManager(kubeClient, klet.podManager)
  597. klet.probeManager = prober.NewManager(
  598. klet.statusManager,
  599. klet.livenessManager,
  600. klet.runner,
  601. containerRefManager,
  602. kubeDeps.Recorder)
  603. klet.volumePluginMgr, err =
  604. NewInitializedVolumePluginMgr(klet, kubeDeps.VolumePlugins)
  605. if err != nil {
  606. return nil, err
  607. }
  608. // setup volumeManager
  609. klet.volumeManager, err = volumemanager.NewVolumeManager(
  610. kubeCfg.EnableControllerAttachDetach,
  611. nodeName,
  612. klet.podManager,
  613. klet.kubeClient,
  614. klet.volumePluginMgr,
  615. klet.containerRuntime,
  616. kubeDeps.Mounter,
  617. klet.getPodsDir(),
  618. kubeDeps.Recorder)
  619. runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
  620. if err != nil {
  621. return nil, err
  622. }
  623. klet.runtimeCache = runtimeCache
  624. klet.reasonCache = NewReasonCache()
  625. klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
  626. klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
  627. klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
  628. klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
  629. klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
  630. // setup eviction manager
  631. evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers), klet.imageManager, kubeDeps.Recorder, nodeRef, klet.clock)
  632. if err != nil {
  633. return nil, fmt.Errorf("failed to initialize eviction manager: %v", err)
  634. }
  635. klet.evictionManager = evictionManager
  636. klet.AddPodAdmitHandler(evictionAdmitHandler)
  637. // add sysctl admission
  638. runtimeSupport, err := sysctl.NewRuntimeAdmitHandler(klet.containerRuntime)
  639. if err != nil {
  640. return nil, err
  641. }
  642. safeWhitelist, err := sysctl.NewWhitelist(sysctl.SafeSysctlWhitelist(), api.SysctlsPodAnnotationKey)
  643. if err != nil {
  644. return nil, err
  645. }
  646. // Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec
  647. // Hence, we concatenate those two lists.
  648. safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), kubeCfg.AllowedUnsafeSysctls...)
  649. unsafeWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls, api.UnsafeSysctlsPodAnnotationKey)
  650. if err != nil {
  651. return nil, err
  652. }
  653. klet.AddPodAdmitHandler(runtimeSupport)
  654. klet.AddPodAdmitHandler(safeWhitelist)
  655. klet.AddPodAdmitHandler(unsafeWhitelist)
  656. // enable active deadline handler
  657. activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
  658. if err != nil {
  659. return nil, err
  660. }
  661. klet.AddPodSyncLoopHandler(activeDeadlineHandler)
  662. klet.AddPodSyncHandler(activeDeadlineHandler)
  663. klet.appArmorValidator = apparmor.NewValidator(kubeCfg.ContainerRuntime)
  664. klet.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
  665. // apply functional Option's
  666. for _, opt := range kubeDeps.Options {
  667. opt(klet)
  668. }
  669. // Finally, put the most recent version of the config on the Kubelet, so
  670. // people can see how it was configured.
  671. klet.kubeletConfiguration = *kubeCfg
  672. return klet, nil
  673. }
  674. type serviceLister interface {
  675. List() (api.ServiceList, error)
  676. }
  677. type nodeLister interface {
  678. List() (machines api.NodeList, err error)
  679. }
  680. // Kubelet is the main kubelet implementation.
  681. type Kubelet struct {
  682. kubeletConfiguration componentconfig.KubeletConfiguration
  683. hostname string
  684. nodeName string
  685. dockerClient dockertools.DockerInterface
  686. runtimeCache kubecontainer.RuntimeCache
  687. kubeClient clientset.Interface
  688. iptClient utilipt.Interface
  689. rootDirectory string
  690. // podWorkers handle syncing Pods in response to events.
  691. podWorkers PodWorkers
  692. // resyncInterval is the interval between periodic full reconciliations of
  693. // pods on this node.
  694. resyncInterval time.Duration
  695. // sourcesReady records the sources seen by the kubelet, it is thread-safe.
  696. sourcesReady config.SourcesReady
  697. // podManager is a facade that abstracts away the various sources of pods
  698. // this Kubelet services.
  699. podManager kubepod.Manager
  700. // Needed to observe and respond to situations that could impact node stability
  701. evictionManager eviction.Manager
  702. // Needed to report events for containers belonging to deleted/modified pods.
  703. // Tracks references for reporting events
  704. containerRefManager *kubecontainer.RefManager
  705. // Optional, defaults to /logs/ from /var/log
  706. logServer http.Handler
  707. // Optional, defaults to simple Docker implementation
  708. runner kubecontainer.ContainerCommandRunner
  709. // Optional, client for http requests, defaults to empty client
  710. httpClient kubetypes.HttpGetter
  711. // cAdvisor used for container information.
  712. cadvisor cadvisor.Interface
  713. // Set to true to have the node register itself with the apiserver.
  714. registerNode bool
  715. // Set to true to have the node register itself as schedulable.
  716. registerSchedulable bool
  717. // for internal book keeping; access only from within registerWithApiserver
  718. registrationCompleted bool
  719. // Set to true if the kubelet is in standalone mode (i.e. setup without an apiserver)
  720. standaloneMode bool
  721. // If non-empty, use this for container DNS search.
  722. clusterDomain string
  723. // If non-nil, use this for container DNS server.
  724. clusterDNS net.IP
  725. // masterServiceNamespace is the namespace that the master service is exposed in.
  726. masterServiceNamespace string
  727. // serviceLister knows how to list services
  728. serviceLister serviceLister
  729. // nodeLister knows how to list nodes
  730. nodeLister nodeLister
  731. // nodeInfo knows how to get information about the node for this kubelet.
  732. nodeInfo predicates.NodeInfo
  733. // a list of node labels to register
  734. nodeLabels map[string]string
  735. // Last timestamp when runtime responded on ping.
  736. // Mutex is used to protect this value.
  737. runtimeState *runtimeState
  738. // Volume plugins.
  739. volumePluginMgr *volume.VolumePluginMgr
  740. // Network plugin.
  741. networkPlugin network.NetworkPlugin
  742. // Handles container probing.
  743. probeManager prober.Manager
  744. // Manages container health check results.
  745. livenessManager proberesults.Manager
  746. // How long to keep idle streaming command execution/port forwarding
  747. // connections open before terminating them
  748. streamingConnectionIdleTimeout time.Duration
  749. // The EventRecorder to use
  750. recorder record.EventRecorder
  751. // Policy for handling garbage collection of dead containers.
  752. containerGC kubecontainer.ContainerGC
  753. // Manager for image garbage collection.
  754. imageManager images.ImageGCManager
  755. // Diskspace manager.
  756. diskSpaceManager diskSpaceManager
  757. // Cached MachineInfo returned by cadvisor.
  758. machineInfo *cadvisorapi.MachineInfo
  759. // Syncs pods statuses with apiserver; also used as a cache of statuses.
  760. statusManager status.Manager
  761. // VolumeManager runs a set of asynchronous loops that figure out which
  762. // volumes need to be attached/mounted/unmounted/detached based on the pods
  763. // scheduled on this node and makes it so.
  764. volumeManager volumemanager.VolumeManager
  765. // Cloud provider interface.
  766. cloud cloudprovider.Interface
  767. autoDetectCloudProvider bool
  768. // Reference to this node.
  769. nodeRef *api.ObjectReference
  770. // Container runtime.
  771. containerRuntime kubecontainer.Runtime
  772. // reasonCache caches the failure reason of the last creation of all containers, which is
  773. // used for generating ContainerStatus.
  774. reasonCache *ReasonCache
  775. // nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
  776. // Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
  777. // in nodecontroller. There are several constraints:
  778. // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
  779. // N means number of retries allowed for kubelet to post node status. It is pointless
  780. // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
  781. // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
  782. // The constant must be less than podEvictionTimeout.
  783. // 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
  784. // status. Kubelet may fail to update node status reliably if the value is too small,
  785. // as it takes time to gather all necessary node information.
  786. nodeStatusUpdateFrequency time.Duration
  787. // Generates pod events.
  788. pleg pleg.PodLifecycleEventGenerator
  789. // Store kubecontainer.PodStatus for all pods.
  790. podCache kubecontainer.Cache
  791. // os is a facade for various syscalls that need to be mocked during testing.
  792. os kubecontainer.OSInterface
  793. // Watcher of out of memory events.
  794. oomWatcher OOMWatcher
  795. // Monitor resource usage
  796. resourceAnalyzer stats.ResourceAnalyzer
  797. // Whether or not we should have the QOS cgroup hierarchy for resource management
  798. cgroupsPerQOS bool
  799. // If non-empty, pass this to the container runtime as the root cgroup.
  800. cgroupRoot string
  801. // Mounter to use for volumes.
  802. mounter mount.Interface
  803. // Writer interface to use for volumes.
  804. writer kubeio.Writer
  805. // Manager of non-Runtime containers.
  806. containerManager cm.ContainerManager
  807. nodeConfig cm.NodeConfig
  808. // Whether or not kubelet should take responsibility for keeping cbr0 in
  809. // the correct state.
  810. configureCBR0 bool
  811. reconcileCIDR bool
  812. // Traffic to IPs outside this range will use IP masquerade.
  813. nonMasqueradeCIDR string
  814. // Maximum Number of Pods which can be run by this Kubelet
  815. maxPods int
  816. // Number of NVIDIA GPUs on this node
  817. nvidiaGPUs int
  818. // Monitor Kubelet's sync loop
  819. syncLoopMonitor atomic.Value
  820. // Container restart Backoff
  821. backOff *flowcontrol.Backoff
  822. // Channel for sending pods to kill.
  823. podKillingCh chan *kubecontainer.PodPair
  824. // The configuration file used as the base to generate the container's
  825. // DNS resolver configuration file. This can be used in conjunction with
  826. // clusterDomain and clusterDNS.
  827. resolverConfig string
  828. // Optionally shape the bandwidth of a pod
  829. // TODO: remove when kubenet plugin is ready
  830. shaper bandwidth.BandwidthShaper
  831. // True if container cpu limits should be enforced via cgroup CFS quota
  832. cpuCFSQuota bool
  833. // Information about the ports which are opened by daemons on Node running this Kubelet server.
  834. daemonEndpoints *api.NodeDaemonEndpoints
  835. // A queue used to trigger pod workers.
  836. workQueue queue.WorkQueue
  837. // oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
  838. oneTimeInitializer sync.Once
  839. // flannelExperimentalOverlay determines whether the experimental flannel
  840. // network overlay is active.
  841. flannelExperimentalOverlay bool
  842. // TODO: Flannelhelper doesn't store any state, we can instantiate it
  843. // on the fly if we're confident the dbus connetions it opens doesn't
  844. // put the system under duress.
  845. flannelHelper *FlannelHelper
  846. // If non-nil, use this IP address for the node
  847. nodeIP net.IP
  848. // clock is an interface that provides time related functionality in a way that makes it
  849. // easy to test the code.
  850. clock clock.Clock
  851. // outOfDiskTransitionFrequency specifies the amount of time the kubelet has to be actually
  852. // not out of disk before it can transition the node condition status from out-of-disk to
  853. // not-out-of-disk. This prevents a pod that causes out-of-disk condition from repeatedly
  854. // getting rescheduled onto the node.
  855. outOfDiskTransitionFrequency time.Duration
  856. // reservation specifies resources which are reserved for non-pod usage, including kubernetes and
  857. // non-kubernetes system processes.
  858. reservation kubetypes.Reservation
  859. // support gathering custom metrics.
  860. enableCustomMetrics bool
  861. // How the Kubelet should setup hairpin NAT. Can take the values: "promiscuous-bridge"
  862. // (make cbr0 promiscuous), "hairpin-veth" (set the hairpin flag on veth interfaces)
  863. // or "none" (do nothing).
  864. hairpinMode componentconfig.HairpinMode
  865. // The node has babysitter process monitoring docker and kubelet
  866. babysitDaemons bool
  867. // handlers called during the tryUpdateNodeStatus cycle
  868. setNodeStatusFuncs []func(*api.Node) error
  869. // TODO: think about moving this to be centralized in PodWorkers in follow-on.
  870. // the list of handlers to call during pod admission.
  871. lifecycle.PodAdmitHandlers
  872. // the list of handlers to call during pod sync loop.
  873. lifecycle.PodSyncLoopHandlers
  874. // the list of handlers to call during pod sync.
  875. lifecycle.PodSyncHandlers
  876. // the number of allowed pods per core
  877. podsPerCore int
  878. // enableControllerAttachDetach indicates the Attach/Detach controller
  879. // should manage attachment/detachment of volumes scheduled to this node,
  880. // and disable kubelet from executing any attach/detach operations
  881. enableControllerAttachDetach bool
  882. // trigger deleting containers in a pod
  883. containerDeletor *podContainerDeletor
  884. // config iptables util rules
  885. makeIPTablesUtilChains bool
  886. // The bit of the fwmark space to mark packets for SNAT.
  887. iptablesMasqueradeBit int
  888. // The bit of the fwmark space to mark packets for dropping.
  889. iptablesDropBit int
  890. // The AppArmor validator for checking whether AppArmor is supported.
  891. appArmorValidator apparmor.Validator
  892. }
  893. // setupDataDirs creates:
  894. // 1. the root directory
  895. // 2. the pods directory
  896. // 3. the plugins directory
  897. func (kl *Kubelet) setupDataDirs() error {
  898. kl.rootDirectory = path.Clean(kl.rootDirectory)
  899. if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
  900. return fmt.Errorf("error creating root directory: %v", err)
  901. }
  902. if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
  903. return fmt.Errorf("error creating pods directory: %v", err)
  904. }
  905. if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
  906. return fmt.Errorf("error creating plugins directory: %v", err)
  907. }
  908. return nil
  909. }
  910. // Get a list of pods that have data directories.
  911. func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
  912. podInfos, err := ioutil.ReadDir(kl.getPodsDir())
  913. if err != nil {
  914. return nil, err
  915. }
  916. pods := []types.UID{}
  917. for i := range podInfos {
  918. if podInfos[i].IsDir() {
  919. pods = append(pods, types.UID(podInfos[i].Name()))
  920. }
  921. }
  922. return pods, nil
  923. }
  924. // Starts garbage collection threads.
  925. func (kl *Kubelet) StartGarbageCollection() {
  926. go wait.Until(func() {
  927. if err := kl.containerGC.GarbageCollect(kl.sourcesReady.AllReady()); err != nil {
  928. glog.Errorf("Container garbage collection failed: %v", err)
  929. }
  930. }, ContainerGCPeriod, wait.NeverStop)
  931. go wait.Until(func() {
  932. if err := kl.imageManager.GarbageCollect(); err != nil {
  933. glog.Errorf("Image garbage collection failed: %v", err)
  934. }
  935. }, ImageGCPeriod, wait.NeverStop)
  936. }
  937. // initializeModules will initialize internal modules that do not require the container runtime to be up.
  938. // Note that the modules here must not depend on modules that are not initialized here.
  939. func (kl *Kubelet) initializeModules() error {
  940. // Step 1: Promethues metrics.
  941. metrics.Register(kl.runtimeCache)
  942. // Step 2: Setup filesystem directories.
  943. if err := kl.setupDataDirs(); err != nil {
  944. return err
  945. }
  946. // Step 3: If the container logs directory does not exist, create it.
  947. if _, err := os.Stat(containerLogsDir); err != nil {
  948. if err := kl.os.MkdirAll(containerLogsDir, 0755); err != nil {
  949. glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)
  950. }
  951. }
  952. // Step 4: Start the image manager.
  953. if err := kl.imageManager.Start(); err != nil {
  954. return fmt.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
  955. }
  956. // Step 5: Start container manager.
  957. node, err := kl.getNodeAnyWay()
  958. if err != nil {
  959. glog.Errorf("Cannot get Node info: %v", err)
  960. return fmt.Errorf("Kubelet failed to get node info.")
  961. }
  962. if err := kl.containerManager.Start(node); err != nil {
  963. return fmt.Errorf("Failed to start ContainerManager %v", err)
  964. }
  965. // Step 6: Start out of memory watcher.
  966. if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
  967. return fmt.Errorf("Failed to start OOM watcher %v", err)
  968. }
  969. // Step 7: Start resource analyzer
  970. kl.resourceAnalyzer.Start()
  971. return nil
  972. }
  973. // initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
  974. func (kl *Kubelet) initializeRuntimeDependentModules() {
  975. if err := kl.cadvisor.Start(); err != nil {
  976. // Fail kubelet and rely on the babysitter to retry starting kubelet.
  977. // TODO(random-liu): Add backoff logic in the babysitter
  978. glog.Fatalf("Failed to start cAdvisor %v", err)
  979. }
  980. // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
  981. if err := kl.evictionManager.Start(kl, kl.getActivePods, evictionMonitoringPeriod); err != nil {
  982. kl.runtimeState.setInternalError(fmt.Errorf("failed to start eviction manager %v", err))
  983. }
  984. }
  985. // Run starts the kubelet reacting to config updates
  986. func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
  987. if kl.logServer == nil {
  988. kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
  989. }
  990. if kl.kubeClient == nil {
  991. glog.Warning("No api server defined - no node status update will be sent.")
  992. }
  993. if err := kl.initializeModules(); err != nil {
  994. kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.KubeletSetupFailed, err.Error())
  995. glog.Error(err)
  996. kl.runtimeState.setInitError(err)
  997. }
  998. // Start volume manager
  999. go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
  1000. if kl.kubeClient != nil {
  1001. // Start syncing node status immediately, this may set up things the runtime needs to run.
  1002. go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
  1003. }
  1004. go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
  1005. go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
  1006. // Start loop to sync iptables util rules
  1007. if kl.makeIPTablesUtilChains {
  1008. go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
  1009. }
  1010. // Start a goroutine responsible for killing pods (that are not properly
  1011. // handled by pod workers).
  1012. go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
  1013. // Start component sync loops.
  1014. kl.statusManager.Start()
  1015. kl.probeManager.Start()
  1016. // Start the pod lifecycle event generator.
  1017. kl.pleg.Start()
  1018. kl.syncLoop(updates, kl)
  1019. }
  1020. // getActivePods returns non-terminal pods
  1021. func (kl *Kubelet) getActivePods() []*api.Pod {
  1022. allPods := kl.podManager.GetPods()
  1023. activePods := kl.filterOutTerminatedPods(allPods)
  1024. return activePods
  1025. }
  1026. // makeMounts determines the mount points for the given container.
  1027. func makeMounts(pod *api.Pod, podDir string, container *api.Container, hostName, hostDomain, podIP string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.Mount, error) {
  1028. // Kubernetes only mounts on /etc/hosts if :
  1029. // - container does not use hostNetwork and
  1030. // - container is not an infrastructure(pause) container
  1031. // - container is not already mounting on /etc/hosts
  1032. // When the pause container is being created, its IP is still unknown. Hence, PodIP will not have been set.
  1033. mountEtcHostsFile := (pod.Spec.SecurityContext == nil || !pod.Spec.SecurityContext.HostNetwork) && len(podIP) > 0
  1034. glog.V(3).Infof("container: %v/%v/%v podIP: %q creating hosts mount: %v", pod.Namespace, pod.Name, container.Name, podIP, mountEtcHostsFile)
  1035. mounts := []kubecontainer.Mount{}
  1036. for _, mount := range container.VolumeMounts {
  1037. mountEtcHostsFile = mountEtcHostsFile && (mount.MountPath != etcHostsPath)
  1038. vol, ok := podVolumes[mount.Name]
  1039. if !ok {
  1040. glog.Warningf("Mount cannot be satisfied for container %q, because the volume is missing: %q", container.Name, mount)
  1041. continue
  1042. }
  1043. relabelVolume := false
  1044. // If the volume supports SELinux and it has not been
  1045. // relabeled already and it is not a read-only volume,
  1046. // relabel it and mark it as labeled
  1047. if vol.Mounter.GetAttributes().Managed && vol.Mounter.GetAttributes().SupportsSELinux && !vol.SELinuxLabeled {
  1048. vol.SELinuxLabeled = true
  1049. relabelVolume = true
  1050. }
  1051. hostPath, err := volume.GetPath(vol.Mounter)
  1052. if err != nil {
  1053. return nil, err
  1054. }
  1055. if mount.SubPath != "" {
  1056. hostPath = filepath.Join(hostPath, mount.SubPath)
  1057. }
  1058. mounts = append(mounts, kubecontainer.Mount{
  1059. Name: mount.Name,
  1060. ContainerPath: mount.MountPath,
  1061. HostPath: hostPath,
  1062. ReadOnly: mount.ReadOnly,
  1063. SELinuxRelabel: relabelVolume,
  1064. })
  1065. }
  1066. if mountEtcHostsFile {
  1067. hostsMount, err := makeHostsMount(podDir, podIP, hostName, hostDomain)
  1068. if err != nil {
  1069. return nil, err
  1070. }
  1071. mounts = append(mounts, *hostsMount)
  1072. }
  1073. return mounts, nil
  1074. }
  1075. // makeHostsMount makes the mountpoint for the hosts file that the containers
  1076. // in a pod are injected with.
  1077. func makeHostsMount(podDir, podIP, hostName, hostDomainName string) (*kubecontainer.Mount, error) {
  1078. hostsFilePath := path.Join(podDir, "etc-hosts")
  1079. if err := ensureHostsFile(hostsFilePath, podIP, hostName, hostDomainName); err != nil {
  1080. return nil, err
  1081. }
  1082. return &kubecontainer.Mount{
  1083. Name: "k8s-managed-etc-hosts",
  1084. ContainerPath: etcHostsPath,
  1085. HostPath: hostsFilePath,
  1086. ReadOnly: false,
  1087. }, nil
  1088. }
  1089. // ensureHostsFile ensures that the given host file has an up-to-date ip, host
  1090. // name, and domain name.
  1091. func ensureHostsFile(fileName, hostIP, hostName, hostDomainName string) error {
  1092. if _, err := os.Stat(fileName); os.IsExist(err) {
  1093. glog.V(4).Infof("kubernetes-managed etc-hosts file exits. Will not be recreated: %q", fileName)
  1094. return nil
  1095. }
  1096. var buffer bytes.Buffer
  1097. buffer.WriteString("# Kubernetes-managed hosts file.\n")
  1098. buffer.WriteString("127.0.0.1\tlocalhost\n") // ipv4 localhost
  1099. buffer.WriteString("::1\tlocalhost ip6-localhost ip6-loopback\n") // ipv6 localhost
  1100. buffer.WriteString("fe00::0\tip6-localnet\n")
  1101. buffer.WriteString("fe00::0\tip6-mcastprefix\n")
  1102. buffer.WriteString("fe00::1\tip6-allnodes\n")
  1103. buffer.WriteString("fe00::2\tip6-allrouters\n")
  1104. if len(hostDomainName) > 0 {
  1105. buffer.WriteString(fmt.Sprintf("%s\t%s.%s\t%s\n", hostIP, hostName, hostDomainName, hostName))
  1106. } else {
  1107. buffer.WriteString(fmt.Sprintf("%s\t%s\n", hostIP, hostName))
  1108. }
  1109. return ioutil.WriteFile(fileName, buffer.Bytes(), 0644)
  1110. }
  1111. func makePortMappings(container *api.Container) (ports []kubecontainer.PortMapping) {
  1112. names := make(map[string]struct{})
  1113. for _, p := range container.Ports {
  1114. pm := kubecontainer.PortMapping{
  1115. HostPort: int(p.HostPort),
  1116. ContainerPort: int(p.ContainerPort),
  1117. Protocol: p.Protocol,
  1118. HostIP: p.HostIP,
  1119. }
  1120. // We need to create some default port name if it's not specified, since
  1121. // this is necessary for rkt.
  1122. // http://issue.k8s.io/7710
  1123. if p.Name == "" {
  1124. pm.Name = fmt.Sprintf("%s-%s:%d", container.Name, p.Protocol, p.ContainerPort)
  1125. } else {
  1126. pm.Name = fmt.Sprintf("%s-%s", container.Name, p.Name)
  1127. }
  1128. // Protect against exposing the same protocol-port more than once in a container.
  1129. if _, ok := names[pm.Name]; ok {
  1130. glog.Warningf("Port name conflicted, %q is defined more than once", pm.Name)
  1131. continue
  1132. }
  1133. ports = append(ports, pm)
  1134. names[pm.Name] = struct{}{}
  1135. }
  1136. return
  1137. }
  1138. func (kl *Kubelet) GeneratePodHostNameAndDomain(pod *api.Pod) (string, string, error) {
  1139. // TODO(vmarmol): Handle better.
  1140. // Cap hostname at 63 chars (specification is 64bytes which is 63 chars and the null terminating char).
  1141. clusterDomain := kl.clusterDomain
  1142. const hostnameMaxLen = 63
  1143. podAnnotations := pod.Annotations
  1144. if podAnnotations == nil {
  1145. podAnnotations = make(map[string]string)
  1146. }
  1147. hostname := pod.Name
  1148. if len(pod.Spec.Hostname) > 0 {
  1149. if msgs := utilvalidation.IsDNS1123Label(pod.Spec.Hostname); len(msgs) != 0 {
  1150. return "", "", fmt.Errorf("Pod Hostname %q is not a valid DNS label: %s", pod.Spec.Hostname, strings.Join(msgs, ";"))
  1151. }
  1152. hostname = pod.Spec.Hostname
  1153. } else {
  1154. hostnameCandidate := podAnnotations[utilpod.PodHostnameAnnotation]
  1155. if len(utilvalidation.IsDNS1123Label(hostnameCandidate)) == 0 {
  1156. // use hostname annotation, if specified.
  1157. hostname = hostnameCandidate
  1158. }
  1159. }
  1160. if len(hostname) > hostnameMaxLen {
  1161. hostname = hostname[:hostnameMaxLen]
  1162. glog.Errorf("hostname for pod:%q was longer than %d. Truncated hostname to :%q", pod.Name, hostnameMaxLen, hostname)
  1163. }
  1164. hostDomain := ""
  1165. if len(pod.Spec.Subdomain) > 0 {
  1166. if msgs := utilvalidation.IsDNS1123Label(pod.Spec.Subdomain); len(msgs) != 0 {
  1167. return "", "", fmt.Errorf("Pod Subdomain %q is not a valid DNS label: %s", pod.Spec.Subdomain, strings.Join(msgs, ";"))
  1168. }
  1169. hostDomain = fmt.Sprintf("%s.%s.svc.%s", pod.Spec.Subdomain, pod.Namespace, clusterDomain)
  1170. } else {
  1171. subdomainCandidate := pod.Annotations[utilpod.PodSubdomainAnnotation]
  1172. if len(utilvalidation.IsDNS1123Label(subdomainCandidate)) == 0 {
  1173. hostDomain = fmt.Sprintf("%s.%s.svc.%s", subdomainCandidate, pod.Namespace, clusterDomain)
  1174. }
  1175. }
  1176. return hostname, hostDomain, nil
  1177. }
  1178. // GenerateRunContainerOptions generates the RunContainerOptions, which can be used by
  1179. // the container runtime to set parameters for launching a container.
  1180. func (kl *Kubelet) GenerateRunContainerOptions(pod *api.Pod, container *api.Container, podIP string) (*kubecontainer.RunContainerOptions, error) {
  1181. var err error
  1182. opts := &kubecontainer.RunContainerOptions{CgroupParent: kl.cgroupRoot}
  1183. hostname, hostDomainName, err := kl.GeneratePodHostNameAndDomain(pod)
  1184. if err != nil {
  1185. return nil, err
  1186. }
  1187. opts.Hostname = hostname
  1188. podName := volumehelper.GetUniquePodName(pod)
  1189. volumes := kl.volumeManager.GetMountedVolumesForPod(podName)
  1190. opts.PortMappings = makePortMappings(container)
  1191. // Docker does not relabel volumes if the container is running
  1192. // in the host pid or ipc namespaces so the kubelet must
  1193. // relabel the volumes
  1194. if pod.Spec.SecurityContext != nil && (pod.Spec.SecurityContext.HostIPC || pod.Spec.SecurityContext.HostPID) {
  1195. err = kl.relabelVolumes(pod, volumes)
  1196. if err != nil {
  1197. return nil, err
  1198. }
  1199. }
  1200. opts.Mounts, err = makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIP, volumes)
  1201. if err != nil {
  1202. return nil, err
  1203. }
  1204. opts.Envs, err = kl.makeEnvironmentVariables(pod, container, podIP)
  1205. if err != nil {
  1206. return nil, err
  1207. }
  1208. if len(container.TerminationMessagePath) != 0 {
  1209. p := kl.getPodContainerDir(pod.UID, container.Name)
  1210. if err := os.MkdirAll(p, 0750); err != nil {
  1211. glog.Errorf("Error on creating %q: %v", p, err)
  1212. } else {
  1213. opts.PodContainerDir = p
  1214. }
  1215. }
  1216. opts.DNS, opts.DNSSearch, err = kl.GetClusterDNS(pod)
  1217. if err != nil {
  1218. return nil, err
  1219. }
  1220. return opts, nil
  1221. }
  1222. var masterServices = sets.NewString("kubernetes")
  1223. // getServiceEnvVarMap makes a map[string]string of env vars for services a pod in namespace ns should see
  1224. func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
  1225. var (
  1226. serviceMap = make(map[string]api.Service)
  1227. m = make(map[string]string)
  1228. )
  1229. // Get all service resources from the master (via a cache),
  1230. // and populate them into service environment variables.
  1231. if kl.serviceLister == nil {
  1232. // Kubelets without masters (e.g. plain GCE ContainerVM) don't set env vars.
  1233. return m, nil
  1234. }
  1235. services, err := kl.serviceLister.List()
  1236. if err != nil {
  1237. return m, fmt.Errorf("failed to list services when setting up env vars.")
  1238. }
  1239. // project the services in namespace ns onto the master services
  1240. for _, service := range services.Items {
  1241. // ignore services where ClusterIP is "None" or empty
  1242. if !api.IsServiceIPSet(&service) {
  1243. continue
  1244. }
  1245. serviceName := service.Name
  1246. switch service.Namespace {
  1247. // for the case whether the master service namespace is the namespace the pod
  1248. // is in, the pod should receive all the services in the namespace.
  1249. //
  1250. // ordering of the case clauses below enforces this
  1251. case ns:
  1252. serviceMap[serviceName] = service
  1253. case kl.masterServiceNamespace:
  1254. if masterServices.Has(serviceName) {
  1255. if _, exists := serviceMap[serviceName]; !exists {
  1256. serviceMap[serviceName] = service
  1257. }
  1258. }
  1259. }
  1260. }
  1261. services.Items = []api.Service{}
  1262. for _, service := range serviceMap {
  1263. services.Items = append(services.Items, service)
  1264. }
  1265. for _, e := range envvars.FromServices(&services) {
  1266. m[e.Name] = e.Value
  1267. }
  1268. return m, nil
  1269. }
  1270. // Make the environment variables for a pod in the given namespace.
  1271. func (kl *Kubelet) makeEnvironmentVariables(pod *api.Pod, container *api.Container, podIP string) ([]kubecontainer.EnvVar, error) {
  1272. var result []kubecontainer.EnvVar
  1273. // Note: These are added to the docker Config, but are not included in the checksum computed
  1274. // by dockertools.BuildDockerName(...). That way, we can still determine whether an
  1275. // api.Container is already running by its hash. (We don't want to restart a container just
  1276. // because some service changed.)
  1277. //
  1278. // Note that there is a race between Kubelet seeing the pod and kubelet seeing the service.
  1279. // To avoid this users can: (1) wait between starting a service and starting; or (2) detect
  1280. // missing service env var and exit and be restarted; or (3) use DNS instead of env vars
  1281. // and keep trying to resolve the DNS name of the service (recommended).
  1282. serviceEnv, err := kl.getServiceEnvVarMap(pod.Namespace)
  1283. if err != nil {
  1284. return result, err
  1285. }
  1286. // Determine the final values of variables:
  1287. //
  1288. // 1. Determine the final value of each variable:
  1289. // a. If the variable's Value is set, expand the `$(var)` references to other
  1290. // variables in the .Value field; the sources of variables are the declared
  1291. // variables of the container and the service environment variables
  1292. // b. If a source is defined for an environment variable, resolve the source
  1293. // 2. Create the container's environment in the order variables are declared
  1294. // 3. Add remaining service environment vars
  1295. var (
  1296. tmpEnv = make(map[string]string)
  1297. configMaps = make(map[string]*api.ConfigMap)
  1298. secrets = make(map[string]*api.Secret)
  1299. mappingFunc = expansion.MappingFuncFor(tmpEnv, serviceEnv)
  1300. )
  1301. for _, envVar := range container.Env {
  1302. // Accesses apiserver+Pods.
  1303. // So, the master may set service env vars, or kubelet may. In case both are doing
  1304. // it, we delete the key from the kubelet-generated ones so we don't have duplicate
  1305. // env vars.
  1306. // TODO: remove this net line once all platforms use apiserver+Pods.
  1307. delete(serviceEnv, envVar.Name)
  1308. runtimeVal := envVar.Value
  1309. if runtimeVal != "" {
  1310. // Step 1a: expand variable references
  1311. runtimeVal = expansion.Expand(runtimeVal, mappingFunc)
  1312. } else if envVar.ValueFrom != nil {
  1313. // Step 1b: resolve alternate env var sources
  1314. switch {
  1315. case envVar.ValueFrom.FieldRef != nil:
  1316. runtimeVal, err = kl.podFieldSelectorRuntimeValue(envVar.ValueFrom.FieldRef, pod, podIP)
  1317. if err != nil {
  1318. return result, err
  1319. }
  1320. case envVar.ValueFrom.ResourceFieldRef != nil:
  1321. defaultedPod, defaultedContainer, err := kl.defaultPodLimitsForDownwardApi(pod, container)
  1322. if err != nil {
  1323. return result, err
  1324. }
  1325. runtimeVal, err = containerResourceRuntimeValue(envVar.ValueFrom.ResourceFieldRef, defaultedPod, defaultedContainer)
  1326. if err != nil {
  1327. return result, err
  1328. }
  1329. case envVar.ValueFrom.ConfigMapKeyRef != nil:
  1330. name := envVar.ValueFrom.ConfigMapKeyRef.Name
  1331. key := envVar.ValueFrom.ConfigMapKeyRef.Key
  1332. configMap, ok := configMaps[name]
  1333. if !ok {
  1334. configMap, err = kl.kubeClient.Core().ConfigMaps(pod.Namespace).Get(name)
  1335. if err != nil {
  1336. return result, err
  1337. }
  1338. }
  1339. runtimeVal, ok = configMap.Data[key]
  1340. if !ok {
  1341. return result, fmt.Errorf("Couldn't find key %v in ConfigMap %v/%v", key, pod.Namespace, name)
  1342. }
  1343. case envVar.ValueFrom.SecretKeyRef != nil:
  1344. name := envVar.ValueFrom.SecretKeyRef.Name
  1345. key := envVar.ValueFrom.SecretKeyRef.Key
  1346. secret, ok := secrets[name]
  1347. if !ok {
  1348. secret, err = kl.kubeClient.Core().Secrets(pod.Namespace).Get(name)
  1349. if err != nil {
  1350. return result, err
  1351. }
  1352. }
  1353. runtimeValBytes, ok := secret.Data[key]
  1354. if !ok {
  1355. return result, fmt.Errorf("Couldn't find key %v in Secret %v/%v", key, pod.Namespace, name)
  1356. }
  1357. runtimeVal = string(runtimeValBytes)
  1358. }
  1359. }
  1360. tmpEnv[envVar.Name] = runtimeVal
  1361. result = append(result, kubecontainer.EnvVar{Name: envVar.Name, Value: tmpEnv[envVar.Name]})
  1362. }
  1363. // Append remaining service env vars.
  1364. for k, v := range serviceEnv {
  1365. result = append(result, kubecontainer.EnvVar{Name: k, Value: v})
  1366. }
  1367. return result, nil
  1368. }
  1369. // podFieldSelectorRuntimeValue returns the runtime value of the given
  1370. // selector for a pod.
  1371. func (kl *Kubelet) podFieldSelectorRuntimeValue(fs *api.ObjectFieldSelector, pod *api.Pod, podIP string) (string, error) {
  1372. internalFieldPath, _, err := api.Scheme.ConvertFieldLabel(fs.APIVersion, "Pod", fs.FieldPath, "")
  1373. if err != nil {
  1374. return "", err
  1375. }
  1376. switch internalFieldPath {
  1377. case "spec.nodeName":
  1378. return pod.Spec.NodeName, nil
  1379. case "spec.serviceAccountName":
  1380. return pod.Spec.ServiceAccountName, nil
  1381. case "status.podIP":
  1382. return podIP, nil
  1383. }
  1384. return fieldpath.ExtractFieldPathAsString(pod, internalFieldPath)
  1385. }
  1386. // containerResourceRuntimeValue returns the value of the provided container resource
  1387. func containerResourceRuntimeValue(fs *api.ResourceFieldSelector, pod *api.Pod, container *api.Container) (string, error) {
  1388. containerName := fs.ContainerName
  1389. if len(containerName) == 0 {
  1390. return fieldpath.ExtractContainerResourceValue(fs, container)
  1391. } else {
  1392. return fieldpath.ExtractResourceValueByContainerName(fs, pod, containerName)
  1393. }
  1394. }
  1395. // GetClusterDNS returns a list of the DNS servers and a list of the DNS search
  1396. // domains of the cluster.
  1397. func (kl *Kubelet) GetClusterDNS(pod *api.Pod) ([]string, []string, error) {
  1398. var hostDNS, hostSearch []string
  1399. // Get host DNS settings
  1400. if kl.resolverConfig != "" {
  1401. f, err := os.Open(kl.resolverConfig)
  1402. if err != nil {
  1403. return nil, nil, err
  1404. }
  1405. defer f.Close()
  1406. hostDNS, hostSearch, err = kl.parseResolvConf(f)
  1407. if err != nil {
  1408. return nil, nil, err
  1409. }
  1410. }
  1411. useClusterFirstPolicy := pod.Spec.DNSPolicy == api.DNSClusterFirst
  1412. if useClusterFirstPolicy && kl.clusterDNS == nil {
  1413. // clusterDNS is not known.
  1414. // pod with ClusterDNSFirst Policy cannot be created
  1415. kl.recorder.Eventf(pod, api.EventTypeWarning, "MissingClusterDNS", "kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy)
  1416. log := fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. pod: %q. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy, format.Pod(pod))
  1417. kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, "MissingClusterDNS", log)
  1418. // fallback to DNSDefault
  1419. useClusterFirstPolicy = false
  1420. }
  1421. if !useClusterFirstPolicy {
  1422. // When the kubelet --resolv-conf flag is set to the empty string, use
  1423. // DNS settings that override the docker default (which is to use
  1424. // /etc/resolv.conf) and effectively disable DNS lookups. According to
  1425. // the bind documentation, the behavior of the DNS client library when
  1426. // "nameservers" are not specified is to "use the nameserver on the
  1427. // local machine". A nameserver setting of localhost is equivalent to
  1428. // this documented behavior.
  1429. if kl.resolverConfig == "" {
  1430. hostDNS = []string{"127.0.0.1"}
  1431. hostSearch = []string{"."}
  1432. }
  1433. return hostDNS, hostSearch, nil
  1434. }
  1435. // for a pod with DNSClusterFirst policy, the cluster DNS server is the only nameserver configured for
  1436. // the pod. The cluster DNS server itself will forward queries to other nameservers that is configured to use,
  1437. // in case the cluster DNS server cannot resolve the DNS query itself
  1438. dns := []string{kl.clusterDNS.String()}
  1439. var dnsSearch []string
  1440. if kl.clusterDomain != "" {
  1441. nsSvcDomain := fmt.Sprintf("%s.svc.%s", pod.Namespace, kl.clusterDomain)
  1442. svcDomain := fmt.Sprintf("svc.%s", kl.clusterDomain)
  1443. dnsSearch = append([]string{nsSvcDomain, svcDomain, kl.clusterDomain}, hostSearch...)
  1444. } else {
  1445. dnsSearch = hostSearch
  1446. }
  1447. return dns, dnsSearch, nil
  1448. }
  1449. // One of the following arguments must be non-nil: runningPod, status.
  1450. // TODO: Modify containerRuntime.KillPod() to accept the right arguments.
  1451. func (kl *Kubelet) killPod(pod *api.Pod, runningPod *kubecontainer.Pod, status *kubecontainer.PodStatus, gracePeriodOverride *int64) error {
  1452. var p kubecontainer.Pod
  1453. if runningPod != nil {
  1454. p = *runningPod
  1455. } else if status != nil {
  1456. p = kubecontainer.ConvertPodStatusToRunningPod(status)
  1457. }
  1458. return kl.containerRuntime.KillPod(pod, p, gracePeriodOverride)
  1459. }
  1460. // makePodDataDirs creates the dirs for the pod datas.
  1461. func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
  1462. uid := pod.UID
  1463. if err := os.MkdirAll(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) {
  1464. return err
  1465. }
  1466. if err := os.MkdirAll(kl.getPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) {
  1467. return err
  1468. }
  1469. if err := os.MkdirAll(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) {
  1470. return err
  1471. }
  1472. return nil
  1473. }
  1474. // syncPod is the transaction script for the sync of a single pod.
  1475. //
  1476. // Arguments:
  1477. //
  1478. // o - the SyncPodOptions for this invocation
  1479. //
  1480. // The workflow is:
  1481. // * If the pod is being created, record pod worker start latency
  1482. // * Call generateAPIPodStatus to prepare an api.PodStatus for the pod
  1483. // * If the pod is being seen as running for the first time, record pod
  1484. // start latency
  1485. // * Update the status of the pod in the status manager
  1486. // * Kill the pod if it should not be running
  1487. // * Create a mirror pod if the pod is a static pod, and does not
  1488. // already have a mirror pod
  1489. // * Create the data directories for the pod if they do not exist
  1490. // * Wait for volumes to attach/mount
  1491. // * Fetch the pull secrets for the pod
  1492. // * Call the container runtime's SyncPod callback
  1493. // * Update the traffic shaping for the pod's ingress and egress limits
  1494. //
  1495. // If any step if this workflow errors, the error is returned, and is repeated
  1496. // on the next syncPod call.
  1497. func (kl *Kubelet) syncPod(o syncPodOptions) error {
  1498. // pull out the required options
  1499. pod := o.pod
  1500. mirrorPod := o.mirrorPod
  1501. podStatus := o.podStatus
  1502. updateType := o.updateType
  1503. // if we want to kill a pod, do it now!
  1504. if updateType == kubetypes.SyncPodKill {
  1505. killPodOptions := o.killPodOptions
  1506. if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
  1507. return fmt.Errorf("kill pod options are required if update type is kill")
  1508. }
  1509. apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
  1510. kl.statusManager.SetPodStatus(pod, apiPodStatus)
  1511. // we kill the pod with the specified grace period since this is a termination
  1512. if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
  1513. // there was an error killing the pod, so we return that error directly
  1514. utilruntime.HandleError(err)
  1515. return err
  1516. }
  1517. return nil
  1518. }
  1519. // Latency measurements for the main workflow are relative to the
  1520. // first time the pod was seen by the API server.
  1521. var firstSeenTime time.Time
  1522. if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
  1523. firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
  1524. }
  1525. // Record pod worker start latency if being created
  1526. // TODO: make pod workers record their own latencies
  1527. if updateType == kubetypes.SyncPodCreate {
  1528. if !firstSeenTime.IsZero() {
  1529. // This is the first time we are syncing the pod. Record the latency
  1530. // since kubelet first saw the pod if firstSeenTime is set.
  1531. metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
  1532. } else {
  1533. glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
  1534. }
  1535. }
  1536. // Generate final API pod status with pod and status manager status
  1537. apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
  1538. // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
  1539. // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
  1540. // set pod IP to hostIP directly in runtime.GetPodStatus
  1541. podStatus.IP = apiPodStatus.PodIP
  1542. // Record the time it takes for the pod to become running.
  1543. existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
  1544. if !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning &&
  1545. !firstSeenTime.IsZero() {
  1546. metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
  1547. }
  1548. // Update status in the status manager
  1549. kl.statusManager.SetPodStatus(pod, apiPodStatus)
  1550. // Kill pod if it should not be running
  1551. if errOuter := canRunPod(pod); errOuter != nil || pod.DeletionTimestamp != nil || apiPodStatus.Phase == api.PodFailed {
  1552. if errInner := kl.killPod(pod, nil, podStatus, nil); errInner != nil {
  1553. errOuter = fmt.Errorf("error killing pod: %v", errInner)
  1554. utilruntime.HandleError(errOuter)
  1555. }
  1556. // there was no error killing the pod, but the pod cannot be run, so we return that err (if any)
  1557. return errOuter
  1558. }
  1559. // Create Mirror Pod for Static Pod if it doesn't already exist
  1560. if kubepod.IsStaticPod(pod) {
  1561. podFullName := kubecontainer.GetPodFullName(pod)
  1562. deleted := false
  1563. if mirrorPod != nil {
  1564. if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
  1565. // The mirror pod is semantically different from the static pod. Remove
  1566. // it. The mirror pod will get recreated later.
  1567. glog.Warningf("Deleting mirror pod %q because it is outdated", format.Pod(mirrorPod))
  1568. if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil {
  1569. glog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
  1570. } else {
  1571. deleted = true
  1572. }
  1573. }
  1574. }
  1575. if mirrorPod == nil || deleted {
  1576. glog.V(3).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
  1577. if err := kl.podManager.CreateMirrorPod(pod); err != nil {
  1578. glog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
  1579. }
  1580. }
  1581. }
  1582. // Make data directories for the pod
  1583. if err := kl.makePodDataDirs(pod); err != nil {
  1584. glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
  1585. return err
  1586. }
  1587. // Wait for volumes to attach/mount
  1588. if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
  1589. kl.recorder.Eventf(pod, api.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
  1590. glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
  1591. return err
  1592. }
  1593. // Fetch the pull secrets for the pod
  1594. pullSecrets, err := kl.getPullSecretsForPod(pod)
  1595. if err != nil {
  1596. glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err)
  1597. return err
  1598. }
  1599. // Call the container runtime's SyncPod callback
  1600. result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
  1601. kl.reasonCache.Update(pod.UID, result)
  1602. if err = result.Error(); err != nil {
  1603. return err
  1604. }
  1605. // early successful exit if pod is not bandwidth-constrained
  1606. if !kl.shapingEnabled() {
  1607. return nil
  1608. }
  1609. // Update the traffic shaping for the pod's ingress and egress limits
  1610. ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
  1611. if err != nil {
  1612. return err
  1613. }
  1614. if egress != nil || ingress != nil {
  1615. if podUsesHostNetwork(pod) {
  1616. kl.recorder.Event(pod, api.EventTypeWarning, events.HostNetworkNotSupported, "Bandwidth shaping is not currently supported on the host network")
  1617. } else if kl.shaper != nil {
  1618. if len(apiPodStatus.PodIP) > 0 {
  1619. err = kl.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", apiPodStatus.PodIP), egress, ingress)
  1620. }
  1621. } else {
  1622. kl.recorder.Event(pod, api.EventTypeWarning, events.UndefinedShaper, "Pod requests bandwidth shaping, but the shaper is undefined")
  1623. }
  1624. }
  1625. return nil
  1626. }
  1627. // returns whether the pod uses the host network namespace.
  1628. func podUsesHostNetwork(pod *api.Pod) bool {
  1629. return pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork
  1630. }
  1631. // getPullSecretsForPod inspects the Pod and retrieves the referenced pull
  1632. // secrets.
  1633. // TODO: duplicate secrets are being retrieved multiple times and there
  1634. // is no cache. Creating and using a secret manager interface will make this
  1635. // easier to address.
  1636. func (kl *Kubelet) getPullSecretsForPod(pod *api.Pod) ([]api.Secret, error) {
  1637. pullSecrets := []api.Secret{}
  1638. for _, secretRef := range pod.Spec.ImagePullSecrets {
  1639. secret, err := kl.kubeClient.Core().Secrets(pod.Namespace).Get(secretRef.Name)
  1640. if err != nil {
  1641. glog.Warningf("Unable to retrieve pull secret %s/%s for %s/%s due to %v. The image pull may not succeed.", pod.Namespace, secretRef.Name, pod.Namespace, pod.Name, err)
  1642. continue
  1643. }
  1644. pullSecrets = append(pullSecrets, *secret)
  1645. }
  1646. return pullSecrets, nil
  1647. }
  1648. // Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
  1649. // * pod whose work is ready.
  1650. // * internal modules that request sync of a pod.
  1651. func (kl *Kubelet) getPodsToSync() []*api.Pod {
  1652. allPods := kl.podManager.GetPods()
  1653. podUIDs := kl.workQueue.GetWork()
  1654. podUIDSet := sets.NewString()
  1655. for _, podUID := range podUIDs {
  1656. podUIDSet.Insert(string(podUID))
  1657. }
  1658. var podsToSync []*api.Pod
  1659. for _, pod := range allPods {
  1660. if podUIDSet.Has(string(pod.UID)) {
  1661. // The work of the pod is ready
  1662. podsToSync = append(podsToSync, pod)
  1663. continue
  1664. }
  1665. for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
  1666. if podSyncLoopHandler.ShouldSync(pod) {
  1667. podsToSync = append(podsToSync, pod)
  1668. break
  1669. }
  1670. }
  1671. }
  1672. return podsToSync
  1673. }
  1674. // Returns true if pod is in the terminated state ("Failed" or "Succeeded").
  1675. func (kl *Kubelet) podIsTerminated(pod *api.Pod) bool {
  1676. var status api.PodStatus
  1677. // Check the cached pod status which was set after the last sync.
  1678. status, ok := kl.statusManager.GetPodStatus(pod.UID)
  1679. if !ok {
  1680. // If there is no cached status, use the status from the
  1681. // apiserver. This is useful if kubelet has recently been
  1682. // restarted.
  1683. status = pod.Status
  1684. }
  1685. if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
  1686. return true
  1687. }
  1688. return false
  1689. }
  1690. // filterOutTerminatedPods returns the given pods which the status manager
  1691. // does not consider failed or succeeded.
  1692. func (kl *Kubelet) filterOutTerminatedPods(pods []*api.Pod) []*api.Pod {
  1693. var filteredPods []*api.Pod
  1694. for _, p := range pods {
  1695. if kl.podIsTerminated(p) {
  1696. continue
  1697. }
  1698. filteredPods = append(filteredPods, p)
  1699. }
  1700. return filteredPods
  1701. }
  1702. // removeOrphanedPodStatuses removes obsolete entries in podStatus where
  1703. // the pod is no longer considered bound to this node.
  1704. func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods []*api.Pod) {
  1705. podUIDs := make(map[types.UID]bool)
  1706. for _, pod := range pods {
  1707. podUIDs[pod.UID] = true
  1708. }
  1709. for _, pod := range mirrorPods {
  1710. podUIDs[pod.UID] = true
  1711. }
  1712. kl.statusManager.RemoveOrphanedStatuses(podUIDs)
  1713. }
  1714. // deletePod deletes the pod from the internal state of the kubelet by:
  1715. // 1. stopping the associated pod worker asynchronously
  1716. // 2. signaling to kill the pod by sending on the podKillingCh channel
  1717. //
  1718. // deletePod returns an error if not all sources are ready or the pod is not
  1719. // found in the runtime cache.
  1720. func (kl *Kubelet) deletePod(pod *api.Pod) error {
  1721. if pod == nil {
  1722. return fmt.Errorf("deletePod does not allow nil pod")
  1723. }
  1724. if !kl.sourcesReady.AllReady() {
  1725. // If the sources aren't ready, skip deletion, as we may accidentally delete pods
  1726. // for sources that haven't reported yet.
  1727. return fmt.Errorf("skipping delete because sources aren't ready yet")
  1728. }
  1729. kl.podWorkers.ForgetWorker(pod.UID)
  1730. // Runtime cache may not have been updated to with the pod, but it's okay
  1731. // because the periodic cleanup routine will attempt to delete again later.
  1732. runningPods, err := kl.runtimeCache.GetPods()
  1733. if err != nil {
  1734. return fmt.Errorf("error listing containers: %v", err)
  1735. }
  1736. runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)
  1737. if runningPod.IsEmpty() {
  1738. return fmt.Errorf("pod not found")
  1739. }
  1740. podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
  1741. kl.podKillingCh <- &podPair
  1742. // TODO: delete the mirror pod here?
  1743. // We leave the volume/directory cleanup to the periodic cleanup routine.
  1744. return nil
  1745. }
  1746. // HandlePodCleanups performs a series of cleanup work, including terminating
  1747. // pod workers, killing unwanted pods, and removing orphaned volumes/pod
  1748. // directories.
  1749. // NOTE: This function is executed by the main sync loop, so it
  1750. // should not contain any blocking calls.
  1751. func (kl *Kubelet) HandlePodCleanups() error {
  1752. allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
  1753. // Pod phase progresses monotonically. Once a pod has reached a final state,
  1754. // it should never leave regardless of the restart policy. The statuses
  1755. // of such pods should not be changed, and there is no need to sync them.
  1756. // TODO: the logic here does not handle two cases:
  1757. // 1. If the containers were removed immediately after they died, kubelet
  1758. // may fail to generate correct statuses, let alone filtering correctly.
  1759. // 2. If kubelet restarted before writing the terminated status for a pod
  1760. // to the apiserver, it could still restart the terminated pod (even
  1761. // though the pod was not considered terminated by the apiserver).
  1762. // These two conditions could be alleviated by checkpointing kubelet.
  1763. activePods := kl.filterOutTerminatedPods(allPods)
  1764. desiredPods := make(map[types.UID]empty)
  1765. for _, pod := range activePods {
  1766. desiredPods[pod.UID] = empty{}
  1767. }
  1768. // Stop the workers for no-longer existing pods.
  1769. // TODO: is here the best place to forget pod workers?
  1770. kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
  1771. kl.probeManager.CleanupPods(activePods)
  1772. runningPods, err := kl.runtimeCache.GetPods()
  1773. if err != nil {
  1774. glog.Errorf("Error listing containers: %#v", err)
  1775. return err
  1776. }
  1777. for _, pod := range runningPods {
  1778. if _, found := desiredPods[pod.ID]; !found {
  1779. kl.podKillingCh <- &kubecontainer.PodPair{APIPod: nil, RunningPod: pod}
  1780. }
  1781. }
  1782. kl.removeOrphanedPodStatuses(allPods, mirrorPods)
  1783. // Note that we just killed the unwanted pods. This may not have reflected
  1784. // in the cache. We need to bypass the cache to get the latest set of
  1785. // running pods to clean up the volumes.
  1786. // TODO: Evaluate the performance impact of bypassing the runtime cache.
  1787. runningPods, err = kl.containerRuntime.GetPods(false)
  1788. if err != nil {
  1789. glog.Errorf("Error listing containers: %#v", err)
  1790. return err
  1791. }
  1792. // Remove any orphaned volumes.
  1793. // Note that we pass all pods (including terminated pods) to the function,
  1794. // so that we don't remove volumes associated with terminated but not yet
  1795. // deleted pods.
  1796. err = kl.cleanupOrphanedPodDirs(allPods, runningPods)
  1797. if err != nil {
  1798. // We want all cleanup tasks to be run even if one of them failed. So
  1799. // we just log an error here and continue other cleanup tasks.
  1800. // This also applies to the other clean up tasks.
  1801. glog.Errorf("Failed cleaning up orphaned pod directories: %v", err)
  1802. }
  1803. // Remove any orphaned mirror pods.
  1804. kl.podManager.DeleteOrphanedMirrorPods()
  1805. // Clear out any old bandwidth rules
  1806. err = kl.cleanupBandwidthLimits(allPods)
  1807. if err != nil {
  1808. glog.Errorf("Failed cleaning up bandwidth limits: %v", err)
  1809. }
  1810. kl.backOff.GC()
  1811. return nil
  1812. }
  1813. // podKiller launches a goroutine to kill a pod received from the channel if
  1814. // another goroutine isn't already in action.
  1815. func (kl *Kubelet) podKiller() {
  1816. killing := sets.NewString()
  1817. resultCh := make(chan types.UID)
  1818. defer close(resultCh)
  1819. for {
  1820. select {
  1821. case podPair, ok := <-kl.podKillingCh:
  1822. if !ok {
  1823. return
  1824. }
  1825. runningPod := podPair.RunningPod
  1826. apiPod := podPair.APIPod
  1827. if killing.Has(string(runningPod.ID)) {
  1828. // The pod is already being killed.
  1829. break
  1830. }
  1831. killing.Insert(string(runningPod.ID))
  1832. go func(apiPod *api.Pod, runningPod *kubecontainer.Pod, ch chan types.UID) {
  1833. defer func() {
  1834. ch <- runningPod.ID
  1835. }()
  1836. glog.V(2).Infof("Killing unwanted pod %q", runningPod.Name)
  1837. err := kl.killPod(apiPod, runningPod, nil, nil)
  1838. if err != nil {
  1839. glog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err)
  1840. }
  1841. }(apiPod, runningPod, resultCh)
  1842. case podID := <-resultCh:
  1843. killing.Delete(string(podID))
  1844. }
  1845. }
  1846. }
  1847. // checkHostPortConflicts detects pods with conflicted host ports.
  1848. func hasHostPortConflicts(pods []*api.Pod) bool {
  1849. ports := sets.String{}
  1850. for _, pod := range pods {
  1851. if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports, field.NewPath("spec", "containers")); len(errs) > 0 {
  1852. glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", format.Pod(pod), errs)
  1853. return true
  1854. }
  1855. if errs := validation.AccumulateUniqueHostPorts(pod.Spec.InitContainers, &ports, field.NewPath("spec", "initContainers")); len(errs) > 0 {
  1856. glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", format.Pod(pod), errs)
  1857. return true
  1858. }
  1859. }
  1860. return false
  1861. }
  1862. // handleOutOfDisk detects if pods can't fit due to lack of disk space.
  1863. func (kl *Kubelet) isOutOfDisk() bool {
  1864. // Check disk space once globally and reject or accept all new pods.
  1865. withinBounds, err := kl.diskSpaceManager.IsRuntimeDiskSpaceAvailable()
  1866. // Assume enough space in case of errors.
  1867. if err != nil {
  1868. glog.Errorf("Failed to check if disk space is available for the runtime: %v", err)
  1869. } else if !withinBounds {
  1870. return true
  1871. }
  1872. withinBounds, err = kl.diskSpaceManager.IsRootDiskSpaceAvailable()
  1873. // Assume enough space in case of errors.
  1874. if err != nil {
  1875. glog.Errorf("Failed to check if disk space is available on the root partition: %v", err)
  1876. } else if !withinBounds {
  1877. return true
  1878. }
  1879. return false
  1880. }
  1881. // rejectPod records an event about the pod with the given reason and message,
  1882. // and updates the pod to the failed phase in the status manage.
  1883. func (kl *Kubelet) rejectPod(pod *api.Pod, reason, message string) {
  1884. kl.recorder.Eventf(pod, api.EventTypeWarning, reason, message)
  1885. kl.statusManager.SetPodStatus(pod, api.PodStatus{
  1886. Phase: api.PodFailed,
  1887. Reason: reason,
  1888. Message: "Pod " + message})
  1889. }
  1890. // canAdmitPod determines if a pod can be admitted, and gives a reason if it
  1891. // cannot. "pod" is new pod, while "pods" are all admitted pods
  1892. // The function returns a boolean value indicating whether the pod
  1893. // can be admitted, a brief single-word reason and a message explaining why
  1894. // the pod cannot be admitted.
  1895. func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, string) {
  1896. node, err := kl.getNodeAnyWay()
  1897. if err != nil {
  1898. glog.Errorf("Cannot get Node info: %v", err)
  1899. return false, "InvalidNodeInfo", "Kubelet cannot get node info."
  1900. }
  1901. // the kubelet will invoke each pod admit handler in sequence
  1902. // if any handler rejects, the pod is rejected.
  1903. // TODO: move predicate check into a pod admitter
  1904. // TODO: move out of disk check into a pod admitter
  1905. // TODO: out of resource eviction should have a pod admitter call-out
  1906. attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
  1907. for _, podAdmitHandler := range kl.PodAdmitHandlers {
  1908. if result := podAdmitHandler.Admit(attrs); !result.Admit {
  1909. return false, result.Reason, result.Message
  1910. }
  1911. }
  1912. nodeInfo := schedulercache.NewNodeInfo(pods...)
  1913. nodeInfo.SetNode(node)
  1914. fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo)
  1915. if err != nil {
  1916. message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err)
  1917. glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
  1918. return fit, "UnexpectedError", message
  1919. }
  1920. if !fit {
  1921. var reason string
  1922. var message string
  1923. if len(reasons) == 0 {
  1924. message = fmt.Sprint("GeneralPredicates failed due to unknown reason, which is unexpected.")
  1925. glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
  1926. return fit, "UnknownReason", message
  1927. }
  1928. // If there are failed predicates, we only return the first one as a reason.
  1929. r := reasons[0]
  1930. switch re := r.(type) {
  1931. case *predicates.PredicateFailureError:
  1932. reason = re.PredicateName
  1933. message = re.Error()
  1934. glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
  1935. case *predicates.InsufficientResourceError:
  1936. reason = fmt.Sprintf("OutOf%s", re.ResourceName)
  1937. message := re.Error()
  1938. glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
  1939. case *predicates.FailureReason:
  1940. reason = re.GetReason()
  1941. message = fmt.Sprintf("Failure: %s", re.GetReason())
  1942. glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
  1943. default:
  1944. reason = "UnexpectedPredicateFailureType"
  1945. message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", r)
  1946. glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
  1947. }
  1948. return fit, reason, message
  1949. }
  1950. // TODO: When disk space scheduling is implemented (#11976), remove the out-of-disk check here and
  1951. // add the disk space predicate to predicates.GeneralPredicates.
  1952. if kl.isOutOfDisk() {
  1953. glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), "predicate fails due to OutOfDisk")
  1954. return false, "OutOfDisk", "cannot be started due to lack of disk space."
  1955. }
  1956. return true, "", ""
  1957. }
  1958. // syncLoop is the main loop for processing changes. It watches for changes from
  1959. // three channels (file, apiserver, and http) and creates a union of them. For
  1960. // any new change seen, will run a sync against desired state and running state. If
  1961. // no changes are seen to the configuration, will synchronize the last known desired
  1962. // state every sync-frequency seconds. Never returns.
  1963. func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
  1964. glog.Info("Starting kubelet main sync loop.")
  1965. // The resyncTicker wakes up kubelet to checks if there are any pod workers
  1966. // that need to be sync'd. A one-second period is sufficient because the
  1967. // sync interval is defaulted to 10s.
  1968. syncTicker := time.NewTicker(time.Second)
  1969. defer syncTicker.Stop()
  1970. housekeepingTicker := time.NewTicker(housekeepingPeriod)
  1971. defer housekeepingTicker.Stop()
  1972. plegCh := kl.pleg.Watch()
  1973. for {
  1974. if rs := kl.runtimeState.errors(); len(rs) != 0 {
  1975. glog.Infof("skipping pod synchronization - %v", rs)
  1976. time.Sleep(5 * time.Second)
  1977. continue
  1978. }
  1979. if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
  1980. break
  1981. }
  1982. }
  1983. }
  1984. // syncLoopIteration reads from various channels and dispatches pods to the
  1985. // given handler.
  1986. //
  1987. // Arguments:
  1988. // 1. configCh: a channel to read config events from
  1989. // 2. handler: the SyncHandler to dispatch pods to
  1990. // 3. syncCh: a channel to read periodic sync events from
  1991. // 4. houseKeepingCh: a channel to read housekeeping events from
  1992. // 5. plegCh: a channel to read PLEG updates from
  1993. //
  1994. // Events are also read from the kubelet liveness manager's update channel.
  1995. //
  1996. // The workflow is to read from one of the channels, handle that event, and
  1997. // update the timestamp in the sync loop monitor.
  1998. //
  1999. // Here is an appropriate place to note that despite the syntactical
  2000. // similarity to the switch statement, the case statements in a select are
  2001. // evaluated in a pseudorandom order if there are multiple channels ready to
  2002. // read from when the select is evaluated. In other words, case statements
  2003. // are evaluated in random order, and you can not assume that the case
  2004. // statements evaluate in order if multiple channels have events.
  2005. //
  2006. // With that in mind, in truly no particular order, the different channels
  2007. // are handled as follows:
  2008. //
  2009. // * configCh: dispatch the pods for the config change to the appropriate
  2010. // handler callback for the event type
  2011. // * plegCh: update the runtime cache; sync pod
  2012. // * syncCh: sync all pods waiting for sync
  2013. // * houseKeepingCh: trigger cleanup of pods
  2014. // * liveness manager: sync pods that have failed or in which one or more
  2015. // containers have failed liveness checks
  2016. func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
  2017. syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
  2018. kl.syncLoopMonitor.Store(kl.clock.Now())
  2019. select {
  2020. case u, open := <-configCh:
  2021. // Update from a config source; dispatch it to the right handler
  2022. // callback.
  2023. if !open {
  2024. glog.Errorf("Update channel is closed. Exiting the sync loop.")
  2025. return false
  2026. }
  2027. switch u.Op {
  2028. case kubetypes.ADD:
  2029. glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
  2030. // After restarting, kubelet will get all existing pods through
  2031. // ADD as if they are new pods. These pods will then go through the
  2032. // admission process and *may* be rejcted. This can be resolved
  2033. // once we have checkpointing.
  2034. handler.HandlePodAdditions(u.Pods)
  2035. case kubetypes.UPDATE:
  2036. glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletiontimestamps(u.Pods))
  2037. handler.HandlePodUpdates(u.Pods)
  2038. case kubetypes.REMOVE:
  2039. glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
  2040. handler.HandlePodRemoves(u.Pods)
  2041. case kubetypes.RECONCILE:
  2042. glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
  2043. handler.HandlePodReconcile(u.Pods)
  2044. case kubetypes.DELETE:
  2045. glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
  2046. // DELETE is treated as a UPDATE because of graceful deletion.
  2047. handler.HandlePodUpdates(u.Pods)
  2048. case kubetypes.SET:
  2049. // TODO: Do we want to support this?
  2050. glog.Errorf("Kubelet does not support snapshot update")
  2051. }
  2052. // Mark the source ready after receiving at least one update from the
  2053. // source. Once all the sources are marked ready, various cleanup
  2054. // routines will start reclaiming resources. It is important that this
  2055. // takes place only after kubelet calls the update handler to process
  2056. // the update to ensure the internal pod cache is up-to-date.
  2057. kl.sourcesReady.AddSource(u.Source)
  2058. case e := <-plegCh:
  2059. if isSyncPodWorthy(e) {
  2060. // PLEG event for a pod; sync it.
  2061. if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
  2062. glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
  2063. handler.HandlePodSyncs([]*api.Pod{pod})
  2064. } else {
  2065. // If the pod no longer exists, ignore the event.
  2066. glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
  2067. }
  2068. }
  2069. if e.Type == pleg.ContainerDied {
  2070. if containerID, ok := e.Data.(string); ok {
  2071. kl.cleanUpContainersInPod(e.ID, containerID)
  2072. }
  2073. }
  2074. case <-syncCh:
  2075. // Sync pods waiting for sync
  2076. podsToSync := kl.getPodsToSync()
  2077. if len(podsToSync) == 0 {
  2078. break
  2079. }
  2080. glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
  2081. kl.HandlePodSyncs(podsToSync)
  2082. case update := <-kl.livenessManager.Updates():
  2083. if update.Result == proberesults.Failure {
  2084. // The liveness manager detected a failure; sync the pod.
  2085. // We should not use the pod from livenessManager, because it is never updated after
  2086. // initialization.
  2087. pod, ok := kl.podManager.GetPodByUID(update.PodUID)
  2088. if !ok {
  2089. // If the pod no longer exists, ignore the update.
  2090. glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
  2091. break
  2092. }
  2093. glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
  2094. handler.HandlePodSyncs([]*api.Pod{pod})
  2095. }
  2096. case <-housekeepingCh:
  2097. if !kl.sourcesReady.AllReady() {
  2098. // If the sources aren't ready, skip housekeeping, as we may
  2099. // accidentally delete pods from unready sources.
  2100. glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
  2101. } else {
  2102. glog.V(4).Infof("SyncLoop (housekeeping)")
  2103. if err := handler.HandlePodCleanups(); err != nil {
  2104. glog.Errorf("Failed cleaning pods: %v", err)
  2105. }
  2106. }
  2107. }
  2108. kl.syncLoopMonitor.Store(kl.clock.Now())
  2109. return true
  2110. }
  2111. // dispatchWork starts the asynchronous sync of the pod in a pod worker.
  2112. // If the pod is terminated, dispatchWork
  2113. func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) {
  2114. if kl.podIsTerminated(pod) {
  2115. if pod.DeletionTimestamp != nil {
  2116. // If the pod is in a terminated state, there is no pod worker to
  2117. // handle the work item. Check if the DeletionTimestamp has been
  2118. // set, and force a status update to trigger a pod deletion request
  2119. // to the apiserver.
  2120. kl.statusManager.TerminatePod(pod)
  2121. }
  2122. return
  2123. }
  2124. // Run the sync in an async worker.
  2125. kl.podWorkers.UpdatePod(&UpdatePodOptions{
  2126. Pod: pod,
  2127. MirrorPod: mirrorPod,
  2128. UpdateType: syncType,
  2129. OnCompleteFunc: func(err error) {
  2130. if err != nil {
  2131. metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
  2132. }
  2133. },
  2134. })
  2135. // Note the number of containers for new pods.
  2136. if syncType == kubetypes.SyncPodCreate {
  2137. metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
  2138. }
  2139. }
  2140. // TODO: handle mirror pods in a separate component (issue #17251)
  2141. func (kl *Kubelet) handleMirrorPod(mirrorPod *api.Pod, start time.Time) {
  2142. // Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
  2143. // corresponding static pod. Send update to the pod worker if the static
  2144. // pod exists.
  2145. if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
  2146. kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
  2147. }
  2148. }
  2149. // HandlePodAdditions is the callback in SyncHandler for pods being added from
  2150. // a config source.
  2151. func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
  2152. start := kl.clock.Now()
  2153. sort.Sort(sliceutils.PodsByCreationTime(pods))
  2154. for _, pod := range pods {
  2155. if kubepod.IsMirrorPod(pod) {
  2156. kl.podManager.AddPod(pod)
  2157. kl.handleMirrorPod(pod, start)
  2158. continue
  2159. }
  2160. // Note that allPods excludes the new pod.
  2161. allPods := kl.podManager.GetPods()
  2162. // We failed pods that we rejected, so activePods include all admitted
  2163. // pods that are alive.
  2164. activePods := kl.filterOutTerminatedPods(allPods)
  2165. // Check if we can admit the pod; if not, reject it.
  2166. if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
  2167. kl.rejectPod(pod, reason, message)
  2168. continue
  2169. }
  2170. kl.podManager.AddPod(pod)
  2171. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  2172. kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
  2173. kl.probeManager.AddPod(pod)
  2174. }
  2175. }
  2176. // HandlePodUpdates is the callback in the SyncHandler interface for pods
  2177. // being updated from a config source.
  2178. func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {
  2179. start := kl.clock.Now()
  2180. for _, pod := range pods {
  2181. kl.podManager.UpdatePod(pod)
  2182. if kubepod.IsMirrorPod(pod) {
  2183. kl.handleMirrorPod(pod, start)
  2184. continue
  2185. }
  2186. // TODO: Evaluate if we need to validate and reject updates.
  2187. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  2188. kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
  2189. }
  2190. }
  2191. // HandlePodRemoves is the callback in the SyncHandler interface for pods
  2192. // being removed from a config source.
  2193. func (kl *Kubelet) HandlePodRemoves(pods []*api.Pod) {
  2194. start := kl.clock.Now()
  2195. for _, pod := range pods {
  2196. kl.podManager.DeletePod(pod)
  2197. if kubepod.IsMirrorPod(pod) {
  2198. kl.handleMirrorPod(pod, start)
  2199. continue
  2200. }
  2201. // Deletion is allowed to fail because the periodic cleanup routine
  2202. // will trigger deletion again.
  2203. if err := kl.deletePod(pod); err != nil {
  2204. glog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)
  2205. }
  2206. kl.probeManager.RemovePod(pod)
  2207. }
  2208. }
  2209. // HandlePodReconcile is the callback in the SyncHandler interface for pods
  2210. // that should be reconciled.
  2211. func (kl *Kubelet) HandlePodReconcile(pods []*api.Pod) {
  2212. for _, pod := range pods {
  2213. // Update the pod in pod manager, status manager will do periodically reconcile according
  2214. // to the pod manager.
  2215. kl.podManager.UpdatePod(pod)
  2216. // After an evicted pod is synced, all dead containers in the pod can be removed.
  2217. if eviction.PodIsEvicted(pod.Status) {
  2218. if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
  2219. kl.containerDeletor.deleteContainersInPod("", podStatus, true)
  2220. }
  2221. }
  2222. }
  2223. }
  2224. // HandlePodSyncs is the callback in the syncHandler interface for pods
  2225. // that should be dispatched to pod workers for sync.
  2226. func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) {
  2227. start := kl.clock.Now()
  2228. for _, pod := range pods {
  2229. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
  2230. kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
  2231. }
  2232. }
  2233. // LatestLoopEntryTime returns the last time in the sync loop monitor.
  2234. func (kl *Kubelet) LatestLoopEntryTime() time.Time {
  2235. val := kl.syncLoopMonitor.Load()
  2236. if val == nil {
  2237. return time.Time{}
  2238. }
  2239. return val.(time.Time)
  2240. }
  2241. // PLEGHealthCheck returns whether the PLEG is healty.
  2242. func (kl *Kubelet) PLEGHealthCheck() (bool, error) {
  2243. return kl.pleg.Healthy()
  2244. }
  2245. // validateContainerLogStatus returns the container ID for the desired container to retrieve logs for, based on the state
  2246. // of the container. The previous flag will only return the logs for the last terminated container, otherwise, the current
  2247. // running container is preferred over a previous termination. If info about the container is not available then a specific
  2248. // error is returned to the end user.
  2249. func (kl *Kubelet) validateContainerLogStatus(podName string, podStatus *api.PodStatus, containerName string, previous bool) (containerID kubecontainer.ContainerID, err error) {
  2250. var cID string
  2251. cStatus, found := api.GetContainerStatus(podStatus.ContainerStatuses, containerName)
  2252. // if not found, check the init containers
  2253. if !found {
  2254. cStatus, found = api.GetContainerStatus(podStatus.InitContainerStatuses, containerName)
  2255. }
  2256. if !found {
  2257. return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is not available", containerName, podName)
  2258. }
  2259. lastState := cStatus.LastTerminationState
  2260. waiting, running, terminated := cStatus.State.Waiting, cStatus.State.Running, cStatus.State.Terminated
  2261. switch {
  2262. case previous:
  2263. if lastState.Terminated == nil {
  2264. return kubecontainer.ContainerID{}, fmt.Errorf("previous terminated container %q in pod %q not found", containerName, podName)
  2265. }
  2266. cID = lastState.Terminated.ContainerID
  2267. case running != nil:
  2268. cID = cStatus.ContainerID
  2269. case terminated != nil:
  2270. cID = terminated.ContainerID
  2271. case lastState.Terminated != nil:
  2272. cID = lastState.Terminated.ContainerID
  2273. case waiting != nil:
  2274. // output some info for the most common pending failures
  2275. switch reason := waiting.Reason; reason {
  2276. case images.ErrImagePull.Error():
  2277. return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is waiting to start: image can't be pulled", containerName, podName)
  2278. case images.ErrImagePullBackOff.Error():
  2279. return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is waiting to start: trying and failing to pull image", containerName, podName)
  2280. default:
  2281. return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is waiting to start: %v", containerName, podName, reason)
  2282. }
  2283. default:
  2284. // unrecognized state
  2285. return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is waiting to start - no logs yet", containerName, podName)
  2286. }
  2287. return kubecontainer.ParseContainerID(cID), nil
  2288. }
  2289. // GetKubeletContainerLogs returns logs from the container
  2290. // TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
  2291. // or all of them.
  2292. func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error {
  2293. // Pod workers periodically write status to statusManager. If status is not
  2294. // cached there, something is wrong (or kubelet just restarted and hasn't
  2295. // caught up yet). Just assume the pod is not ready yet.
  2296. name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
  2297. if err != nil {
  2298. return fmt.Errorf("unable to parse pod full name %q: %v", podFullName, err)
  2299. }
  2300. pod, ok := kl.GetPodByName(namespace, name)
  2301. if !ok {
  2302. return fmt.Errorf("pod %q cannot be found - no logs available", name)
  2303. }
  2304. podUID := pod.UID
  2305. if mirrorPod, ok := kl.podManager.GetMirrorPodByPod(pod); ok {
  2306. podUID = mirrorPod.UID
  2307. }
  2308. podStatus, found := kl.statusManager.GetPodStatus(podUID)
  2309. if !found {
  2310. // If there is no cached status, use the status from the
  2311. // apiserver. This is useful if kubelet has recently been
  2312. // restarted.
  2313. podStatus = pod.Status
  2314. }
  2315. containerID, err := kl.validateContainerLogStatus(pod.Name, &podStatus, containerName, logOptions.Previous)
  2316. if err != nil {
  2317. return err
  2318. }
  2319. // Do a zero-byte write to stdout before handing off to the container runtime.
  2320. // This ensures at least one Write call is made to the writer when copying starts,
  2321. // even if we then block waiting for log output from the container.
  2322. if _, err := stdout.Write([]byte{}); err != nil {
  2323. return err
  2324. }
  2325. return kl.containerRuntime.GetContainerLogs(pod, containerID, logOptions, stdout, stderr)
  2326. }
  2327. // updateRuntimeUp calls the container runtime status callback, initializing
  2328. // the runtime dependent modules when the container runtime first comes up,
  2329. // and returns an error if the status check fails. If the status check is OK,
  2330. // update the container runtime uptime in the kubelet runtimeState.
  2331. func (kl *Kubelet) updateRuntimeUp() {
  2332. if err := kl.containerRuntime.Status(); err != nil {
  2333. glog.Errorf("Container runtime sanity check failed: %v", err)
  2334. return
  2335. }
  2336. kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
  2337. kl.runtimeState.setRuntimeSync(kl.clock.Now())
  2338. }
  2339. func (kl *Kubelet) updateCloudProviderFromMachineInfo(node *api.Node, info *cadvisorapi.MachineInfo) {
  2340. if info.CloudProvider != cadvisorapi.UnknownProvider &&
  2341. info.CloudProvider != cadvisorapi.Baremetal {
  2342. // The cloud providers from pkg/cloudprovider/providers/* that update ProviderID
  2343. // will use the format of cloudprovider://project/availability_zone/instance_name
  2344. // here we only have the cloudprovider and the instance name so we leave project
  2345. // and availability zone empty for compatibility.
  2346. node.Spec.ProviderID = strings.ToLower(string(info.CloudProvider)) +
  2347. ":////" + string(info.InstanceID)
  2348. }
  2349. }
  2350. // GetPhase returns the phase of a pod given its container info.
  2351. // This func is exported to simplify integration with 3rd party kubelet
  2352. // integrations like kubernetes-mesos.
  2353. func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase {
  2354. initialized := 0
  2355. pendingInitialization := 0
  2356. failedInitialization := 0
  2357. for _, container := range spec.InitContainers {
  2358. containerStatus, ok := api.GetContainerStatus(info, container.Name)
  2359. if !ok {
  2360. pendingInitialization++
  2361. continue
  2362. }
  2363. switch {
  2364. case containerStatus.State.Running != nil:
  2365. pendingInitialization++
  2366. case containerStatus.State.Terminated != nil:
  2367. if containerStatus.State.Terminated.ExitCode == 0 {
  2368. initialized++
  2369. } else {
  2370. failedInitialization++
  2371. }
  2372. case containerStatus.State.Waiting != nil:
  2373. if containerStatus.LastTerminationState.Terminated != nil {
  2374. if containerStatus.LastTerminationState.Terminated.ExitCode == 0 {
  2375. initialized++
  2376. } else {
  2377. failedInitialization++
  2378. }
  2379. } else {
  2380. pendingInitialization++
  2381. }
  2382. default:
  2383. pendingInitialization++
  2384. }
  2385. }
  2386. unknown := 0
  2387. running := 0
  2388. waiting := 0
  2389. stopped := 0
  2390. failed := 0
  2391. succeeded := 0
  2392. for _, container := range spec.Containers {
  2393. containerStatus, ok := api.GetContainerStatus(info, container.Name)
  2394. if !ok {
  2395. unknown++
  2396. continue
  2397. }
  2398. switch {
  2399. case containerStatus.State.Running != nil:
  2400. running++
  2401. case containerStatus.State.Terminated != nil:
  2402. stopped++
  2403. if containerStatus.State.Terminated.ExitCode == 0 {
  2404. succeeded++
  2405. } else {
  2406. failed++
  2407. }
  2408. case containerStatus.State.Waiting != nil:
  2409. if containerStatus.LastTerminationState.Terminated != nil {
  2410. stopped++
  2411. } else {
  2412. waiting++
  2413. }
  2414. default:
  2415. unknown++
  2416. }
  2417. }
  2418. if failedInitialization > 0 && spec.RestartPolicy == api.RestartPolicyNever {
  2419. return api.PodFailed
  2420. }
  2421. switch {
  2422. case pendingInitialization > 0:
  2423. fallthrough
  2424. case waiting > 0:
  2425. glog.V(5).Infof("pod waiting > 0, pending")
  2426. // One or more containers has not been started
  2427. return api.PodPending
  2428. case running > 0 && unknown == 0:
  2429. // All containers have been started, and at least
  2430. // one container is running
  2431. return api.PodRunning
  2432. case running == 0 && stopped > 0 && unknown == 0:
  2433. // All containers are terminated
  2434. if spec.RestartPolicy == api.RestartPolicyAlways {
  2435. // All containers are in the process of restarting
  2436. return api.PodRunning
  2437. }
  2438. if stopped == succeeded {
  2439. // RestartPolicy is not Always, and all
  2440. // containers are terminated in success
  2441. return api.PodSucceeded
  2442. }
  2443. if spec.RestartPolicy == api.RestartPolicyNever {
  2444. // RestartPolicy is Never, and all containers are
  2445. // terminated with at least one in failure
  2446. return api.PodFailed
  2447. }
  2448. // RestartPolicy is OnFailure, and at least one in failure
  2449. // and in the process of restarting
  2450. return api.PodRunning
  2451. default:
  2452. glog.V(5).Infof("pod default case, pending")
  2453. return api.PodPending
  2454. }
  2455. }
  2456. // generateAPIPodStatus creates the final API pod status for a pod, given the
  2457. // internal pod status.
  2458. func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus {
  2459. glog.V(3).Infof("Generating status for %q", format.Pod(pod))
  2460. // check if an internal module has requested the pod is evicted.
  2461. for _, podSyncHandler := range kl.PodSyncHandlers {
  2462. if result := podSyncHandler.ShouldEvict(pod); result.Evict {
  2463. return api.PodStatus{
  2464. Phase: api.PodFailed,
  2465. Reason: result.Reason,
  2466. Message: result.Message,
  2467. }
  2468. }
  2469. }
  2470. s := kl.convertStatusToAPIStatus(pod, podStatus)
  2471. // Assume info is ready to process
  2472. spec := &pod.Spec
  2473. allStatus := append(append([]api.ContainerStatus{}, s.ContainerStatuses...), s.InitContainerStatuses...)
  2474. s.Phase = GetPhase(spec, allStatus)
  2475. kl.probeManager.UpdatePodStatus(pod.UID, s)
  2476. s.Conditions = append(s.Conditions, status.GeneratePodInitializedCondition(spec, s.InitContainerStatuses, s.Phase))
  2477. s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.ContainerStatuses, s.Phase))
  2478. // s (the PodStatus we are creating) will not have a PodScheduled condition yet, because converStatusToAPIStatus()
  2479. // does not create one. If the existing PodStatus has a PodScheduled condition, then copy it into s and make sure
  2480. // it is set to true. If the existing PodStatus does not have a PodScheduled condition, then create one that is set to true.
  2481. if _, oldPodScheduled := api.GetPodCondition(&pod.Status, api.PodScheduled); oldPodScheduled != nil {
  2482. s.Conditions = append(s.Conditions, *oldPodScheduled)
  2483. }
  2484. api.UpdatePodCondition(&pod.Status, &api.PodCondition{
  2485. Type: api.PodScheduled,
  2486. Status: api.ConditionTrue,
  2487. })
  2488. if !kl.standaloneMode {
  2489. hostIP, err := kl.getHostIPAnyWay()
  2490. if err != nil {
  2491. glog.V(4).Infof("Cannot get host IP: %v", err)
  2492. } else {
  2493. s.HostIP = hostIP.String()
  2494. if podUsesHostNetwork(pod) && s.PodIP == "" {
  2495. s.PodIP = hostIP.String()
  2496. }
  2497. }
  2498. }
  2499. return *s
  2500. }
  2501. // convertStatusToAPIStatus creates an api PodStatus for the given pod from
  2502. // the given internal pod status. It is purely transformative and does not
  2503. // alter the kubelet state at all.
  2504. func (kl *Kubelet) convertStatusToAPIStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) *api.PodStatus {
  2505. var apiPodStatus api.PodStatus
  2506. apiPodStatus.PodIP = podStatus.IP
  2507. apiPodStatus.ContainerStatuses = kl.convertToAPIContainerStatuses(
  2508. pod, podStatus,
  2509. pod.Status.ContainerStatuses,
  2510. pod.Spec.Containers,
  2511. len(pod.Spec.InitContainers) > 0,
  2512. false,
  2513. )
  2514. apiPodStatus.InitContainerStatuses = kl.convertToAPIContainerStatuses(
  2515. pod, podStatus,
  2516. pod.Status.InitContainerStatuses,
  2517. pod.Spec.InitContainers,
  2518. len(pod.Spec.InitContainers) > 0,
  2519. true,
  2520. )
  2521. return &apiPodStatus
  2522. }
  2523. func (kl *Kubelet) convertToAPIContainerStatuses(pod *api.Pod, podStatus *kubecontainer.PodStatus, previousStatus []api.ContainerStatus, containers []api.Container, hasInitContainers, isInitContainer bool) []api.ContainerStatus {
  2524. convertContainerStatus := func(cs *kubecontainer.ContainerStatus) *api.ContainerStatus {
  2525. cid := cs.ID.String()
  2526. status := &api.ContainerStatus{
  2527. Name: cs.Name,
  2528. RestartCount: int32(cs.RestartCount),
  2529. Image: cs.Image,
  2530. ImageID: cs.ImageID,
  2531. ContainerID: cid,
  2532. }
  2533. switch cs.State {
  2534. case kubecontainer.ContainerStateRunning:
  2535. status.State.Running = &api.ContainerStateRunning{StartedAt: unversioned.NewTime(cs.StartedAt)}
  2536. case kubecontainer.ContainerStateExited:
  2537. status.State.Terminated = &api.ContainerStateTerminated{
  2538. ExitCode: int32(cs.ExitCode),
  2539. Reason: cs.Reason,
  2540. Message: cs.Message,
  2541. StartedAt: unversioned.NewTime(cs.StartedAt),
  2542. FinishedAt: unversioned.NewTime(cs.FinishedAt),
  2543. ContainerID: cid,
  2544. }
  2545. default:
  2546. status.State.Waiting = &api.ContainerStateWaiting{}
  2547. }
  2548. return status
  2549. }
  2550. // Fetch old containers statuses from old pod status.
  2551. oldStatuses := make(map[string]api.ContainerStatus, len(containers))
  2552. for _, status := range previousStatus {
  2553. oldStatuses[status.Name] = status
  2554. }
  2555. // Set all container statuses to default waiting state
  2556. statuses := make(map[string]*api.ContainerStatus, len(containers))
  2557. defaultWaitingState := api.ContainerState{Waiting: &api.ContainerStateWaiting{Reason: "ContainerCreating"}}
  2558. if hasInitContainers {
  2559. defaultWaitingState = api.ContainerState{Waiting: &api.ContainerStateWaiting{Reason: "PodInitializing"}}
  2560. }
  2561. for _, container := range containers {
  2562. status := &api.ContainerStatus{
  2563. Name: container.Name,
  2564. Image: container.Image,
  2565. State: defaultWaitingState,
  2566. }
  2567. // Apply some values from the old statuses as the default values.
  2568. if oldStatus, found := oldStatuses[container.Name]; found {
  2569. status.RestartCount = oldStatus.RestartCount
  2570. status.LastTerminationState = oldStatus.LastTerminationState
  2571. }
  2572. statuses[container.Name] = status
  2573. }
  2574. // Make the latest container status comes first.
  2575. sort.Sort(sort.Reverse(kubecontainer.SortContainerStatusesByCreationTime(podStatus.ContainerStatuses)))
  2576. // Set container statuses according to the statuses seen in pod status
  2577. containerSeen := map[string]int{}
  2578. for _, cStatus := range podStatus.ContainerStatuses {
  2579. cName := cStatus.Name
  2580. if _, ok := statuses[cName]; !ok {
  2581. // This would also ignore the infra container.
  2582. continue
  2583. }
  2584. if containerSeen[cName] >= 2 {
  2585. continue
  2586. }
  2587. status := convertContainerStatus(cStatus)
  2588. if containerSeen[cName] == 0 {
  2589. statuses[cName] = status
  2590. } else {
  2591. statuses[cName].LastTerminationState = status.State
  2592. }
  2593. containerSeen[cName] = containerSeen[cName] + 1
  2594. }
  2595. // Handle the containers failed to be started, which should be in Waiting state.
  2596. for _, container := range containers {
  2597. if isInitContainer {
  2598. // If the init container is terminated with exit code 0, it won't be restarted.
  2599. // TODO(random-liu): Handle this in a cleaner way.
  2600. s := podStatus.FindContainerStatusByName(container.Name)
  2601. if s != nil && s.State == kubecontainer.ContainerStateExited && s.ExitCode == 0 {
  2602. continue
  2603. }
  2604. }
  2605. // If a container should be restarted in next syncpod, it is *Waiting*.
  2606. if !kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
  2607. continue
  2608. }
  2609. status := statuses[container.Name]
  2610. reason, message, ok := kl.reasonCache.Get(pod.UID, container.Name)
  2611. if !ok {
  2612. // In fact, we could also apply Waiting state here, but it is less informative,
  2613. // and the container will be restarted soon, so we prefer the original state here.
  2614. // Note that with the current implementation of ShouldContainerBeRestarted the original state here
  2615. // could be:
  2616. // * Waiting: There is no associated historical container and start failure reason record.
  2617. // * Terminated: The container is terminated.
  2618. continue
  2619. }
  2620. if status.State.Terminated != nil {
  2621. status.LastTerminationState = status.State
  2622. }
  2623. status.State = api.ContainerState{
  2624. Waiting: &api.ContainerStateWaiting{
  2625. Reason: reason.Error(),
  2626. Message: message,
  2627. },
  2628. }
  2629. statuses[container.Name] = status
  2630. }
  2631. var containerStatuses []api.ContainerStatus
  2632. for _, status := range statuses {
  2633. containerStatuses = append(containerStatuses, *status)
  2634. }
  2635. // Sort the container statuses since clients of this interface expect the list
  2636. // of containers in a pod has a deterministic order.
  2637. if isInitContainer {
  2638. kubetypes.SortInitContainerStatuses(pod, containerStatuses)
  2639. } else {
  2640. sort.Sort(kubetypes.SortedContainerStatuses(containerStatuses))
  2641. }
  2642. return containerStatuses
  2643. }
  2644. // Returns logs of current machine.
  2645. func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
  2646. // TODO: whitelist logs we are willing to serve
  2647. kl.logServer.ServeHTTP(w, req)
  2648. }
  2649. // findContainer finds and returns the container with the given pod ID, full name, and container name.
  2650. // It returns nil if not found.
  2651. func (kl *Kubelet) findContainer(podFullName string, podUID types.UID, containerName string) (*kubecontainer.Container, error) {
  2652. pods, err := kl.containerRuntime.GetPods(false)
  2653. if err != nil {
  2654. return nil, err
  2655. }
  2656. pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
  2657. return pod.FindContainerByName(containerName), nil
  2658. }
  2659. // Run a command in a container, returns the combined stdout, stderr as an array of bytes
  2660. func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containerName string, cmd []string) ([]byte, error) {
  2661. podUID = kl.podManager.TranslatePodUID(podUID)
  2662. container, err := kl.findContainer(podFullName, podUID, containerName)
  2663. if err != nil {
  2664. return nil, err
  2665. }
  2666. if container == nil {
  2667. return nil, fmt.Errorf("container not found (%q)", containerName)
  2668. }
  2669. var buffer bytes.Buffer
  2670. output := ioutils.WriteCloserWrapper(&buffer)
  2671. err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false, nil)
  2672. // Even if err is non-nil, there still may be output (e.g. the exec wrote to stdout or stderr but
  2673. // the command returned a nonzero exit code). Therefore, always return the output along with the
  2674. // error.
  2675. return buffer.Bytes(), err
  2676. }
  2677. // ExecInContainer executes a command in a container, connecting the supplied
  2678. // stdin/stdout/stderr to the command's IO streams.
  2679. func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
  2680. podUID = kl.podManager.TranslatePodUID(podUID)
  2681. container, err := kl.findContainer(podFullName, podUID, containerName)
  2682. if err != nil {
  2683. return err
  2684. }
  2685. if container == nil {
  2686. return fmt.Errorf("container not found (%q)", containerName)
  2687. }
  2688. return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize)
  2689. }
  2690. // AttachContainer uses the container runtime to attach the given streams to
  2691. // the given container.
  2692. func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
  2693. podUID = kl.podManager.TranslatePodUID(podUID)
  2694. container, err := kl.findContainer(podFullName, podUID, containerName)
  2695. if err != nil {
  2696. return err
  2697. }
  2698. if container == nil {
  2699. return fmt.Errorf("container not found (%q)", containerName)
  2700. }
  2701. return kl.containerRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize)
  2702. }
  2703. // PortForward connects to the pod's port and copies data between the port
  2704. // and the stream.
  2705. func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error {
  2706. podUID = kl.podManager.TranslatePodUID(podUID)
  2707. pods, err := kl.containerRuntime.GetPods(false)
  2708. if err != nil {
  2709. return err
  2710. }
  2711. pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
  2712. if pod.IsEmpty() {
  2713. return fmt.Errorf("pod not found (%q)", podFullName)
  2714. }
  2715. return kl.runner.PortForward(&pod, port, stream)
  2716. }
  2717. // GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
  2718. func (kl *Kubelet) GetConfiguration() componentconfig.KubeletConfiguration {
  2719. return kl.kubeletConfiguration
  2720. }
  2721. // BirthCry sends an event that the kubelet has started up.
  2722. func (kl *Kubelet) BirthCry() {
  2723. // Make an event that kubelet restarted.
  2724. kl.recorder.Eventf(kl.nodeRef, api.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
  2725. }
  2726. // StreamingConnectionIdleTimeout returns the timeout for streaming connections to the HTTP server.
  2727. func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
  2728. return kl.streamingConnectionIdleTimeout
  2729. }
  2730. // ResyncInterval returns the interval used for periodic syncs.
  2731. func (kl *Kubelet) ResyncInterval() time.Duration {
  2732. return kl.resyncInterval
  2733. }
  2734. // ListenAndServe runs the kubelet HTTP server.
  2735. func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) {
  2736. server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime)
  2737. }
  2738. // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
  2739. func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
  2740. server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime)
  2741. }
  2742. // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
  2743. func (kl *Kubelet) cleanUpContainersInPod(podId types.UID, exitedContainerID string) {
  2744. if podStatus, err := kl.podCache.Get(podId); err == nil {
  2745. removeAll := false
  2746. if syncedPod, ok := kl.podManager.GetPodByUID(podId); ok {
  2747. // When an evicted pod has already synced, all containers can be removed.
  2748. removeAll = eviction.PodIsEvicted(syncedPod.Status)
  2749. }
  2750. kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)
  2751. }
  2752. }
  2753. // isSyncPodWorthy filters out events that are not worthy of pod syncing
  2754. func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
  2755. // ContatnerRemoved doesn't affect pod state
  2756. return event.Type != pleg.ContainerRemoved
  2757. }
  2758. func parseResourceList(m utilconfig.ConfigurationMap) (api.ResourceList, error) {
  2759. rl := make(api.ResourceList)
  2760. for k, v := range m {
  2761. switch api.ResourceName(k) {
  2762. // Only CPU and memory resources are supported.
  2763. case api.ResourceCPU, api.ResourceMemory:
  2764. q, err := resource.ParseQuantity(v)
  2765. if err != nil {
  2766. return nil, err
  2767. }
  2768. if q.Sign() == -1 {
  2769. return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
  2770. }
  2771. rl[api.ResourceName(k)] = q
  2772. default:
  2773. return nil, fmt.Errorf("cannot reserve %q resource", k)
  2774. }
  2775. }
  2776. return rl, nil
  2777. }
  2778. func ParseReservation(kubeReserved, systemReserved utilconfig.ConfigurationMap) (*kubetypes.Reservation, error) {
  2779. reservation := new(kubetypes.Reservation)
  2780. if rl, err := parseResourceList(kubeReserved); err != nil {
  2781. return nil, err
  2782. } else {
  2783. reservation.Kubernetes = rl
  2784. }
  2785. if rl, err := parseResourceList(systemReserved); err != nil {
  2786. return nil, err
  2787. } else {
  2788. reservation.System = rl
  2789. }
  2790. return reservation, nil
  2791. }