sync.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package deployment
  14. import (
  15. "fmt"
  16. "reflect"
  17. "sort"
  18. "strconv"
  19. "github.com/golang/glog"
  20. "k8s.io/kubernetes/pkg/api"
  21. "k8s.io/kubernetes/pkg/api/errors"
  22. "k8s.io/kubernetes/pkg/api/unversioned"
  23. "k8s.io/kubernetes/pkg/apis/extensions"
  24. "k8s.io/kubernetes/pkg/controller"
  25. deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
  26. utilerrors "k8s.io/kubernetes/pkg/util/errors"
  27. labelsutil "k8s.io/kubernetes/pkg/util/labels"
  28. podutil "k8s.io/kubernetes/pkg/util/pod"
  29. rsutil "k8s.io/kubernetes/pkg/util/replicaset"
  30. )
  31. // syncStatusOnly only updates Deployments Status and doesn't take any mutating actions.
  32. func (dc *DeploymentController) syncStatusOnly(deployment *extensions.Deployment) error {
  33. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
  34. if err != nil {
  35. return err
  36. }
  37. allRSs := append(oldRSs, newRS)
  38. return dc.syncDeploymentStatus(allRSs, newRS, deployment)
  39. }
  40. // sync is responsible for reconciling deployments on scaling events or when they
  41. // are paused.
  42. func (dc *DeploymentController) sync(deployment *extensions.Deployment) error {
  43. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(deployment, false)
  44. if err != nil {
  45. return err
  46. }
  47. if err := dc.scale(deployment, newRS, oldRSs); err != nil {
  48. // If we get an error while trying to scale, the deployment will be requeued
  49. // so we can abort this resync
  50. return err
  51. }
  52. dc.cleanupDeployment(oldRSs, deployment)
  53. allRSs := append(oldRSs, newRS)
  54. return dc.syncDeploymentStatus(allRSs, newRS, deployment)
  55. }
  56. // getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided deployment (new and all old), with new RS's and deployment's revision updated.
  57. // 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV).
  58. // 2. Get new RS this deployment targets (whose pod template matches deployment's), and update new RS's revision number to (maxOldV + 1),
  59. // only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop.
  60. // 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop.
  61. // Note that currently the deployment controller is using caches to avoid querying the server for reads.
  62. // This may lead to stale reads of replica sets, thus incorrect deployment status.
  63. func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(deployment *extensions.Deployment, createIfNotExisted bool) (*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
  64. // List the deployment's RSes & Pods and apply pod-template-hash info to deployment's adopted RSes/Pods
  65. rsList, podList, err := dc.rsAndPodsWithHashKeySynced(deployment)
  66. if err != nil {
  67. return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err)
  68. }
  69. _, allOldRSs, err := deploymentutil.FindOldReplicaSets(deployment, rsList, podList)
  70. if err != nil {
  71. return nil, nil, err
  72. }
  73. // Calculate the max revision number among all old RSes
  74. maxOldV := deploymentutil.MaxRevision(allOldRSs)
  75. // Get new replica set with the updated revision number
  76. newRS, err := dc.getNewReplicaSet(deployment, rsList, maxOldV, allOldRSs, createIfNotExisted)
  77. if err != nil {
  78. return nil, nil, err
  79. }
  80. // Sync deployment's revision number with new replica set
  81. if newRS != nil && newRS.Annotations != nil && len(newRS.Annotations[deploymentutil.RevisionAnnotation]) > 0 &&
  82. (deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != newRS.Annotations[deploymentutil.RevisionAnnotation]) {
  83. if err = dc.updateDeploymentRevision(deployment, newRS.Annotations[deploymentutil.RevisionAnnotation]); err != nil {
  84. glog.V(4).Infof("Error: %v. Unable to update deployment revision, will retry later.", err)
  85. }
  86. }
  87. return newRS, allOldRSs, nil
  88. }
  89. // rsAndPodsWithHashKeySynced returns the RSes and pods the given deployment targets, with pod-template-hash information synced.
  90. func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extensions.Deployment) ([]extensions.ReplicaSet, *api.PodList, error) {
  91. rsList, err := deploymentutil.ListReplicaSets(deployment,
  92. func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
  93. return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector)
  94. })
  95. if err != nil {
  96. return nil, nil, fmt.Errorf("error listing ReplicaSets: %v", err)
  97. }
  98. syncedRSList := []extensions.ReplicaSet{}
  99. for _, rs := range rsList {
  100. // Add pod-template-hash information if it's not in the RS.
  101. // Otherwise, new RS produced by Deployment will overlap with pre-existing ones
  102. // that aren't constrained by the pod-template-hash.
  103. syncedRS, err := dc.addHashKeyToRSAndPods(rs)
  104. if err != nil {
  105. return nil, nil, err
  106. }
  107. syncedRSList = append(syncedRSList, *syncedRS)
  108. }
  109. syncedPodList, err := dc.listPods(deployment)
  110. if err != nil {
  111. return nil, nil, err
  112. }
  113. return syncedRSList, syncedPodList, nil
  114. }
  115. // addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps:
  116. // 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created
  117. // 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas
  118. // 3. Add hash label to the rs's label and selector
  119. func (dc *DeploymentController) addHashKeyToRSAndPods(rs extensions.ReplicaSet) (updatedRS *extensions.ReplicaSet, err error) {
  120. updatedRS = &rs
  121. // If the rs already has the new hash label in its selector, it's done syncing
  122. if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
  123. return
  124. }
  125. namespace := rs.Namespace
  126. hash := rsutil.GetPodTemplateSpecHash(rs)
  127. rsUpdated := false
  128. // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
  129. updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS,
  130. func(updated *extensions.ReplicaSet) error {
  131. // Precondition: the RS doesn't contain the new hash in its pod template label.
  132. if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
  133. return utilerrors.ErrPreconditionViolated
  134. }
  135. updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
  136. return nil
  137. })
  138. if err != nil {
  139. return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
  140. }
  141. if !rsUpdated {
  142. // If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error.
  143. // Return here and retry in the next sync loop.
  144. return &rs, nil
  145. }
  146. // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
  147. if updatedRS.Generation > updatedRS.Status.ObservedGeneration {
  148. if err = deploymentutil.WaitForReplicaSetUpdated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil {
  149. return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err)
  150. }
  151. }
  152. glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
  153. // 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
  154. selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector)
  155. if err != nil {
  156. return nil, fmt.Errorf("error in converting selector to label selector for replica set %s: %s", updatedRS.Name, err)
  157. }
  158. options := api.ListOptions{LabelSelector: selector}
  159. pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
  160. if err != nil {
  161. return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err)
  162. }
  163. podList := api.PodList{Items: make([]api.Pod, 0, len(pods))}
  164. for i := range pods {
  165. podList.Items = append(podList.Items, *pods[i])
  166. }
  167. allPodsLabeled := false
  168. if allPodsLabeled, err = deploymentutil.LabelPodsWithHash(&podList, updatedRS, dc.client, namespace, hash); err != nil {
  169. return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err)
  170. }
  171. // If not all pods are labeled but didn't return error in step 2, we've hit at least one pod not found error.
  172. // Return here and retry in the next sync loop.
  173. if !allPodsLabeled {
  174. return updatedRS, nil
  175. }
  176. // We need to wait for the replicaset controller to observe the pods being
  177. // labeled with pod template hash. Because previously we've called
  178. // WaitForReplicaSetUpdated, the replicaset controller should have dropped
  179. // FullyLabeledReplicas to 0 already, we only need to wait it to increase
  180. // back to the number of replicas in the spec.
  181. if err = deploymentutil.WaitForPodsHashPopulated(dc.client, updatedRS.Generation, namespace, updatedRS.Name); err != nil {
  182. return nil, fmt.Errorf("%s %s/%s: error waiting for replicaset controller to observe pods being labeled with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
  183. }
  184. // 3. Update rs label and selector to include the new hash label
  185. // Copy the old selector, so that we can scrub out any orphaned pods
  186. if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(dc.client.Extensions().ReplicaSets(namespace), updatedRS,
  187. func(updated *extensions.ReplicaSet) error {
  188. // Precondition: the RS doesn't contain the new hash in its label or selector.
  189. if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
  190. return utilerrors.ErrPreconditionViolated
  191. }
  192. updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
  193. updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash)
  194. return nil
  195. }); err != nil {
  196. return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
  197. }
  198. if rsUpdated {
  199. glog.V(4).Infof("Updated %s %s/%s's selector and label with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
  200. }
  201. // If the RS isn't actually updated in step 3, that's okay, we'll retry in the next sync loop since its selector isn't updated yet.
  202. // TODO: look for orphaned pods and label them in the background somewhere else periodically
  203. return updatedRS, nil
  204. }
  205. func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) {
  206. return deploymentutil.ListPods(deployment,
  207. func(namespace string, options api.ListOptions) (*api.PodList, error) {
  208. pods, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
  209. result := api.PodList{Items: make([]api.Pod, 0, len(pods))}
  210. for i := range pods {
  211. result.Items = append(result.Items, *pods[i])
  212. }
  213. return &result, err
  214. })
  215. }
  216. // Returns a replica set that matches the intent of the given deployment. Returns nil if the new replica set doesn't exist yet.
  217. // 1. Get existing new RS (the RS that the given deployment targets, whose pod template is the same as deployment's).
  218. // 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
  219. // 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
  220. // Note that the pod-template-hash will be added to adopted RSes and pods.
  221. func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.ReplicaSet, maxOldRevision int64, oldRSs []*extensions.ReplicaSet, createIfNotExisted bool) (*extensions.ReplicaSet, error) {
  222. // Calculate revision number for this new replica set
  223. newRevision := strconv.FormatInt(maxOldRevision+1, 10)
  224. existingNewRS, err := deploymentutil.FindNewReplicaSet(deployment, rsList)
  225. if err != nil {
  226. return nil, err
  227. } else if existingNewRS != nil {
  228. // Set existing new replica set's annotation
  229. if deploymentutil.SetNewReplicaSetAnnotations(deployment, existingNewRS, newRevision, true) {
  230. return dc.client.Extensions().ReplicaSets(deployment.ObjectMeta.Namespace).Update(existingNewRS)
  231. }
  232. return existingNewRS, nil
  233. }
  234. if !createIfNotExisted {
  235. return nil, nil
  236. }
  237. // new ReplicaSet does not exist, create one.
  238. namespace := deployment.ObjectMeta.Namespace
  239. podTemplateSpecHash := podutil.GetPodTemplateSpecHash(deployment.Spec.Template)
  240. newRSTemplate := deploymentutil.GetNewReplicaSetTemplate(deployment)
  241. // Add podTemplateHash label to selector.
  242. newRSSelector := labelsutil.CloneSelectorAndAddLabel(deployment.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, podTemplateSpecHash)
  243. // Create new ReplicaSet
  244. newRS := extensions.ReplicaSet{
  245. ObjectMeta: api.ObjectMeta{
  246. // Make the name deterministic, to ensure idempotence
  247. Name: deployment.Name + "-" + fmt.Sprintf("%d", podTemplateSpecHash),
  248. Namespace: namespace,
  249. },
  250. Spec: extensions.ReplicaSetSpec{
  251. Replicas: 0,
  252. Selector: newRSSelector,
  253. Template: newRSTemplate,
  254. },
  255. }
  256. allRSs := append(oldRSs, &newRS)
  257. newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, &newRS)
  258. if err != nil {
  259. return nil, err
  260. }
  261. newRS.Spec.Replicas = newReplicasCount
  262. // Set new replica set's annotation
  263. deploymentutil.SetNewReplicaSetAnnotations(deployment, &newRS, newRevision, false)
  264. createdRS, err := dc.client.Extensions().ReplicaSets(namespace).Create(&newRS)
  265. if err != nil {
  266. return nil, fmt.Errorf("error creating replica set %v: %v", deployment.Name, err)
  267. }
  268. if newReplicasCount > 0 {
  269. dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", "up", createdRS.Name, newReplicasCount)
  270. }
  271. return createdRS, dc.updateDeploymentRevision(deployment, newRevision)
  272. }
  273. func (dc *DeploymentController) updateDeploymentRevision(deployment *extensions.Deployment, revision string) error {
  274. if deployment.Annotations == nil {
  275. deployment.Annotations = make(map[string]string)
  276. }
  277. if deployment.Annotations[deploymentutil.RevisionAnnotation] != revision {
  278. deployment.Annotations[deploymentutil.RevisionAnnotation] = revision
  279. _, err := dc.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment)
  280. return err
  281. }
  282. return nil
  283. }
  284. // scale scales proportionally in order to mitigate risk. Otherwise, scaling up can increase the size
  285. // of the new replica set and scaling down can decrease the sizes of the old ones, both of which would
  286. // have the effect of hastening the rollout progress, which could produce a higher proportion of unavailable
  287. // replicas in the event of a problem with the rolled out template. Should run only on scaling events or
  288. // when a deployment is paused and not during the normal rollout process.
  289. func (dc *DeploymentController) scale(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) error {
  290. // If there is only one active replica set then we should scale that up to the full count of the
  291. // deployment. If there is no active replica set, then we should scale up the newest replica set.
  292. if activeOrLatest := deploymentutil.FindActiveOrLatest(newRS, oldRSs); activeOrLatest != nil {
  293. if activeOrLatest.Spec.Replicas == deployment.Spec.Replicas {
  294. return nil
  295. }
  296. _, _, err := dc.scaleReplicaSetAndRecordEvent(activeOrLatest, deployment.Spec.Replicas, deployment)
  297. return err
  298. }
  299. // If the new replica set is saturated, old replica sets should be fully scaled down.
  300. // This case handles replica set adoption during a saturated new replica set.
  301. if deploymentutil.IsSaturated(deployment, newRS) {
  302. for _, old := range controller.FilterActiveReplicaSets(oldRSs) {
  303. if _, _, err := dc.scaleReplicaSetAndRecordEvent(old, 0, deployment); err != nil {
  304. return err
  305. }
  306. }
  307. return nil
  308. }
  309. // There are old replica sets with pods and the new replica set is not saturated.
  310. // We need to proportionally scale all replica sets (new and old) in case of a
  311. // rolling deployment.
  312. if deploymentutil.IsRollingUpdate(deployment) {
  313. allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
  314. allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
  315. allowedSize := int32(0)
  316. if deployment.Spec.Replicas > 0 {
  317. allowedSize = deployment.Spec.Replicas + deploymentutil.MaxSurge(*deployment)
  318. }
  319. // Number of additional replicas that can be either added or removed from the total
  320. // replicas count. These replicas should be distributed proportionally to the active
  321. // replica sets.
  322. deploymentReplicasToAdd := allowedSize - allRSsReplicas
  323. // The additional replicas should be distributed proportionally amongst the active
  324. // replica sets from the larger to the smaller in size replica set. Scaling direction
  325. // drives what happens in case we are trying to scale replica sets of the same size.
  326. // In such a case when scaling up, we should scale up newer replica sets first, and
  327. // when scaling down, we should scale down older replica sets first.
  328. scalingOperation := "up"
  329. switch {
  330. case deploymentReplicasToAdd > 0:
  331. sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
  332. case deploymentReplicasToAdd < 0:
  333. sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
  334. scalingOperation = "down"
  335. default: /* deploymentReplicasToAdd == 0 */
  336. // Nothing to add.
  337. return nil
  338. }
  339. // Iterate over all active replica sets and estimate proportions for each of them.
  340. // The absolute value of deploymentReplicasAdded should never exceed the absolute
  341. // value of deploymentReplicasToAdd.
  342. deploymentReplicasAdded := int32(0)
  343. for i := range allRSs {
  344. rs := allRSs[i]
  345. proportion := deploymentutil.GetProportion(rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
  346. rs.Spec.Replicas += proportion
  347. deploymentReplicasAdded += proportion
  348. }
  349. // Update all replica sets
  350. for i := range allRSs {
  351. rs := allRSs[i]
  352. // Add/remove any leftovers to the largest replica set.
  353. if i == 0 {
  354. leftover := deploymentReplicasToAdd - deploymentReplicasAdded
  355. rs.Spec.Replicas += leftover
  356. if rs.Spec.Replicas < 0 {
  357. rs.Spec.Replicas = 0
  358. }
  359. }
  360. if _, err := dc.scaleReplicaSet(rs, rs.Spec.Replicas, deployment, scalingOperation); err != nil {
  361. // Return as soon as we fail, the deployment is requeued
  362. return err
  363. }
  364. }
  365. }
  366. return nil
  367. }
  368. func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment) (bool, *extensions.ReplicaSet, error) {
  369. // No need to scale
  370. if rs.Spec.Replicas == newScale {
  371. return false, rs, nil
  372. }
  373. var scalingOperation string
  374. if rs.Spec.Replicas < newScale {
  375. scalingOperation = "up"
  376. } else {
  377. scalingOperation = "down"
  378. }
  379. newRS, err := dc.scaleReplicaSet(rs, newScale, deployment, scalingOperation)
  380. return true, newRS, err
  381. }
  382. func (dc *DeploymentController) scaleReplicaSet(rs *extensions.ReplicaSet, newScale int32, deployment *extensions.Deployment, scalingOperation string) (*extensions.ReplicaSet, error) {
  383. // NOTE: This mutates the ReplicaSet passed in. Not sure if that's a good idea.
  384. rs.Spec.Replicas = newScale
  385. deploymentutil.SetReplicasAnnotations(rs, deployment.Spec.Replicas, deployment.Spec.Replicas+deploymentutil.MaxSurge(*deployment))
  386. rs, err := dc.client.Extensions().ReplicaSets(rs.ObjectMeta.Namespace).Update(rs)
  387. if err == nil {
  388. dc.eventRecorder.Eventf(deployment, api.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
  389. }
  390. return rs, err
  391. }
  392. // cleanupDeployment is responsible for cleaning up a deployment ie. retains all but the latest N old replica sets
  393. // where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept
  394. // around by default 1) for historical reasons and 2) for the ability to rollback a deployment.
  395. func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment) error {
  396. if deployment.Spec.RevisionHistoryLimit == nil {
  397. return nil
  398. }
  399. diff := int32(len(oldRSs)) - *deployment.Spec.RevisionHistoryLimit
  400. if diff <= 0 {
  401. return nil
  402. }
  403. sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
  404. var errList []error
  405. // TODO: This should be parallelized.
  406. for i := int32(0); i < diff; i++ {
  407. rs := oldRSs[i]
  408. // Avoid delete replica set with non-zero replica counts
  409. if rs.Status.Replicas != 0 || rs.Spec.Replicas != 0 || rs.Generation > rs.Status.ObservedGeneration {
  410. continue
  411. }
  412. if err := dc.client.Extensions().ReplicaSets(rs.Namespace).Delete(rs.Name, nil); err != nil && !errors.IsNotFound(err) {
  413. glog.V(2).Infof("Failed deleting old replica set %v for deployment %v: %v", rs.Name, deployment.Name, err)
  414. errList = append(errList, err)
  415. }
  416. }
  417. return utilerrors.NewAggregate(errList)
  418. }
  419. // syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
  420. func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error {
  421. newStatus, err := dc.calculateStatus(allRSs, newRS, d)
  422. if err != nil {
  423. return err
  424. }
  425. if !reflect.DeepEqual(d.Status, newStatus) {
  426. return dc.updateDeploymentStatus(allRSs, newRS, d)
  427. }
  428. return nil
  429. }
  430. func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (extensions.DeploymentStatus, error) {
  431. availableReplicas, err := dc.getAvailablePodsForReplicaSets(deployment, allRSs)
  432. if err != nil {
  433. return deployment.Status, fmt.Errorf("failed to count available pods: %v", err)
  434. }
  435. totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
  436. return extensions.DeploymentStatus{
  437. // TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
  438. ObservedGeneration: deployment.Generation,
  439. Replicas: deploymentutil.GetActualReplicaCountForReplicaSets(allRSs),
  440. UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}),
  441. AvailableReplicas: availableReplicas,
  442. UnavailableReplicas: totalReplicas - availableReplicas,
  443. }, nil
  444. }
  445. func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet) (int32, error) {
  446. podList, err := dc.listPods(deployment)
  447. if err != nil {
  448. return 0, err
  449. }
  450. return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, deployment.Spec.MinReadySeconds)
  451. }
  452. func (dc *DeploymentController) updateDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) error {
  453. newStatus, err := dc.calculateStatus(allRSs, newRS, deployment)
  454. if err != nil {
  455. return err
  456. }
  457. newDeployment := deployment
  458. newDeployment.Status = newStatus
  459. _, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(newDeployment)
  460. return err
  461. }
  462. // isScalingEvent checks whether the provided deployment has been updated with a scaling event
  463. // by looking at the desired-replicas annotation in the active replica sets of the deployment.
  464. func (dc *DeploymentController) isScalingEvent(d *extensions.Deployment) (bool, error) {
  465. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, false)
  466. if err != nil {
  467. return false, err
  468. }
  469. // If there is no new replica set matching this deployment and the deployment isn't paused
  470. // then there is a new rollout that waits to happen
  471. if newRS == nil && !d.Spec.Paused {
  472. // Update all active replicas sets to the new deployment size. SetReplicasAnnotations makes
  473. // sure that we will update only replica sets that don't have the current size of the deployment.
  474. maxSurge := deploymentutil.MaxSurge(*d)
  475. for _, rs := range controller.FilterActiveReplicaSets(oldRSs) {
  476. if updated := deploymentutil.SetReplicasAnnotations(rs, d.Spec.Replicas, d.Spec.Replicas+maxSurge); updated {
  477. if _, err := dc.client.Extensions().ReplicaSets(rs.Namespace).Update(rs); err != nil {
  478. glog.Infof("Cannot update annotations for replica set %q: %v", rs.Name, err)
  479. return false, err
  480. }
  481. }
  482. }
  483. return false, nil
  484. }
  485. allRSs := append(oldRSs, newRS)
  486. for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
  487. desired, ok := deploymentutil.GetDesiredReplicasAnnotation(rs)
  488. if !ok {
  489. continue
  490. }
  491. if desired != d.Spec.Replicas {
  492. return true, nil
  493. }
  494. }
  495. return false, nil
  496. }