1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package kubelet
- import (
- "bytes"
- "fmt"
- "io"
- "io/ioutil"
- "net"
- "net/http"
- "os"
- "path"
- "path/filepath"
- "sort"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- "github.com/golang/glog"
- cadvisorapi "github.com/google/cadvisor/info/v1"
- "k8s.io/kubernetes/pkg/api"
- utilpod "k8s.io/kubernetes/pkg/api/pod"
- "k8s.io/kubernetes/pkg/api/resource"
- "k8s.io/kubernetes/pkg/api/unversioned"
- "k8s.io/kubernetes/pkg/api/validation"
- "k8s.io/kubernetes/pkg/apis/componentconfig"
- kubeExternal "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1"
- "k8s.io/kubernetes/pkg/client/cache"
- clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
- "k8s.io/kubernetes/pkg/client/record"
- "k8s.io/kubernetes/pkg/cloudprovider"
- "k8s.io/kubernetes/pkg/fieldpath"
- "k8s.io/kubernetes/pkg/fields"
- "k8s.io/kubernetes/pkg/kubelet/cadvisor"
- "k8s.io/kubernetes/pkg/kubelet/cm"
- "k8s.io/kubernetes/pkg/kubelet/config"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/dockertools"
- "k8s.io/kubernetes/pkg/kubelet/envvars"
- "k8s.io/kubernetes/pkg/kubelet/events"
- "k8s.io/kubernetes/pkg/kubelet/eviction"
- "k8s.io/kubernetes/pkg/kubelet/images"
- "k8s.io/kubernetes/pkg/kubelet/kuberuntime"
- "k8s.io/kubernetes/pkg/kubelet/lifecycle"
- "k8s.io/kubernetes/pkg/kubelet/metrics"
- "k8s.io/kubernetes/pkg/kubelet/network"
- "k8s.io/kubernetes/pkg/kubelet/pleg"
- kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
- "k8s.io/kubernetes/pkg/kubelet/prober"
- proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
- "k8s.io/kubernetes/pkg/kubelet/remote"
- "k8s.io/kubernetes/pkg/kubelet/rkt"
- "k8s.io/kubernetes/pkg/kubelet/server"
- "k8s.io/kubernetes/pkg/kubelet/server/stats"
- "k8s.io/kubernetes/pkg/kubelet/status"
- "k8s.io/kubernetes/pkg/kubelet/sysctl"
- kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
- "k8s.io/kubernetes/pkg/kubelet/util/format"
- "k8s.io/kubernetes/pkg/kubelet/util/ioutils"
- "k8s.io/kubernetes/pkg/kubelet/util/queue"
- "k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
- "k8s.io/kubernetes/pkg/kubelet/volumemanager"
- "k8s.io/kubernetes/pkg/runtime"
- "k8s.io/kubernetes/pkg/security/apparmor"
- "k8s.io/kubernetes/pkg/types"
- "k8s.io/kubernetes/pkg/util/bandwidth"
- "k8s.io/kubernetes/pkg/util/clock"
- utilconfig "k8s.io/kubernetes/pkg/util/config"
- utildbus "k8s.io/kubernetes/pkg/util/dbus"
- utilexec "k8s.io/kubernetes/pkg/util/exec"
- "k8s.io/kubernetes/pkg/util/flowcontrol"
- "k8s.io/kubernetes/pkg/util/integer"
- kubeio "k8s.io/kubernetes/pkg/util/io"
- utilipt "k8s.io/kubernetes/pkg/util/iptables"
- "k8s.io/kubernetes/pkg/util/mount"
- nodeutil "k8s.io/kubernetes/pkg/util/node"
- "k8s.io/kubernetes/pkg/util/oom"
- "k8s.io/kubernetes/pkg/util/procfs"
- utilruntime "k8s.io/kubernetes/pkg/util/runtime"
- "k8s.io/kubernetes/pkg/util/sets"
- "k8s.io/kubernetes/pkg/util/term"
- utilvalidation "k8s.io/kubernetes/pkg/util/validation"
- "k8s.io/kubernetes/pkg/util/validation/field"
- "k8s.io/kubernetes/pkg/util/wait"
- "k8s.io/kubernetes/pkg/volume"
- "k8s.io/kubernetes/pkg/volume/util/volumehelper"
- "k8s.io/kubernetes/pkg/watch"
- "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
- "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
- "k8s.io/kubernetes/third_party/forked/golang/expansion"
- )
- const (
- // Max amount of time to wait for the container runtime to come up.
- maxWaitForContainerRuntime = 5 * time.Minute
- // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
- nodeStatusUpdateRetry = 5
- // Location of container logs.
- containerLogsDir = "/var/log/containers"
- // max backoff period, exported for the e2e test
- MaxContainerBackOff = 300 * time.Second
- // Capacity of the channel for storing pods to kill. A small number should
- // suffice because a goroutine is dedicated to check the channel and does
- // not block on anything else.
- podKillingChannelCapacity = 50
- // Period for performing global cleanup tasks.
- housekeepingPeriod = time.Second * 2
- // Period for performing eviction monitoring.
- // TODO ensure this is in sync with internal cadvisor housekeeping.
- evictionMonitoringPeriod = time.Second * 10
- // The path in containers' filesystems where the hosts file is mounted.
- etcHostsPath = "/etc/hosts"
- // Capacity of the channel for receiving pod lifecycle events. This number
- // is a bit arbitrary and may be adjusted in the future.
- plegChannelCapacity = 1000
- // Generic PLEG relies on relisting for discovering container events.
- // A longer period means that kubelet will take longer to detect container
- // changes and to update pod status. On the other hand, a shorter period
- // will cause more frequent relisting (e.g., container runtime operations),
- // leading to higher cpu usage.
- // Note that even though we set the period to 1s, the relisting itself can
- // take more than 1s to finish if the container runtime responds slowly
- // and/or when there are many container changes in one cycle.
- plegRelistPeriod = time.Second * 1
- // backOffPeriod is the period to back off when pod syncing results in an
- // error. It is also used as the base period for the exponential backoff
- // container restarts and image pulls.
- backOffPeriod = time.Second * 10
- // Period for performing container garbage collection.
- ContainerGCPeriod = time.Minute
- // Period for performing image garbage collection.
- ImageGCPeriod = 5 * time.Minute
- // maxImagesInStatus is the number of max images we store in image status.
- maxImagesInNodeStatus = 50
- // Minimum number of dead containers to keep in a pod
- minDeadContainerInPod = 1
- )
- // SyncHandler is an interface implemented by Kubelet, for testability
- type SyncHandler interface {
- HandlePodAdditions(pods []*api.Pod)
- HandlePodUpdates(pods []*api.Pod)
- HandlePodRemoves(pods []*api.Pod)
- HandlePodReconcile(pods []*api.Pod)
- HandlePodSyncs(pods []*api.Pod)
- HandlePodCleanups() error
- }
- // Option is a functional option type for Kubelet
- type Option func(*Kubelet)
- // bootstrapping interface for kubelet, targets the initialization protocol
- type KubeletBootstrap interface {
- GetConfiguration() componentconfig.KubeletConfiguration
- BirthCry()
- StartGarbageCollection()
- ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool)
- ListenAndServeReadOnly(address net.IP, port uint)
- Run(<-chan kubetypes.PodUpdate)
- RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
- }
- // create and initialize a Kubelet instance
- type KubeletBuilder func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (KubeletBootstrap, error)
- // KubeletDeps is a bin for things we might consider "injected dependencies" -- objects constructed
- // at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
- // these objects while we figure out a more comprehensive dependency injection story for the Kubelet.
- type KubeletDeps struct {
- // TODO(mtaufen): KubeletBuilder:
- // Mesos currently uses this as a hook to let them make their own call to
- // let them wrap the KubeletBootstrap that CreateAndInitKubelet returns with
- // their own KubeletBootstrap. It's a useful hook. I need to think about what
- // a nice home for it would be. There seems to be a trend, between this and
- // the Options fields below, of providing hooks where you can add extra functionality
- // to the Kubelet for your solution. Maybe we should centralize these sorts of things?
- Builder KubeletBuilder
- // TODO(mtaufen): ContainerRuntimeOptions and Options:
- // Arrays of functions that can do arbitrary things to the Kubelet and the Runtime
- // seem like a difficult path to trace when it's time to debug something.
- // I'm leaving these fields here for now, but there is likely an easier-to-follow
- // way to support their intended use cases. E.g. ContainerRuntimeOptions
- // is used by Mesos to set an environment variable in containers which has
- // some connection to their container GC. It seems that Mesos intends to use
- // Options to add additional node conditions that are updated as part of the
- // Kubelet lifecycle (see https://github.com/kubernetes/kubernetes/pull/21521).
- // We should think about providing more explicit ways of doing these things.
- ContainerRuntimeOptions []kubecontainer.Option
- Options []Option
- // Injected Dependencies
- Auth server.AuthInterface
- CAdvisorInterface cadvisor.Interface
- Cloud cloudprovider.Interface
- ContainerManager cm.ContainerManager
- DockerClient dockertools.DockerInterface
- EventClient *clientset.Clientset
- KubeClient *clientset.Clientset
- Mounter mount.Interface
- NetworkPlugins []network.NetworkPlugin
- OOMAdjuster *oom.OOMAdjuster
- OSInterface kubecontainer.OSInterface
- PodConfig *config.PodConfig
- Recorder record.EventRecorder
- Writer kubeio.Writer
- VolumePlugins []volume.VolumePlugin
- TLSOptions *server.TLSOptions
- }
- func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, nodeName string) (*config.PodConfig, error) {
- manifestURLHeader := make(http.Header)
- if kubeCfg.ManifestURLHeader != "" {
- pieces := strings.Split(kubeCfg.ManifestURLHeader, ":")
- if len(pieces) != 2 {
- return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", kubeCfg.ManifestURLHeader)
- }
- manifestURLHeader.Set(pieces[0], pieces[1])
- }
- // source of all configuration
- cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
- // define file config source
- if kubeCfg.PodManifestPath != "" {
- glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath)
- config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
- }
- // define url config source
- if kubeCfg.ManifestURL != "" {
- glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader)
- config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
- }
- if kubeDeps.KubeClient != nil {
- glog.Infof("Watching apiserver")
- config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
- }
- return cfg, nil
- }
- // NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
- // No initialization of Kubelet and its modules should happen here.
- func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) {
- if kubeCfg.RootDirectory == "" {
- return nil, fmt.Errorf("invalid root directory %q", kubeCfg.RootDirectory)
- }
- if kubeCfg.SyncFrequency.Duration <= 0 {
- return nil, fmt.Errorf("invalid sync frequency %d", kubeCfg.SyncFrequency.Duration)
- }
- if kubeCfg.MakeIPTablesUtilChains {
- if kubeCfg.IPTablesMasqueradeBit > 31 || kubeCfg.IPTablesMasqueradeBit < 0 {
- return nil, fmt.Errorf("iptables-masquerade-bit is not valid. Must be within [0, 31]")
- }
- if kubeCfg.IPTablesDropBit > 31 || kubeCfg.IPTablesDropBit < 0 {
- return nil, fmt.Errorf("iptables-drop-bit is not valid. Must be within [0, 31]")
- }
- if kubeCfg.IPTablesDropBit == kubeCfg.IPTablesMasqueradeBit {
- return nil, fmt.Errorf("iptables-masquerade-bit and iptables-drop-bit must be different")
- }
- }
- hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride)
- // Query the cloud provider for our node name, default to hostname
- nodeName := hostname
- if kubeDeps.Cloud != nil {
- var err error
- instances, ok := kubeDeps.Cloud.Instances()
- if !ok {
- return nil, fmt.Errorf("failed to get instances from cloud provider")
- }
- nodeName, err = instances.CurrentNodeName(hostname)
- if err != nil {
- return nil, fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
- }
- glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
- }
- // TODO: KubeletDeps.KubeClient should be a client interface, but client interface misses certain methods
- // used by kubelet. Since NewMainKubelet expects a client interface, we need to make sure we are not passing
- // a nil pointer to it when what we really want is a nil interface.
- var kubeClient clientset.Interface
- if kubeDeps.KubeClient != nil {
- kubeClient = kubeDeps.KubeClient
- // TODO: remove this when we've refactored kubelet to only use clientset.
- }
- if kubeDeps.PodConfig == nil {
- var err error
- kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
- if err != nil {
- return nil, err
- }
- }
- containerGCPolicy := kubecontainer.ContainerGCPolicy{
- MinAge: kubeCfg.MinimumGCAge.Duration,
- MaxPerPodContainer: int(kubeCfg.MaxPerPodContainerCount),
- MaxContainers: int(kubeCfg.MaxContainerCount),
- }
- daemonEndpoints := &api.NodeDaemonEndpoints{
- KubeletEndpoint: api.DaemonEndpoint{Port: kubeCfg.Port},
- }
- imageGCPolicy := images.ImageGCPolicy{
- MinAge: kubeCfg.ImageMinimumGCAge.Duration,
- HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
- LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
- }
- diskSpacePolicy := DiskSpacePolicy{
- DockerFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB),
- RootFreeDiskMB: int(kubeCfg.LowDiskSpaceThresholdMB),
- }
- thresholds, err := eviction.ParseThresholdConfig(kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
- if err != nil {
- return nil, err
- }
- evictionConfig := eviction.Config{
- PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
- MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
- Thresholds: thresholds,
- }
- reservation, err := ParseReservation(kubeCfg.KubeReserved, kubeCfg.SystemReserved)
- if err != nil {
- return nil, err
- }
- var dockerExecHandler dockertools.ExecHandler
- switch kubeCfg.DockerExecHandlerName {
- case "native":
- dockerExecHandler = &dockertools.NativeExecHandler{}
- case "nsenter":
- dockerExecHandler = &dockertools.NsenterExecHandler{}
- default:
- glog.Warningf("Unknown Docker exec handler %q; defaulting to native", kubeCfg.DockerExecHandlerName)
- dockerExecHandler = &dockertools.NativeExecHandler{}
- }
- serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
- if kubeClient != nil {
- // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
- // than an interface. There is no way to construct a list+watcher using resource name.
- listWatch := &cache.ListWatch{
- ListFunc: func(options api.ListOptions) (runtime.Object, error) {
- return kubeClient.Core().Services(api.NamespaceAll).List(options)
- },
- WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
- return kubeClient.Core().Services(api.NamespaceAll).Watch(options)
- },
- }
- cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run()
- }
- serviceLister := &cache.StoreToServiceLister{Store: serviceStore}
- nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
- if kubeClient != nil {
- // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather
- // than an interface. There is no way to construct a list+watcher using resource name.
- fieldSelector := fields.Set{api.ObjectNameField: nodeName}.AsSelector()
- listWatch := &cache.ListWatch{
- ListFunc: func(options api.ListOptions) (runtime.Object, error) {
- options.FieldSelector = fieldSelector
- return kubeClient.Core().Nodes().List(options)
- },
- WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
- options.FieldSelector = fieldSelector
- return kubeClient.Core().Nodes().Watch(options)
- },
- }
- cache.NewReflector(listWatch, &api.Node{}, nodeStore, 0).Run()
- }
- nodeLister := &cache.StoreToNodeLister{Store: nodeStore}
- nodeInfo := &predicates.CachedNodeInfo{StoreToNodeLister: nodeLister}
- // TODO: get the real node object of ourself,
- // and use the real node name and UID.
- // TODO: what is namespace for node?
- nodeRef := &api.ObjectReference{
- Kind: "Node",
- Name: nodeName,
- UID: types.UID(nodeName),
- Namespace: "",
- }
- diskSpaceManager, err := newDiskSpaceManager(kubeDeps.CAdvisorInterface, diskSpacePolicy)
- if err != nil {
- return nil, fmt.Errorf("failed to initialize disk manager: %v", err)
- }
- containerRefManager := kubecontainer.NewRefManager()
- oomWatcher := NewOOMWatcher(kubeDeps.CAdvisorInterface, kubeDeps.Recorder)
- // TODO(mtaufen): remove when internal cbr0 implementation gets removed in favor
- // of the kubenet network plugin
- var myConfigureCBR0 bool = kubeCfg.ConfigureCBR0
- var myFlannelExperimentalOverlay bool = kubeCfg.ExperimentalFlannelOverlay
- if kubeCfg.NetworkPluginName == "kubenet" {
- myConfigureCBR0 = false
- myFlannelExperimentalOverlay = false
- }
- klet := &Kubelet{
- hostname: hostname,
- nodeName: nodeName,
- dockerClient: kubeDeps.DockerClient,
- kubeClient: kubeClient,
- rootDirectory: kubeCfg.RootDirectory,
- resyncInterval: kubeCfg.SyncFrequency.Duration,
- containerRefManager: containerRefManager,
- httpClient: &http.Client{},
- sourcesReady: config.NewSourcesReady(kubeDeps.PodConfig.SeenAllSources),
- registerNode: kubeCfg.RegisterNode,
- registerSchedulable: kubeCfg.RegisterSchedulable,
- standaloneMode: standaloneMode,
- clusterDomain: kubeCfg.ClusterDomain,
- clusterDNS: net.ParseIP(kubeCfg.ClusterDNS),
- serviceLister: serviceLister,
- nodeLister: nodeLister,
- nodeInfo: nodeInfo,
- masterServiceNamespace: kubeCfg.MasterServiceNamespace,
- streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
- recorder: kubeDeps.Recorder,
- cadvisor: kubeDeps.CAdvisorInterface,
- diskSpaceManager: diskSpaceManager,
- cloud: kubeDeps.Cloud,
- autoDetectCloudProvider: (kubeExternal.AutoDetectCloudProvider == kubeCfg.CloudProvider),
- nodeRef: nodeRef,
- nodeLabels: kubeCfg.NodeLabels,
- nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
- os: kubeDeps.OSInterface,
- oomWatcher: oomWatcher,
- cgroupsPerQOS: kubeCfg.CgroupsPerQOS,
- cgroupRoot: kubeCfg.CgroupRoot,
- mounter: kubeDeps.Mounter,
- writer: kubeDeps.Writer,
- configureCBR0: myConfigureCBR0,
- nonMasqueradeCIDR: kubeCfg.NonMasqueradeCIDR,
- reconcileCIDR: kubeCfg.ReconcileCIDR,
- maxPods: int(kubeCfg.MaxPods),
- podsPerCore: int(kubeCfg.PodsPerCore),
- nvidiaGPUs: int(kubeCfg.NvidiaGPUs),
- syncLoopMonitor: atomic.Value{},
- resolverConfig: kubeCfg.ResolverConfig,
- cpuCFSQuota: kubeCfg.CPUCFSQuota,
- daemonEndpoints: daemonEndpoints,
- containerManager: kubeDeps.ContainerManager,
- flannelExperimentalOverlay: myFlannelExperimentalOverlay,
- flannelHelper: nil,
- nodeIP: net.ParseIP(kubeCfg.NodeIP),
- clock: clock.RealClock{},
- outOfDiskTransitionFrequency: kubeCfg.OutOfDiskTransitionFrequency.Duration,
- reservation: *reservation,
- enableCustomMetrics: kubeCfg.EnableCustomMetrics,
- babysitDaemons: kubeCfg.BabysitDaemons,
- enableControllerAttachDetach: kubeCfg.EnableControllerAttachDetach,
- iptClient: utilipt.New(utilexec.New(), utildbus.New(), utilipt.ProtocolIpv4),
- makeIPTablesUtilChains: kubeCfg.MakeIPTablesUtilChains,
- iptablesMasqueradeBit: int(kubeCfg.IPTablesMasqueradeBit),
- iptablesDropBit: int(kubeCfg.IPTablesDropBit),
- }
- if klet.flannelExperimentalOverlay {
- klet.flannelHelper = NewFlannelHelper()
- glog.Infof("Flannel is in charge of podCIDR and overlay networking.")
- }
- if klet.nodeIP != nil {
- if err := klet.validateNodeIP(); err != nil {
- return nil, err
- }
- glog.Infof("Using node IP: %q", klet.nodeIP.String())
- }
- if mode, err := effectiveHairpinMode(componentconfig.HairpinMode(kubeCfg.HairpinMode), kubeCfg.ContainerRuntime, kubeCfg.ConfigureCBR0, kubeCfg.NetworkPluginName); err != nil {
- // This is a non-recoverable error. Returning it up the callstack will just
- // lead to retries of the same failure, so just fail hard.
- glog.Fatalf("Invalid hairpin mode: %v", err)
- } else {
- klet.hairpinMode = mode
- }
- glog.Infof("Hairpin mode set to %q", klet.hairpinMode)
- if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &networkHost{klet}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil {
- return nil, err
- } else {
- klet.networkPlugin = plug
- }
- machineInfo, err := klet.GetCachedMachineInfo()
- if err != nil {
- return nil, err
- }
- procFs := procfs.NewProcFS()
- imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
- klet.livenessManager = proberesults.NewManager()
- klet.podCache = kubecontainer.NewCache()
- klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
- if kubeCfg.RemoteRuntimeEndpoint != "" {
- kubeCfg.ContainerRuntime = "remote"
- // kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified
- if kubeCfg.RemoteImageEndpoint == "" {
- kubeCfg.RemoteImageEndpoint = kubeCfg.RemoteRuntimeEndpoint
- }
- }
- // Initialize the runtime.
- switch kubeCfg.ContainerRuntime {
- case "docker":
- // Only supported one for now, continue.
- klet.containerRuntime = dockertools.NewDockerManager(
- kubeDeps.DockerClient,
- kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
- klet.livenessManager,
- containerRefManager,
- klet.podManager,
- machineInfo,
- kubeCfg.PodInfraContainerImage,
- float32(kubeCfg.RegistryPullQPS),
- int(kubeCfg.RegistryBurst),
- containerLogsDir,
- kubeDeps.OSInterface,
- klet.networkPlugin,
- klet,
- klet.httpClient,
- dockerExecHandler,
- kubeDeps.OOMAdjuster,
- procFs,
- klet.cpuCFSQuota,
- imageBackOff,
- kubeCfg.SerializeImagePulls,
- kubeCfg.EnableCustomMetrics,
- // If using "kubenet", the Kubernetes network plugin that wraps
- // CNI's bridge plugin, it knows how to set the hairpin veth flag
- // so we tell the container runtime to back away from setting it.
- // If the kubelet is started with any other plugin we can't be
- // sure it handles the hairpin case so we instruct the docker
- // runtime to set the flag instead.
- klet.hairpinMode == componentconfig.HairpinVeth && kubeCfg.NetworkPluginName != "kubenet",
- kubeCfg.SeccompProfileRoot,
- kubeDeps.ContainerRuntimeOptions...,
- )
- case "rkt":
- // TODO: Include hairpin mode settings in rkt?
- conf := &rkt.Config{
- Path: kubeCfg.RktPath,
- Stage1Image: kubeCfg.RktStage1Image,
- InsecureOptions: "image,ondisk",
- }
- rktRuntime, err := rkt.New(
- kubeCfg.RktAPIEndpoint,
- conf,
- klet,
- kubeDeps.Recorder,
- containerRefManager,
- klet.podManager,
- klet.livenessManager,
- klet.httpClient,
- klet.networkPlugin,
- klet.hairpinMode == componentconfig.HairpinVeth,
- utilexec.New(),
- kubecontainer.RealOS{},
- imageBackOff,
- kubeCfg.SerializeImagePulls,
- kubeCfg.RuntimeRequestTimeout.Duration,
- )
- if err != nil {
- return nil, err
- }
- klet.containerRuntime = rktRuntime
- case "remote":
- remoteRuntimeService, err := remote.NewRemoteRuntimeService(kubeCfg.RemoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
- if err != nil {
- return nil, err
- }
- remoteImageService, err := remote.NewRemoteImageService(kubeCfg.RemoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
- if err != nil {
- return nil, err
- }
- klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager(
- kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
- klet.livenessManager,
- containerRefManager,
- kubeDeps.OSInterface,
- klet.networkPlugin,
- klet,
- klet.httpClient,
- imageBackOff,
- kubeCfg.SerializeImagePulls,
- klet.cpuCFSQuota,
- remoteRuntimeService,
- remoteImageService,
- )
- if err != nil {
- return nil, err
- }
- default:
- return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime)
- }
- // TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency
- klet.resourceAnalyzer = stats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, klet.containerRuntime)
- klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
- klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
- klet.updatePodCIDR(kubeCfg.PodCIDR)
- // setup containerGC
- containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
- if err != nil {
- return nil, err
- }
- klet.containerGC = containerGC
- klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
- // setup imageManager
- imageManager, err := images.NewImageGCManager(klet.containerRuntime, kubeDeps.CAdvisorInterface, kubeDeps.Recorder, nodeRef, imageGCPolicy)
- if err != nil {
- return nil, fmt.Errorf("failed to initialize image manager: %v", err)
- }
- klet.imageManager = imageManager
- klet.runner = klet.containerRuntime
- klet.statusManager = status.NewManager(kubeClient, klet.podManager)
- klet.probeManager = prober.NewManager(
- klet.statusManager,
- klet.livenessManager,
- klet.runner,
- containerRefManager,
- kubeDeps.Recorder)
- klet.volumePluginMgr, err =
- NewInitializedVolumePluginMgr(klet, kubeDeps.VolumePlugins)
- if err != nil {
- return nil, err
- }
- // setup volumeManager
- klet.volumeManager, err = volumemanager.NewVolumeManager(
- kubeCfg.EnableControllerAttachDetach,
- nodeName,
- klet.podManager,
- klet.kubeClient,
- klet.volumePluginMgr,
- klet.containerRuntime,
- kubeDeps.Mounter,
- klet.getPodsDir(),
- kubeDeps.Recorder)
- runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
- if err != nil {
- return nil, err
- }
- klet.runtimeCache = runtimeCache
- klet.reasonCache = NewReasonCache()
- klet.workQueue = queue.NewBasicWorkQueue(klet.clock)
- klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
- klet.backOff = flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
- klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity)
- klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
- // setup eviction manager
- evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers), klet.imageManager, kubeDeps.Recorder, nodeRef, klet.clock)
- if err != nil {
- return nil, fmt.Errorf("failed to initialize eviction manager: %v", err)
- }
- klet.evictionManager = evictionManager
- klet.AddPodAdmitHandler(evictionAdmitHandler)
- // add sysctl admission
- runtimeSupport, err := sysctl.NewRuntimeAdmitHandler(klet.containerRuntime)
- if err != nil {
- return nil, err
- }
- safeWhitelist, err := sysctl.NewWhitelist(sysctl.SafeSysctlWhitelist(), api.SysctlsPodAnnotationKey)
- if err != nil {
- return nil, err
- }
- // Safe, whitelisted sysctls can always be used as unsafe sysctls in the spec
- // Hence, we concatenate those two lists.
- safeAndUnsafeSysctls := append(sysctl.SafeSysctlWhitelist(), kubeCfg.AllowedUnsafeSysctls...)
- unsafeWhitelist, err := sysctl.NewWhitelist(safeAndUnsafeSysctls, api.UnsafeSysctlsPodAnnotationKey)
- if err != nil {
- return nil, err
- }
- klet.AddPodAdmitHandler(runtimeSupport)
- klet.AddPodAdmitHandler(safeWhitelist)
- klet.AddPodAdmitHandler(unsafeWhitelist)
- // enable active deadline handler
- activeDeadlineHandler, err := newActiveDeadlineHandler(klet.statusManager, kubeDeps.Recorder, klet.clock)
- if err != nil {
- return nil, err
- }
- klet.AddPodSyncLoopHandler(activeDeadlineHandler)
- klet.AddPodSyncHandler(activeDeadlineHandler)
- klet.appArmorValidator = apparmor.NewValidator(kubeCfg.ContainerRuntime)
- klet.AddPodAdmitHandler(lifecycle.NewAppArmorAdmitHandler(klet.appArmorValidator))
- // apply functional Option's
- for _, opt := range kubeDeps.Options {
- opt(klet)
- }
- // Finally, put the most recent version of the config on the Kubelet, so
- // people can see how it was configured.
- klet.kubeletConfiguration = *kubeCfg
- return klet, nil
- }
- type serviceLister interface {
- List() (api.ServiceList, error)
- }
- type nodeLister interface {
- List() (machines api.NodeList, err error)
- }
- // Kubelet is the main kubelet implementation.
- type Kubelet struct {
- kubeletConfiguration componentconfig.KubeletConfiguration
- hostname string
- nodeName string
- dockerClient dockertools.DockerInterface
- runtimeCache kubecontainer.RuntimeCache
- kubeClient clientset.Interface
- iptClient utilipt.Interface
- rootDirectory string
- // podWorkers handle syncing Pods in response to events.
- podWorkers PodWorkers
- // resyncInterval is the interval between periodic full reconciliations of
- // pods on this node.
- resyncInterval time.Duration
- // sourcesReady records the sources seen by the kubelet, it is thread-safe.
- sourcesReady config.SourcesReady
- // podManager is a facade that abstracts away the various sources of pods
- // this Kubelet services.
- podManager kubepod.Manager
- // Needed to observe and respond to situations that could impact node stability
- evictionManager eviction.Manager
- // Needed to report events for containers belonging to deleted/modified pods.
- // Tracks references for reporting events
- containerRefManager *kubecontainer.RefManager
- // Optional, defaults to /logs/ from /var/log
- logServer http.Handler
- // Optional, defaults to simple Docker implementation
- runner kubecontainer.ContainerCommandRunner
- // Optional, client for http requests, defaults to empty client
- httpClient kubetypes.HttpGetter
- // cAdvisor used for container information.
- cadvisor cadvisor.Interface
- // Set to true to have the node register itself with the apiserver.
- registerNode bool
- // Set to true to have the node register itself as schedulable.
- registerSchedulable bool
- // for internal book keeping; access only from within registerWithApiserver
- registrationCompleted bool
- // Set to true if the kubelet is in standalone mode (i.e. setup without an apiserver)
- standaloneMode bool
- // If non-empty, use this for container DNS search.
- clusterDomain string
- // If non-nil, use this for container DNS server.
- clusterDNS net.IP
- // masterServiceNamespace is the namespace that the master service is exposed in.
- masterServiceNamespace string
- // serviceLister knows how to list services
- serviceLister serviceLister
- // nodeLister knows how to list nodes
- nodeLister nodeLister
- // nodeInfo knows how to get information about the node for this kubelet.
- nodeInfo predicates.NodeInfo
- // a list of node labels to register
- nodeLabels map[string]string
- // Last timestamp when runtime responded on ping.
- // Mutex is used to protect this value.
- runtimeState *runtimeState
- // Volume plugins.
- volumePluginMgr *volume.VolumePluginMgr
- // Network plugin.
- networkPlugin network.NetworkPlugin
- // Handles container probing.
- probeManager prober.Manager
- // Manages container health check results.
- livenessManager proberesults.Manager
- // How long to keep idle streaming command execution/port forwarding
- // connections open before terminating them
- streamingConnectionIdleTimeout time.Duration
- // The EventRecorder to use
- recorder record.EventRecorder
- // Policy for handling garbage collection of dead containers.
- containerGC kubecontainer.ContainerGC
- // Manager for image garbage collection.
- imageManager images.ImageGCManager
- // Diskspace manager.
- diskSpaceManager diskSpaceManager
- // Cached MachineInfo returned by cadvisor.
- machineInfo *cadvisorapi.MachineInfo
- // Syncs pods statuses with apiserver; also used as a cache of statuses.
- statusManager status.Manager
- // VolumeManager runs a set of asynchronous loops that figure out which
- // volumes need to be attached/mounted/unmounted/detached based on the pods
- // scheduled on this node and makes it so.
- volumeManager volumemanager.VolumeManager
- // Cloud provider interface.
- cloud cloudprovider.Interface
- autoDetectCloudProvider bool
- // Reference to this node.
- nodeRef *api.ObjectReference
- // Container runtime.
- containerRuntime kubecontainer.Runtime
- // reasonCache caches the failure reason of the last creation of all containers, which is
- // used for generating ContainerStatus.
- reasonCache *ReasonCache
- // nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
- // Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
- // in nodecontroller. There are several constraints:
- // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
- // N means number of retries allowed for kubelet to post node status. It is pointless
- // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
- // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
- // The constant must be less than podEvictionTimeout.
- // 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
- // status. Kubelet may fail to update node status reliably if the value is too small,
- // as it takes time to gather all necessary node information.
- nodeStatusUpdateFrequency time.Duration
- // Generates pod events.
- pleg pleg.PodLifecycleEventGenerator
- // Store kubecontainer.PodStatus for all pods.
- podCache kubecontainer.Cache
- // os is a facade for various syscalls that need to be mocked during testing.
- os kubecontainer.OSInterface
- // Watcher of out of memory events.
- oomWatcher OOMWatcher
- // Monitor resource usage
- resourceAnalyzer stats.ResourceAnalyzer
- // Whether or not we should have the QOS cgroup hierarchy for resource management
- cgroupsPerQOS bool
- // If non-empty, pass this to the container runtime as the root cgroup.
- cgroupRoot string
- // Mounter to use for volumes.
- mounter mount.Interface
- // Writer interface to use for volumes.
- writer kubeio.Writer
- // Manager of non-Runtime containers.
- containerManager cm.ContainerManager
- nodeConfig cm.NodeConfig
- // Whether or not kubelet should take responsibility for keeping cbr0 in
- // the correct state.
- configureCBR0 bool
- reconcileCIDR bool
- // Traffic to IPs outside this range will use IP masquerade.
- nonMasqueradeCIDR string
- // Maximum Number of Pods which can be run by this Kubelet
- maxPods int
- // Number of NVIDIA GPUs on this node
- nvidiaGPUs int
- // Monitor Kubelet's sync loop
- syncLoopMonitor atomic.Value
- // Container restart Backoff
- backOff *flowcontrol.Backoff
- // Channel for sending pods to kill.
- podKillingCh chan *kubecontainer.PodPair
- // The configuration file used as the base to generate the container's
- // DNS resolver configuration file. This can be used in conjunction with
- // clusterDomain and clusterDNS.
- resolverConfig string
- // Optionally shape the bandwidth of a pod
- // TODO: remove when kubenet plugin is ready
- shaper bandwidth.BandwidthShaper
- // True if container cpu limits should be enforced via cgroup CFS quota
- cpuCFSQuota bool
- // Information about the ports which are opened by daemons on Node running this Kubelet server.
- daemonEndpoints *api.NodeDaemonEndpoints
- // A queue used to trigger pod workers.
- workQueue queue.WorkQueue
- // oneTimeInitializer is used to initialize modules that are dependent on the runtime to be up.
- oneTimeInitializer sync.Once
- // flannelExperimentalOverlay determines whether the experimental flannel
- // network overlay is active.
- flannelExperimentalOverlay bool
- // TODO: Flannelhelper doesn't store any state, we can instantiate it
- // on the fly if we're confident the dbus connetions it opens doesn't
- // put the system under duress.
- flannelHelper *FlannelHelper
- // If non-nil, use this IP address for the node
- nodeIP net.IP
- // clock is an interface that provides time related functionality in a way that makes it
- // easy to test the code.
- clock clock.Clock
- // outOfDiskTransitionFrequency specifies the amount of time the kubelet has to be actually
- // not out of disk before it can transition the node condition status from out-of-disk to
- // not-out-of-disk. This prevents a pod that causes out-of-disk condition from repeatedly
- // getting rescheduled onto the node.
- outOfDiskTransitionFrequency time.Duration
- // reservation specifies resources which are reserved for non-pod usage, including kubernetes and
- // non-kubernetes system processes.
- reservation kubetypes.Reservation
- // support gathering custom metrics.
- enableCustomMetrics bool
- // How the Kubelet should setup hairpin NAT. Can take the values: "promiscuous-bridge"
- // (make cbr0 promiscuous), "hairpin-veth" (set the hairpin flag on veth interfaces)
- // or "none" (do nothing).
- hairpinMode componentconfig.HairpinMode
- // The node has babysitter process monitoring docker and kubelet
- babysitDaemons bool
- // handlers called during the tryUpdateNodeStatus cycle
- setNodeStatusFuncs []func(*api.Node) error
- // TODO: think about moving this to be centralized in PodWorkers in follow-on.
- // the list of handlers to call during pod admission.
- lifecycle.PodAdmitHandlers
- // the list of handlers to call during pod sync loop.
- lifecycle.PodSyncLoopHandlers
- // the list of handlers to call during pod sync.
- lifecycle.PodSyncHandlers
- // the number of allowed pods per core
- podsPerCore int
- // enableControllerAttachDetach indicates the Attach/Detach controller
- // should manage attachment/detachment of volumes scheduled to this node,
- // and disable kubelet from executing any attach/detach operations
- enableControllerAttachDetach bool
- // trigger deleting containers in a pod
- containerDeletor *podContainerDeletor
- // config iptables util rules
- makeIPTablesUtilChains bool
- // The bit of the fwmark space to mark packets for SNAT.
- iptablesMasqueradeBit int
- // The bit of the fwmark space to mark packets for dropping.
- iptablesDropBit int
- // The AppArmor validator for checking whether AppArmor is supported.
- appArmorValidator apparmor.Validator
- }
- // setupDataDirs creates:
- // 1. the root directory
- // 2. the pods directory
- // 3. the plugins directory
- func (kl *Kubelet) setupDataDirs() error {
- kl.rootDirectory = path.Clean(kl.rootDirectory)
- if err := os.MkdirAll(kl.getRootDir(), 0750); err != nil {
- return fmt.Errorf("error creating root directory: %v", err)
- }
- if err := os.MkdirAll(kl.getPodsDir(), 0750); err != nil {
- return fmt.Errorf("error creating pods directory: %v", err)
- }
- if err := os.MkdirAll(kl.getPluginsDir(), 0750); err != nil {
- return fmt.Errorf("error creating plugins directory: %v", err)
- }
- return nil
- }
- // Get a list of pods that have data directories.
- func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
- podInfos, err := ioutil.ReadDir(kl.getPodsDir())
- if err != nil {
- return nil, err
- }
- pods := []types.UID{}
- for i := range podInfos {
- if podInfos[i].IsDir() {
- pods = append(pods, types.UID(podInfos[i].Name()))
- }
- }
- return pods, nil
- }
- // Starts garbage collection threads.
- func (kl *Kubelet) StartGarbageCollection() {
- go wait.Until(func() {
- if err := kl.containerGC.GarbageCollect(kl.sourcesReady.AllReady()); err != nil {
- glog.Errorf("Container garbage collection failed: %v", err)
- }
- }, ContainerGCPeriod, wait.NeverStop)
- go wait.Until(func() {
- if err := kl.imageManager.GarbageCollect(); err != nil {
- glog.Errorf("Image garbage collection failed: %v", err)
- }
- }, ImageGCPeriod, wait.NeverStop)
- }
- // initializeModules will initialize internal modules that do not require the container runtime to be up.
- // Note that the modules here must not depend on modules that are not initialized here.
- func (kl *Kubelet) initializeModules() error {
- // Step 1: Promethues metrics.
- metrics.Register(kl.runtimeCache)
- // Step 2: Setup filesystem directories.
- if err := kl.setupDataDirs(); err != nil {
- return err
- }
- // Step 3: If the container logs directory does not exist, create it.
- if _, err := os.Stat(containerLogsDir); err != nil {
- if err := kl.os.MkdirAll(containerLogsDir, 0755); err != nil {
- glog.Errorf("Failed to create directory %q: %v", containerLogsDir, err)
- }
- }
- // Step 4: Start the image manager.
- if err := kl.imageManager.Start(); err != nil {
- return fmt.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
- }
- // Step 5: Start container manager.
- node, err := kl.getNodeAnyWay()
- if err != nil {
- glog.Errorf("Cannot get Node info: %v", err)
- return fmt.Errorf("Kubelet failed to get node info.")
- }
- if err := kl.containerManager.Start(node); err != nil {
- return fmt.Errorf("Failed to start ContainerManager %v", err)
- }
- // Step 6: Start out of memory watcher.
- if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
- return fmt.Errorf("Failed to start OOM watcher %v", err)
- }
- // Step 7: Start resource analyzer
- kl.resourceAnalyzer.Start()
- return nil
- }
- // initializeRuntimeDependentModules will initialize internal modules that require the container runtime to be up.
- func (kl *Kubelet) initializeRuntimeDependentModules() {
- if err := kl.cadvisor.Start(); err != nil {
- // Fail kubelet and rely on the babysitter to retry starting kubelet.
- // TODO(random-liu): Add backoff logic in the babysitter
- glog.Fatalf("Failed to start cAdvisor %v", err)
- }
- // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs
- if err := kl.evictionManager.Start(kl, kl.getActivePods, evictionMonitoringPeriod); err != nil {
- kl.runtimeState.setInternalError(fmt.Errorf("failed to start eviction manager %v", err))
- }
- }
- // Run starts the kubelet reacting to config updates
- func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
- if kl.logServer == nil {
- kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
- }
- if kl.kubeClient == nil {
- glog.Warning("No api server defined - no node status update will be sent.")
- }
- if err := kl.initializeModules(); err != nil {
- kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.KubeletSetupFailed, err.Error())
- glog.Error(err)
- kl.runtimeState.setInitError(err)
- }
- // Start volume manager
- go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
- if kl.kubeClient != nil {
- // Start syncing node status immediately, this may set up things the runtime needs to run.
- go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
- }
- go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
- go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
- // Start loop to sync iptables util rules
- if kl.makeIPTablesUtilChains {
- go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
- }
- // Start a goroutine responsible for killing pods (that are not properly
- // handled by pod workers).
- go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)
- // Start component sync loops.
- kl.statusManager.Start()
- kl.probeManager.Start()
- // Start the pod lifecycle event generator.
- kl.pleg.Start()
- kl.syncLoop(updates, kl)
- }
- // getActivePods returns non-terminal pods
- func (kl *Kubelet) getActivePods() []*api.Pod {
- allPods := kl.podManager.GetPods()
- activePods := kl.filterOutTerminatedPods(allPods)
- return activePods
- }
- // makeMounts determines the mount points for the given container.
- func makeMounts(pod *api.Pod, podDir string, container *api.Container, hostName, hostDomain, podIP string, podVolumes kubecontainer.VolumeMap) ([]kubecontainer.Mount, error) {
- // Kubernetes only mounts on /etc/hosts if :
- // - container does not use hostNetwork and
- // - container is not an infrastructure(pause) container
- // - container is not already mounting on /etc/hosts
- // When the pause container is being created, its IP is still unknown. Hence, PodIP will not have been set.
- mountEtcHostsFile := (pod.Spec.SecurityContext == nil || !pod.Spec.SecurityContext.HostNetwork) && len(podIP) > 0
- glog.V(3).Infof("container: %v/%v/%v podIP: %q creating hosts mount: %v", pod.Namespace, pod.Name, container.Name, podIP, mountEtcHostsFile)
- mounts := []kubecontainer.Mount{}
- for _, mount := range container.VolumeMounts {
- mountEtcHostsFile = mountEtcHostsFile && (mount.MountPath != etcHostsPath)
- vol, ok := podVolumes[mount.Name]
- if !ok {
- glog.Warningf("Mount cannot be satisfied for container %q, because the volume is missing: %q", container.Name, mount)
- continue
- }
- relabelVolume := false
- // If the volume supports SELinux and it has not been
- // relabeled already and it is not a read-only volume,
- // relabel it and mark it as labeled
- if vol.Mounter.GetAttributes().Managed && vol.Mounter.GetAttributes().SupportsSELinux && !vol.SELinuxLabeled {
- vol.SELinuxLabeled = true
- relabelVolume = true
- }
- hostPath, err := volume.GetPath(vol.Mounter)
- if err != nil {
- return nil, err
- }
- if mount.SubPath != "" {
- hostPath = filepath.Join(hostPath, mount.SubPath)
- }
- mounts = append(mounts, kubecontainer.Mount{
- Name: mount.Name,
- ContainerPath: mount.MountPath,
- HostPath: hostPath,
- ReadOnly: mount.ReadOnly,
- SELinuxRelabel: relabelVolume,
- })
- }
- if mountEtcHostsFile {
- hostsMount, err := makeHostsMount(podDir, podIP, hostName, hostDomain)
- if err != nil {
- return nil, err
- }
- mounts = append(mounts, *hostsMount)
- }
- return mounts, nil
- }
- // makeHostsMount makes the mountpoint for the hosts file that the containers
- // in a pod are injected with.
- func makeHostsMount(podDir, podIP, hostName, hostDomainName string) (*kubecontainer.Mount, error) {
- hostsFilePath := path.Join(podDir, "etc-hosts")
- if err := ensureHostsFile(hostsFilePath, podIP, hostName, hostDomainName); err != nil {
- return nil, err
- }
- return &kubecontainer.Mount{
- Name: "k8s-managed-etc-hosts",
- ContainerPath: etcHostsPath,
- HostPath: hostsFilePath,
- ReadOnly: false,
- }, nil
- }
- // ensureHostsFile ensures that the given host file has an up-to-date ip, host
- // name, and domain name.
- func ensureHostsFile(fileName, hostIP, hostName, hostDomainName string) error {
- if _, err := os.Stat(fileName); os.IsExist(err) {
- glog.V(4).Infof("kubernetes-managed etc-hosts file exits. Will not be recreated: %q", fileName)
- return nil
- }
- var buffer bytes.Buffer
- buffer.WriteString("# Kubernetes-managed hosts file.\n")
- buffer.WriteString("127.0.0.1\tlocalhost\n") // ipv4 localhost
- buffer.WriteString("::1\tlocalhost ip6-localhost ip6-loopback\n") // ipv6 localhost
- buffer.WriteString("fe00::0\tip6-localnet\n")
- buffer.WriteString("fe00::0\tip6-mcastprefix\n")
- buffer.WriteString("fe00::1\tip6-allnodes\n")
- buffer.WriteString("fe00::2\tip6-allrouters\n")
- if len(hostDomainName) > 0 {
- buffer.WriteString(fmt.Sprintf("%s\t%s.%s\t%s\n", hostIP, hostName, hostDomainName, hostName))
- } else {
- buffer.WriteString(fmt.Sprintf("%s\t%s\n", hostIP, hostName))
- }
- return ioutil.WriteFile(fileName, buffer.Bytes(), 0644)
- }
- func makePortMappings(container *api.Container) (ports []kubecontainer.PortMapping) {
- names := make(map[string]struct{})
- for _, p := range container.Ports {
- pm := kubecontainer.PortMapping{
- HostPort: int(p.HostPort),
- ContainerPort: int(p.ContainerPort),
- Protocol: p.Protocol,
- HostIP: p.HostIP,
- }
- // We need to create some default port name if it's not specified, since
- // this is necessary for rkt.
- // http://issue.k8s.io/7710
- if p.Name == "" {
- pm.Name = fmt.Sprintf("%s-%s:%d", container.Name, p.Protocol, p.ContainerPort)
- } else {
- pm.Name = fmt.Sprintf("%s-%s", container.Name, p.Name)
- }
- // Protect against exposing the same protocol-port more than once in a container.
- if _, ok := names[pm.Name]; ok {
- glog.Warningf("Port name conflicted, %q is defined more than once", pm.Name)
- continue
- }
- ports = append(ports, pm)
- names[pm.Name] = struct{}{}
- }
- return
- }
- func (kl *Kubelet) GeneratePodHostNameAndDomain(pod *api.Pod) (string, string, error) {
- // TODO(vmarmol): Handle better.
- // Cap hostname at 63 chars (specification is 64bytes which is 63 chars and the null terminating char).
- clusterDomain := kl.clusterDomain
- const hostnameMaxLen = 63
- podAnnotations := pod.Annotations
- if podAnnotations == nil {
- podAnnotations = make(map[string]string)
- }
- hostname := pod.Name
- if len(pod.Spec.Hostname) > 0 {
- if msgs := utilvalidation.IsDNS1123Label(pod.Spec.Hostname); len(msgs) != 0 {
- return "", "", fmt.Errorf("Pod Hostname %q is not a valid DNS label: %s", pod.Spec.Hostname, strings.Join(msgs, ";"))
- }
- hostname = pod.Spec.Hostname
- } else {
- hostnameCandidate := podAnnotations[utilpod.PodHostnameAnnotation]
- if len(utilvalidation.IsDNS1123Label(hostnameCandidate)) == 0 {
- // use hostname annotation, if specified.
- hostname = hostnameCandidate
- }
- }
- if len(hostname) > hostnameMaxLen {
- hostname = hostname[:hostnameMaxLen]
- glog.Errorf("hostname for pod:%q was longer than %d. Truncated hostname to :%q", pod.Name, hostnameMaxLen, hostname)
- }
- hostDomain := ""
- if len(pod.Spec.Subdomain) > 0 {
- if msgs := utilvalidation.IsDNS1123Label(pod.Spec.Subdomain); len(msgs) != 0 {
- return "", "", fmt.Errorf("Pod Subdomain %q is not a valid DNS label: %s", pod.Spec.Subdomain, strings.Join(msgs, ";"))
- }
- hostDomain = fmt.Sprintf("%s.%s.svc.%s", pod.Spec.Subdomain, pod.Namespace, clusterDomain)
- } else {
- subdomainCandidate := pod.Annotations[utilpod.PodSubdomainAnnotation]
- if len(utilvalidation.IsDNS1123Label(subdomainCandidate)) == 0 {
- hostDomain = fmt.Sprintf("%s.%s.svc.%s", subdomainCandidate, pod.Namespace, clusterDomain)
- }
- }
- return hostname, hostDomain, nil
- }
- // GenerateRunContainerOptions generates the RunContainerOptions, which can be used by
- // the container runtime to set parameters for launching a container.
- func (kl *Kubelet) GenerateRunContainerOptions(pod *api.Pod, container *api.Container, podIP string) (*kubecontainer.RunContainerOptions, error) {
- var err error
- opts := &kubecontainer.RunContainerOptions{CgroupParent: kl.cgroupRoot}
- hostname, hostDomainName, err := kl.GeneratePodHostNameAndDomain(pod)
- if err != nil {
- return nil, err
- }
- opts.Hostname = hostname
- podName := volumehelper.GetUniquePodName(pod)
- volumes := kl.volumeManager.GetMountedVolumesForPod(podName)
- opts.PortMappings = makePortMappings(container)
- // Docker does not relabel volumes if the container is running
- // in the host pid or ipc namespaces so the kubelet must
- // relabel the volumes
- if pod.Spec.SecurityContext != nil && (pod.Spec.SecurityContext.HostIPC || pod.Spec.SecurityContext.HostPID) {
- err = kl.relabelVolumes(pod, volumes)
- if err != nil {
- return nil, err
- }
- }
- opts.Mounts, err = makeMounts(pod, kl.getPodDir(pod.UID), container, hostname, hostDomainName, podIP, volumes)
- if err != nil {
- return nil, err
- }
- opts.Envs, err = kl.makeEnvironmentVariables(pod, container, podIP)
- if err != nil {
- return nil, err
- }
- if len(container.TerminationMessagePath) != 0 {
- p := kl.getPodContainerDir(pod.UID, container.Name)
- if err := os.MkdirAll(p, 0750); err != nil {
- glog.Errorf("Error on creating %q: %v", p, err)
- } else {
- opts.PodContainerDir = p
- }
- }
- opts.DNS, opts.DNSSearch, err = kl.GetClusterDNS(pod)
- if err != nil {
- return nil, err
- }
- return opts, nil
- }
- var masterServices = sets.NewString("kubernetes")
- // getServiceEnvVarMap makes a map[string]string of env vars for services a pod in namespace ns should see
- func (kl *Kubelet) getServiceEnvVarMap(ns string) (map[string]string, error) {
- var (
- serviceMap = make(map[string]api.Service)
- m = make(map[string]string)
- )
- // Get all service resources from the master (via a cache),
- // and populate them into service environment variables.
- if kl.serviceLister == nil {
- // Kubelets without masters (e.g. plain GCE ContainerVM) don't set env vars.
- return m, nil
- }
- services, err := kl.serviceLister.List()
- if err != nil {
- return m, fmt.Errorf("failed to list services when setting up env vars.")
- }
- // project the services in namespace ns onto the master services
- for _, service := range services.Items {
- // ignore services where ClusterIP is "None" or empty
- if !api.IsServiceIPSet(&service) {
- continue
- }
- serviceName := service.Name
- switch service.Namespace {
- // for the case whether the master service namespace is the namespace the pod
- // is in, the pod should receive all the services in the namespace.
- //
- // ordering of the case clauses below enforces this
- case ns:
- serviceMap[serviceName] = service
- case kl.masterServiceNamespace:
- if masterServices.Has(serviceName) {
- if _, exists := serviceMap[serviceName]; !exists {
- serviceMap[serviceName] = service
- }
- }
- }
- }
- services.Items = []api.Service{}
- for _, service := range serviceMap {
- services.Items = append(services.Items, service)
- }
- for _, e := range envvars.FromServices(&services) {
- m[e.Name] = e.Value
- }
- return m, nil
- }
- // Make the environment variables for a pod in the given namespace.
- func (kl *Kubelet) makeEnvironmentVariables(pod *api.Pod, container *api.Container, podIP string) ([]kubecontainer.EnvVar, error) {
- var result []kubecontainer.EnvVar
- // Note: These are added to the docker Config, but are not included in the checksum computed
- // by dockertools.BuildDockerName(...). That way, we can still determine whether an
- // api.Container is already running by its hash. (We don't want to restart a container just
- // because some service changed.)
- //
- // Note that there is a race between Kubelet seeing the pod and kubelet seeing the service.
- // To avoid this users can: (1) wait between starting a service and starting; or (2) detect
- // missing service env var and exit and be restarted; or (3) use DNS instead of env vars
- // and keep trying to resolve the DNS name of the service (recommended).
- serviceEnv, err := kl.getServiceEnvVarMap(pod.Namespace)
- if err != nil {
- return result, err
- }
- // Determine the final values of variables:
- //
- // 1. Determine the final value of each variable:
- // a. If the variable's Value is set, expand the `$(var)` references to other
- // variables in the .Value field; the sources of variables are the declared
- // variables of the container and the service environment variables
- // b. If a source is defined for an environment variable, resolve the source
- // 2. Create the container's environment in the order variables are declared
- // 3. Add remaining service environment vars
- var (
- tmpEnv = make(map[string]string)
- configMaps = make(map[string]*api.ConfigMap)
- secrets = make(map[string]*api.Secret)
- mappingFunc = expansion.MappingFuncFor(tmpEnv, serviceEnv)
- )
- for _, envVar := range container.Env {
- // Accesses apiserver+Pods.
- // So, the master may set service env vars, or kubelet may. In case both are doing
- // it, we delete the key from the kubelet-generated ones so we don't have duplicate
- // env vars.
- // TODO: remove this net line once all platforms use apiserver+Pods.
- delete(serviceEnv, envVar.Name)
- runtimeVal := envVar.Value
- if runtimeVal != "" {
- // Step 1a: expand variable references
- runtimeVal = expansion.Expand(runtimeVal, mappingFunc)
- } else if envVar.ValueFrom != nil {
- // Step 1b: resolve alternate env var sources
- switch {
- case envVar.ValueFrom.FieldRef != nil:
- runtimeVal, err = kl.podFieldSelectorRuntimeValue(envVar.ValueFrom.FieldRef, pod, podIP)
- if err != nil {
- return result, err
- }
- case envVar.ValueFrom.ResourceFieldRef != nil:
- defaultedPod, defaultedContainer, err := kl.defaultPodLimitsForDownwardApi(pod, container)
- if err != nil {
- return result, err
- }
- runtimeVal, err = containerResourceRuntimeValue(envVar.ValueFrom.ResourceFieldRef, defaultedPod, defaultedContainer)
- if err != nil {
- return result, err
- }
- case envVar.ValueFrom.ConfigMapKeyRef != nil:
- name := envVar.ValueFrom.ConfigMapKeyRef.Name
- key := envVar.ValueFrom.ConfigMapKeyRef.Key
- configMap, ok := configMaps[name]
- if !ok {
- configMap, err = kl.kubeClient.Core().ConfigMaps(pod.Namespace).Get(name)
- if err != nil {
- return result, err
- }
- }
- runtimeVal, ok = configMap.Data[key]
- if !ok {
- return result, fmt.Errorf("Couldn't find key %v in ConfigMap %v/%v", key, pod.Namespace, name)
- }
- case envVar.ValueFrom.SecretKeyRef != nil:
- name := envVar.ValueFrom.SecretKeyRef.Name
- key := envVar.ValueFrom.SecretKeyRef.Key
- secret, ok := secrets[name]
- if !ok {
- secret, err = kl.kubeClient.Core().Secrets(pod.Namespace).Get(name)
- if err != nil {
- return result, err
- }
- }
- runtimeValBytes, ok := secret.Data[key]
- if !ok {
- return result, fmt.Errorf("Couldn't find key %v in Secret %v/%v", key, pod.Namespace, name)
- }
- runtimeVal = string(runtimeValBytes)
- }
- }
- tmpEnv[envVar.Name] = runtimeVal
- result = append(result, kubecontainer.EnvVar{Name: envVar.Name, Value: tmpEnv[envVar.Name]})
- }
- // Append remaining service env vars.
- for k, v := range serviceEnv {
- result = append(result, kubecontainer.EnvVar{Name: k, Value: v})
- }
- return result, nil
- }
- // podFieldSelectorRuntimeValue returns the runtime value of the given
- // selector for a pod.
- func (kl *Kubelet) podFieldSelectorRuntimeValue(fs *api.ObjectFieldSelector, pod *api.Pod, podIP string) (string, error) {
- internalFieldPath, _, err := api.Scheme.ConvertFieldLabel(fs.APIVersion, "Pod", fs.FieldPath, "")
- if err != nil {
- return "", err
- }
- switch internalFieldPath {
- case "spec.nodeName":
- return pod.Spec.NodeName, nil
- case "spec.serviceAccountName":
- return pod.Spec.ServiceAccountName, nil
- case "status.podIP":
- return podIP, nil
- }
- return fieldpath.ExtractFieldPathAsString(pod, internalFieldPath)
- }
- // containerResourceRuntimeValue returns the value of the provided container resource
- func containerResourceRuntimeValue(fs *api.ResourceFieldSelector, pod *api.Pod, container *api.Container) (string, error) {
- containerName := fs.ContainerName
- if len(containerName) == 0 {
- return fieldpath.ExtractContainerResourceValue(fs, container)
- } else {
- return fieldpath.ExtractResourceValueByContainerName(fs, pod, containerName)
- }
- }
- // GetClusterDNS returns a list of the DNS servers and a list of the DNS search
- // domains of the cluster.
- func (kl *Kubelet) GetClusterDNS(pod *api.Pod) ([]string, []string, error) {
- var hostDNS, hostSearch []string
- // Get host DNS settings
- if kl.resolverConfig != "" {
- f, err := os.Open(kl.resolverConfig)
- if err != nil {
- return nil, nil, err
- }
- defer f.Close()
- hostDNS, hostSearch, err = kl.parseResolvConf(f)
- if err != nil {
- return nil, nil, err
- }
- }
- useClusterFirstPolicy := pod.Spec.DNSPolicy == api.DNSClusterFirst
- if useClusterFirstPolicy && kl.clusterDNS == nil {
- // clusterDNS is not known.
- // pod with ClusterDNSFirst Policy cannot be created
- kl.recorder.Eventf(pod, api.EventTypeWarning, "MissingClusterDNS", "kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy)
- log := fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. pod: %q. Falling back to DNSDefault policy.", pod.Spec.DNSPolicy, format.Pod(pod))
- kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, "MissingClusterDNS", log)
- // fallback to DNSDefault
- useClusterFirstPolicy = false
- }
- if !useClusterFirstPolicy {
- // When the kubelet --resolv-conf flag is set to the empty string, use
- // DNS settings that override the docker default (which is to use
- // /etc/resolv.conf) and effectively disable DNS lookups. According to
- // the bind documentation, the behavior of the DNS client library when
- // "nameservers" are not specified is to "use the nameserver on the
- // local machine". A nameserver setting of localhost is equivalent to
- // this documented behavior.
- if kl.resolverConfig == "" {
- hostDNS = []string{"127.0.0.1"}
- hostSearch = []string{"."}
- }
- return hostDNS, hostSearch, nil
- }
- // for a pod with DNSClusterFirst policy, the cluster DNS server is the only nameserver configured for
- // the pod. The cluster DNS server itself will forward queries to other nameservers that is configured to use,
- // in case the cluster DNS server cannot resolve the DNS query itself
- dns := []string{kl.clusterDNS.String()}
- var dnsSearch []string
- if kl.clusterDomain != "" {
- nsSvcDomain := fmt.Sprintf("%s.svc.%s", pod.Namespace, kl.clusterDomain)
- svcDomain := fmt.Sprintf("svc.%s", kl.clusterDomain)
- dnsSearch = append([]string{nsSvcDomain, svcDomain, kl.clusterDomain}, hostSearch...)
- } else {
- dnsSearch = hostSearch
- }
- return dns, dnsSearch, nil
- }
- // One of the following arguments must be non-nil: runningPod, status.
- // TODO: Modify containerRuntime.KillPod() to accept the right arguments.
- func (kl *Kubelet) killPod(pod *api.Pod, runningPod *kubecontainer.Pod, status *kubecontainer.PodStatus, gracePeriodOverride *int64) error {
- var p kubecontainer.Pod
- if runningPod != nil {
- p = *runningPod
- } else if status != nil {
- p = kubecontainer.ConvertPodStatusToRunningPod(status)
- }
- return kl.containerRuntime.KillPod(pod, p, gracePeriodOverride)
- }
- // makePodDataDirs creates the dirs for the pod datas.
- func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
- uid := pod.UID
- if err := os.MkdirAll(kl.getPodDir(uid), 0750); err != nil && !os.IsExist(err) {
- return err
- }
- if err := os.MkdirAll(kl.getPodVolumesDir(uid), 0750); err != nil && !os.IsExist(err) {
- return err
- }
- if err := os.MkdirAll(kl.getPodPluginsDir(uid), 0750); err != nil && !os.IsExist(err) {
- return err
- }
- return nil
- }
- // syncPod is the transaction script for the sync of a single pod.
- //
- // Arguments:
- //
- // o - the SyncPodOptions for this invocation
- //
- // The workflow is:
- // * If the pod is being created, record pod worker start latency
- // * Call generateAPIPodStatus to prepare an api.PodStatus for the pod
- // * If the pod is being seen as running for the first time, record pod
- // start latency
- // * Update the status of the pod in the status manager
- // * Kill the pod if it should not be running
- // * Create a mirror pod if the pod is a static pod, and does not
- // already have a mirror pod
- // * Create the data directories for the pod if they do not exist
- // * Wait for volumes to attach/mount
- // * Fetch the pull secrets for the pod
- // * Call the container runtime's SyncPod callback
- // * Update the traffic shaping for the pod's ingress and egress limits
- //
- // If any step if this workflow errors, the error is returned, and is repeated
- // on the next syncPod call.
- func (kl *Kubelet) syncPod(o syncPodOptions) error {
- // pull out the required options
- pod := o.pod
- mirrorPod := o.mirrorPod
- podStatus := o.podStatus
- updateType := o.updateType
- // if we want to kill a pod, do it now!
- if updateType == kubetypes.SyncPodKill {
- killPodOptions := o.killPodOptions
- if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
- return fmt.Errorf("kill pod options are required if update type is kill")
- }
- apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
- kl.statusManager.SetPodStatus(pod, apiPodStatus)
- // we kill the pod with the specified grace period since this is a termination
- if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
- // there was an error killing the pod, so we return that error directly
- utilruntime.HandleError(err)
- return err
- }
- return nil
- }
- // Latency measurements for the main workflow are relative to the
- // first time the pod was seen by the API server.
- var firstSeenTime time.Time
- if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
- firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
- }
- // Record pod worker start latency if being created
- // TODO: make pod workers record their own latencies
- if updateType == kubetypes.SyncPodCreate {
- if !firstSeenTime.IsZero() {
- // This is the first time we are syncing the pod. Record the latency
- // since kubelet first saw the pod if firstSeenTime is set.
- metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
- } else {
- glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
- }
- }
- // Generate final API pod status with pod and status manager status
- apiPodStatus := kl.generateAPIPodStatus(pod, podStatus)
- // The pod IP may be changed in generateAPIPodStatus if the pod is using host network. (See #24576)
- // TODO(random-liu): After writing pod spec into container labels, check whether pod is using host network, and
- // set pod IP to hostIP directly in runtime.GetPodStatus
- podStatus.IP = apiPodStatus.PodIP
- // Record the time it takes for the pod to become running.
- existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
- if !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning &&
- !firstSeenTime.IsZero() {
- metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
- }
- // Update status in the status manager
- kl.statusManager.SetPodStatus(pod, apiPodStatus)
- // Kill pod if it should not be running
- if errOuter := canRunPod(pod); errOuter != nil || pod.DeletionTimestamp != nil || apiPodStatus.Phase == api.PodFailed {
- if errInner := kl.killPod(pod, nil, podStatus, nil); errInner != nil {
- errOuter = fmt.Errorf("error killing pod: %v", errInner)
- utilruntime.HandleError(errOuter)
- }
- // there was no error killing the pod, but the pod cannot be run, so we return that err (if any)
- return errOuter
- }
- // Create Mirror Pod for Static Pod if it doesn't already exist
- if kubepod.IsStaticPod(pod) {
- podFullName := kubecontainer.GetPodFullName(pod)
- deleted := false
- if mirrorPod != nil {
- if mirrorPod.DeletionTimestamp != nil || !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
- // The mirror pod is semantically different from the static pod. Remove
- // it. The mirror pod will get recreated later.
- glog.Warningf("Deleting mirror pod %q because it is outdated", format.Pod(mirrorPod))
- if err := kl.podManager.DeleteMirrorPod(podFullName); err != nil {
- glog.Errorf("Failed deleting mirror pod %q: %v", format.Pod(mirrorPod), err)
- } else {
- deleted = true
- }
- }
- }
- if mirrorPod == nil || deleted {
- glog.V(3).Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
- if err := kl.podManager.CreateMirrorPod(pod); err != nil {
- glog.Errorf("Failed creating a mirror pod for %q: %v", format.Pod(pod), err)
- }
- }
- }
- // Make data directories for the pod
- if err := kl.makePodDataDirs(pod); err != nil {
- glog.Errorf("Unable to make pod data directories for pod %q: %v", format.Pod(pod), err)
- return err
- }
- // Wait for volumes to attach/mount
- if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
- kl.recorder.Eventf(pod, api.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
- glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
- return err
- }
- // Fetch the pull secrets for the pod
- pullSecrets, err := kl.getPullSecretsForPod(pod)
- if err != nil {
- glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err)
- return err
- }
- // Call the container runtime's SyncPod callback
- result := kl.containerRuntime.SyncPod(pod, apiPodStatus, podStatus, pullSecrets, kl.backOff)
- kl.reasonCache.Update(pod.UID, result)
- if err = result.Error(); err != nil {
- return err
- }
- // early successful exit if pod is not bandwidth-constrained
- if !kl.shapingEnabled() {
- return nil
- }
- // Update the traffic shaping for the pod's ingress and egress limits
- ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
- if err != nil {
- return err
- }
- if egress != nil || ingress != nil {
- if podUsesHostNetwork(pod) {
- kl.recorder.Event(pod, api.EventTypeWarning, events.HostNetworkNotSupported, "Bandwidth shaping is not currently supported on the host network")
- } else if kl.shaper != nil {
- if len(apiPodStatus.PodIP) > 0 {
- err = kl.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", apiPodStatus.PodIP), egress, ingress)
- }
- } else {
- kl.recorder.Event(pod, api.EventTypeWarning, events.UndefinedShaper, "Pod requests bandwidth shaping, but the shaper is undefined")
- }
- }
- return nil
- }
- // returns whether the pod uses the host network namespace.
- func podUsesHostNetwork(pod *api.Pod) bool {
- return pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork
- }
- // getPullSecretsForPod inspects the Pod and retrieves the referenced pull
- // secrets.
- // TODO: duplicate secrets are being retrieved multiple times and there
- // is no cache. Creating and using a secret manager interface will make this
- // easier to address.
- func (kl *Kubelet) getPullSecretsForPod(pod *api.Pod) ([]api.Secret, error) {
- pullSecrets := []api.Secret{}
- for _, secretRef := range pod.Spec.ImagePullSecrets {
- secret, err := kl.kubeClient.Core().Secrets(pod.Namespace).Get(secretRef.Name)
- if err != nil {
- glog.Warningf("Unable to retrieve pull secret %s/%s for %s/%s due to %v. The image pull may not succeed.", pod.Namespace, secretRef.Name, pod.Namespace, pod.Name, err)
- continue
- }
- pullSecrets = append(pullSecrets, *secret)
- }
- return pullSecrets, nil
- }
- // Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
- // * pod whose work is ready.
- // * internal modules that request sync of a pod.
- func (kl *Kubelet) getPodsToSync() []*api.Pod {
- allPods := kl.podManager.GetPods()
- podUIDs := kl.workQueue.GetWork()
- podUIDSet := sets.NewString()
- for _, podUID := range podUIDs {
- podUIDSet.Insert(string(podUID))
- }
- var podsToSync []*api.Pod
- for _, pod := range allPods {
- if podUIDSet.Has(string(pod.UID)) {
- // The work of the pod is ready
- podsToSync = append(podsToSync, pod)
- continue
- }
- for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
- if podSyncLoopHandler.ShouldSync(pod) {
- podsToSync = append(podsToSync, pod)
- break
- }
- }
- }
- return podsToSync
- }
- // Returns true if pod is in the terminated state ("Failed" or "Succeeded").
- func (kl *Kubelet) podIsTerminated(pod *api.Pod) bool {
- var status api.PodStatus
- // Check the cached pod status which was set after the last sync.
- status, ok := kl.statusManager.GetPodStatus(pod.UID)
- if !ok {
- // If there is no cached status, use the status from the
- // apiserver. This is useful if kubelet has recently been
- // restarted.
- status = pod.Status
- }
- if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
- return true
- }
- return false
- }
- // filterOutTerminatedPods returns the given pods which the status manager
- // does not consider failed or succeeded.
- func (kl *Kubelet) filterOutTerminatedPods(pods []*api.Pod) []*api.Pod {
- var filteredPods []*api.Pod
- for _, p := range pods {
- if kl.podIsTerminated(p) {
- continue
- }
- filteredPods = append(filteredPods, p)
- }
- return filteredPods
- }
- // removeOrphanedPodStatuses removes obsolete entries in podStatus where
- // the pod is no longer considered bound to this node.
- func (kl *Kubelet) removeOrphanedPodStatuses(pods []*api.Pod, mirrorPods []*api.Pod) {
- podUIDs := make(map[types.UID]bool)
- for _, pod := range pods {
- podUIDs[pod.UID] = true
- }
- for _, pod := range mirrorPods {
- podUIDs[pod.UID] = true
- }
- kl.statusManager.RemoveOrphanedStatuses(podUIDs)
- }
- // deletePod deletes the pod from the internal state of the kubelet by:
- // 1. stopping the associated pod worker asynchronously
- // 2. signaling to kill the pod by sending on the podKillingCh channel
- //
- // deletePod returns an error if not all sources are ready or the pod is not
- // found in the runtime cache.
- func (kl *Kubelet) deletePod(pod *api.Pod) error {
- if pod == nil {
- return fmt.Errorf("deletePod does not allow nil pod")
- }
- if !kl.sourcesReady.AllReady() {
- // If the sources aren't ready, skip deletion, as we may accidentally delete pods
- // for sources that haven't reported yet.
- return fmt.Errorf("skipping delete because sources aren't ready yet")
- }
- kl.podWorkers.ForgetWorker(pod.UID)
- // Runtime cache may not have been updated to with the pod, but it's okay
- // because the periodic cleanup routine will attempt to delete again later.
- runningPods, err := kl.runtimeCache.GetPods()
- if err != nil {
- return fmt.Errorf("error listing containers: %v", err)
- }
- runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)
- if runningPod.IsEmpty() {
- return fmt.Errorf("pod not found")
- }
- podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
- kl.podKillingCh <- &podPair
- // TODO: delete the mirror pod here?
- // We leave the volume/directory cleanup to the periodic cleanup routine.
- return nil
- }
- // HandlePodCleanups performs a series of cleanup work, including terminating
- // pod workers, killing unwanted pods, and removing orphaned volumes/pod
- // directories.
- // NOTE: This function is executed by the main sync loop, so it
- // should not contain any blocking calls.
- func (kl *Kubelet) HandlePodCleanups() error {
- allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
- // Pod phase progresses monotonically. Once a pod has reached a final state,
- // it should never leave regardless of the restart policy. The statuses
- // of such pods should not be changed, and there is no need to sync them.
- // TODO: the logic here does not handle two cases:
- // 1. If the containers were removed immediately after they died, kubelet
- // may fail to generate correct statuses, let alone filtering correctly.
- // 2. If kubelet restarted before writing the terminated status for a pod
- // to the apiserver, it could still restart the terminated pod (even
- // though the pod was not considered terminated by the apiserver).
- // These two conditions could be alleviated by checkpointing kubelet.
- activePods := kl.filterOutTerminatedPods(allPods)
- desiredPods := make(map[types.UID]empty)
- for _, pod := range activePods {
- desiredPods[pod.UID] = empty{}
- }
- // Stop the workers for no-longer existing pods.
- // TODO: is here the best place to forget pod workers?
- kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
- kl.probeManager.CleanupPods(activePods)
- runningPods, err := kl.runtimeCache.GetPods()
- if err != nil {
- glog.Errorf("Error listing containers: %#v", err)
- return err
- }
- for _, pod := range runningPods {
- if _, found := desiredPods[pod.ID]; !found {
- kl.podKillingCh <- &kubecontainer.PodPair{APIPod: nil, RunningPod: pod}
- }
- }
- kl.removeOrphanedPodStatuses(allPods, mirrorPods)
- // Note that we just killed the unwanted pods. This may not have reflected
- // in the cache. We need to bypass the cache to get the latest set of
- // running pods to clean up the volumes.
- // TODO: Evaluate the performance impact of bypassing the runtime cache.
- runningPods, err = kl.containerRuntime.GetPods(false)
- if err != nil {
- glog.Errorf("Error listing containers: %#v", err)
- return err
- }
- // Remove any orphaned volumes.
- // Note that we pass all pods (including terminated pods) to the function,
- // so that we don't remove volumes associated with terminated but not yet
- // deleted pods.
- err = kl.cleanupOrphanedPodDirs(allPods, runningPods)
- if err != nil {
- // We want all cleanup tasks to be run even if one of them failed. So
- // we just log an error here and continue other cleanup tasks.
- // This also applies to the other clean up tasks.
- glog.Errorf("Failed cleaning up orphaned pod directories: %v", err)
- }
- // Remove any orphaned mirror pods.
- kl.podManager.DeleteOrphanedMirrorPods()
- // Clear out any old bandwidth rules
- err = kl.cleanupBandwidthLimits(allPods)
- if err != nil {
- glog.Errorf("Failed cleaning up bandwidth limits: %v", err)
- }
- kl.backOff.GC()
- return nil
- }
- // podKiller launches a goroutine to kill a pod received from the channel if
- // another goroutine isn't already in action.
- func (kl *Kubelet) podKiller() {
- killing := sets.NewString()
- resultCh := make(chan types.UID)
- defer close(resultCh)
- for {
- select {
- case podPair, ok := <-kl.podKillingCh:
- if !ok {
- return
- }
- runningPod := podPair.RunningPod
- apiPod := podPair.APIPod
- if killing.Has(string(runningPod.ID)) {
- // The pod is already being killed.
- break
- }
- killing.Insert(string(runningPod.ID))
- go func(apiPod *api.Pod, runningPod *kubecontainer.Pod, ch chan types.UID) {
- defer func() {
- ch <- runningPod.ID
- }()
- glog.V(2).Infof("Killing unwanted pod %q", runningPod.Name)
- err := kl.killPod(apiPod, runningPod, nil, nil)
- if err != nil {
- glog.Errorf("Failed killing the pod %q: %v", runningPod.Name, err)
- }
- }(apiPod, runningPod, resultCh)
- case podID := <-resultCh:
- killing.Delete(string(podID))
- }
- }
- }
- // checkHostPortConflicts detects pods with conflicted host ports.
- func hasHostPortConflicts(pods []*api.Pod) bool {
- ports := sets.String{}
- for _, pod := range pods {
- if errs := validation.AccumulateUniqueHostPorts(pod.Spec.Containers, &ports, field.NewPath("spec", "containers")); len(errs) > 0 {
- glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", format.Pod(pod), errs)
- return true
- }
- if errs := validation.AccumulateUniqueHostPorts(pod.Spec.InitContainers, &ports, field.NewPath("spec", "initContainers")); len(errs) > 0 {
- glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", format.Pod(pod), errs)
- return true
- }
- }
- return false
- }
- // handleOutOfDisk detects if pods can't fit due to lack of disk space.
- func (kl *Kubelet) isOutOfDisk() bool {
- // Check disk space once globally and reject or accept all new pods.
- withinBounds, err := kl.diskSpaceManager.IsRuntimeDiskSpaceAvailable()
- // Assume enough space in case of errors.
- if err != nil {
- glog.Errorf("Failed to check if disk space is available for the runtime: %v", err)
- } else if !withinBounds {
- return true
- }
- withinBounds, err = kl.diskSpaceManager.IsRootDiskSpaceAvailable()
- // Assume enough space in case of errors.
- if err != nil {
- glog.Errorf("Failed to check if disk space is available on the root partition: %v", err)
- } else if !withinBounds {
- return true
- }
- return false
- }
- // rejectPod records an event about the pod with the given reason and message,
- // and updates the pod to the failed phase in the status manage.
- func (kl *Kubelet) rejectPod(pod *api.Pod, reason, message string) {
- kl.recorder.Eventf(pod, api.EventTypeWarning, reason, message)
- kl.statusManager.SetPodStatus(pod, api.PodStatus{
- Phase: api.PodFailed,
- Reason: reason,
- Message: "Pod " + message})
- }
- // canAdmitPod determines if a pod can be admitted, and gives a reason if it
- // cannot. "pod" is new pod, while "pods" are all admitted pods
- // The function returns a boolean value indicating whether the pod
- // can be admitted, a brief single-word reason and a message explaining why
- // the pod cannot be admitted.
- func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, string) {
- node, err := kl.getNodeAnyWay()
- if err != nil {
- glog.Errorf("Cannot get Node info: %v", err)
- return false, "InvalidNodeInfo", "Kubelet cannot get node info."
- }
- // the kubelet will invoke each pod admit handler in sequence
- // if any handler rejects, the pod is rejected.
- // TODO: move predicate check into a pod admitter
- // TODO: move out of disk check into a pod admitter
- // TODO: out of resource eviction should have a pod admitter call-out
- attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: pods}
- for _, podAdmitHandler := range kl.PodAdmitHandlers {
- if result := podAdmitHandler.Admit(attrs); !result.Admit {
- return false, result.Reason, result.Message
- }
- }
- nodeInfo := schedulercache.NewNodeInfo(pods...)
- nodeInfo.SetNode(node)
- fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo)
- if err != nil {
- message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err)
- glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
- return fit, "UnexpectedError", message
- }
- if !fit {
- var reason string
- var message string
- if len(reasons) == 0 {
- message = fmt.Sprint("GeneralPredicates failed due to unknown reason, which is unexpected.")
- glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
- return fit, "UnknownReason", message
- }
- // If there are failed predicates, we only return the first one as a reason.
- r := reasons[0]
- switch re := r.(type) {
- case *predicates.PredicateFailureError:
- reason = re.PredicateName
- message = re.Error()
- glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
- case *predicates.InsufficientResourceError:
- reason = fmt.Sprintf("OutOf%s", re.ResourceName)
- message := re.Error()
- glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
- case *predicates.FailureReason:
- reason = re.GetReason()
- message = fmt.Sprintf("Failure: %s", re.GetReason())
- glog.V(2).Infof("Predicate failed on Pod: %v, for reason: %v", format.Pod(pod), message)
- default:
- reason = "UnexpectedPredicateFailureType"
- message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", r)
- glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
- }
- return fit, reason, message
- }
- // TODO: When disk space scheduling is implemented (#11976), remove the out-of-disk check here and
- // add the disk space predicate to predicates.GeneralPredicates.
- if kl.isOutOfDisk() {
- glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), "predicate fails due to OutOfDisk")
- return false, "OutOfDisk", "cannot be started due to lack of disk space."
- }
- return true, "", ""
- }
- // syncLoop is the main loop for processing changes. It watches for changes from
- // three channels (file, apiserver, and http) and creates a union of them. For
- // any new change seen, will run a sync against desired state and running state. If
- // no changes are seen to the configuration, will synchronize the last known desired
- // state every sync-frequency seconds. Never returns.
- func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
- glog.Info("Starting kubelet main sync loop.")
- // The resyncTicker wakes up kubelet to checks if there are any pod workers
- // that need to be sync'd. A one-second period is sufficient because the
- // sync interval is defaulted to 10s.
- syncTicker := time.NewTicker(time.Second)
- defer syncTicker.Stop()
- housekeepingTicker := time.NewTicker(housekeepingPeriod)
- defer housekeepingTicker.Stop()
- plegCh := kl.pleg.Watch()
- for {
- if rs := kl.runtimeState.errors(); len(rs) != 0 {
- glog.Infof("skipping pod synchronization - %v", rs)
- time.Sleep(5 * time.Second)
- continue
- }
- if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
- break
- }
- }
- }
- // syncLoopIteration reads from various channels and dispatches pods to the
- // given handler.
- //
- // Arguments:
- // 1. configCh: a channel to read config events from
- // 2. handler: the SyncHandler to dispatch pods to
- // 3. syncCh: a channel to read periodic sync events from
- // 4. houseKeepingCh: a channel to read housekeeping events from
- // 5. plegCh: a channel to read PLEG updates from
- //
- // Events are also read from the kubelet liveness manager's update channel.
- //
- // The workflow is to read from one of the channels, handle that event, and
- // update the timestamp in the sync loop monitor.
- //
- // Here is an appropriate place to note that despite the syntactical
- // similarity to the switch statement, the case statements in a select are
- // evaluated in a pseudorandom order if there are multiple channels ready to
- // read from when the select is evaluated. In other words, case statements
- // are evaluated in random order, and you can not assume that the case
- // statements evaluate in order if multiple channels have events.
- //
- // With that in mind, in truly no particular order, the different channels
- // are handled as follows:
- //
- // * configCh: dispatch the pods for the config change to the appropriate
- // handler callback for the event type
- // * plegCh: update the runtime cache; sync pod
- // * syncCh: sync all pods waiting for sync
- // * houseKeepingCh: trigger cleanup of pods
- // * liveness manager: sync pods that have failed or in which one or more
- // containers have failed liveness checks
- func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
- syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
- kl.syncLoopMonitor.Store(kl.clock.Now())
- select {
- case u, open := <-configCh:
- // Update from a config source; dispatch it to the right handler
- // callback.
- if !open {
- glog.Errorf("Update channel is closed. Exiting the sync loop.")
- return false
- }
- switch u.Op {
- case kubetypes.ADD:
- glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
- // After restarting, kubelet will get all existing pods through
- // ADD as if they are new pods. These pods will then go through the
- // admission process and *may* be rejcted. This can be resolved
- // once we have checkpointing.
- handler.HandlePodAdditions(u.Pods)
- case kubetypes.UPDATE:
- glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletiontimestamps(u.Pods))
- handler.HandlePodUpdates(u.Pods)
- case kubetypes.REMOVE:
- glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
- handler.HandlePodRemoves(u.Pods)
- case kubetypes.RECONCILE:
- glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
- handler.HandlePodReconcile(u.Pods)
- case kubetypes.DELETE:
- glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
- // DELETE is treated as a UPDATE because of graceful deletion.
- handler.HandlePodUpdates(u.Pods)
- case kubetypes.SET:
- // TODO: Do we want to support this?
- glog.Errorf("Kubelet does not support snapshot update")
- }
- // Mark the source ready after receiving at least one update from the
- // source. Once all the sources are marked ready, various cleanup
- // routines will start reclaiming resources. It is important that this
- // takes place only after kubelet calls the update handler to process
- // the update to ensure the internal pod cache is up-to-date.
- kl.sourcesReady.AddSource(u.Source)
- case e := <-plegCh:
- if isSyncPodWorthy(e) {
- // PLEG event for a pod; sync it.
- if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
- glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
- handler.HandlePodSyncs([]*api.Pod{pod})
- } else {
- // If the pod no longer exists, ignore the event.
- glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
- }
- }
- if e.Type == pleg.ContainerDied {
- if containerID, ok := e.Data.(string); ok {
- kl.cleanUpContainersInPod(e.ID, containerID)
- }
- }
- case <-syncCh:
- // Sync pods waiting for sync
- podsToSync := kl.getPodsToSync()
- if len(podsToSync) == 0 {
- break
- }
- glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
- kl.HandlePodSyncs(podsToSync)
- case update := <-kl.livenessManager.Updates():
- if update.Result == proberesults.Failure {
- // The liveness manager detected a failure; sync the pod.
- // We should not use the pod from livenessManager, because it is never updated after
- // initialization.
- pod, ok := kl.podManager.GetPodByUID(update.PodUID)
- if !ok {
- // If the pod no longer exists, ignore the update.
- glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
- break
- }
- glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
- handler.HandlePodSyncs([]*api.Pod{pod})
- }
- case <-housekeepingCh:
- if !kl.sourcesReady.AllReady() {
- // If the sources aren't ready, skip housekeeping, as we may
- // accidentally delete pods from unready sources.
- glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
- } else {
- glog.V(4).Infof("SyncLoop (housekeeping)")
- if err := handler.HandlePodCleanups(); err != nil {
- glog.Errorf("Failed cleaning pods: %v", err)
- }
- }
- }
- kl.syncLoopMonitor.Store(kl.clock.Now())
- return true
- }
- // dispatchWork starts the asynchronous sync of the pod in a pod worker.
- // If the pod is terminated, dispatchWork
- func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mirrorPod *api.Pod, start time.Time) {
- if kl.podIsTerminated(pod) {
- if pod.DeletionTimestamp != nil {
- // If the pod is in a terminated state, there is no pod worker to
- // handle the work item. Check if the DeletionTimestamp has been
- // set, and force a status update to trigger a pod deletion request
- // to the apiserver.
- kl.statusManager.TerminatePod(pod)
- }
- return
- }
- // Run the sync in an async worker.
- kl.podWorkers.UpdatePod(&UpdatePodOptions{
- Pod: pod,
- MirrorPod: mirrorPod,
- UpdateType: syncType,
- OnCompleteFunc: func(err error) {
- if err != nil {
- metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
- }
- },
- })
- // Note the number of containers for new pods.
- if syncType == kubetypes.SyncPodCreate {
- metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
- }
- }
- // TODO: handle mirror pods in a separate component (issue #17251)
- func (kl *Kubelet) handleMirrorPod(mirrorPod *api.Pod, start time.Time) {
- // Mirror pod ADD/UPDATE/DELETE operations are considered an UPDATE to the
- // corresponding static pod. Send update to the pod worker if the static
- // pod exists.
- if pod, ok := kl.podManager.GetPodByMirrorPod(mirrorPod); ok {
- kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
- }
- }
- // HandlePodAdditions is the callback in SyncHandler for pods being added from
- // a config source.
- func (kl *Kubelet) HandlePodAdditions(pods []*api.Pod) {
- start := kl.clock.Now()
- sort.Sort(sliceutils.PodsByCreationTime(pods))
- for _, pod := range pods {
- if kubepod.IsMirrorPod(pod) {
- kl.podManager.AddPod(pod)
- kl.handleMirrorPod(pod, start)
- continue
- }
- // Note that allPods excludes the new pod.
- allPods := kl.podManager.GetPods()
- // We failed pods that we rejected, so activePods include all admitted
- // pods that are alive.
- activePods := kl.filterOutTerminatedPods(allPods)
- // Check if we can admit the pod; if not, reject it.
- if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
- kl.rejectPod(pod, reason, message)
- continue
- }
- kl.podManager.AddPod(pod)
- mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
- kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
- kl.probeManager.AddPod(pod)
- }
- }
- // HandlePodUpdates is the callback in the SyncHandler interface for pods
- // being updated from a config source.
- func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {
- start := kl.clock.Now()
- for _, pod := range pods {
- kl.podManager.UpdatePod(pod)
- if kubepod.IsMirrorPod(pod) {
- kl.handleMirrorPod(pod, start)
- continue
- }
- // TODO: Evaluate if we need to validate and reject updates.
- mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
- kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)
- }
- }
- // HandlePodRemoves is the callback in the SyncHandler interface for pods
- // being removed from a config source.
- func (kl *Kubelet) HandlePodRemoves(pods []*api.Pod) {
- start := kl.clock.Now()
- for _, pod := range pods {
- kl.podManager.DeletePod(pod)
- if kubepod.IsMirrorPod(pod) {
- kl.handleMirrorPod(pod, start)
- continue
- }
- // Deletion is allowed to fail because the periodic cleanup routine
- // will trigger deletion again.
- if err := kl.deletePod(pod); err != nil {
- glog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)
- }
- kl.probeManager.RemovePod(pod)
- }
- }
- // HandlePodReconcile is the callback in the SyncHandler interface for pods
- // that should be reconciled.
- func (kl *Kubelet) HandlePodReconcile(pods []*api.Pod) {
- for _, pod := range pods {
- // Update the pod in pod manager, status manager will do periodically reconcile according
- // to the pod manager.
- kl.podManager.UpdatePod(pod)
- // After an evicted pod is synced, all dead containers in the pod can be removed.
- if eviction.PodIsEvicted(pod.Status) {
- if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
- kl.containerDeletor.deleteContainersInPod("", podStatus, true)
- }
- }
- }
- }
- // HandlePodSyncs is the callback in the syncHandler interface for pods
- // that should be dispatched to pod workers for sync.
- func (kl *Kubelet) HandlePodSyncs(pods []*api.Pod) {
- start := kl.clock.Now()
- for _, pod := range pods {
- mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
- kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
- }
- }
- // LatestLoopEntryTime returns the last time in the sync loop monitor.
- func (kl *Kubelet) LatestLoopEntryTime() time.Time {
- val := kl.syncLoopMonitor.Load()
- if val == nil {
- return time.Time{}
- }
- return val.(time.Time)
- }
- // PLEGHealthCheck returns whether the PLEG is healty.
- func (kl *Kubelet) PLEGHealthCheck() (bool, error) {
- return kl.pleg.Healthy()
- }
- // validateContainerLogStatus returns the container ID for the desired container to retrieve logs for, based on the state
- // of the container. The previous flag will only return the logs for the last terminated container, otherwise, the current
- // running container is preferred over a previous termination. If info about the container is not available then a specific
- // error is returned to the end user.
- func (kl *Kubelet) validateContainerLogStatus(podName string, podStatus *api.PodStatus, containerName string, previous bool) (containerID kubecontainer.ContainerID, err error) {
- var cID string
- cStatus, found := api.GetContainerStatus(podStatus.ContainerStatuses, containerName)
- // if not found, check the init containers
- if !found {
- cStatus, found = api.GetContainerStatus(podStatus.InitContainerStatuses, containerName)
- }
- if !found {
- return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is not available", containerName, podName)
- }
- lastState := cStatus.LastTerminationState
- waiting, running, terminated := cStatus.State.Waiting, cStatus.State.Running, cStatus.State.Terminated
- switch {
- case previous:
- if lastState.Terminated == nil {
- return kubecontainer.ContainerID{}, fmt.Errorf("previous terminated container %q in pod %q not found", containerName, podName)
- }
- cID = lastState.Terminated.ContainerID
- case running != nil:
- cID = cStatus.ContainerID
- case terminated != nil:
- cID = terminated.ContainerID
- case lastState.Terminated != nil:
- cID = lastState.Terminated.ContainerID
- case waiting != nil:
- // output some info for the most common pending failures
- switch reason := waiting.Reason; reason {
- case images.ErrImagePull.Error():
- return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is waiting to start: image can't be pulled", containerName, podName)
- case images.ErrImagePullBackOff.Error():
- return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is waiting to start: trying and failing to pull image", containerName, podName)
- default:
- return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is waiting to start: %v", containerName, podName, reason)
- }
- default:
- // unrecognized state
- return kubecontainer.ContainerID{}, fmt.Errorf("container %q in pod %q is waiting to start - no logs yet", containerName, podName)
- }
- return kubecontainer.ParseContainerID(cID), nil
- }
- // GetKubeletContainerLogs returns logs from the container
- // TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
- // or all of them.
- func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName string, logOptions *api.PodLogOptions, stdout, stderr io.Writer) error {
- // Pod workers periodically write status to statusManager. If status is not
- // cached there, something is wrong (or kubelet just restarted and hasn't
- // caught up yet). Just assume the pod is not ready yet.
- name, namespace, err := kubecontainer.ParsePodFullName(podFullName)
- if err != nil {
- return fmt.Errorf("unable to parse pod full name %q: %v", podFullName, err)
- }
- pod, ok := kl.GetPodByName(namespace, name)
- if !ok {
- return fmt.Errorf("pod %q cannot be found - no logs available", name)
- }
- podUID := pod.UID
- if mirrorPod, ok := kl.podManager.GetMirrorPodByPod(pod); ok {
- podUID = mirrorPod.UID
- }
- podStatus, found := kl.statusManager.GetPodStatus(podUID)
- if !found {
- // If there is no cached status, use the status from the
- // apiserver. This is useful if kubelet has recently been
- // restarted.
- podStatus = pod.Status
- }
- containerID, err := kl.validateContainerLogStatus(pod.Name, &podStatus, containerName, logOptions.Previous)
- if err != nil {
- return err
- }
- // Do a zero-byte write to stdout before handing off to the container runtime.
- // This ensures at least one Write call is made to the writer when copying starts,
- // even if we then block waiting for log output from the container.
- if _, err := stdout.Write([]byte{}); err != nil {
- return err
- }
- return kl.containerRuntime.GetContainerLogs(pod, containerID, logOptions, stdout, stderr)
- }
- // updateRuntimeUp calls the container runtime status callback, initializing
- // the runtime dependent modules when the container runtime first comes up,
- // and returns an error if the status check fails. If the status check is OK,
- // update the container runtime uptime in the kubelet runtimeState.
- func (kl *Kubelet) updateRuntimeUp() {
- if err := kl.containerRuntime.Status(); err != nil {
- glog.Errorf("Container runtime sanity check failed: %v", err)
- return
- }
- kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
- kl.runtimeState.setRuntimeSync(kl.clock.Now())
- }
- func (kl *Kubelet) updateCloudProviderFromMachineInfo(node *api.Node, info *cadvisorapi.MachineInfo) {
- if info.CloudProvider != cadvisorapi.UnknownProvider &&
- info.CloudProvider != cadvisorapi.Baremetal {
- // The cloud providers from pkg/cloudprovider/providers/* that update ProviderID
- // will use the format of cloudprovider://project/availability_zone/instance_name
- // here we only have the cloudprovider and the instance name so we leave project
- // and availability zone empty for compatibility.
- node.Spec.ProviderID = strings.ToLower(string(info.CloudProvider)) +
- ":////" + string(info.InstanceID)
- }
- }
- // GetPhase returns the phase of a pod given its container info.
- // This func is exported to simplify integration with 3rd party kubelet
- // integrations like kubernetes-mesos.
- func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase {
- initialized := 0
- pendingInitialization := 0
- failedInitialization := 0
- for _, container := range spec.InitContainers {
- containerStatus, ok := api.GetContainerStatus(info, container.Name)
- if !ok {
- pendingInitialization++
- continue
- }
- switch {
- case containerStatus.State.Running != nil:
- pendingInitialization++
- case containerStatus.State.Terminated != nil:
- if containerStatus.State.Terminated.ExitCode == 0 {
- initialized++
- } else {
- failedInitialization++
- }
- case containerStatus.State.Waiting != nil:
- if containerStatus.LastTerminationState.Terminated != nil {
- if containerStatus.LastTerminationState.Terminated.ExitCode == 0 {
- initialized++
- } else {
- failedInitialization++
- }
- } else {
- pendingInitialization++
- }
- default:
- pendingInitialization++
- }
- }
- unknown := 0
- running := 0
- waiting := 0
- stopped := 0
- failed := 0
- succeeded := 0
- for _, container := range spec.Containers {
- containerStatus, ok := api.GetContainerStatus(info, container.Name)
- if !ok {
- unknown++
- continue
- }
- switch {
- case containerStatus.State.Running != nil:
- running++
- case containerStatus.State.Terminated != nil:
- stopped++
- if containerStatus.State.Terminated.ExitCode == 0 {
- succeeded++
- } else {
- failed++
- }
- case containerStatus.State.Waiting != nil:
- if containerStatus.LastTerminationState.Terminated != nil {
- stopped++
- } else {
- waiting++
- }
- default:
- unknown++
- }
- }
- if failedInitialization > 0 && spec.RestartPolicy == api.RestartPolicyNever {
- return api.PodFailed
- }
- switch {
- case pendingInitialization > 0:
- fallthrough
- case waiting > 0:
- glog.V(5).Infof("pod waiting > 0, pending")
- // One or more containers has not been started
- return api.PodPending
- case running > 0 && unknown == 0:
- // All containers have been started, and at least
- // one container is running
- return api.PodRunning
- case running == 0 && stopped > 0 && unknown == 0:
- // All containers are terminated
- if spec.RestartPolicy == api.RestartPolicyAlways {
- // All containers are in the process of restarting
- return api.PodRunning
- }
- if stopped == succeeded {
- // RestartPolicy is not Always, and all
- // containers are terminated in success
- return api.PodSucceeded
- }
- if spec.RestartPolicy == api.RestartPolicyNever {
- // RestartPolicy is Never, and all containers are
- // terminated with at least one in failure
- return api.PodFailed
- }
- // RestartPolicy is OnFailure, and at least one in failure
- // and in the process of restarting
- return api.PodRunning
- default:
- glog.V(5).Infof("pod default case, pending")
- return api.PodPending
- }
- }
- // generateAPIPodStatus creates the final API pod status for a pod, given the
- // internal pod status.
- func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus {
- glog.V(3).Infof("Generating status for %q", format.Pod(pod))
- // check if an internal module has requested the pod is evicted.
- for _, podSyncHandler := range kl.PodSyncHandlers {
- if result := podSyncHandler.ShouldEvict(pod); result.Evict {
- return api.PodStatus{
- Phase: api.PodFailed,
- Reason: result.Reason,
- Message: result.Message,
- }
- }
- }
- s := kl.convertStatusToAPIStatus(pod, podStatus)
- // Assume info is ready to process
- spec := &pod.Spec
- allStatus := append(append([]api.ContainerStatus{}, s.ContainerStatuses...), s.InitContainerStatuses...)
- s.Phase = GetPhase(spec, allStatus)
- kl.probeManager.UpdatePodStatus(pod.UID, s)
- s.Conditions = append(s.Conditions, status.GeneratePodInitializedCondition(spec, s.InitContainerStatuses, s.Phase))
- s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.ContainerStatuses, s.Phase))
- // s (the PodStatus we are creating) will not have a PodScheduled condition yet, because converStatusToAPIStatus()
- // does not create one. If the existing PodStatus has a PodScheduled condition, then copy it into s and make sure
- // it is set to true. If the existing PodStatus does not have a PodScheduled condition, then create one that is set to true.
- if _, oldPodScheduled := api.GetPodCondition(&pod.Status, api.PodScheduled); oldPodScheduled != nil {
- s.Conditions = append(s.Conditions, *oldPodScheduled)
- }
- api.UpdatePodCondition(&pod.Status, &api.PodCondition{
- Type: api.PodScheduled,
- Status: api.ConditionTrue,
- })
- if !kl.standaloneMode {
- hostIP, err := kl.getHostIPAnyWay()
- if err != nil {
- glog.V(4).Infof("Cannot get host IP: %v", err)
- } else {
- s.HostIP = hostIP.String()
- if podUsesHostNetwork(pod) && s.PodIP == "" {
- s.PodIP = hostIP.String()
- }
- }
- }
- return *s
- }
- // convertStatusToAPIStatus creates an api PodStatus for the given pod from
- // the given internal pod status. It is purely transformative and does not
- // alter the kubelet state at all.
- func (kl *Kubelet) convertStatusToAPIStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) *api.PodStatus {
- var apiPodStatus api.PodStatus
- apiPodStatus.PodIP = podStatus.IP
- apiPodStatus.ContainerStatuses = kl.convertToAPIContainerStatuses(
- pod, podStatus,
- pod.Status.ContainerStatuses,
- pod.Spec.Containers,
- len(pod.Spec.InitContainers) > 0,
- false,
- )
- apiPodStatus.InitContainerStatuses = kl.convertToAPIContainerStatuses(
- pod, podStatus,
- pod.Status.InitContainerStatuses,
- pod.Spec.InitContainers,
- len(pod.Spec.InitContainers) > 0,
- true,
- )
- return &apiPodStatus
- }
- func (kl *Kubelet) convertToAPIContainerStatuses(pod *api.Pod, podStatus *kubecontainer.PodStatus, previousStatus []api.ContainerStatus, containers []api.Container, hasInitContainers, isInitContainer bool) []api.ContainerStatus {
- convertContainerStatus := func(cs *kubecontainer.ContainerStatus) *api.ContainerStatus {
- cid := cs.ID.String()
- status := &api.ContainerStatus{
- Name: cs.Name,
- RestartCount: int32(cs.RestartCount),
- Image: cs.Image,
- ImageID: cs.ImageID,
- ContainerID: cid,
- }
- switch cs.State {
- case kubecontainer.ContainerStateRunning:
- status.State.Running = &api.ContainerStateRunning{StartedAt: unversioned.NewTime(cs.StartedAt)}
- case kubecontainer.ContainerStateExited:
- status.State.Terminated = &api.ContainerStateTerminated{
- ExitCode: int32(cs.ExitCode),
- Reason: cs.Reason,
- Message: cs.Message,
- StartedAt: unversioned.NewTime(cs.StartedAt),
- FinishedAt: unversioned.NewTime(cs.FinishedAt),
- ContainerID: cid,
- }
- default:
- status.State.Waiting = &api.ContainerStateWaiting{}
- }
- return status
- }
- // Fetch old containers statuses from old pod status.
- oldStatuses := make(map[string]api.ContainerStatus, len(containers))
- for _, status := range previousStatus {
- oldStatuses[status.Name] = status
- }
- // Set all container statuses to default waiting state
- statuses := make(map[string]*api.ContainerStatus, len(containers))
- defaultWaitingState := api.ContainerState{Waiting: &api.ContainerStateWaiting{Reason: "ContainerCreating"}}
- if hasInitContainers {
- defaultWaitingState = api.ContainerState{Waiting: &api.ContainerStateWaiting{Reason: "PodInitializing"}}
- }
- for _, container := range containers {
- status := &api.ContainerStatus{
- Name: container.Name,
- Image: container.Image,
- State: defaultWaitingState,
- }
- // Apply some values from the old statuses as the default values.
- if oldStatus, found := oldStatuses[container.Name]; found {
- status.RestartCount = oldStatus.RestartCount
- status.LastTerminationState = oldStatus.LastTerminationState
- }
- statuses[container.Name] = status
- }
- // Make the latest container status comes first.
- sort.Sort(sort.Reverse(kubecontainer.SortContainerStatusesByCreationTime(podStatus.ContainerStatuses)))
- // Set container statuses according to the statuses seen in pod status
- containerSeen := map[string]int{}
- for _, cStatus := range podStatus.ContainerStatuses {
- cName := cStatus.Name
- if _, ok := statuses[cName]; !ok {
- // This would also ignore the infra container.
- continue
- }
- if containerSeen[cName] >= 2 {
- continue
- }
- status := convertContainerStatus(cStatus)
- if containerSeen[cName] == 0 {
- statuses[cName] = status
- } else {
- statuses[cName].LastTerminationState = status.State
- }
- containerSeen[cName] = containerSeen[cName] + 1
- }
- // Handle the containers failed to be started, which should be in Waiting state.
- for _, container := range containers {
- if isInitContainer {
- // If the init container is terminated with exit code 0, it won't be restarted.
- // TODO(random-liu): Handle this in a cleaner way.
- s := podStatus.FindContainerStatusByName(container.Name)
- if s != nil && s.State == kubecontainer.ContainerStateExited && s.ExitCode == 0 {
- continue
- }
- }
- // If a container should be restarted in next syncpod, it is *Waiting*.
- if !kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
- continue
- }
- status := statuses[container.Name]
- reason, message, ok := kl.reasonCache.Get(pod.UID, container.Name)
- if !ok {
- // In fact, we could also apply Waiting state here, but it is less informative,
- // and the container will be restarted soon, so we prefer the original state here.
- // Note that with the current implementation of ShouldContainerBeRestarted the original state here
- // could be:
- // * Waiting: There is no associated historical container and start failure reason record.
- // * Terminated: The container is terminated.
- continue
- }
- if status.State.Terminated != nil {
- status.LastTerminationState = status.State
- }
- status.State = api.ContainerState{
- Waiting: &api.ContainerStateWaiting{
- Reason: reason.Error(),
- Message: message,
- },
- }
- statuses[container.Name] = status
- }
- var containerStatuses []api.ContainerStatus
- for _, status := range statuses {
- containerStatuses = append(containerStatuses, *status)
- }
- // Sort the container statuses since clients of this interface expect the list
- // of containers in a pod has a deterministic order.
- if isInitContainer {
- kubetypes.SortInitContainerStatuses(pod, containerStatuses)
- } else {
- sort.Sort(kubetypes.SortedContainerStatuses(containerStatuses))
- }
- return containerStatuses
- }
- // Returns logs of current machine.
- func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
- // TODO: whitelist logs we are willing to serve
- kl.logServer.ServeHTTP(w, req)
- }
- // findContainer finds and returns the container with the given pod ID, full name, and container name.
- // It returns nil if not found.
- func (kl *Kubelet) findContainer(podFullName string, podUID types.UID, containerName string) (*kubecontainer.Container, error) {
- pods, err := kl.containerRuntime.GetPods(false)
- if err != nil {
- return nil, err
- }
- pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
- return pod.FindContainerByName(containerName), nil
- }
- // Run a command in a container, returns the combined stdout, stderr as an array of bytes
- func (kl *Kubelet) RunInContainer(podFullName string, podUID types.UID, containerName string, cmd []string) ([]byte, error) {
- podUID = kl.podManager.TranslatePodUID(podUID)
- container, err := kl.findContainer(podFullName, podUID, containerName)
- if err != nil {
- return nil, err
- }
- if container == nil {
- return nil, fmt.Errorf("container not found (%q)", containerName)
- }
- var buffer bytes.Buffer
- output := ioutils.WriteCloserWrapper(&buffer)
- err = kl.runner.ExecInContainer(container.ID, cmd, nil, output, output, false, nil)
- // Even if err is non-nil, there still may be output (e.g. the exec wrote to stdout or stderr but
- // the command returned a nonzero exit code). Therefore, always return the output along with the
- // error.
- return buffer.Bytes(), err
- }
- // ExecInContainer executes a command in a container, connecting the supplied
- // stdin/stdout/stderr to the command's IO streams.
- func (kl *Kubelet) ExecInContainer(podFullName string, podUID types.UID, containerName string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
- podUID = kl.podManager.TranslatePodUID(podUID)
- container, err := kl.findContainer(podFullName, podUID, containerName)
- if err != nil {
- return err
- }
- if container == nil {
- return fmt.Errorf("container not found (%q)", containerName)
- }
- return kl.runner.ExecInContainer(container.ID, cmd, stdin, stdout, stderr, tty, resize)
- }
- // AttachContainer uses the container runtime to attach the given streams to
- // the given container.
- func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, containerName string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) error {
- podUID = kl.podManager.TranslatePodUID(podUID)
- container, err := kl.findContainer(podFullName, podUID, containerName)
- if err != nil {
- return err
- }
- if container == nil {
- return fmt.Errorf("container not found (%q)", containerName)
- }
- return kl.containerRuntime.AttachContainer(container.ID, stdin, stdout, stderr, tty, resize)
- }
- // PortForward connects to the pod's port and copies data between the port
- // and the stream.
- func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error {
- podUID = kl.podManager.TranslatePodUID(podUID)
- pods, err := kl.containerRuntime.GetPods(false)
- if err != nil {
- return err
- }
- pod := kubecontainer.Pods(pods).FindPod(podFullName, podUID)
- if pod.IsEmpty() {
- return fmt.Errorf("pod not found (%q)", podFullName)
- }
- return kl.runner.PortForward(&pod, port, stream)
- }
- // GetConfiguration returns the KubeletConfiguration used to configure the kubelet.
- func (kl *Kubelet) GetConfiguration() componentconfig.KubeletConfiguration {
- return kl.kubeletConfiguration
- }
- // BirthCry sends an event that the kubelet has started up.
- func (kl *Kubelet) BirthCry() {
- // Make an event that kubelet restarted.
- kl.recorder.Eventf(kl.nodeRef, api.EventTypeNormal, events.StartingKubelet, "Starting kubelet.")
- }
- // StreamingConnectionIdleTimeout returns the timeout for streaming connections to the HTTP server.
- func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
- return kl.streamingConnectionIdleTimeout
- }
- // ResyncInterval returns the interval used for periodic syncs.
- func (kl *Kubelet) ResyncInterval() time.Duration {
- return kl.resyncInterval
- }
- // ListenAndServe runs the kubelet HTTP server.
- func (kl *Kubelet) ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableDebuggingHandlers bool) {
- server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, address, port, tlsOptions, auth, enableDebuggingHandlers, kl.containerRuntime)
- }
- // ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
- func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
- server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, kl.containerRuntime)
- }
- // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around.
- func (kl *Kubelet) cleanUpContainersInPod(podId types.UID, exitedContainerID string) {
- if podStatus, err := kl.podCache.Get(podId); err == nil {
- removeAll := false
- if syncedPod, ok := kl.podManager.GetPodByUID(podId); ok {
- // When an evicted pod has already synced, all containers can be removed.
- removeAll = eviction.PodIsEvicted(syncedPod.Status)
- }
- kl.containerDeletor.deleteContainersInPod(exitedContainerID, podStatus, removeAll)
- }
- }
- // isSyncPodWorthy filters out events that are not worthy of pod syncing
- func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
- // ContatnerRemoved doesn't affect pod state
- return event.Type != pleg.ContainerRemoved
- }
- func parseResourceList(m utilconfig.ConfigurationMap) (api.ResourceList, error) {
- rl := make(api.ResourceList)
- for k, v := range m {
- switch api.ResourceName(k) {
- // Only CPU and memory resources are supported.
- case api.ResourceCPU, api.ResourceMemory:
- q, err := resource.ParseQuantity(v)
- if err != nil {
- return nil, err
- }
- if q.Sign() == -1 {
- return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v)
- }
- rl[api.ResourceName(k)] = q
- default:
- return nil, fmt.Errorf("cannot reserve %q resource", k)
- }
- }
- return rl, nil
- }
- func ParseReservation(kubeReserved, systemReserved utilconfig.ConfigurationMap) (*kubetypes.Reservation, error) {
- reservation := new(kubetypes.Reservation)
- if rl, err := parseResourceList(kubeReserved); err != nil {
- return nil, err
- } else {
- reservation.Kubernetes = rl
- }
- if rl, err := parseResourceList(systemReserved); err != nil {
- return nil, err
- } else {
- reservation.System = rl
- }
- return reservation, nil
- }
|