disruption.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package disruption
  14. import (
  15. "fmt"
  16. "time"
  17. "k8s.io/kubernetes/pkg/api"
  18. "k8s.io/kubernetes/pkg/api/unversioned"
  19. "k8s.io/kubernetes/pkg/apis/extensions"
  20. "k8s.io/kubernetes/pkg/apis/policy"
  21. "k8s.io/kubernetes/pkg/client/cache"
  22. "k8s.io/kubernetes/pkg/client/record"
  23. client "k8s.io/kubernetes/pkg/client/unversioned"
  24. "k8s.io/kubernetes/pkg/controller"
  25. "k8s.io/kubernetes/pkg/controller/framework"
  26. "k8s.io/kubernetes/pkg/runtime"
  27. "k8s.io/kubernetes/pkg/types"
  28. "k8s.io/kubernetes/pkg/util/intstr"
  29. "k8s.io/kubernetes/pkg/util/wait"
  30. "k8s.io/kubernetes/pkg/util/workqueue"
  31. "k8s.io/kubernetes/pkg/watch"
  32. "github.com/golang/glog"
  33. )
  34. const statusUpdateRetries = 2
  35. type updater func(*policy.PodDisruptionBudget) error
  36. type DisruptionController struct {
  37. kubeClient *client.Client
  38. pdbStore cache.Store
  39. pdbController *framework.Controller
  40. pdbLister cache.StoreToPodDisruptionBudgetLister
  41. podController framework.ControllerInterface
  42. podLister cache.StoreToPodLister
  43. rcIndexer cache.Indexer
  44. rcController *framework.Controller
  45. rcLister cache.StoreToReplicationControllerLister
  46. rsStore cache.Store
  47. rsController *framework.Controller
  48. rsLister cache.StoreToReplicaSetLister
  49. dIndexer cache.Indexer
  50. dController *framework.Controller
  51. dLister cache.StoreToDeploymentLister
  52. queue *workqueue.Type
  53. broadcaster record.EventBroadcaster
  54. recorder record.EventRecorder
  55. getUpdater func() updater
  56. }
  57. // controllerAndScale is used to return (controller, scale) pairs from the
  58. // controller finder functions.
  59. type controllerAndScale struct {
  60. types.UID
  61. scale int32
  62. }
  63. // podControllerFinder is a function type that maps a pod to a list of
  64. // controllers and their scale.
  65. type podControllerFinder func(*api.Pod) ([]controllerAndScale, error)
  66. func NewDisruptionController(podInformer framework.SharedIndexInformer, kubeClient *client.Client) *DisruptionController {
  67. dc := &DisruptionController{
  68. kubeClient: kubeClient,
  69. podController: podInformer.GetController(),
  70. queue: workqueue.NewNamed("disruption"),
  71. broadcaster: record.NewBroadcaster(),
  72. }
  73. dc.recorder = dc.broadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
  74. dc.getUpdater = func() updater { return dc.writePdbStatus }
  75. dc.podLister.Indexer = podInformer.GetIndexer()
  76. podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
  77. AddFunc: dc.addPod,
  78. UpdateFunc: dc.updatePod,
  79. DeleteFunc: dc.deletePod,
  80. })
  81. dc.pdbStore, dc.pdbController = framework.NewInformer(
  82. &cache.ListWatch{
  83. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  84. return dc.kubeClient.Policy().PodDisruptionBudgets(api.NamespaceAll).List(options)
  85. },
  86. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  87. return dc.kubeClient.Policy().PodDisruptionBudgets(api.NamespaceAll).Watch(options)
  88. },
  89. },
  90. &policy.PodDisruptionBudget{},
  91. 30*time.Second,
  92. framework.ResourceEventHandlerFuncs{
  93. AddFunc: dc.addDb,
  94. UpdateFunc: dc.updateDb,
  95. DeleteFunc: dc.removeDb,
  96. },
  97. )
  98. dc.pdbLister.Store = dc.pdbStore
  99. dc.rcIndexer, dc.rcController = framework.NewIndexerInformer(
  100. &cache.ListWatch{
  101. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  102. return dc.kubeClient.ReplicationControllers(api.NamespaceAll).List(options)
  103. },
  104. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  105. return dc.kubeClient.ReplicationControllers(api.NamespaceAll).Watch(options)
  106. },
  107. },
  108. &api.ReplicationController{},
  109. 30*time.Second,
  110. framework.ResourceEventHandlerFuncs{},
  111. cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
  112. )
  113. dc.rcLister.Indexer = dc.rcIndexer
  114. dc.rsStore, dc.rsController = framework.NewInformer(
  115. &cache.ListWatch{
  116. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  117. return dc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).List(options)
  118. },
  119. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  120. return dc.kubeClient.Extensions().ReplicaSets(api.NamespaceAll).Watch(options)
  121. },
  122. },
  123. &extensions.ReplicaSet{},
  124. 30*time.Second,
  125. framework.ResourceEventHandlerFuncs{},
  126. )
  127. dc.rsLister.Store = dc.rsStore
  128. dc.dIndexer, dc.dController = framework.NewIndexerInformer(
  129. &cache.ListWatch{
  130. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  131. return dc.kubeClient.Extensions().Deployments(api.NamespaceAll).List(options)
  132. },
  133. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  134. return dc.kubeClient.Extensions().Deployments(api.NamespaceAll).Watch(options)
  135. },
  136. },
  137. &extensions.Deployment{},
  138. 30*time.Second,
  139. framework.ResourceEventHandlerFuncs{},
  140. cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
  141. )
  142. dc.dLister.Indexer = dc.dIndexer
  143. return dc
  144. }
  145. // TODO(mml): When controllerRef is implemented (#2210), we *could* simply
  146. // return controllers without their scales, and access scale type-generically
  147. // via the scale subresource. That may not be as much of a win as it sounds,
  148. // however. We are accessing everything through the pkg/client/cache API that
  149. // we have to set up and tune to the types we know we'll be accessing anyway,
  150. // and we may well need further tweaks just to be able to access scale
  151. // subresources.
  152. func (dc *DisruptionController) finders() []podControllerFinder {
  153. return []podControllerFinder{dc.getPodReplicationControllers, dc.getPodDeployments, dc.getPodReplicaSets}
  154. }
  155. // getPodReplicaSets finds replicasets which have no matching deployments.
  156. func (dc *DisruptionController) getPodReplicaSets(pod *api.Pod) ([]controllerAndScale, error) {
  157. cas := []controllerAndScale{}
  158. rss, err := dc.rsLister.GetPodReplicaSets(pod)
  159. // GetPodReplicaSets returns an error only if no ReplicaSets are found. We
  160. // don't return that as an error to the caller.
  161. if err != nil {
  162. return cas, nil
  163. }
  164. controllerScale := map[types.UID]int32{}
  165. for _, rs := range rss {
  166. // GetDeploymentsForReplicaSet returns an error only if no matching
  167. // deployments are found.
  168. _, err := dc.dLister.GetDeploymentsForReplicaSet(&rs)
  169. if err == nil { // A deployment was found, so this finder will not count this RS.
  170. continue
  171. }
  172. controllerScale[rs.UID] = rs.Spec.Replicas
  173. }
  174. for uid, scale := range controllerScale {
  175. cas = append(cas, controllerAndScale{UID: uid, scale: scale})
  176. }
  177. return cas, nil
  178. }
  179. // getPodDeployments finds deployments for any replicasets which are being managed by deployments.
  180. func (dc *DisruptionController) getPodDeployments(pod *api.Pod) ([]controllerAndScale, error) {
  181. cas := []controllerAndScale{}
  182. rss, err := dc.rsLister.GetPodReplicaSets(pod)
  183. // GetPodReplicaSets returns an error only if no ReplicaSets are found. We
  184. // don't return that as an error to the caller.
  185. if err != nil {
  186. return cas, nil
  187. }
  188. controllerScale := map[types.UID]int32{}
  189. for _, rs := range rss {
  190. ds, err := dc.dLister.GetDeploymentsForReplicaSet(&rs)
  191. // GetDeploymentsForReplicaSet returns an error only if no matching
  192. // deployments are found. In that case we skip this ReplicaSet.
  193. if err != nil {
  194. continue
  195. }
  196. for _, d := range ds {
  197. controllerScale[d.UID] = d.Spec.Replicas
  198. }
  199. }
  200. for uid, scale := range controllerScale {
  201. cas = append(cas, controllerAndScale{UID: uid, scale: scale})
  202. }
  203. return cas, nil
  204. }
  205. func (dc *DisruptionController) getPodReplicationControllers(pod *api.Pod) ([]controllerAndScale, error) {
  206. cas := []controllerAndScale{}
  207. rcs, err := dc.rcLister.GetPodControllers(pod)
  208. if err == nil {
  209. for _, rc := range rcs {
  210. cas = append(cas, controllerAndScale{UID: rc.UID, scale: rc.Spec.Replicas})
  211. }
  212. }
  213. return cas, nil
  214. }
  215. func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
  216. glog.V(0).Infof("Starting disruption controller")
  217. if dc.kubeClient != nil {
  218. glog.V(0).Infof("Sending events to api server.")
  219. dc.broadcaster.StartRecordingToSink(dc.kubeClient.Events(""))
  220. } else {
  221. glog.V(0).Infof("No api server defined - no events will be sent to API server.")
  222. }
  223. go dc.pdbController.Run(stopCh)
  224. go dc.podController.Run(stopCh)
  225. go dc.rcController.Run(stopCh)
  226. go dc.rsController.Run(stopCh)
  227. go dc.dController.Run(stopCh)
  228. go wait.Until(dc.worker, time.Second, stopCh)
  229. <-stopCh
  230. glog.V(0).Infof("Shutting down disruption controller")
  231. }
  232. func (dc *DisruptionController) addDb(obj interface{}) {
  233. pdb := obj.(*policy.PodDisruptionBudget)
  234. glog.V(4).Infof("add DB %q", pdb.Name)
  235. dc.enqueuePdb(pdb)
  236. }
  237. func (dc *DisruptionController) updateDb(old, cur interface{}) {
  238. // TODO(mml) ignore updates where 'old' is equivalent to 'cur'.
  239. pdb := cur.(*policy.PodDisruptionBudget)
  240. glog.V(4).Infof("update DB %q", pdb.Name)
  241. dc.enqueuePdb(pdb)
  242. }
  243. func (dc *DisruptionController) removeDb(obj interface{}) {
  244. pdb := obj.(*policy.PodDisruptionBudget)
  245. glog.V(4).Infof("remove DB %q", pdb.Name)
  246. dc.enqueuePdb(pdb)
  247. }
  248. func (dc *DisruptionController) addPod(obj interface{}) {
  249. pod := obj.(*api.Pod)
  250. glog.V(4).Infof("addPod called on pod %q", pod.Name)
  251. pdb := dc.getPdbForPod(pod)
  252. if pdb == nil {
  253. glog.V(4).Infof("No matching pdb for pod %q", pod.Name)
  254. return
  255. }
  256. glog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name)
  257. dc.enqueuePdb(pdb)
  258. }
  259. func (dc *DisruptionController) updatePod(old, cur interface{}) {
  260. pod := cur.(*api.Pod)
  261. glog.V(4).Infof("updatePod called on pod %q", pod.Name)
  262. pdb := dc.getPdbForPod(pod)
  263. if pdb == nil {
  264. glog.V(4).Infof("No matching pdb for pod %q", pod.Name)
  265. return
  266. }
  267. glog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name)
  268. dc.enqueuePdb(pdb)
  269. }
  270. func (dc *DisruptionController) deletePod(obj interface{}) {
  271. pod, ok := obj.(*api.Pod)
  272. // When a delete is dropped, the relist will notice a pod in the store not
  273. // in the list, leading to the insertion of a tombstone object which contains
  274. // the deleted key/value. Note that this value might be stale. If the pod
  275. // changed labels the new ReplicaSet will not be woken up till the periodic
  276. // resync.
  277. if !ok {
  278. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  279. if !ok {
  280. glog.Errorf("Couldn't get object from tombstone %+v", obj)
  281. return
  282. }
  283. pod, ok = tombstone.Obj.(*api.Pod)
  284. if !ok {
  285. glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
  286. return
  287. }
  288. }
  289. glog.V(4).Infof("deletePod called on pod %q", pod.Name)
  290. pdb := dc.getPdbForPod(pod)
  291. if pdb == nil {
  292. glog.V(4).Infof("No matching pdb for pod %q", pod.Name)
  293. return
  294. }
  295. glog.V(4).Infof("deletePod %q -> PDB %q", pod.Name, pdb.Name)
  296. dc.enqueuePdb(pdb)
  297. }
  298. func (dc *DisruptionController) enqueuePdb(pdb *policy.PodDisruptionBudget) {
  299. key, err := controller.KeyFunc(pdb)
  300. if err != nil {
  301. glog.Errorf("Cound't get key for PodDisruptionBudget object %+v: %v", pdb, err)
  302. return
  303. }
  304. dc.queue.Add(key)
  305. }
  306. func (dc *DisruptionController) getPdbForPod(pod *api.Pod) *policy.PodDisruptionBudget {
  307. // GetPodPodDisruptionBudgets returns an error only if no
  308. // PodDisruptionBudgets are found. We don't return that as an error to the
  309. // caller.
  310. pdbs, err := dc.pdbLister.GetPodPodDisruptionBudgets(pod)
  311. if err != nil {
  312. glog.V(4).Infof("No PodDisruptionBudgets found for pod %v, PodDisruptionBudget controller will avoid syncing.", pod.Name)
  313. return nil
  314. }
  315. if len(pdbs) > 1 {
  316. msg := fmt.Sprintf("Pod %q/%q matches multiple PodDisruptionBudgets. Chose %q arbitrarily.", pod.Namespace, pod.Name, pdbs[0].Name)
  317. glog.Warning(msg)
  318. dc.recorder.Event(pod, api.EventTypeWarning, "MultiplePodDisruptionBudgets", msg)
  319. }
  320. return &pdbs[0]
  321. }
  322. func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ([]*api.Pod, error) {
  323. sel, err := unversioned.LabelSelectorAsSelector(pdb.Spec.Selector)
  324. if sel.Empty() {
  325. return []*api.Pod{}, nil
  326. }
  327. if err != nil {
  328. return []*api.Pod{}, err
  329. }
  330. pods, err := dc.podLister.Pods(pdb.Namespace).List(sel)
  331. if err != nil {
  332. return []*api.Pod{}, err
  333. }
  334. // TODO: Do we need to copy here?
  335. result := make([]*api.Pod, 0, len(pods))
  336. for i := range pods {
  337. result = append(result, &(*pods[i]))
  338. }
  339. return result, nil
  340. }
  341. func (dc *DisruptionController) worker() {
  342. work := func() bool {
  343. key, quit := dc.queue.Get()
  344. if quit {
  345. return quit
  346. }
  347. defer dc.queue.Done(key)
  348. glog.V(4).Infof("Syncing PodDisruptionBudget %q", key.(string))
  349. if err := dc.sync(key.(string)); err != nil {
  350. glog.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", key.(string), err)
  351. // TODO(mml): In order to be safe in the face of a total inability to write state
  352. // changes, we should write an expiration timestamp here and consumers
  353. // of the PDB state (the /evict subresource handler) should check that
  354. // any 'true' state is relatively fresh.
  355. // TODO(mml): file an issue to that effect
  356. // TODO(mml): If we used a workqueue.RateLimitingInterface, we could
  357. // improve our behavior (be a better citizen) when we need to retry.
  358. dc.queue.Add(key)
  359. }
  360. return false
  361. }
  362. for {
  363. if quit := work(); quit {
  364. return
  365. }
  366. }
  367. }
  368. func (dc *DisruptionController) sync(key string) error {
  369. startTime := time.Now()
  370. defer func() {
  371. glog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Now().Sub(startTime))
  372. }()
  373. obj, exists, err := dc.pdbLister.Store.GetByKey(key)
  374. if !exists {
  375. return err
  376. }
  377. if err != nil {
  378. glog.Errorf("unable to retrieve PodDisruptionBudget %v from store: %v", key, err)
  379. return err
  380. }
  381. pdb := obj.(*policy.PodDisruptionBudget)
  382. if err := dc.trySync(pdb); err != nil {
  383. return dc.failSafe(pdb)
  384. }
  385. return nil
  386. }
  387. func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error {
  388. pods, err := dc.getPodsForPdb(pdb)
  389. if err != nil {
  390. return err
  391. }
  392. expectedCount, desiredHealthy, err := dc.getExpectedPodCount(pdb, pods)
  393. if err != nil {
  394. return err
  395. }
  396. currentHealthy := countHealthyPods(pods)
  397. err = dc.updatePdbSpec(pdb, currentHealthy, desiredHealthy, expectedCount)
  398. return err
  399. }
  400. func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBudget, pods []*api.Pod) (expectedCount, desiredHealthy int32, err error) {
  401. err = nil
  402. // TODO(davidopp): consider making the way expectedCount and rules about
  403. // permitted controller configurations (specifically, considering it an error
  404. // if a pod covered by a PDB has 0 controllers or > 1 controller) should be
  405. // handled the same way for integer and percentage minAvailable
  406. if pdb.Spec.MinAvailable.Type == intstr.Int {
  407. desiredHealthy = pdb.Spec.MinAvailable.IntVal
  408. expectedCount = int32(len(pods))
  409. } else if pdb.Spec.MinAvailable.Type == intstr.String {
  410. // When the user specifies a fraction of pods that must be available, we
  411. // use as the fraction's denominator
  412. // SUM_{all c in C} scale(c)
  413. // where C is the union of C_p1, C_p2, ..., C_pN
  414. // and each C_pi is the set of controllers controlling the pod pi
  415. // k8s only defines what will happens when 0 or 1 controllers control a
  416. // given pod. We explicitly exclude the 0 controllers case here, and we
  417. // report an error if we find a pod with more than 1 controller. Thus in
  418. // practice each C_pi is a set of exactly 1 controller.
  419. // A mapping from controllers to their scale.
  420. controllerScale := map[types.UID]int32{}
  421. // 1. Find the controller(s) for each pod. If any pod has 0 controllers,
  422. // that's an error. If any pod has more than 1 controller, that's also an
  423. // error.
  424. for _, pod := range pods {
  425. controllerCount := 0
  426. for _, finder := range dc.finders() {
  427. var controllers []controllerAndScale
  428. controllers, err = finder(pod)
  429. if err != nil {
  430. return
  431. }
  432. for _, controller := range controllers {
  433. controllerScale[controller.UID] = controller.scale
  434. controllerCount++
  435. }
  436. }
  437. if controllerCount == 0 {
  438. err = fmt.Errorf("asked for percentage, but found no controllers for pod %q", pod.Name)
  439. dc.recorder.Event(pdb, api.EventTypeWarning, "NoControllers", err.Error())
  440. return
  441. } else if controllerCount > 1 {
  442. err = fmt.Errorf("pod %q has %v>1 controllers", pod.Name, controllerCount)
  443. dc.recorder.Event(pdb, api.EventTypeWarning, "TooManyControllers", err.Error())
  444. return
  445. }
  446. }
  447. // 2. Add up all the controllers.
  448. expectedCount = 0
  449. for _, count := range controllerScale {
  450. expectedCount += count
  451. }
  452. // 3. Do the math.
  453. var dh int
  454. dh, err = intstr.GetValueFromIntOrPercent(&pdb.Spec.MinAvailable, int(expectedCount), true)
  455. if err != nil {
  456. return
  457. }
  458. desiredHealthy = int32(dh)
  459. }
  460. return
  461. }
  462. func countHealthyPods(pods []*api.Pod) (currentHealthy int32) {
  463. Pod:
  464. for _, pod := range pods {
  465. for _, c := range pod.Status.Conditions {
  466. if c.Type == api.PodReady && c.Status == api.ConditionTrue {
  467. currentHealthy++
  468. continue Pod
  469. }
  470. }
  471. }
  472. return
  473. }
  474. // failSafe is an attempt to at least update the PodDisruptionAllowed field to
  475. // false if everything something else has failed. This is one place we
  476. // implement the "fail open" part of the design since if we manage to update
  477. // this field correctly, we will prevent the /evict handler from approving an
  478. // eviction when it may be unsafe to do so.
  479. func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget) error {
  480. obj, err := api.Scheme.DeepCopy(*pdb)
  481. if err != nil {
  482. return err
  483. }
  484. newPdb := obj.(policy.PodDisruptionBudget)
  485. newPdb.Status.PodDisruptionAllowed = false
  486. return dc.getUpdater()(&newPdb)
  487. }
  488. func (dc *DisruptionController) updatePdbSpec(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32) error {
  489. // We require expectedCount to be > 0 so that PDBs which currently match no
  490. // pods are in a safe state when their first pods appear but this controller
  491. // has not updated their status yet. This isn't the only race, but it's a
  492. // common one that's easy to detect.
  493. disruptionAllowed := currentHealthy >= desiredHealthy && expectedCount > 0
  494. if pdb.Status.CurrentHealthy == currentHealthy && pdb.Status.DesiredHealthy == desiredHealthy && pdb.Status.ExpectedPods == expectedCount && pdb.Status.PodDisruptionAllowed == disruptionAllowed {
  495. return nil
  496. }
  497. obj, err := api.Scheme.DeepCopy(*pdb)
  498. if err != nil {
  499. return err
  500. }
  501. newPdb := obj.(policy.PodDisruptionBudget)
  502. newPdb.Status = policy.PodDisruptionBudgetStatus{
  503. CurrentHealthy: currentHealthy,
  504. DesiredHealthy: desiredHealthy,
  505. ExpectedPods: expectedCount,
  506. PodDisruptionAllowed: disruptionAllowed,
  507. }
  508. return dc.getUpdater()(&newPdb)
  509. }
  510. // refresh tries to re-GET the given PDB. If there are any errors, it just
  511. // returns the old PDB. Intended to be used in a retry loop where it runs a
  512. // bounded number of times.
  513. func refresh(pdbClient client.PodDisruptionBudgetInterface, pdb *policy.PodDisruptionBudget) *policy.PodDisruptionBudget {
  514. newPdb, err := pdbClient.Get(pdb.Name)
  515. if err == nil {
  516. return newPdb
  517. } else {
  518. return pdb
  519. }
  520. }
  521. func (dc *DisruptionController) writePdbStatus(pdb *policy.PodDisruptionBudget) error {
  522. pdbClient := dc.kubeClient.Policy().PodDisruptionBudgets(pdb.Namespace)
  523. st := pdb.Status
  524. var err error
  525. for i, pdb := 0, pdb; i < statusUpdateRetries; i, pdb = i+1, refresh(pdbClient, pdb) {
  526. pdb.Status = st
  527. if _, err = pdbClient.UpdateStatus(pdb); err == nil {
  528. break
  529. }
  530. }
  531. return err
  532. }