nodecontroller.go 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package node
  14. import (
  15. "errors"
  16. "fmt"
  17. "net"
  18. "sync"
  19. "time"
  20. "github.com/golang/glog"
  21. "k8s.io/kubernetes/pkg/api"
  22. "k8s.io/kubernetes/pkg/api/unversioned"
  23. "k8s.io/kubernetes/pkg/apis/extensions"
  24. "k8s.io/kubernetes/pkg/client/cache"
  25. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  26. unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
  27. "k8s.io/kubernetes/pkg/client/record"
  28. "k8s.io/kubernetes/pkg/cloudprovider"
  29. "k8s.io/kubernetes/pkg/controller"
  30. "k8s.io/kubernetes/pkg/controller/framework"
  31. "k8s.io/kubernetes/pkg/controller/framework/informers"
  32. "k8s.io/kubernetes/pkg/fields"
  33. "k8s.io/kubernetes/pkg/labels"
  34. "k8s.io/kubernetes/pkg/runtime"
  35. "k8s.io/kubernetes/pkg/util/flowcontrol"
  36. "k8s.io/kubernetes/pkg/util/metrics"
  37. utilnode "k8s.io/kubernetes/pkg/util/node"
  38. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  39. "k8s.io/kubernetes/pkg/util/system"
  40. "k8s.io/kubernetes/pkg/util/wait"
  41. "k8s.io/kubernetes/pkg/version"
  42. "k8s.io/kubernetes/pkg/watch"
  43. "github.com/prometheus/client_golang/prometheus"
  44. )
  45. func init() {
  46. // Register prometheus metrics
  47. Register()
  48. }
  49. var (
  50. ErrCloudInstance = errors.New("cloud provider doesn't support instances.")
  51. gracefulDeletionVersion = version.MustParse("v1.1.0")
  52. // The minimum kubelet version for which the nodecontroller
  53. // can safely flip pod.Status to NotReady.
  54. podStatusReconciliationVersion = version.MustParse("v1.2.0")
  55. )
  56. const (
  57. // nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update.
  58. nodeStatusUpdateRetry = 5
  59. // controls how often NodeController will try to evict Pods from non-responsive Nodes.
  60. nodeEvictionPeriod = 100 * time.Millisecond
  61. // Burst value for all eviction rate limiters
  62. evictionRateLimiterBurst = 1
  63. // The amount of time the nodecontroller polls on the list nodes endpoint.
  64. apiserverStartupGracePeriod = 10 * time.Minute
  65. )
  66. type zoneState string
  67. const (
  68. stateInitial = zoneState("Initial")
  69. stateNormal = zoneState("Normal")
  70. stateFullDisruption = zoneState("FullDisruption")
  71. statePartialDisruption = zoneState("PartialDisruption")
  72. )
  73. type nodeStatusData struct {
  74. probeTimestamp unversioned.Time
  75. readyTransitionTimestamp unversioned.Time
  76. status api.NodeStatus
  77. }
  78. type NodeController struct {
  79. allocateNodeCIDRs bool
  80. cloud cloudprovider.Interface
  81. clusterCIDR *net.IPNet
  82. serviceCIDR *net.IPNet
  83. knownNodeSet map[string]*api.Node
  84. kubeClient clientset.Interface
  85. // Method for easy mocking in unittest.
  86. lookupIP func(host string) ([]net.IP, error)
  87. // Value used if sync_nodes_status=False. NodeController will not proactively
  88. // sync node status in this case, but will monitor node status updated from kubelet. If
  89. // it doesn't receive update for this amount of time, it will start posting "NodeReady==
  90. // ConditionUnknown". The amount of time before which NodeController start evicting pods
  91. // is controlled via flag 'pod-eviction-timeout'.
  92. // Note: be cautious when changing the constant, it must work with nodeStatusUpdateFrequency
  93. // in kubelet. There are several constraints:
  94. // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
  95. // N means number of retries allowed for kubelet to post node status. It is pointless
  96. // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
  97. // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
  98. // The constant must be less than podEvictionTimeout.
  99. // 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes
  100. // longer for user to see up-to-date node status.
  101. nodeMonitorGracePeriod time.Duration
  102. // Value controlling NodeController monitoring period, i.e. how often does NodeController
  103. // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod.
  104. // TODO: Change node status monitor to watch based.
  105. nodeMonitorPeriod time.Duration
  106. // Value used if sync_nodes_status=False, only for node startup. When node
  107. // is just created, e.g. cluster bootstrap or node creation, we give a longer grace period.
  108. nodeStartupGracePeriod time.Duration
  109. // per Node map storing last observed Status together with a local time when it was observed.
  110. // This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
  111. // to aviod the problem with time skew across the cluster.
  112. nodeStatusMap map[string]nodeStatusData
  113. now func() unversioned.Time
  114. // Lock to access evictor workers
  115. evictorLock sync.Mutex
  116. // workers that evicts pods from unresponsive nodes.
  117. zonePodEvictor map[string]*RateLimitedTimedQueue
  118. zoneTerminationEvictor map[string]*RateLimitedTimedQueue
  119. podEvictionTimeout time.Duration
  120. // The maximum duration before a pod evicted from a node can be forcefully terminated.
  121. maximumGracePeriod time.Duration
  122. recorder record.EventRecorder
  123. // Pod framework and store
  124. podController framework.ControllerInterface
  125. podStore cache.StoreToPodLister
  126. // Node framework and store
  127. nodeController *framework.Controller
  128. nodeStore cache.StoreToNodeLister
  129. // DaemonSet framework and store
  130. daemonSetController *framework.Controller
  131. daemonSetStore cache.StoreToDaemonSetLister
  132. // allocate/recycle CIDRs for node if allocateNodeCIDRs == true
  133. cidrAllocator CIDRAllocator
  134. forcefullyDeletePod func(*api.Pod) error
  135. nodeExistsInCloudProvider func(string) (bool, error)
  136. computeZoneStateFunc func(nodeConditions []*api.NodeCondition) (int, zoneState)
  137. enterPartialDisruptionFunc func(nodeNum int) float32
  138. enterFullDisruptionFunc func(nodeNum int) float32
  139. zoneStates map[string]zoneState
  140. evictionLimiterQPS float32
  141. secondaryEvictionLimiterQPS float32
  142. largeClusterThreshold int32
  143. unhealthyZoneThreshold float32
  144. // internalPodInformer is used to hold a personal informer. If we're using
  145. // a normal shared informer, then the informer will be started for us. If
  146. // we have a personal informer, we must start it ourselves. If you start
  147. // the controller using NewDaemonSetsController(passing SharedInformer), this
  148. // will be null
  149. internalPodInformer framework.SharedIndexInformer
  150. evictions10Minutes *evictionData
  151. evictions1Hour *evictionData
  152. }
  153. // NewNodeController returns a new node controller to sync instances from cloudprovider.
  154. // This method returns an error if it is unable to initialize the CIDR bitmap with
  155. // podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes
  156. // currently, this should be handled as a fatal error.
  157. func NewNodeController(
  158. podInformer framework.SharedIndexInformer,
  159. cloud cloudprovider.Interface,
  160. kubeClient clientset.Interface,
  161. podEvictionTimeout time.Duration,
  162. evictionLimiterQPS float32,
  163. secondaryEvictionLimiterQPS float32,
  164. largeClusterThreshold int32,
  165. unhealthyZoneThreshold float32,
  166. nodeMonitorGracePeriod time.Duration,
  167. nodeStartupGracePeriod time.Duration,
  168. nodeMonitorPeriod time.Duration,
  169. clusterCIDR *net.IPNet,
  170. serviceCIDR *net.IPNet,
  171. nodeCIDRMaskSize int,
  172. allocateNodeCIDRs bool) (*NodeController, error) {
  173. eventBroadcaster := record.NewBroadcaster()
  174. recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
  175. eventBroadcaster.StartLogging(glog.Infof)
  176. if kubeClient != nil {
  177. glog.V(0).Infof("Sending events to api server.")
  178. eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
  179. } else {
  180. glog.V(0).Infof("No api server defined - no events will be sent to API server.")
  181. }
  182. if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
  183. metrics.RegisterMetricAndTrackRateLimiterUsage("node_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
  184. }
  185. if allocateNodeCIDRs {
  186. if clusterCIDR == nil {
  187. glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.")
  188. }
  189. mask := clusterCIDR.Mask
  190. if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSize {
  191. glog.Fatal("NodeController: Invalid clusterCIDR, mask size of clusterCIDR must be less than nodeCIDRMaskSize.")
  192. }
  193. }
  194. nc := &NodeController{
  195. cloud: cloud,
  196. knownNodeSet: make(map[string]*api.Node),
  197. kubeClient: kubeClient,
  198. recorder: recorder,
  199. podEvictionTimeout: podEvictionTimeout,
  200. maximumGracePeriod: 5 * time.Minute,
  201. zonePodEvictor: make(map[string]*RateLimitedTimedQueue),
  202. zoneTerminationEvictor: make(map[string]*RateLimitedTimedQueue),
  203. nodeStatusMap: make(map[string]nodeStatusData),
  204. nodeMonitorGracePeriod: nodeMonitorGracePeriod,
  205. nodeMonitorPeriod: nodeMonitorPeriod,
  206. nodeStartupGracePeriod: nodeStartupGracePeriod,
  207. lookupIP: net.LookupIP,
  208. now: unversioned.Now,
  209. clusterCIDR: clusterCIDR,
  210. serviceCIDR: serviceCIDR,
  211. allocateNodeCIDRs: allocateNodeCIDRs,
  212. forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
  213. nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
  214. evictionLimiterQPS: evictionLimiterQPS,
  215. secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
  216. largeClusterThreshold: largeClusterThreshold,
  217. unhealthyZoneThreshold: unhealthyZoneThreshold,
  218. zoneStates: make(map[string]zoneState),
  219. evictions10Minutes: newEvictionData(10 * time.Minute),
  220. evictions1Hour: newEvictionData(time.Hour),
  221. }
  222. nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
  223. nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
  224. nc.computeZoneStateFunc = nc.ComputeZoneState
  225. podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
  226. AddFunc: nc.maybeDeleteTerminatingPod,
  227. UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) },
  228. })
  229. nc.podStore.Indexer = podInformer.GetIndexer()
  230. nc.podController = podInformer.GetController()
  231. nodeEventHandlerFuncs := framework.ResourceEventHandlerFuncs{}
  232. if nc.allocateNodeCIDRs {
  233. nodeEventHandlerFuncs = framework.ResourceEventHandlerFuncs{
  234. AddFunc: func(obj interface{}) {
  235. node := obj.(*api.Node)
  236. err := nc.cidrAllocator.AllocateOrOccupyCIDR(node)
  237. if err != nil {
  238. glog.Errorf("Error allocating CIDR: %v", err)
  239. }
  240. },
  241. UpdateFunc: func(_, obj interface{}) {
  242. node := obj.(*api.Node)
  243. // If the PodCIDR is not empty we either:
  244. // - already processed a Node that already had a CIDR after NC restarted
  245. // (cidr is marked as used),
  246. // - already processed a Node successfully and allocated a CIDR for it
  247. // (cidr is marked as used),
  248. // - already processed a Node but we did saw a "timeout" response and
  249. // request eventually got through in this case we haven't released
  250. // the allocated CIDR (cidr is still marked as used).
  251. // There's a possible error here:
  252. // - NC sees a new Node and assigns a CIDR X to it,
  253. // - Update Node call fails with a timeout,
  254. // - Node is updated by some other component, NC sees an update and
  255. // assigns CIDR Y to the Node,
  256. // - Both CIDR X and CIDR Y are marked as used in the local cache,
  257. // even though Node sees only CIDR Y
  258. // The problem here is that in in-memory cache we see CIDR X as marked,
  259. // which prevents it from being assigned to any new node. The cluster
  260. // state is correct.
  261. // Restart of NC fixes the issue.
  262. if node.Spec.PodCIDR == "" {
  263. err := nc.cidrAllocator.AllocateOrOccupyCIDR(node)
  264. if err != nil {
  265. glog.Errorf("Error allocating CIDR: %v", err)
  266. }
  267. }
  268. },
  269. DeleteFunc: func(obj interface{}) {
  270. node := obj.(*api.Node)
  271. err := nc.cidrAllocator.ReleaseCIDR(node)
  272. if err != nil {
  273. glog.Errorf("Error releasing CIDR: %v", err)
  274. }
  275. },
  276. }
  277. }
  278. nc.nodeStore.Store, nc.nodeController = framework.NewInformer(
  279. &cache.ListWatch{
  280. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  281. return nc.kubeClient.Core().Nodes().List(options)
  282. },
  283. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  284. return nc.kubeClient.Core().Nodes().Watch(options)
  285. },
  286. },
  287. &api.Node{},
  288. controller.NoResyncPeriodFunc(),
  289. nodeEventHandlerFuncs,
  290. )
  291. nc.daemonSetStore.Store, nc.daemonSetController = framework.NewInformer(
  292. &cache.ListWatch{
  293. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  294. return nc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).List(options)
  295. },
  296. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  297. return nc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).Watch(options)
  298. },
  299. },
  300. &extensions.DaemonSet{},
  301. controller.NoResyncPeriodFunc(),
  302. framework.ResourceEventHandlerFuncs{},
  303. )
  304. if allocateNodeCIDRs {
  305. var nodeList *api.NodeList
  306. var err error
  307. // We must poll because apiserver might not be up. This error causes
  308. // controller manager to restart.
  309. if pollErr := wait.Poll(10*time.Second, apiserverStartupGracePeriod, func() (bool, error) {
  310. nodeList, err = kubeClient.Core().Nodes().List(api.ListOptions{
  311. FieldSelector: fields.Everything(),
  312. LabelSelector: labels.Everything(),
  313. })
  314. if err != nil {
  315. glog.Errorf("Failed to list all nodes: %v", err)
  316. return false, nil
  317. }
  318. return true, nil
  319. }); pollErr != nil {
  320. return nil, fmt.Errorf("Failed to list all nodes in %v, cannot proceed without updating CIDR map", apiserverStartupGracePeriod)
  321. }
  322. nc.cidrAllocator, err = NewCIDRRangeAllocator(kubeClient, clusterCIDR, serviceCIDR, nodeCIDRMaskSize, nodeList)
  323. if err != nil {
  324. return nil, err
  325. }
  326. }
  327. return nc, nil
  328. }
  329. func NewNodeControllerFromClient(
  330. cloud cloudprovider.Interface,
  331. kubeClient clientset.Interface,
  332. podEvictionTimeout time.Duration,
  333. evictionLimiterQPS float32,
  334. secondaryEvictionLimiterQPS float32,
  335. largeClusterThreshold int32,
  336. unhealthyZoneThreshold float32,
  337. nodeMonitorGracePeriod time.Duration,
  338. nodeStartupGracePeriod time.Duration,
  339. nodeMonitorPeriod time.Duration,
  340. clusterCIDR *net.IPNet,
  341. serviceCIDR *net.IPNet,
  342. nodeCIDRMaskSize int,
  343. allocateNodeCIDRs bool) (*NodeController, error) {
  344. podInformer := informers.NewPodInformer(kubeClient, controller.NoResyncPeriodFunc())
  345. nc, err := NewNodeController(podInformer, cloud, kubeClient, podEvictionTimeout, evictionLimiterQPS, secondaryEvictionLimiterQPS,
  346. largeClusterThreshold, unhealthyZoneThreshold, nodeMonitorGracePeriod, nodeStartupGracePeriod, nodeMonitorPeriod, clusterCIDR,
  347. serviceCIDR, nodeCIDRMaskSize, allocateNodeCIDRs)
  348. if err != nil {
  349. return nil, err
  350. }
  351. nc.internalPodInformer = podInformer
  352. return nc, nil
  353. }
  354. // Run starts an asynchronous loop that monitors the status of cluster nodes.
  355. func (nc *NodeController) Run() {
  356. go nc.nodeController.Run(wait.NeverStop)
  357. go nc.podController.Run(wait.NeverStop)
  358. go nc.daemonSetController.Run(wait.NeverStop)
  359. if nc.internalPodInformer != nil {
  360. nc.internalPodInformer.Run(wait.NeverStop)
  361. }
  362. // Incorporate the results of node status pushed from kubelet to master.
  363. go wait.Until(func() {
  364. if err := nc.monitorNodeStatus(); err != nil {
  365. glog.Errorf("Error monitoring node status: %v", err)
  366. }
  367. }, nc.nodeMonitorPeriod, wait.NeverStop)
  368. // Managing eviction of nodes:
  369. // 1. when we delete pods off a node, if the node was not empty at the time we then
  370. // queue a termination watcher
  371. // a. If we hit an error, retry deletion
  372. // 2. The terminator loop ensures that pods are eventually cleaned and we never
  373. // terminate a pod in a time period less than nc.maximumGracePeriod. AddedAt
  374. // is the time from which we measure "has this pod been terminating too long",
  375. // after which we will delete the pod with grace period 0 (force delete).
  376. // a. If we hit errors, retry instantly
  377. // b. If there are no pods left terminating, exit
  378. // c. If there are pods still terminating, wait for their estimated completion
  379. // before retrying
  380. go wait.Until(func() {
  381. nc.evictorLock.Lock()
  382. defer nc.evictorLock.Unlock()
  383. for k := range nc.zonePodEvictor {
  384. nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
  385. obj, exists, err := nc.nodeStore.Get(value.Value)
  386. if err != nil {
  387. glog.Warningf("Failed to get Node %v from the nodeStore: %v", value.Value, err)
  388. } else if !exists {
  389. glog.Warningf("Node %v no longer present in nodeStore!", value.Value)
  390. } else {
  391. node, _ := obj.(*api.Node)
  392. zone := utilnode.GetZoneKey(node)
  393. nc.evictions10Minutes.registerEviction(zone, value.Value)
  394. nc.evictions1Hour.registerEviction(zone, value.Value)
  395. }
  396. nodeUid, _ := value.UID.(string)
  397. remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
  398. if err != nil {
  399. utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
  400. return false, 0
  401. }
  402. if remaining {
  403. nc.zoneTerminationEvictor[k].Add(value.Value, value.UID)
  404. }
  405. return true, 0
  406. })
  407. }
  408. }, nodeEvictionPeriod, wait.NeverStop)
  409. // TODO: replace with a controller that ensures pods that are terminating complete
  410. // in a particular time period
  411. go wait.Until(func() {
  412. nc.evictorLock.Lock()
  413. defer nc.evictorLock.Unlock()
  414. for k := range nc.zoneTerminationEvictor {
  415. nc.zoneTerminationEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
  416. nodeUid, _ := value.UID.(string)
  417. completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, value.AddedAt, nc.maximumGracePeriod)
  418. if err != nil {
  419. utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
  420. return false, 0
  421. }
  422. if completed {
  423. glog.V(2).Infof("All pods terminated on %s", value.Value)
  424. recordNodeEvent(nc.recorder, value.Value, nodeUid, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
  425. return true, 0
  426. }
  427. glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining)
  428. // clamp very short intervals
  429. if remaining < nodeEvictionPeriod {
  430. remaining = nodeEvictionPeriod
  431. }
  432. return false, remaining
  433. })
  434. }
  435. }, nodeEvictionPeriod, wait.NeverStop)
  436. go wait.Until(func() {
  437. pods, err := nc.podStore.List(labels.Everything())
  438. if err != nil {
  439. utilruntime.HandleError(err)
  440. return
  441. }
  442. cleanupOrphanedPods(pods, nc.nodeStore.Store, nc.forcefullyDeletePod)
  443. }, 30*time.Second, wait.NeverStop)
  444. }
  445. // monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
  446. // post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
  447. // not reachable for a long period of time.
  448. func (nc *NodeController) monitorNodeStatus() error {
  449. nodes, err := nc.kubeClient.Core().Nodes().List(api.ListOptions{})
  450. if err != nil {
  451. return err
  452. }
  453. added, deleted := nc.checkForNodeAddedDeleted(nodes)
  454. for i := range added {
  455. glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
  456. recordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
  457. nc.knownNodeSet[added[i].Name] = added[i]
  458. // When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
  459. zone := utilnode.GetZoneKey(added[i])
  460. if _, found := nc.zonePodEvictor[zone]; !found {
  461. nc.zonePodEvictor[zone] =
  462. NewRateLimitedTimedQueue(
  463. flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
  464. nc.evictions10Minutes.initZone(zone)
  465. nc.evictions1Hour.initZone(zone)
  466. }
  467. if _, found := nc.zoneTerminationEvictor[zone]; !found {
  468. nc.zoneTerminationEvictor[zone] = NewRateLimitedTimedQueue(
  469. flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
  470. }
  471. nc.cancelPodEviction(added[i])
  472. }
  473. for i := range deleted {
  474. glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name)
  475. recordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name))
  476. nc.evictPods(deleted[i])
  477. delete(nc.knownNodeSet, deleted[i].Name)
  478. }
  479. zoneToNodeConditions := map[string][]*api.NodeCondition{}
  480. for i := range nodes.Items {
  481. var gracePeriod time.Duration
  482. var observedReadyCondition api.NodeCondition
  483. var currentReadyCondition *api.NodeCondition
  484. node := &nodes.Items[i]
  485. for rep := 0; rep < nodeStatusUpdateRetry; rep++ {
  486. gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node)
  487. if err == nil {
  488. break
  489. }
  490. name := node.Name
  491. node, err = nc.kubeClient.Core().Nodes().Get(name)
  492. if err != nil {
  493. glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name)
  494. break
  495. }
  496. }
  497. if err != nil {
  498. glog.Errorf("Update status of Node %v from NodeController exceeds retry count."+
  499. "Skipping - no pods will be evicted.", node.Name)
  500. continue
  501. }
  502. // We do not treat a master node as a part of the cluster for network disruption checking.
  503. if !system.IsMasterNode(node) {
  504. zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
  505. }
  506. decisionTimestamp := nc.now()
  507. if currentReadyCondition != nil {
  508. // Check eviction timeout against decisionTimestamp
  509. if observedReadyCondition.Status == api.ConditionFalse &&
  510. decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
  511. if nc.evictPods(node) {
  512. glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
  513. }
  514. }
  515. if observedReadyCondition.Status == api.ConditionUnknown &&
  516. decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
  517. if nc.evictPods(node) {
  518. glog.V(4).Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
  519. }
  520. }
  521. if observedReadyCondition.Status == api.ConditionTrue {
  522. if nc.cancelPodEviction(node) {
  523. glog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
  524. }
  525. }
  526. // Report node event.
  527. if currentReadyCondition.Status != api.ConditionTrue && observedReadyCondition.Status == api.ConditionTrue {
  528. recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
  529. if err = markAllPodsNotReady(nc.kubeClient, node); err != nil {
  530. utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
  531. }
  532. }
  533. // Check with the cloud provider to see if the node still exists. If it
  534. // doesn't, delete the node immediately.
  535. if currentReadyCondition.Status != api.ConditionTrue && nc.cloud != nil {
  536. exists, err := nc.nodeExistsInCloudProvider(node.Name)
  537. if err != nil {
  538. glog.Errorf("Error determining if node %v exists in cloud: %v", node.Name, err)
  539. continue
  540. }
  541. if !exists {
  542. glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
  543. recordNodeEvent(nc.recorder, node.Name, string(node.UID), api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
  544. go func(nodeName string) {
  545. defer utilruntime.HandleCrash()
  546. // Kubelet is not reporting and Cloud Provider says node
  547. // is gone. Delete it without worrying about grace
  548. // periods.
  549. if err := forcefullyDeleteNode(nc.kubeClient, nodeName, nc.forcefullyDeletePod); err != nil {
  550. glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err)
  551. }
  552. }(node.Name)
  553. continue
  554. }
  555. }
  556. }
  557. }
  558. nc.handleDisruption(zoneToNodeConditions, nodes)
  559. nc.updateEvictionMetric(Evictions10Minutes, nc.evictions10Minutes)
  560. nc.updateEvictionMetric(Evictions1Hour, nc.evictions1Hour)
  561. return nil
  562. }
  563. func (nc *NodeController) updateEvictionMetric(metric *prometheus.GaugeVec, data *evictionData) {
  564. data.slideWindow()
  565. zones := data.getZones()
  566. for _, z := range zones {
  567. metric.WithLabelValues(z).Set(float64(data.countEvictions(z)))
  568. }
  569. }
  570. func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*api.NodeCondition, nodes *api.NodeList) {
  571. newZoneStates := map[string]zoneState{}
  572. allAreFullyDisrupted := true
  573. for k, v := range zoneToNodeConditions {
  574. ZoneSize.WithLabelValues(k).Set(float64(len(v)))
  575. unhealthy, newState := nc.computeZoneStateFunc(v)
  576. ZoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
  577. UnhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
  578. if newState != stateFullDisruption {
  579. allAreFullyDisrupted = false
  580. }
  581. newZoneStates[k] = newState
  582. if _, had := nc.zoneStates[k]; !had {
  583. nc.zoneStates[k] = stateInitial
  584. }
  585. }
  586. allWasFullyDisrupted := true
  587. for k, v := range nc.zoneStates {
  588. if _, have := zoneToNodeConditions[k]; !have {
  589. ZoneSize.WithLabelValues(k).Set(0)
  590. ZoneHealth.WithLabelValues(k).Set(100)
  591. UnhealthyNodes.WithLabelValues(k).Set(0)
  592. delete(nc.zoneStates, k)
  593. continue
  594. }
  595. if v != stateFullDisruption {
  596. allWasFullyDisrupted = false
  597. break
  598. }
  599. }
  600. // At least one node was responding in previous pass or in the current pass. Semantics is as follows:
  601. // - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
  602. // - if the new state is "normal" we resume normal operation (go back to default limiter settings),
  603. // - if new state is "fullDisruption" we restore normal eviction rate,
  604. // - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
  605. if !allAreFullyDisrupted || !allWasFullyDisrupted {
  606. // We're switching to full disruption mode
  607. if allAreFullyDisrupted {
  608. glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.")
  609. for i := range nodes.Items {
  610. nc.cancelPodEviction(&nodes.Items[i])
  611. }
  612. // We stop all evictions.
  613. for k := range nc.zonePodEvictor {
  614. nc.zonePodEvictor[k].SwapLimiter(0)
  615. nc.zoneTerminationEvictor[k].SwapLimiter(0)
  616. }
  617. for k := range nc.zoneStates {
  618. nc.zoneStates[k] = stateFullDisruption
  619. }
  620. // All rate limiters are updated, so we can return early here.
  621. return
  622. }
  623. // We're exiting full disruption mode
  624. if allWasFullyDisrupted {
  625. glog.V(0).Info("NodeController detected that some Nodes are Ready. Exiting master disruption mode.")
  626. // When exiting disruption mode update probe timestamps on all Nodes.
  627. now := nc.now()
  628. for i := range nodes.Items {
  629. v := nc.nodeStatusMap[nodes.Items[i].Name]
  630. v.probeTimestamp = now
  631. v.readyTransitionTimestamp = now
  632. nc.nodeStatusMap[nodes.Items[i].Name] = v
  633. }
  634. // We reset all rate limiters to settings appropriate for the given state.
  635. for k := range nc.zonePodEvictor {
  636. nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
  637. nc.zoneStates[k] = newZoneStates[k]
  638. }
  639. return
  640. }
  641. // We know that there's at least one not-fully disrupted so,
  642. // we can use default behavior for rate limiters
  643. for k, v := range nc.zoneStates {
  644. newState := newZoneStates[k]
  645. if v == newState {
  646. continue
  647. }
  648. glog.V(0).Infof("NodeController detected that zone %v is now in state %v.", k, newState)
  649. nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
  650. nc.zoneStates[k] = newState
  651. }
  652. }
  653. }
  654. func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zoneState) {
  655. switch state {
  656. case stateNormal:
  657. nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
  658. nc.zoneTerminationEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
  659. case statePartialDisruption:
  660. nc.zonePodEvictor[zone].SwapLimiter(
  661. nc.enterPartialDisruptionFunc(zoneSize))
  662. nc.zoneTerminationEvictor[zone].SwapLimiter(
  663. nc.enterPartialDisruptionFunc(zoneSize))
  664. case stateFullDisruption:
  665. nc.zonePodEvictor[zone].SwapLimiter(
  666. nc.enterFullDisruptionFunc(zoneSize))
  667. nc.zoneTerminationEvictor[zone].SwapLimiter(
  668. nc.enterFullDisruptionFunc(zoneSize))
  669. }
  670. }
  671. // For a given node checks its conditions and tries to update it. Returns grace period to which given node
  672. // is entitled, state of current and last observed Ready Condition, and an error if it occurred.
  673. func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, api.NodeCondition, *api.NodeCondition, error) {
  674. var err error
  675. var gracePeriod time.Duration
  676. var observedReadyCondition api.NodeCondition
  677. _, currentReadyCondition := api.GetNodeCondition(&node.Status, api.NodeReady)
  678. if currentReadyCondition == nil {
  679. // If ready condition is nil, then kubelet (or nodecontroller) never posted node status.
  680. // A fake ready condition is created, where LastProbeTime and LastTransitionTime is set
  681. // to node.CreationTimestamp to avoid handle the corner case.
  682. observedReadyCondition = api.NodeCondition{
  683. Type: api.NodeReady,
  684. Status: api.ConditionUnknown,
  685. LastHeartbeatTime: node.CreationTimestamp,
  686. LastTransitionTime: node.CreationTimestamp,
  687. }
  688. gracePeriod = nc.nodeStartupGracePeriod
  689. nc.nodeStatusMap[node.Name] = nodeStatusData{
  690. status: node.Status,
  691. probeTimestamp: node.CreationTimestamp,
  692. readyTransitionTimestamp: node.CreationTimestamp,
  693. }
  694. } else {
  695. // If ready condition is not nil, make a copy of it, since we may modify it in place later.
  696. observedReadyCondition = *currentReadyCondition
  697. gracePeriod = nc.nodeMonitorGracePeriod
  698. }
  699. savedNodeStatus, found := nc.nodeStatusMap[node.Name]
  700. // There are following cases to check:
  701. // - both saved and new status have no Ready Condition set - we leave everything as it is,
  702. // - saved status have no Ready Condition, but current one does - NodeController was restarted with Node data already present in etcd,
  703. // - saved status have some Ready Condition, but current one does not - it's an error, but we fill it up because that's probably a good thing to do,
  704. // - both saved and current statuses have Ready Conditions and they have the same LastProbeTime - nothing happened on that Node, it may be
  705. // unresponsive, so we leave it as it is,
  706. // - both saved and current statuses have Ready Conditions, they have different LastProbeTimes, but the same Ready Condition State -
  707. // everything's in order, no transition occurred, we update only probeTimestamp,
  708. // - both saved and current statuses have Ready Conditions, different LastProbeTimes and different Ready Condition State -
  709. // Ready Condition changed it state since we last seen it, so we update both probeTimestamp and readyTransitionTimestamp.
  710. // TODO: things to consider:
  711. // - if 'LastProbeTime' have gone back in time its probably an error, currently we ignore it,
  712. // - currently only correct Ready State transition outside of Node Controller is marking it ready by Kubelet, we don't check
  713. // if that's the case, but it does not seem necessary.
  714. var savedCondition *api.NodeCondition
  715. if found {
  716. _, savedCondition = api.GetNodeCondition(&savedNodeStatus.status, api.NodeReady)
  717. }
  718. _, observedCondition := api.GetNodeCondition(&node.Status, api.NodeReady)
  719. if !found {
  720. glog.Warningf("Missing timestamp for Node %s. Assuming now as a timestamp.", node.Name)
  721. savedNodeStatus = nodeStatusData{
  722. status: node.Status,
  723. probeTimestamp: nc.now(),
  724. readyTransitionTimestamp: nc.now(),
  725. }
  726. } else if savedCondition == nil && observedCondition != nil {
  727. glog.V(1).Infof("Creating timestamp entry for newly observed Node %s", node.Name)
  728. savedNodeStatus = nodeStatusData{
  729. status: node.Status,
  730. probeTimestamp: nc.now(),
  731. readyTransitionTimestamp: nc.now(),
  732. }
  733. } else if savedCondition != nil && observedCondition == nil {
  734. glog.Errorf("ReadyCondition was removed from Status of Node %s", node.Name)
  735. // TODO: figure out what to do in this case. For now we do the same thing as above.
  736. savedNodeStatus = nodeStatusData{
  737. status: node.Status,
  738. probeTimestamp: nc.now(),
  739. readyTransitionTimestamp: nc.now(),
  740. }
  741. } else if savedCondition != nil && observedCondition != nil && savedCondition.LastHeartbeatTime != observedCondition.LastHeartbeatTime {
  742. var transitionTime unversioned.Time
  743. // If ReadyCondition changed since the last time we checked, we update the transition timestamp to "now",
  744. // otherwise we leave it as it is.
  745. if savedCondition.LastTransitionTime != observedCondition.LastTransitionTime {
  746. glog.V(3).Infof("ReadyCondition for Node %s transitioned from %v to %v", node.Name, savedCondition.Status, observedCondition)
  747. transitionTime = nc.now()
  748. } else {
  749. transitionTime = savedNodeStatus.readyTransitionTimestamp
  750. }
  751. if glog.V(5) {
  752. glog.V(5).Infof("Node %s ReadyCondition updated. Updating timestamp: %+v vs %+v.", node.Name, savedNodeStatus.status, node.Status)
  753. } else {
  754. glog.V(3).Infof("Node %s ReadyCondition updated. Updating timestamp.", node.Name)
  755. }
  756. savedNodeStatus = nodeStatusData{
  757. status: node.Status,
  758. probeTimestamp: nc.now(),
  759. readyTransitionTimestamp: transitionTime,
  760. }
  761. }
  762. nc.nodeStatusMap[node.Name] = savedNodeStatus
  763. if nc.now().After(savedNodeStatus.probeTimestamp.Add(gracePeriod)) {
  764. // NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown
  765. // (regardless of its current value) in the master.
  766. if currentReadyCondition == nil {
  767. glog.V(2).Infof("node %v is never updated by kubelet", node.Name)
  768. node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
  769. Type: api.NodeReady,
  770. Status: api.ConditionUnknown,
  771. Reason: "NodeStatusNeverUpdated",
  772. Message: fmt.Sprintf("Kubelet never posted node status."),
  773. LastHeartbeatTime: node.CreationTimestamp,
  774. LastTransitionTime: nc.now(),
  775. })
  776. } else {
  777. glog.V(4).Infof("node %v hasn't been updated for %+v. Last ready condition is: %+v",
  778. node.Name, nc.now().Time.Sub(savedNodeStatus.probeTimestamp.Time), observedReadyCondition)
  779. if observedReadyCondition.Status != api.ConditionUnknown {
  780. currentReadyCondition.Status = api.ConditionUnknown
  781. currentReadyCondition.Reason = "NodeStatusUnknown"
  782. currentReadyCondition.Message = fmt.Sprintf("Kubelet stopped posting node status.")
  783. // LastProbeTime is the last time we heard from kubelet.
  784. currentReadyCondition.LastHeartbeatTime = observedReadyCondition.LastHeartbeatTime
  785. currentReadyCondition.LastTransitionTime = nc.now()
  786. }
  787. }
  788. // Like NodeReady condition, NodeOutOfDisk was last set longer ago than gracePeriod, so update
  789. // it to Unknown (regardless of its current value) in the master.
  790. // TODO(madhusudancs): Refactor this with readyCondition to remove duplicated code.
  791. _, oodCondition := api.GetNodeCondition(&node.Status, api.NodeOutOfDisk)
  792. if oodCondition == nil {
  793. glog.V(2).Infof("Out of disk condition of node %v is never updated by kubelet", node.Name)
  794. node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
  795. Type: api.NodeOutOfDisk,
  796. Status: api.ConditionUnknown,
  797. Reason: "NodeStatusNeverUpdated",
  798. Message: fmt.Sprintf("Kubelet never posted node status."),
  799. LastHeartbeatTime: node.CreationTimestamp,
  800. LastTransitionTime: nc.now(),
  801. })
  802. } else {
  803. glog.V(4).Infof("node %v hasn't been updated for %+v. Last out of disk condition is: %+v",
  804. node.Name, nc.now().Time.Sub(savedNodeStatus.probeTimestamp.Time), oodCondition)
  805. if oodCondition.Status != api.ConditionUnknown {
  806. oodCondition.Status = api.ConditionUnknown
  807. oodCondition.Reason = "NodeStatusUnknown"
  808. oodCondition.Message = fmt.Sprintf("Kubelet stopped posting node status.")
  809. oodCondition.LastTransitionTime = nc.now()
  810. }
  811. }
  812. _, currentCondition := api.GetNodeCondition(&node.Status, api.NodeReady)
  813. if !api.Semantic.DeepEqual(currentCondition, &observedReadyCondition) {
  814. if _, err = nc.kubeClient.Core().Nodes().UpdateStatus(node); err != nil {
  815. glog.Errorf("Error updating node %s: %v", node.Name, err)
  816. return gracePeriod, observedReadyCondition, currentReadyCondition, err
  817. } else {
  818. nc.nodeStatusMap[node.Name] = nodeStatusData{
  819. status: node.Status,
  820. probeTimestamp: nc.nodeStatusMap[node.Name].probeTimestamp,
  821. readyTransitionTimestamp: nc.now(),
  822. }
  823. return gracePeriod, observedReadyCondition, currentReadyCondition, nil
  824. }
  825. }
  826. }
  827. return gracePeriod, observedReadyCondition, currentReadyCondition, err
  828. }
  829. func (nc *NodeController) checkForNodeAddedDeleted(nodes *api.NodeList) (added, deleted []*api.Node) {
  830. for i := range nodes.Items {
  831. if _, has := nc.knownNodeSet[nodes.Items[i].Name]; !has {
  832. added = append(added, &nodes.Items[i])
  833. }
  834. }
  835. // If there's a difference between lengths of known Nodes and observed nodes
  836. // we must have removed some Node.
  837. if len(nc.knownNodeSet)+len(added) != len(nodes.Items) {
  838. knowSetCopy := map[string]*api.Node{}
  839. for k, v := range nc.knownNodeSet {
  840. knowSetCopy[k] = v
  841. }
  842. for i := range nodes.Items {
  843. delete(knowSetCopy, nodes.Items[i].Name)
  844. }
  845. for i := range knowSetCopy {
  846. deleted = append(deleted, knowSetCopy[i])
  847. }
  848. }
  849. return
  850. }
  851. // cancelPodEviction removes any queued evictions, typically because the node is available again. It
  852. // returns true if an eviction was queued.
  853. func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
  854. zone := utilnode.GetZoneKey(node)
  855. nc.evictorLock.Lock()
  856. defer nc.evictorLock.Unlock()
  857. wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name)
  858. wasTerminating := nc.zoneTerminationEvictor[zone].Remove(node.Name)
  859. if wasDeleting || wasTerminating {
  860. glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name)
  861. nc.evictions10Minutes.removeEviction(zone, node.Name)
  862. nc.evictions1Hour.removeEviction(zone, node.Name)
  863. return true
  864. }
  865. return false
  866. }
  867. // evictPods queues an eviction for the provided node name, and returns false if the node is already
  868. // queued for eviction.
  869. func (nc *NodeController) evictPods(node *api.Node) bool {
  870. nc.evictorLock.Lock()
  871. defer nc.evictorLock.Unlock()
  872. return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
  873. }
  874. // Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc.
  875. func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32 {
  876. return nc.evictionLimiterQPS
  877. }
  878. // If the cluster is large make evictions slower, if they're small stop evictions altogether.
  879. func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 {
  880. if int32(nodeNum) > nc.largeClusterThreshold {
  881. return nc.secondaryEvictionLimiterQPS
  882. }
  883. return 0
  884. }
  885. // This function is expected to get a slice of NodeReadyConditions for all Nodes in a given zone.
  886. // The zone is considered:
  887. // - fullyDisrupted if there're no Ready Nodes,
  888. // - partiallyDisrupted if at least than nc.unhealthyZoneThreshold percent of Nodes are not Ready,
  889. // - normal otherwise
  890. func (nc *NodeController) ComputeZoneState(nodeReadyConditions []*api.NodeCondition) (int, zoneState) {
  891. readyNodes := 0
  892. notReadyNodes := 0
  893. for i := range nodeReadyConditions {
  894. if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == api.ConditionTrue {
  895. readyNodes++
  896. } else {
  897. notReadyNodes++
  898. }
  899. }
  900. switch {
  901. case readyNodes == 0 && notReadyNodes > 0:
  902. return notReadyNodes, stateFullDisruption
  903. case notReadyNodes > 2 && float32(notReadyNodes)/float32(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold:
  904. return notReadyNodes, statePartialDisruption
  905. default:
  906. return notReadyNodes, stateNormal
  907. }
  908. }