deployment_util.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package util
  14. import (
  15. "fmt"
  16. "sort"
  17. "strconv"
  18. "time"
  19. "github.com/golang/glog"
  20. "k8s.io/kubernetes/pkg/api"
  21. "k8s.io/kubernetes/pkg/api/annotations"
  22. "k8s.io/kubernetes/pkg/api/unversioned"
  23. "k8s.io/kubernetes/pkg/apis/extensions"
  24. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  25. "k8s.io/kubernetes/pkg/controller"
  26. "k8s.io/kubernetes/pkg/labels"
  27. "k8s.io/kubernetes/pkg/util/errors"
  28. "k8s.io/kubernetes/pkg/util/integer"
  29. intstrutil "k8s.io/kubernetes/pkg/util/intstr"
  30. labelsutil "k8s.io/kubernetes/pkg/util/labels"
  31. podutil "k8s.io/kubernetes/pkg/util/pod"
  32. rsutil "k8s.io/kubernetes/pkg/util/replicaset"
  33. "k8s.io/kubernetes/pkg/util/wait"
  34. )
  35. const (
  36. // RevisionAnnotation is the revision annotation of a deployment's replica sets which records its rollout sequence
  37. RevisionAnnotation = "deployment.kubernetes.io/revision"
  38. // DesiredReplicasAnnotation is the desired replicas for a deployment recorded as an annotation
  39. // in its replica sets. Helps in separating scaling events from the rollout process and for
  40. // determining if the new replica set for a deployment is really saturated.
  41. DesiredReplicasAnnotation = "deployment.kubernetes.io/desired-replicas"
  42. // MaxReplicasAnnotation is the maximum replicas a deployment can have at a given point, which
  43. // is deployment.spec.replicas + maxSurge. Used by the underlying replica sets to estimate their
  44. // proportions in case the deployment has surge replicas.
  45. MaxReplicasAnnotation = "deployment.kubernetes.io/max-replicas"
  46. // RollbackRevisionNotFound is not found rollback event reason
  47. RollbackRevisionNotFound = "DeploymentRollbackRevisionNotFound"
  48. // RollbackTemplateUnchanged is the template unchanged rollback event reason
  49. RollbackTemplateUnchanged = "DeploymentRollbackTemplateUnchanged"
  50. // RollbackDone is the done rollback event reason
  51. RollbackDone = "DeploymentRollback"
  52. // OverlapAnnotation marks deployments with overlapping selector with other deployments
  53. // TODO: Delete this annotation when we gracefully handle overlapping selectors. See https://github.com/kubernetes/kubernetes/issues/2210
  54. OverlapAnnotation = "deployment.kubernetes.io/error-selector-overlapping-with"
  55. // SelectorUpdateAnnotation marks the last time deployment selector update
  56. // TODO: Delete this annotation when we gracefully handle overlapping selectors. See https://github.com/kubernetes/kubernetes/issues/2210
  57. SelectorUpdateAnnotation = "deployment.kubernetes.io/selector-updated-at"
  58. )
  59. // MaxRevision finds the highest revision in the replica sets
  60. func MaxRevision(allRSs []*extensions.ReplicaSet) int64 {
  61. max := int64(0)
  62. for _, rs := range allRSs {
  63. if v, err := Revision(rs); err != nil {
  64. // Skip the replica sets when it failed to parse their revision information
  65. glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs)
  66. } else if v > max {
  67. max = v
  68. }
  69. }
  70. return max
  71. }
  72. // LastRevision finds the second max revision number in all replica sets (the last revision)
  73. func LastRevision(allRSs []*extensions.ReplicaSet) int64 {
  74. max, secMax := int64(0), int64(0)
  75. for _, rs := range allRSs {
  76. if v, err := Revision(rs); err != nil {
  77. // Skip the replica sets when it failed to parse their revision information
  78. glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs)
  79. } else if v >= max {
  80. secMax = max
  81. max = v
  82. } else if v > secMax {
  83. secMax = v
  84. }
  85. }
  86. return secMax
  87. }
  88. // SetNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and
  89. // copying required deployment annotations to it; it returns true if replica set's annotation is changed.
  90. func SetNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string, exists bool) bool {
  91. // First, copy deployment's annotations (except for apply and revision annotations)
  92. annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS)
  93. // Then, update replica set's revision annotation
  94. if newRS.Annotations == nil {
  95. newRS.Annotations = make(map[string]string)
  96. }
  97. // The newRS's revision should be the greatest among all RSes. Usually, its revision number is newRevision (the max revision number
  98. // of all old RSes + 1). However, it's possible that some of the old RSes are deleted after the newRS revision being updated, and
  99. // newRevision becomes smaller than newRS's revision. We should only update newRS revision when it's smaller than newRevision.
  100. if newRS.Annotations[RevisionAnnotation] < newRevision {
  101. newRS.Annotations[RevisionAnnotation] = newRevision
  102. annotationChanged = true
  103. glog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision)
  104. }
  105. if !exists && SetReplicasAnnotations(newRS, deployment.Spec.Replicas, deployment.Spec.Replicas+MaxSurge(*deployment)) {
  106. annotationChanged = true
  107. }
  108. return annotationChanged
  109. }
  110. var annotationsToSkip = map[string]bool{
  111. annotations.LastAppliedConfigAnnotation: true,
  112. RevisionAnnotation: true,
  113. DesiredReplicasAnnotation: true,
  114. MaxReplicasAnnotation: true,
  115. OverlapAnnotation: true,
  116. SelectorUpdateAnnotation: true,
  117. }
  118. // skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key
  119. // TODO: How to decide which annotations should / should not be copied?
  120. // See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615
  121. func skipCopyAnnotation(key string) bool {
  122. return annotationsToSkip[key]
  123. }
  124. // copyDeploymentAnnotationsToReplicaSet copies deployment's annotations to replica set's annotations,
  125. // and returns true if replica set's annotation is changed.
  126. // Note that apply and revision annotations are not copied.
  127. func copyDeploymentAnnotationsToReplicaSet(deployment *extensions.Deployment, rs *extensions.ReplicaSet) bool {
  128. rsAnnotationsChanged := false
  129. if rs.Annotations == nil {
  130. rs.Annotations = make(map[string]string)
  131. }
  132. for k, v := range deployment.Annotations {
  133. // newRS revision is updated automatically in getNewReplicaSet, and the deployment's revision number is then updated
  134. // by copying its newRS revision number. We should not copy deployment's revision to its newRS, since the update of
  135. // deployment revision number may fail (revision becomes stale) and the revision number in newRS is more reliable.
  136. if skipCopyAnnotation(k) || rs.Annotations[k] == v {
  137. continue
  138. }
  139. rs.Annotations[k] = v
  140. rsAnnotationsChanged = true
  141. }
  142. return rsAnnotationsChanged
  143. }
  144. // SetDeploymentAnnotationsTo sets deployment's annotations as given RS's annotations.
  145. // This action should be done if and only if the deployment is rolling back to this rs.
  146. // Note that apply and revision annotations are not changed.
  147. func SetDeploymentAnnotationsTo(deployment *extensions.Deployment, rollbackToRS *extensions.ReplicaSet) {
  148. deployment.Annotations = getSkippedAnnotations(deployment.Annotations)
  149. for k, v := range rollbackToRS.Annotations {
  150. if !skipCopyAnnotation(k) {
  151. deployment.Annotations[k] = v
  152. }
  153. }
  154. }
  155. func getSkippedAnnotations(annotations map[string]string) map[string]string {
  156. skippedAnnotations := make(map[string]string)
  157. for k, v := range annotations {
  158. if skipCopyAnnotation(k) {
  159. skippedAnnotations[k] = v
  160. }
  161. }
  162. return skippedAnnotations
  163. }
  164. // FindActiveOrLatest returns the only active or the latest replica set in case there is at most one active
  165. // replica set. If there are more active replica sets, then we should proportionally scale them.
  166. func FindActiveOrLatest(newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) *extensions.ReplicaSet {
  167. if newRS == nil && len(oldRSs) == 0 {
  168. return nil
  169. }
  170. sort.Sort(sort.Reverse(controller.ReplicaSetsByCreationTimestamp(oldRSs)))
  171. allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
  172. switch len(allRSs) {
  173. case 0:
  174. // If there is no active replica set then we should return the newest.
  175. if newRS != nil {
  176. return newRS
  177. }
  178. return oldRSs[0]
  179. case 1:
  180. return allRSs[0]
  181. default:
  182. return nil
  183. }
  184. }
  185. // GetDesiredReplicasAnnotation returns the number of desired replicas
  186. func GetDesiredReplicasAnnotation(rs *extensions.ReplicaSet) (int32, bool) {
  187. return getIntFromAnnotation(rs, DesiredReplicasAnnotation)
  188. }
  189. func getMaxReplicasAnnotation(rs *extensions.ReplicaSet) (int32, bool) {
  190. return getIntFromAnnotation(rs, MaxReplicasAnnotation)
  191. }
  192. func getIntFromAnnotation(rs *extensions.ReplicaSet, annotationKey string) (int32, bool) {
  193. annotationValue, ok := rs.Annotations[annotationKey]
  194. if !ok {
  195. return int32(0), false
  196. }
  197. intValue, err := strconv.Atoi(annotationValue)
  198. if err != nil {
  199. glog.Warningf("Cannot convert the value %q with annotation key %q for the replica set %q",
  200. annotationValue, annotationKey, rs.Name)
  201. return int32(0), false
  202. }
  203. return int32(intValue), true
  204. }
  205. // SetReplicasAnnotations sets the desiredReplicas and maxReplicas into the annotations
  206. func SetReplicasAnnotations(rs *extensions.ReplicaSet, desiredReplicas, maxReplicas int32) bool {
  207. updated := false
  208. if rs.Annotations == nil {
  209. rs.Annotations = make(map[string]string)
  210. }
  211. desiredString := fmt.Sprintf("%d", desiredReplicas)
  212. if hasString := rs.Annotations[DesiredReplicasAnnotation]; hasString != desiredString {
  213. rs.Annotations[DesiredReplicasAnnotation] = desiredString
  214. updated = true
  215. }
  216. maxString := fmt.Sprintf("%d", maxReplicas)
  217. if hasString := rs.Annotations[MaxReplicasAnnotation]; hasString != maxString {
  218. rs.Annotations[MaxReplicasAnnotation] = maxString
  219. updated = true
  220. }
  221. return updated
  222. }
  223. // MaxUnavailable returns the maximum unavailable pods a rolling deployment can take.
  224. func MaxUnavailable(deployment extensions.Deployment) int32 {
  225. if !IsRollingUpdate(&deployment) {
  226. return int32(0)
  227. }
  228. // Error caught by validation
  229. _, maxUnavailable, _ := ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas)
  230. return maxUnavailable
  231. }
  232. // MinAvailable returns the minimum vailable pods of a given deployment
  233. func MinAvailable(deployment *extensions.Deployment) int32 {
  234. if !IsRollingUpdate(deployment) {
  235. return int32(0)
  236. }
  237. return deployment.Spec.Replicas - MaxUnavailable(*deployment)
  238. }
  239. // MaxSurge returns the maximum surge pods a rolling deployment can take.
  240. func MaxSurge(deployment extensions.Deployment) int32 {
  241. if !IsRollingUpdate(&deployment) {
  242. return int32(0)
  243. }
  244. // Error caught by validation
  245. maxSurge, _, _ := ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas)
  246. return maxSurge
  247. }
  248. // GetProportion will estimate the proportion for the provided replica set using 1. the current size
  249. // of the parent deployment, 2. the replica count that needs be added on the replica sets of the
  250. // deployment, and 3. the total replicas added in the replica sets of the deployment so far.
  251. func GetProportion(rs *extensions.ReplicaSet, d extensions.Deployment, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 {
  252. if rs == nil || rs.Spec.Replicas == 0 || deploymentReplicasToAdd == 0 || deploymentReplicasToAdd == deploymentReplicasAdded {
  253. return int32(0)
  254. }
  255. rsFraction := getReplicaSetFraction(*rs, d)
  256. allowed := deploymentReplicasToAdd - deploymentReplicasAdded
  257. if deploymentReplicasToAdd > 0 {
  258. // Use the minimum between the replica set fraction and the maximum allowed replicas
  259. // when scaling up. This way we ensure we will not scale up more than the allowed
  260. // replicas we can add.
  261. return integer.Int32Min(rsFraction, allowed)
  262. }
  263. // Use the maximum between the replica set fraction and the maximum allowed replicas
  264. // when scaling down. This way we ensure we will not scale down more than the allowed
  265. // replicas we can remove.
  266. return integer.Int32Max(rsFraction, allowed)
  267. }
  268. // getReplicaSetFraction estimates the fraction of replicas a replica set can have in
  269. // 1. a scaling event during a rollout or 2. when scaling a paused deployment.
  270. func getReplicaSetFraction(rs extensions.ReplicaSet, d extensions.Deployment) int32 {
  271. // If we are scaling down to zero then the fraction of this replica set is its whole size (negative)
  272. if d.Spec.Replicas == int32(0) {
  273. return -rs.Spec.Replicas
  274. }
  275. deploymentReplicas := d.Spec.Replicas + MaxSurge(d)
  276. annotatedReplicas, ok := getMaxReplicasAnnotation(&rs)
  277. if !ok {
  278. // If we cannot find the annotation then fallback to the current deployment size. Note that this
  279. // will not be an accurate proportion estimation in case other replica sets have different values
  280. // which means that the deployment was scaled at some point but we at least will stay in limits
  281. // due to the min-max comparisons in getProportion.
  282. annotatedReplicas = d.Status.Replicas
  283. }
  284. // We should never proportionally scale up from zero which means rs.spec.replicas and annotatedReplicas
  285. // will never be zero here.
  286. newRSsize := (float64(rs.Spec.Replicas * deploymentReplicas)) / float64(annotatedReplicas)
  287. return integer.RoundToInt32(newRSsize) - rs.Spec.Replicas
  288. }
  289. // GetAllReplicaSets returns the old and new replica sets targeted by the given Deployment. It gets PodList and ReplicaSetList from client interface.
  290. // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
  291. // The third returned value is the new replica set, and it may be nil if it doesn't exist yet.
  292. func GetAllReplicaSets(deployment *extensions.Deployment, c clientset.Interface) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, *extensions.ReplicaSet, error) {
  293. rsList, err := listReplicaSets(deployment, c)
  294. if err != nil {
  295. return nil, nil, nil, err
  296. }
  297. podList, err := listPods(deployment, c)
  298. if err != nil {
  299. return nil, nil, nil, err
  300. }
  301. oldRSes, allOldRSes, err := FindOldReplicaSets(deployment, rsList, podList)
  302. if err != nil {
  303. return nil, nil, nil, err
  304. }
  305. newRS, err := FindNewReplicaSet(deployment, rsList)
  306. if err != nil {
  307. return nil, nil, nil, err
  308. }
  309. return oldRSes, allOldRSes, newRS, nil
  310. }
  311. // GetOldReplicaSets returns the old replica sets targeted by the given Deployment; get PodList and ReplicaSetList from client interface.
  312. // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
  313. func GetOldReplicaSets(deployment *extensions.Deployment, c clientset.Interface) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
  314. rsList, err := listReplicaSets(deployment, c)
  315. if err != nil {
  316. return nil, nil, err
  317. }
  318. podList, err := listPods(deployment, c)
  319. if err != nil {
  320. return nil, nil, err
  321. }
  322. return FindOldReplicaSets(deployment, rsList, podList)
  323. }
  324. // GetNewReplicaSet returns a replica set that matches the intent of the given deployment; get ReplicaSetList from client interface.
  325. // Returns nil if the new replica set doesn't exist yet.
  326. func GetNewReplicaSet(deployment *extensions.Deployment, c clientset.Interface) (*extensions.ReplicaSet, error) {
  327. rsList, err := listReplicaSets(deployment, c)
  328. if err != nil {
  329. return nil, err
  330. }
  331. return FindNewReplicaSet(deployment, rsList)
  332. }
  333. // listReplicaSets lists all RSes the given deployment targets with the given client interface.
  334. func listReplicaSets(deployment *extensions.Deployment, c clientset.Interface) ([]extensions.ReplicaSet, error) {
  335. return ListReplicaSets(deployment,
  336. func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
  337. rsList, err := c.Extensions().ReplicaSets(namespace).List(options)
  338. return rsList.Items, err
  339. })
  340. }
  341. // listReplicaSets lists all Pods the given deployment targets with the given client interface.
  342. func listPods(deployment *extensions.Deployment, c clientset.Interface) (*api.PodList, error) {
  343. return ListPods(deployment,
  344. func(namespace string, options api.ListOptions) (*api.PodList, error) {
  345. return c.Core().Pods(namespace).List(options)
  346. })
  347. }
  348. // TODO: switch this to full namespacers
  349. type rsListFunc func(string, api.ListOptions) ([]extensions.ReplicaSet, error)
  350. type podListFunc func(string, api.ListOptions) (*api.PodList, error)
  351. // ListReplicaSets returns a slice of RSes the given deployment targets.
  352. func ListReplicaSets(deployment *extensions.Deployment, getRSList rsListFunc) ([]extensions.ReplicaSet, error) {
  353. // TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector
  354. // should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830;
  355. // or use controllerRef, see https://github.com/kubernetes/kubernetes/issues/2210
  356. namespace := deployment.Namespace
  357. selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
  358. if err != nil {
  359. return nil, err
  360. }
  361. options := api.ListOptions{LabelSelector: selector}
  362. return getRSList(namespace, options)
  363. }
  364. // ListPods returns a list of pods the given deployment targets.
  365. func ListPods(deployment *extensions.Deployment, getPodList podListFunc) (*api.PodList, error) {
  366. namespace := deployment.Namespace
  367. selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
  368. if err != nil {
  369. return nil, err
  370. }
  371. options := api.ListOptions{LabelSelector: selector}
  372. return getPodList(namespace, options)
  373. }
  374. // equalIgnoreHash returns true if two given podTemplateSpec are equal, ignoring the diff in value of Labels[pod-template-hash]
  375. // We ignore pod-template-hash because the hash result would be different upon podTemplateSpec API changes
  376. // (e.g. the addition of a new field will cause the hash code to change)
  377. // Note that we assume input podTemplateSpecs contain non-empty labels
  378. func equalIgnoreHash(template1, template2 api.PodTemplateSpec) (bool, error) {
  379. // First, compare template.Labels (ignoring hash)
  380. labels1, labels2 := template1.Labels, template2.Labels
  381. // The podTemplateSpec must have a non-empty label so that label selectors can find them.
  382. // This is checked by validation (of resources contain a podTemplateSpec).
  383. if len(labels1) == 0 || len(labels2) == 0 {
  384. return false, fmt.Errorf("Unexpected empty labels found in given template")
  385. }
  386. if len(labels1) > len(labels2) {
  387. labels1, labels2 = labels2, labels1
  388. }
  389. // We make sure len(labels2) >= len(labels1)
  390. for k, v := range labels2 {
  391. if labels1[k] != v && k != extensions.DefaultDeploymentUniqueLabelKey {
  392. return false, nil
  393. }
  394. }
  395. // Then, compare the templates without comparing their labels
  396. template1.Labels, template2.Labels = nil, nil
  397. result := api.Semantic.DeepEqual(template1, template2)
  398. return result, nil
  399. }
  400. // FindNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template).
  401. func FindNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.ReplicaSet) (*extensions.ReplicaSet, error) {
  402. newRSTemplate := GetNewReplicaSetTemplate(deployment)
  403. for i := range rsList {
  404. equal, err := equalIgnoreHash(rsList[i].Spec.Template, newRSTemplate)
  405. if err != nil {
  406. return nil, err
  407. }
  408. if equal {
  409. // This is the new ReplicaSet.
  410. return &rsList[i], nil
  411. }
  412. }
  413. // new ReplicaSet does not exist.
  414. return nil, nil
  415. }
  416. // FindOldReplicaSets returns the old replica sets targeted by the given Deployment, with the given PodList and slice of RSes.
  417. // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
  418. func FindOldReplicaSets(deployment *extensions.Deployment, rsList []extensions.ReplicaSet, podList *api.PodList) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
  419. // Find all pods whose labels match deployment.Spec.Selector, and corresponding replica sets for pods in podList.
  420. // All pods and replica sets are labeled with pod-template-hash to prevent overlapping
  421. oldRSs := map[string]extensions.ReplicaSet{}
  422. allOldRSs := map[string]extensions.ReplicaSet{}
  423. newRSTemplate := GetNewReplicaSetTemplate(deployment)
  424. for _, pod := range podList.Items {
  425. podLabelsSelector := labels.Set(pod.ObjectMeta.Labels)
  426. for _, rs := range rsList {
  427. rsLabelsSelector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
  428. if err != nil {
  429. return nil, nil, fmt.Errorf("invalid label selector: %v", err)
  430. }
  431. // Filter out replica set that has the same pod template spec as the deployment - that is the new replica set.
  432. equal, err := equalIgnoreHash(rs.Spec.Template, newRSTemplate)
  433. if err != nil {
  434. return nil, nil, err
  435. }
  436. if equal {
  437. continue
  438. }
  439. allOldRSs[rs.ObjectMeta.Name] = rs
  440. if rsLabelsSelector.Matches(podLabelsSelector) {
  441. oldRSs[rs.ObjectMeta.Name] = rs
  442. }
  443. }
  444. }
  445. requiredRSs := []*extensions.ReplicaSet{}
  446. for key := range oldRSs {
  447. value := oldRSs[key]
  448. requiredRSs = append(requiredRSs, &value)
  449. }
  450. allRSs := []*extensions.ReplicaSet{}
  451. for key := range allOldRSs {
  452. value := allOldRSs[key]
  453. allRSs = append(allRSs, &value)
  454. }
  455. return requiredRSs, allRSs, nil
  456. }
  457. // WaitForReplicaSetUpdated polls the replica set until it is updated.
  458. func WaitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, namespace, name string) error {
  459. return wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
  460. rs, err := c.Extensions().ReplicaSets(namespace).Get(name)
  461. if err != nil {
  462. return false, err
  463. }
  464. return rs.Status.ObservedGeneration >= desiredGeneration, nil
  465. })
  466. }
  467. // WaitForPodsHashPopulated polls the replica set until updated and fully labeled.
  468. func WaitForPodsHashPopulated(c clientset.Interface, desiredGeneration int64, namespace, name string) error {
  469. return wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) {
  470. rs, err := c.Extensions().ReplicaSets(namespace).Get(name)
  471. if err != nil {
  472. return false, err
  473. }
  474. return rs.Status.ObservedGeneration >= desiredGeneration &&
  475. rs.Status.FullyLabeledReplicas == rs.Spec.Replicas, nil
  476. })
  477. }
  478. // LabelPodsWithHash labels all pods in the given podList with the new hash label.
  479. // The returned bool value can be used to tell if all pods are actually labeled.
  480. func LabelPodsWithHash(podList *api.PodList, rs *extensions.ReplicaSet, c clientset.Interface, namespace, hash string) (bool, error) {
  481. allPodsLabeled := true
  482. for _, pod := range podList.Items {
  483. // Only label the pod that doesn't already have the new hash
  484. if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash {
  485. if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod,
  486. func(podToUpdate *api.Pod) error {
  487. // Precondition: the pod doesn't contain the new hash in its label.
  488. if podToUpdate.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
  489. return errors.ErrPreconditionViolated
  490. }
  491. podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
  492. return nil
  493. }); err != nil {
  494. return false, fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err)
  495. } else if podUpdated {
  496. glog.V(4).Infof("Labeled %s %s/%s of %s %s/%s with hash %s.", pod.Kind, pod.Namespace, pod.Name, rs.Kind, rs.Namespace, rs.Name, hash)
  497. } else {
  498. // If the pod wasn't updated but didn't return error when we try to update it, we've hit "pod not found" or "precondition violated" error.
  499. // Then we can't say all pods are labeled
  500. allPodsLabeled = false
  501. }
  502. }
  503. }
  504. return allPodsLabeled, nil
  505. }
  506. // GetNewReplicaSetTemplate returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet.
  507. func GetNewReplicaSetTemplate(deployment *extensions.Deployment) api.PodTemplateSpec {
  508. // newRS will have the same template as in deployment spec, plus a unique label in some cases.
  509. newRSTemplate := api.PodTemplateSpec{
  510. ObjectMeta: deployment.Spec.Template.ObjectMeta,
  511. Spec: deployment.Spec.Template.Spec,
  512. }
  513. newRSTemplate.ObjectMeta.Labels = labelsutil.CloneAndAddLabel(
  514. deployment.Spec.Template.ObjectMeta.Labels,
  515. extensions.DefaultDeploymentUniqueLabelKey,
  516. podutil.GetPodTemplateSpecHash(newRSTemplate))
  517. return newRSTemplate
  518. }
  519. // SetFromReplicaSetTemplate sets the desired PodTemplateSpec from a replica set template to the given deployment.
  520. func SetFromReplicaSetTemplate(deployment *extensions.Deployment, template api.PodTemplateSpec) *extensions.Deployment {
  521. deployment.Spec.Template.ObjectMeta = template.ObjectMeta
  522. deployment.Spec.Template.Spec = template.Spec
  523. deployment.Spec.Template.ObjectMeta.Labels = labelsutil.CloneAndRemoveLabel(
  524. deployment.Spec.Template.ObjectMeta.Labels,
  525. extensions.DefaultDeploymentUniqueLabelKey)
  526. return deployment
  527. }
  528. // GetReplicaCountForReplicaSets returns the sum of Replicas of the given replica sets.
  529. func GetReplicaCountForReplicaSets(replicaSets []*extensions.ReplicaSet) int32 {
  530. totalReplicaCount := int32(0)
  531. for _, rs := range replicaSets {
  532. if rs != nil {
  533. totalReplicaCount += rs.Spec.Replicas
  534. }
  535. }
  536. return totalReplicaCount
  537. }
  538. // GetActualReplicaCountForReplicaSets returns the sum of actual replicas of the given replica sets.
  539. func GetActualReplicaCountForReplicaSets(replicaSets []*extensions.ReplicaSet) int32 {
  540. totalReplicaCount := int32(0)
  541. for _, rs := range replicaSets {
  542. if rs != nil {
  543. totalReplicaCount += rs.Status.Replicas
  544. }
  545. }
  546. return totalReplicaCount
  547. }
  548. // GetAvailablePodsForReplicaSets returns the number of available pods (listed from clientset) corresponding to the given replica sets.
  549. func GetAvailablePodsForReplicaSets(c clientset.Interface, deployment *extensions.Deployment, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) {
  550. podList, err := listPods(deployment, c)
  551. if err != nil {
  552. return 0, err
  553. }
  554. return CountAvailablePodsForReplicaSets(podList, rss, minReadySeconds)
  555. }
  556. // CountAvailablePodsForReplicaSets returns the number of available pods corresponding to the given pod list and replica sets.
  557. // Note that the input pod list should be the pods targeted by the deployment of input replica sets.
  558. func CountAvailablePodsForReplicaSets(podList *api.PodList, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) {
  559. rsPods, err := filterPodsMatchingReplicaSets(rss, podList, minReadySeconds)
  560. if err != nil {
  561. return 0, err
  562. }
  563. return countAvailablePods(rsPods, minReadySeconds), nil
  564. }
  565. // GetAvailablePodsForDeployment returns the number of available pods (listed from clientset) corresponding to the given deployment.
  566. func GetAvailablePodsForDeployment(c clientset.Interface, deployment *extensions.Deployment) (int32, error) {
  567. podList, err := listPods(deployment, c)
  568. if err != nil {
  569. return 0, err
  570. }
  571. return countAvailablePods(podList.Items, deployment.Spec.MinReadySeconds), nil
  572. }
  573. func countAvailablePods(pods []api.Pod, minReadySeconds int32) int32 {
  574. availablePodCount := int32(0)
  575. for _, pod := range pods {
  576. // TODO: Make the time.Now() as argument to allow unit test this.
  577. // FIXME: avoid using time.Now
  578. if IsPodAvailable(&pod, minReadySeconds, time.Now()) {
  579. glog.V(4).Infof("Pod %s/%s is available.", pod.Namespace, pod.Name)
  580. availablePodCount++
  581. }
  582. }
  583. return availablePodCount
  584. }
  585. // IsPodAvailable return true if the pod is available.
  586. func IsPodAvailable(pod *api.Pod, minReadySeconds int32, now time.Time) bool {
  587. if !controller.IsPodActive(pod) {
  588. return false
  589. }
  590. // Check if we've passed minReadySeconds since LastTransitionTime
  591. // If so, this pod is ready
  592. for _, c := range pod.Status.Conditions {
  593. // we only care about pod ready conditions
  594. if c.Type == api.PodReady && c.Status == api.ConditionTrue {
  595. glog.V(4).Infof("Comparing pod %s/%s ready condition last transition time %s + minReadySeconds %d with now %s.", pod.Namespace, pod.Name, c.LastTransitionTime.String(), minReadySeconds, now.String())
  596. // 2 cases that this ready condition is valid (passed minReadySeconds, i.e. the pod is available):
  597. // 1. minReadySeconds == 0, or
  598. // 2. LastTransitionTime (is set) + minReadySeconds (>0) < current time
  599. minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second
  600. if minReadySeconds == 0 || !c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now) {
  601. return true
  602. }
  603. }
  604. }
  605. return false
  606. }
  607. // filterPodsMatchingReplicaSets filters the given pod list and only return the ones targeted by the input replicasets
  608. func filterPodsMatchingReplicaSets(replicaSets []*extensions.ReplicaSet, podList *api.PodList, minReadySeconds int32) ([]api.Pod, error) {
  609. allRSPods := []api.Pod{}
  610. for _, rs := range replicaSets {
  611. matchingFunc, err := rsutil.MatchingPodsFunc(rs)
  612. if err != nil {
  613. return nil, err
  614. }
  615. if matchingFunc == nil {
  616. continue
  617. }
  618. rsPods := podutil.Filter(podList, matchingFunc)
  619. avaPodsCount := countAvailablePods(rsPods, minReadySeconds)
  620. if avaPodsCount > rs.Spec.Replicas {
  621. msg := fmt.Sprintf("Found %s/%s with %d available pods, more than its spec replicas %d", rs.Namespace, rs.Name, avaPodsCount, rs.Spec.Replicas)
  622. glog.Errorf("ERROR: %s", msg)
  623. return nil, fmt.Errorf(msg)
  624. }
  625. allRSPods = append(allRSPods, podutil.Filter(podList, matchingFunc)...)
  626. }
  627. return allRSPods, nil
  628. }
  629. // Revision returns the revision number of the input replica set
  630. func Revision(rs *extensions.ReplicaSet) (int64, error) {
  631. v, ok := rs.Annotations[RevisionAnnotation]
  632. if !ok {
  633. return 0, nil
  634. }
  635. return strconv.ParseInt(v, 10, 64)
  636. }
  637. // IsRollingUpdate returns true if the strategy type is a rolling update.
  638. func IsRollingUpdate(deployment *extensions.Deployment) bool {
  639. return deployment.Spec.Strategy.Type == extensions.RollingUpdateDeploymentStrategyType
  640. }
  641. // NewRSNewReplicas calculates the number of replicas a deployment's new RS should have.
  642. // When one of the followings is true, we're rolling out the deployment; otherwise, we're scaling it.
  643. // 1) The new RS is saturated: newRS's replicas == deployment's replicas
  644. // 2) Max number of pods allowed is reached: deployment's replicas + maxSurge == all RSs' replicas
  645. func NewRSNewReplicas(deployment *extensions.Deployment, allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) (int32, error) {
  646. switch deployment.Spec.Strategy.Type {
  647. case extensions.RollingUpdateDeploymentStrategyType:
  648. // Check if we can scale up.
  649. maxSurge, err := intstrutil.GetValueFromIntOrPercent(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(deployment.Spec.Replicas), true)
  650. if err != nil {
  651. return 0, err
  652. }
  653. // Find the total number of pods
  654. currentPodCount := GetReplicaCountForReplicaSets(allRSs)
  655. maxTotalPods := deployment.Spec.Replicas + int32(maxSurge)
  656. if currentPodCount >= maxTotalPods {
  657. // Cannot scale up.
  658. return newRS.Spec.Replicas, nil
  659. }
  660. // Scale up.
  661. scaleUpCount := maxTotalPods - currentPodCount
  662. // Do not exceed the number of desired replicas.
  663. scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(deployment.Spec.Replicas-newRS.Spec.Replicas)))
  664. return newRS.Spec.Replicas + scaleUpCount, nil
  665. case extensions.RecreateDeploymentStrategyType:
  666. return deployment.Spec.Replicas, nil
  667. default:
  668. return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
  669. }
  670. }
  671. // IsSaturated checks if the new replica set is saturated by comparing its size with its deployment size.
  672. // Both the deployment and the replica set have to believe this replica set can own all of the desired
  673. // replicas in the deployment and the annotation helps in achieving that.
  674. func IsSaturated(deployment *extensions.Deployment, rs *extensions.ReplicaSet) bool {
  675. if rs == nil {
  676. return false
  677. }
  678. desiredString := rs.Annotations[DesiredReplicasAnnotation]
  679. desired, err := strconv.Atoi(desiredString)
  680. if err != nil {
  681. return false
  682. }
  683. return rs.Spec.Replicas == deployment.Spec.Replicas && int32(desired) == deployment.Spec.Replicas
  684. }
  685. // WaitForObservedDeployment polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration.
  686. // Returns error if polling timesout.
  687. func WaitForObservedDeployment(getDeploymentFunc func() (*extensions.Deployment, error), desiredGeneration int64, interval, timeout time.Duration) error {
  688. // TODO: This should take clientset.Interface when all code is updated to use clientset. Keeping it this way allows the function to be used by callers who have client.Interface.
  689. return wait.Poll(interval, timeout, func() (bool, error) {
  690. deployment, err := getDeploymentFunc()
  691. if err != nil {
  692. return false, err
  693. }
  694. return deployment.Status.ObservedGeneration >= desiredGeneration, nil
  695. })
  696. }
  697. // ResolveFenceposts resolves both maxSurge and maxUnavailable. This needs to happen in one
  698. // step. For example:
  699. //
  700. // 2 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1), then old(-1), then new(+1)
  701. // 1 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1)
  702. // 2 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1)
  703. // 1 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1)
  704. // 2 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1)
  705. // 1 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1)
  706. func ResolveFenceposts(maxSurge, maxUnavailable *intstrutil.IntOrString, desired int32) (int32, int32, error) {
  707. surge, err := intstrutil.GetValueFromIntOrPercent(maxSurge, int(desired), true)
  708. if err != nil {
  709. return 0, 0, err
  710. }
  711. unavailable, err := intstrutil.GetValueFromIntOrPercent(maxUnavailable, int(desired), false)
  712. if err != nil {
  713. return 0, 0, err
  714. }
  715. if surge == 0 && unavailable == 0 {
  716. // Validation should never allow the user to explicitly use zero values for both maxSurge
  717. // maxUnavailable. Due to rounding down maxUnavailable though, it may resolve to zero.
  718. // If both fenceposts resolve to zero, then we should set maxUnavailable to 1 on the
  719. // theory that surge might not work due to quota.
  720. unavailable = 1
  721. }
  722. return int32(surge), int32(unavailable), nil
  723. }
  724. func DeploymentDeepCopy(deployment *extensions.Deployment) (*extensions.Deployment, error) {
  725. objCopy, err := api.Scheme.DeepCopy(deployment)
  726. if err != nil {
  727. return nil, err
  728. }
  729. copied, ok := objCopy.(*extensions.Deployment)
  730. if !ok {
  731. return nil, fmt.Errorf("expected Deployment, got %#v", objCopy)
  732. }
  733. return copied, nil
  734. }
  735. // SelectorUpdatedBefore returns true if the former deployment's selector
  736. // is updated before the latter, false otherwise
  737. func SelectorUpdatedBefore(d1, d2 *extensions.Deployment) bool {
  738. t1, t2 := LastSelectorUpdate(d1), LastSelectorUpdate(d2)
  739. return t1.Before(t2)
  740. }
  741. // LastSelectorUpdate returns the last time given deployment's selector is updated
  742. func LastSelectorUpdate(d *extensions.Deployment) unversioned.Time {
  743. t := d.Annotations[SelectorUpdateAnnotation]
  744. if len(t) > 0 {
  745. parsedTime, err := time.Parse(t, time.RFC3339)
  746. // If failed to parse the time, use creation timestamp instead
  747. if err != nil {
  748. return d.CreationTimestamp
  749. }
  750. return unversioned.Time{Time: parsedTime}
  751. }
  752. // If it's never updated, use creation timestamp instead
  753. return d.CreationTimestamp
  754. }
  755. // BySelectorLastUpdateTime sorts a list of deployments by the last update time of their selector,
  756. // first using their creation timestamp and then their names as a tie breaker.
  757. type BySelectorLastUpdateTime []extensions.Deployment
  758. func (o BySelectorLastUpdateTime) Len() int { return len(o) }
  759. func (o BySelectorLastUpdateTime) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
  760. func (o BySelectorLastUpdateTime) Less(i, j int) bool {
  761. ti, tj := LastSelectorUpdate(&o[i]), LastSelectorUpdate(&o[j])
  762. if ti.Equal(tj) {
  763. if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
  764. return o[i].Name < o[j].Name
  765. }
  766. return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
  767. }
  768. return ti.Before(tj)
  769. }