rolling_updater.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubectl
  14. import (
  15. goerrors "errors"
  16. "fmt"
  17. "io"
  18. "strconv"
  19. "strings"
  20. "time"
  21. "k8s.io/kubernetes/pkg/api"
  22. "k8s.io/kubernetes/pkg/api/errors"
  23. "k8s.io/kubernetes/pkg/api/unversioned"
  24. client "k8s.io/kubernetes/pkg/client/unversioned"
  25. deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
  26. "k8s.io/kubernetes/pkg/labels"
  27. "k8s.io/kubernetes/pkg/runtime"
  28. "k8s.io/kubernetes/pkg/util/integer"
  29. "k8s.io/kubernetes/pkg/util/intstr"
  30. "k8s.io/kubernetes/pkg/util/wait"
  31. )
  32. const (
  33. sourceIdAnnotation = kubectlAnnotationPrefix + "update-source-id"
  34. desiredReplicasAnnotation = kubectlAnnotationPrefix + "desired-replicas"
  35. originalReplicasAnnotation = kubectlAnnotationPrefix + "original-replicas"
  36. nextControllerAnnotation = kubectlAnnotationPrefix + "next-controller-id"
  37. )
  38. // RollingUpdaterConfig is the configuration for a rolling deployment process.
  39. type RollingUpdaterConfig struct {
  40. // Out is a writer for progress output.
  41. Out io.Writer
  42. // OldRC is an existing controller to be replaced.
  43. OldRc *api.ReplicationController
  44. // NewRc is a controller that will take ownership of updated pods (will be
  45. // created if needed).
  46. NewRc *api.ReplicationController
  47. // UpdatePeriod is the time to wait between individual pod updates.
  48. UpdatePeriod time.Duration
  49. // Interval is the time to wait between polling controller status after
  50. // update.
  51. Interval time.Duration
  52. // Timeout is the time to wait for controller updates before giving up.
  53. Timeout time.Duration
  54. // MinReadySeconds is the number of seconds to wait after the pods are ready
  55. MinReadySeconds int32
  56. // CleanupPolicy defines the cleanup action to take after the deployment is
  57. // complete.
  58. CleanupPolicy RollingUpdaterCleanupPolicy
  59. // MaxUnavailable is the maximum number of pods that can be unavailable during the update.
  60. // Value can be an absolute number (ex: 5) or a percentage of desired pods (ex: 10%).
  61. // Absolute number is calculated from percentage by rounding up.
  62. // This can not be 0 if MaxSurge is 0.
  63. // By default, a fixed value of 1 is used.
  64. // Example: when this is set to 30%, the old RC can be scaled down to 70% of desired pods
  65. // immediately when the rolling update starts. Once new pods are ready, old RC
  66. // can be scaled down further, followed by scaling up the new RC, ensuring
  67. // that the total number of pods available at all times during the update is at
  68. // least 70% of desired pods.
  69. MaxUnavailable intstr.IntOrString
  70. // MaxSurge is the maximum number of pods that can be scheduled above the desired number of pods.
  71. // Value can be an absolute number (ex: 5) or a percentage of desired pods (ex: 10%).
  72. // This can not be 0 if MaxUnavailable is 0.
  73. // Absolute number is calculated from percentage by rounding up.
  74. // By default, a value of 1 is used.
  75. // Example: when this is set to 30%, the new RC can be scaled up immediately
  76. // when the rolling update starts, such that the total number of old and new pods do not exceed
  77. // 130% of desired pods. Once old pods have been killed, new RC can be scaled up
  78. // further, ensuring that total number of pods running at any time during
  79. // the update is atmost 130% of desired pods.
  80. MaxSurge intstr.IntOrString
  81. // OnProgress is invoked if set during each scale cycle, to allow the caller to perform additional logic or
  82. // abort the scale. If an error is returned the cleanup method will not be invoked. The percentage value
  83. // is a synthetic "progress" calculation that represents the approximate percentage completion.
  84. OnProgress func(oldRc, newRc *api.ReplicationController, percentage int) error
  85. }
  86. // RollingUpdaterCleanupPolicy is a cleanup action to take after the
  87. // deployment is complete.
  88. type RollingUpdaterCleanupPolicy string
  89. const (
  90. // DeleteRollingUpdateCleanupPolicy means delete the old controller.
  91. DeleteRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Delete"
  92. // PreserveRollingUpdateCleanupPolicy means keep the old controller.
  93. PreserveRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Preserve"
  94. // RenameRollingUpdateCleanupPolicy means delete the old controller, and rename
  95. // the new controller to the name of the old controller.
  96. RenameRollingUpdateCleanupPolicy RollingUpdaterCleanupPolicy = "Rename"
  97. )
  98. // RollingUpdater provides methods for updating replicated pods in a predictable,
  99. // fault-tolerant way.
  100. type RollingUpdater struct {
  101. // Client interface for creating and updating controllers
  102. c client.Interface
  103. // Namespace for resources
  104. ns string
  105. // scaleAndWait scales a controller and returns its updated state.
  106. scaleAndWait func(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error)
  107. //getOrCreateTargetController gets and validates an existing controller or
  108. //makes a new one.
  109. getOrCreateTargetController func(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error)
  110. // cleanup performs post deployment cleanup tasks for newRc and oldRc.
  111. cleanup func(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error
  112. // getReadyPods returns the amount of old and new ready pods.
  113. getReadyPods func(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error)
  114. // nowFn returns the current time used to calculate the minReadySeconds
  115. nowFn func() unversioned.Time
  116. }
  117. // NewRollingUpdater creates a RollingUpdater from a client.
  118. func NewRollingUpdater(namespace string, client client.Interface) *RollingUpdater {
  119. updater := &RollingUpdater{
  120. c: client,
  121. ns: namespace,
  122. }
  123. // Inject real implementations.
  124. updater.scaleAndWait = updater.scaleAndWaitWithScaler
  125. updater.getOrCreateTargetController = updater.getOrCreateTargetControllerWithClient
  126. updater.getReadyPods = updater.readyPods
  127. updater.cleanup = updater.cleanupWithClients
  128. updater.nowFn = func() unversioned.Time { return unversioned.Now() }
  129. return updater
  130. }
  131. // Update all pods for a ReplicationController (oldRc) by creating a new
  132. // controller (newRc) with 0 replicas, and synchronously scaling oldRc and
  133. // newRc until oldRc has 0 replicas and newRc has the original # of desired
  134. // replicas. Cleanup occurs based on a RollingUpdaterCleanupPolicy.
  135. //
  136. // Each interval, the updater will attempt to make progress however it can
  137. // without violating any availability constraints defined by the config. This
  138. // means the amount scaled up or down each interval will vary based on the
  139. // timeliness of readiness and the updater will always try to make progress,
  140. // even slowly.
  141. //
  142. // If an update from newRc to oldRc is already in progress, we attempt to
  143. // drive it to completion. If an error occurs at any step of the update, the
  144. // error will be returned.
  145. //
  146. // A scaling event (either up or down) is considered progress; if no progress
  147. // is made within the config.Timeout, an error is returned.
  148. //
  149. // TODO: make this handle performing a rollback of a partially completed
  150. // rollout.
  151. func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
  152. out := config.Out
  153. oldRc := config.OldRc
  154. scaleRetryParams := NewRetryParams(config.Interval, config.Timeout)
  155. // Find an existing controller (for continuing an interrupted update) or
  156. // create a new one if necessary.
  157. sourceId := fmt.Sprintf("%s:%s", oldRc.Name, oldRc.UID)
  158. newRc, existed, err := r.getOrCreateTargetController(config.NewRc, sourceId)
  159. if err != nil {
  160. return err
  161. }
  162. if existed {
  163. fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newRc.Name)
  164. } else {
  165. fmt.Fprintf(out, "Created %s\n", newRc.Name)
  166. }
  167. // Extract the desired replica count from the controller.
  168. desiredAnnotation, err := strconv.Atoi(newRc.Annotations[desiredReplicasAnnotation])
  169. if err != nil {
  170. return fmt.Errorf("Unable to parse annotation for %s: %s=%s",
  171. newRc.Name, desiredReplicasAnnotation, newRc.Annotations[desiredReplicasAnnotation])
  172. }
  173. desired := int32(desiredAnnotation)
  174. // Extract the original replica count from the old controller, adding the
  175. // annotation if it doesn't yet exist.
  176. _, hasOriginalAnnotation := oldRc.Annotations[originalReplicasAnnotation]
  177. if !hasOriginalAnnotation {
  178. existing, err := r.c.ReplicationControllers(oldRc.Namespace).Get(oldRc.Name)
  179. if err != nil {
  180. return err
  181. }
  182. originReplicas := strconv.Itoa(int(existing.Spec.Replicas))
  183. applyUpdate := func(rc *api.ReplicationController) {
  184. if rc.Annotations == nil {
  185. rc.Annotations = map[string]string{}
  186. }
  187. rc.Annotations[originalReplicasAnnotation] = originReplicas
  188. }
  189. if oldRc, err = updateRcWithRetries(r.c, existing.Namespace, existing, applyUpdate); err != nil {
  190. return err
  191. }
  192. }
  193. // maxSurge is the maximum scaling increment and maxUnavailable are the maximum pods
  194. // that can be unavailable during a rollout.
  195. maxSurge, maxUnavailable, err := deploymentutil.ResolveFenceposts(&config.MaxSurge, &config.MaxUnavailable, desired)
  196. if err != nil {
  197. return err
  198. }
  199. // Validate maximums.
  200. if desired > 0 && maxUnavailable == 0 && maxSurge == 0 {
  201. return fmt.Errorf("one of maxSurge or maxUnavailable must be specified")
  202. }
  203. // The minimum pods which must remain available throughout the update
  204. // calculated for internal convenience.
  205. minAvailable := int32(integer.IntMax(0, int(desired-maxUnavailable)))
  206. // If the desired new scale is 0, then the max unavailable is necessarily
  207. // the effective scale of the old RC regardless of the configuration
  208. // (equivalent to 100% maxUnavailable).
  209. if desired == 0 {
  210. maxUnavailable = oldRc.Spec.Replicas
  211. minAvailable = 0
  212. }
  213. fmt.Fprintf(out, "Scaling up %s from %d to %d, scaling down %s from %d to 0 (keep %d pods available, don't exceed %d pods)\n",
  214. newRc.Name, newRc.Spec.Replicas, desired, oldRc.Name, oldRc.Spec.Replicas, minAvailable, desired+maxSurge)
  215. // give a caller incremental notification and allow them to exit early
  216. goal := desired - newRc.Spec.Replicas
  217. if goal < 0 {
  218. goal = -goal
  219. }
  220. progress := func(complete bool) error {
  221. if config.OnProgress == nil {
  222. return nil
  223. }
  224. progress := desired - newRc.Spec.Replicas
  225. if progress < 0 {
  226. progress = -progress
  227. }
  228. percentage := 100
  229. if !complete && goal > 0 {
  230. percentage = int((goal - progress) * 100 / goal)
  231. }
  232. return config.OnProgress(oldRc, newRc, percentage)
  233. }
  234. // Scale newRc and oldRc until newRc has the desired number of replicas and
  235. // oldRc has 0 replicas.
  236. progressDeadline := time.Now().UnixNano() + config.Timeout.Nanoseconds()
  237. for newRc.Spec.Replicas != desired || oldRc.Spec.Replicas != 0 {
  238. // Store the existing replica counts for progress timeout tracking.
  239. newReplicas := newRc.Spec.Replicas
  240. oldReplicas := oldRc.Spec.Replicas
  241. // Scale up as much as possible.
  242. scaledRc, err := r.scaleUp(newRc, oldRc, desired, maxSurge, maxUnavailable, scaleRetryParams, config)
  243. if err != nil {
  244. return err
  245. }
  246. newRc = scaledRc
  247. // notify the caller if necessary
  248. if err := progress(false); err != nil {
  249. return err
  250. }
  251. // Wait between scaling operations for things to settle.
  252. time.Sleep(config.UpdatePeriod)
  253. // Scale down as much as possible.
  254. scaledRc, err = r.scaleDown(newRc, oldRc, desired, minAvailable, maxUnavailable, maxSurge, config)
  255. if err != nil {
  256. return err
  257. }
  258. oldRc = scaledRc
  259. // notify the caller if necessary
  260. if err := progress(false); err != nil {
  261. return err
  262. }
  263. // If we are making progress, continue to advance the progress deadline.
  264. // Otherwise, time out with an error.
  265. progressMade := (newRc.Spec.Replicas != newReplicas) || (oldRc.Spec.Replicas != oldReplicas)
  266. if progressMade {
  267. progressDeadline = time.Now().UnixNano() + config.Timeout.Nanoseconds()
  268. } else if time.Now().UnixNano() > progressDeadline {
  269. return fmt.Errorf("timed out waiting for any update progress to be made")
  270. }
  271. }
  272. // notify the caller if necessary
  273. if err := progress(true); err != nil {
  274. return err
  275. }
  276. // Housekeeping and cleanup policy execution.
  277. return r.cleanup(oldRc, newRc, config)
  278. }
  279. // scaleUp scales up newRc to desired by whatever increment is possible given
  280. // the configured surge threshold. scaleUp will safely no-op as necessary when
  281. // it detects redundancy or other relevant conditions.
  282. func (r *RollingUpdater) scaleUp(newRc, oldRc *api.ReplicationController, desired, maxSurge, maxUnavailable int32, scaleRetryParams *RetryParams, config *RollingUpdaterConfig) (*api.ReplicationController, error) {
  283. // If we're already at the desired, do nothing.
  284. if newRc.Spec.Replicas == desired {
  285. return newRc, nil
  286. }
  287. // Scale up as far as we can based on the surge limit.
  288. increment := (desired + maxSurge) - (oldRc.Spec.Replicas + newRc.Spec.Replicas)
  289. // If the old is already scaled down, go ahead and scale all the way up.
  290. if oldRc.Spec.Replicas == 0 {
  291. increment = desired - newRc.Spec.Replicas
  292. }
  293. // We can't scale up without violating the surge limit, so do nothing.
  294. if increment <= 0 {
  295. return newRc, nil
  296. }
  297. // Increase the replica count, and deal with fenceposts.
  298. newRc.Spec.Replicas += increment
  299. if newRc.Spec.Replicas > desired {
  300. newRc.Spec.Replicas = desired
  301. }
  302. // Perform the scale-up.
  303. fmt.Fprintf(config.Out, "Scaling %s up to %d\n", newRc.Name, newRc.Spec.Replicas)
  304. scaledRc, err := r.scaleAndWait(newRc, scaleRetryParams, scaleRetryParams)
  305. if err != nil {
  306. return nil, err
  307. }
  308. return scaledRc, nil
  309. }
  310. // scaleDown scales down oldRc to 0 at whatever decrement possible given the
  311. // thresholds defined on the config. scaleDown will safely no-op as necessary
  312. // when it detects redundancy or other relevant conditions.
  313. func (r *RollingUpdater) scaleDown(newRc, oldRc *api.ReplicationController, desired, minAvailable, maxUnavailable, maxSurge int32, config *RollingUpdaterConfig) (*api.ReplicationController, error) {
  314. // Already scaled down; do nothing.
  315. if oldRc.Spec.Replicas == 0 {
  316. return oldRc, nil
  317. }
  318. // Get ready pods. We shouldn't block, otherwise in case both old and new
  319. // pods are unavailable then the rolling update process blocks.
  320. // Timeout-wise we are already covered by the progress check.
  321. _, newAvailable, err := r.getReadyPods(oldRc, newRc, config.MinReadySeconds)
  322. if err != nil {
  323. return nil, err
  324. }
  325. // The old controller is considered as part of the total because we want to
  326. // maintain minimum availability even with a volatile old controller.
  327. // Scale down as much as possible while maintaining minimum availability
  328. allPods := oldRc.Spec.Replicas + newRc.Spec.Replicas
  329. newUnavailable := newRc.Spec.Replicas - newAvailable
  330. decrement := allPods - minAvailable - newUnavailable
  331. // The decrement normally shouldn't drop below 0 because the available count
  332. // always starts below the old replica count, but the old replica count can
  333. // decrement due to externalities like pods death in the replica set. This
  334. // will be considered a transient condition; do nothing and try again later
  335. // with new readiness values.
  336. //
  337. // If the most we can scale is 0, it means we can't scale down without
  338. // violating the minimum. Do nothing and try again later when conditions may
  339. // have changed.
  340. if decrement <= 0 {
  341. return oldRc, nil
  342. }
  343. // Reduce the replica count, and deal with fenceposts.
  344. oldRc.Spec.Replicas -= decrement
  345. if oldRc.Spec.Replicas < 0 {
  346. oldRc.Spec.Replicas = 0
  347. }
  348. // If the new is already fully scaled and available up to the desired size, go
  349. // ahead and scale old all the way down.
  350. if newRc.Spec.Replicas == desired && newAvailable == desired {
  351. oldRc.Spec.Replicas = 0
  352. }
  353. // Perform the scale-down.
  354. fmt.Fprintf(config.Out, "Scaling %s down to %d\n", oldRc.Name, oldRc.Spec.Replicas)
  355. retryWait := &RetryParams{config.Interval, config.Timeout}
  356. scaledRc, err := r.scaleAndWait(oldRc, retryWait, retryWait)
  357. if err != nil {
  358. return nil, err
  359. }
  360. return scaledRc, nil
  361. }
  362. // scalerScaleAndWait scales a controller using a Scaler and a real client.
  363. func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, retry *RetryParams, wait *RetryParams) (*api.ReplicationController, error) {
  364. scaler, err := ScalerFor(api.Kind("ReplicationController"), r.c)
  365. if err != nil {
  366. return nil, fmt.Errorf("Couldn't make scaler: %s", err)
  367. }
  368. if err := scaler.Scale(rc.Namespace, rc.Name, uint(rc.Spec.Replicas), &ScalePrecondition{-1, ""}, retry, wait); err != nil {
  369. return nil, err
  370. }
  371. return r.c.ReplicationControllers(rc.Namespace).Get(rc.Name)
  372. }
  373. // readyPods returns the old and new ready counts for their pods.
  374. // If a pod is observed as being ready, it's considered ready even
  375. // if it later becomes notReady.
  376. func (r *RollingUpdater) readyPods(oldRc, newRc *api.ReplicationController, minReadySeconds int32) (int32, int32, error) {
  377. controllers := []*api.ReplicationController{oldRc, newRc}
  378. oldReady := int32(0)
  379. newReady := int32(0)
  380. if r.nowFn == nil {
  381. r.nowFn = func() unversioned.Time { return unversioned.Now() }
  382. }
  383. for i := range controllers {
  384. controller := controllers[i]
  385. selector := labels.Set(controller.Spec.Selector).AsSelector()
  386. options := api.ListOptions{LabelSelector: selector}
  387. pods, err := r.c.Pods(controller.Namespace).List(options)
  388. if err != nil {
  389. return 0, 0, err
  390. }
  391. for _, pod := range pods.Items {
  392. if !deploymentutil.IsPodAvailable(&pod, minReadySeconds, r.nowFn().Time) {
  393. continue
  394. }
  395. switch controller.Name {
  396. case oldRc.Name:
  397. oldReady++
  398. case newRc.Name:
  399. newReady++
  400. }
  401. }
  402. }
  403. return oldReady, newReady, nil
  404. }
  405. // getOrCreateTargetControllerWithClient looks for an existing controller with
  406. // sourceId. If found, the existing controller is returned with true
  407. // indicating that the controller already exists. If the controller isn't
  408. // found, a new one is created and returned along with false indicating the
  409. // controller was created.
  410. //
  411. // Existing controllers are validated to ensure their sourceIdAnnotation
  412. // matches sourceId; if there's a mismatch, an error is returned.
  413. func (r *RollingUpdater) getOrCreateTargetControllerWithClient(controller *api.ReplicationController, sourceId string) (*api.ReplicationController, bool, error) {
  414. existingRc, err := r.existingController(controller)
  415. if err != nil {
  416. if !errors.IsNotFound(err) {
  417. // There was an error trying to find the controller; don't assume we
  418. // should create it.
  419. return nil, false, err
  420. }
  421. if controller.Spec.Replicas <= 0 {
  422. return nil, false, fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %d\n", controller.Name, controller.Spec.Replicas)
  423. }
  424. // The controller wasn't found, so create it.
  425. if controller.Annotations == nil {
  426. controller.Annotations = map[string]string{}
  427. }
  428. controller.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", controller.Spec.Replicas)
  429. controller.Annotations[sourceIdAnnotation] = sourceId
  430. controller.Spec.Replicas = 0
  431. newRc, err := r.c.ReplicationControllers(r.ns).Create(controller)
  432. return newRc, false, err
  433. }
  434. // Validate and use the existing controller.
  435. annotations := existingRc.Annotations
  436. source := annotations[sourceIdAnnotation]
  437. _, ok := annotations[desiredReplicasAnnotation]
  438. if source != sourceId || !ok {
  439. return nil, false, fmt.Errorf("Missing/unexpected annotations for controller %s, expected %s : %s", controller.Name, sourceId, annotations)
  440. }
  441. return existingRc, true, nil
  442. }
  443. // existingController verifies if the controller already exists
  444. func (r *RollingUpdater) existingController(controller *api.ReplicationController) (*api.ReplicationController, error) {
  445. // without rc name but generate name, there's no existing rc
  446. if len(controller.Name) == 0 && len(controller.GenerateName) > 0 {
  447. return nil, errors.NewNotFound(api.Resource("replicationcontrollers"), controller.Name)
  448. }
  449. // controller name is required to get rc back
  450. return r.c.ReplicationControllers(controller.Namespace).Get(controller.Name)
  451. }
  452. // cleanupWithClients performs cleanup tasks after the rolling update. Update
  453. // process related annotations are removed from oldRc and newRc. The
  454. // CleanupPolicy on config is executed.
  455. func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationController, config *RollingUpdaterConfig) error {
  456. // Clean up annotations
  457. var err error
  458. newRc, err = r.c.ReplicationControllers(r.ns).Get(newRc.Name)
  459. if err != nil {
  460. return err
  461. }
  462. applyUpdate := func(rc *api.ReplicationController) {
  463. delete(rc.Annotations, sourceIdAnnotation)
  464. delete(rc.Annotations, desiredReplicasAnnotation)
  465. }
  466. if newRc, err = updateRcWithRetries(r.c, r.ns, newRc, applyUpdate); err != nil {
  467. return err
  468. }
  469. if err = wait.Poll(config.Interval, config.Timeout, client.ControllerHasDesiredReplicas(r.c, newRc)); err != nil {
  470. return err
  471. }
  472. newRc, err = r.c.ReplicationControllers(r.ns).Get(newRc.Name)
  473. if err != nil {
  474. return err
  475. }
  476. switch config.CleanupPolicy {
  477. case DeleteRollingUpdateCleanupPolicy:
  478. // delete old rc
  479. fmt.Fprintf(config.Out, "Update succeeded. Deleting %s\n", oldRc.Name)
  480. return r.c.ReplicationControllers(r.ns).Delete(oldRc.Name, nil)
  481. case RenameRollingUpdateCleanupPolicy:
  482. // delete old rc
  483. fmt.Fprintf(config.Out, "Update succeeded. Deleting old controller: %s\n", oldRc.Name)
  484. if err := r.c.ReplicationControllers(r.ns).Delete(oldRc.Name, nil); err != nil {
  485. return err
  486. }
  487. fmt.Fprintf(config.Out, "Renaming %s to %s\n", oldRc.Name, newRc.Name)
  488. return Rename(r.c, newRc, oldRc.Name)
  489. case PreserveRollingUpdateCleanupPolicy:
  490. return nil
  491. default:
  492. return nil
  493. }
  494. }
  495. func Rename(c client.ReplicationControllersNamespacer, rc *api.ReplicationController, newName string) error {
  496. oldName := rc.Name
  497. rc.Name = newName
  498. rc.ResourceVersion = ""
  499. // First delete the oldName RC and orphan its pods.
  500. trueVar := true
  501. err := c.ReplicationControllers(rc.Namespace).Delete(oldName, &api.DeleteOptions{OrphanDependents: &trueVar})
  502. if err != nil && !errors.IsNotFound(err) {
  503. return err
  504. }
  505. err = wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
  506. _, err := c.ReplicationControllers(rc.Namespace).Get(oldName)
  507. if err == nil {
  508. return false, nil
  509. } else if errors.IsNotFound(err) {
  510. return true, nil
  511. } else {
  512. return false, err
  513. }
  514. })
  515. if err != nil {
  516. return err
  517. }
  518. // Then create the same RC with the new name.
  519. _, err = c.ReplicationControllers(rc.Namespace).Create(rc)
  520. if err != nil {
  521. return err
  522. }
  523. return nil
  524. }
  525. func LoadExistingNextReplicationController(c client.ReplicationControllersNamespacer, namespace, newName string) (*api.ReplicationController, error) {
  526. if len(newName) == 0 {
  527. return nil, nil
  528. }
  529. newRc, err := c.ReplicationControllers(namespace).Get(newName)
  530. if err != nil && errors.IsNotFound(err) {
  531. return nil, nil
  532. }
  533. return newRc, err
  534. }
  535. type NewControllerConfig struct {
  536. Namespace string
  537. OldName, NewName string
  538. Image string
  539. Container string
  540. DeploymentKey string
  541. PullPolicy api.PullPolicy
  542. }
  543. func CreateNewControllerFromCurrentController(c client.Interface, codec runtime.Codec, cfg *NewControllerConfig) (*api.ReplicationController, error) {
  544. containerIndex := 0
  545. // load the old RC into the "new" RC
  546. newRc, err := c.ReplicationControllers(cfg.Namespace).Get(cfg.OldName)
  547. if err != nil {
  548. return nil, err
  549. }
  550. if len(cfg.Container) != 0 {
  551. containerFound := false
  552. for i, c := range newRc.Spec.Template.Spec.Containers {
  553. if c.Name == cfg.Container {
  554. containerIndex = i
  555. containerFound = true
  556. break
  557. }
  558. }
  559. if !containerFound {
  560. return nil, fmt.Errorf("container %s not found in pod", cfg.Container)
  561. }
  562. }
  563. if len(newRc.Spec.Template.Spec.Containers) > 1 && len(cfg.Container) == 0 {
  564. return nil, goerrors.New("Must specify container to update when updating a multi-container pod")
  565. }
  566. if len(newRc.Spec.Template.Spec.Containers) == 0 {
  567. return nil, goerrors.New(fmt.Sprintf("Pod has no containers! (%v)", newRc))
  568. }
  569. newRc.Spec.Template.Spec.Containers[containerIndex].Image = cfg.Image
  570. if len(cfg.PullPolicy) != 0 {
  571. newRc.Spec.Template.Spec.Containers[containerIndex].ImagePullPolicy = cfg.PullPolicy
  572. }
  573. newHash, err := api.HashObject(newRc, codec)
  574. if err != nil {
  575. return nil, err
  576. }
  577. if len(cfg.NewName) == 0 {
  578. cfg.NewName = fmt.Sprintf("%s-%s", newRc.Name, newHash)
  579. }
  580. newRc.Name = cfg.NewName
  581. newRc.Spec.Selector[cfg.DeploymentKey] = newHash
  582. newRc.Spec.Template.Labels[cfg.DeploymentKey] = newHash
  583. // Clear resource version after hashing so that identical updates get different hashes.
  584. newRc.ResourceVersion = ""
  585. return newRc, nil
  586. }
  587. func AbortRollingUpdate(c *RollingUpdaterConfig) error {
  588. // Swap the controllers
  589. tmp := c.OldRc
  590. c.OldRc = c.NewRc
  591. c.NewRc = tmp
  592. if c.NewRc.Annotations == nil {
  593. c.NewRc.Annotations = map[string]string{}
  594. }
  595. c.NewRc.Annotations[sourceIdAnnotation] = fmt.Sprintf("%s:%s", c.OldRc.Name, c.OldRc.UID)
  596. // Use the original value since the replica count change from old to new
  597. // could be asymmetric. If we don't know the original count, we can't safely
  598. // roll back to a known good size.
  599. originalSize, foundOriginal := tmp.Annotations[originalReplicasAnnotation]
  600. if !foundOriginal {
  601. return fmt.Errorf("couldn't find original replica count of %q", tmp.Name)
  602. }
  603. fmt.Fprintf(c.Out, "Setting %q replicas to %s\n", c.NewRc.Name, originalSize)
  604. c.NewRc.Annotations[desiredReplicasAnnotation] = originalSize
  605. c.CleanupPolicy = DeleteRollingUpdateCleanupPolicy
  606. return nil
  607. }
  608. func GetNextControllerAnnotation(rc *api.ReplicationController) (string, bool) {
  609. res, found := rc.Annotations[nextControllerAnnotation]
  610. return res, found
  611. }
  612. func SetNextControllerAnnotation(rc *api.ReplicationController, name string) {
  613. if rc.Annotations == nil {
  614. rc.Annotations = map[string]string{}
  615. }
  616. rc.Annotations[nextControllerAnnotation] = name
  617. }
  618. func UpdateExistingReplicationController(c client.Interface, oldRc *api.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*api.ReplicationController, error) {
  619. if _, found := oldRc.Spec.Selector[deploymentKey]; !found {
  620. SetNextControllerAnnotation(oldRc, newName)
  621. return AddDeploymentKeyToReplicationController(oldRc, c, deploymentKey, deploymentValue, namespace, out)
  622. } else {
  623. // If we didn't need to update the controller for the deployment key, we still need to write
  624. // the "next" controller.
  625. applyUpdate := func(rc *api.ReplicationController) {
  626. SetNextControllerAnnotation(rc, newName)
  627. }
  628. return updateRcWithRetries(c, namespace, oldRc, applyUpdate)
  629. }
  630. }
  631. func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) {
  632. var err error
  633. // First, update the template label. This ensures that any newly created pods will have the new label
  634. applyUpdate := func(rc *api.ReplicationController) {
  635. if rc.Spec.Template.Labels == nil {
  636. rc.Spec.Template.Labels = map[string]string{}
  637. }
  638. rc.Spec.Template.Labels[deploymentKey] = deploymentValue
  639. }
  640. if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil {
  641. return nil, err
  642. }
  643. // Update all pods managed by the rc to have the new hash label, so they are correctly adopted
  644. // TODO: extract the code from the label command and re-use it here.
  645. selector := labels.SelectorFromSet(oldRc.Spec.Selector)
  646. options := api.ListOptions{LabelSelector: selector}
  647. podList, err := client.Pods(namespace).List(options)
  648. if err != nil {
  649. return nil, err
  650. }
  651. for ix := range podList.Items {
  652. pod := &podList.Items[ix]
  653. applyUpdate := func(p *api.Pod) {
  654. if p.Labels == nil {
  655. p.Labels = map[string]string{
  656. deploymentKey: deploymentValue,
  657. }
  658. } else {
  659. p.Labels[deploymentKey] = deploymentValue
  660. }
  661. }
  662. if pod, err = updatePodWithRetries(client, namespace, pod, applyUpdate); err != nil {
  663. return nil, err
  664. }
  665. }
  666. if oldRc.Spec.Selector == nil {
  667. oldRc.Spec.Selector = map[string]string{}
  668. }
  669. // Copy the old selector, so that we can scrub out any orphaned pods
  670. selectorCopy := map[string]string{}
  671. for k, v := range oldRc.Spec.Selector {
  672. selectorCopy[k] = v
  673. }
  674. applyUpdate = func(rc *api.ReplicationController) {
  675. rc.Spec.Selector[deploymentKey] = deploymentValue
  676. }
  677. // Update the selector of the rc so it manages all the pods we updated above
  678. if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil {
  679. return nil, err
  680. }
  681. // Clean up any orphaned pods that don't have the new label, this can happen if the rc manager
  682. // doesn't see the update to its pod template and creates a new pod with the old labels after
  683. // we've finished re-adopting existing pods to the rc.
  684. selector = labels.SelectorFromSet(selectorCopy)
  685. options = api.ListOptions{LabelSelector: selector}
  686. podList, err = client.Pods(namespace).List(options)
  687. for ix := range podList.Items {
  688. pod := &podList.Items[ix]
  689. if value, found := pod.Labels[deploymentKey]; !found || value != deploymentValue {
  690. if err := client.Pods(namespace).Delete(pod.Name, nil); err != nil {
  691. return nil, err
  692. }
  693. }
  694. }
  695. return oldRc, nil
  696. }
  697. type updateRcFunc func(controller *api.ReplicationController)
  698. // updateRcWithRetries retries updating the given rc on conflict with the following steps:
  699. // 1. Get latest resource
  700. // 2. applyUpdate
  701. // 3. Update the resource
  702. func updateRcWithRetries(c client.Interface, namespace string, rc *api.ReplicationController, applyUpdate updateRcFunc) (*api.ReplicationController, error) {
  703. // Deep copy the rc in case we failed on Get during retry loop
  704. obj, err := api.Scheme.Copy(rc)
  705. if err != nil {
  706. return nil, fmt.Errorf("failed to deep copy rc before updating it: %v", err)
  707. }
  708. oldRc := obj.(*api.ReplicationController)
  709. err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) {
  710. // Apply the update, then attempt to push it to the apiserver.
  711. applyUpdate(rc)
  712. if rc, e = c.ReplicationControllers(namespace).Update(rc); e == nil {
  713. // rc contains the latest controller post update
  714. return
  715. }
  716. updateErr := e
  717. // Update the controller with the latest resource version, if the update failed we
  718. // can't trust rc so use oldRc.Name.
  719. if rc, e = c.ReplicationControllers(namespace).Get(oldRc.Name); e != nil {
  720. // The Get failed: Value in rc cannot be trusted.
  721. rc = oldRc
  722. }
  723. // Only return the error from update
  724. return updateErr
  725. })
  726. // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
  727. // controller contains the applied update.
  728. return rc, err
  729. }
  730. type updatePodFunc func(controller *api.Pod)
  731. // updatePodWithRetries retries updating the given pod on conflict with the following steps:
  732. // 1. Get latest resource
  733. // 2. applyUpdate
  734. // 3. Update the resource
  735. func updatePodWithRetries(c client.Interface, namespace string, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) {
  736. // Deep copy the pod in case we failed on Get during retry loop
  737. obj, err := api.Scheme.Copy(pod)
  738. if err != nil {
  739. return nil, fmt.Errorf("failed to deep copy pod before updating it: %v", err)
  740. }
  741. oldPod := obj.(*api.Pod)
  742. err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) {
  743. // Apply the update, then attempt to push it to the apiserver.
  744. applyUpdate(pod)
  745. if pod, e = c.Pods(namespace).Update(pod); e == nil {
  746. return
  747. }
  748. updateErr := e
  749. if pod, e = c.Pods(namespace).Get(oldPod.Name); e != nil {
  750. pod = oldPod
  751. }
  752. // Only return the error from update
  753. return updateErr
  754. })
  755. // If the error is non-nil the returned pod cannot be trusted, if it is nil, the returned
  756. // controller contains the applied update.
  757. return pod, err
  758. }
  759. func FindSourceController(r client.ReplicationControllersNamespacer, namespace, name string) (*api.ReplicationController, error) {
  760. list, err := r.ReplicationControllers(namespace).List(api.ListOptions{})
  761. if err != nil {
  762. return nil, err
  763. }
  764. for ix := range list.Items {
  765. rc := &list.Items[ix]
  766. if rc.Annotations != nil && strings.HasPrefix(rc.Annotations[sourceIdAnnotation], name) {
  767. return rc, nil
  768. }
  769. }
  770. return nil, fmt.Errorf("couldn't find a replication controller with source id == %s/%s", namespace, name)
  771. }