123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package util
- import (
- "fmt"
- "sort"
- "strconv"
- "time"
- "github.com/golang/glog"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/api/annotations"
- "k8s.io/kubernetes/pkg/api/unversioned"
- "k8s.io/kubernetes/pkg/apis/extensions"
- clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/labels"
- "k8s.io/kubernetes/pkg/util/errors"
- "k8s.io/kubernetes/pkg/util/integer"
- intstrutil "k8s.io/kubernetes/pkg/util/intstr"
- labelsutil "k8s.io/kubernetes/pkg/util/labels"
- podutil "k8s.io/kubernetes/pkg/util/pod"
- rsutil "k8s.io/kubernetes/pkg/util/replicaset"
- "k8s.io/kubernetes/pkg/util/wait"
- )
- const (
- // RevisionAnnotation is the revision annotation of a deployment's replica sets which records its rollout sequence
- RevisionAnnotation = "deployment.kubernetes.io/revision"
- // DesiredReplicasAnnotation is the desired replicas for a deployment recorded as an annotation
- // in its replica sets. Helps in separating scaling events from the rollout process and for
- // determining if the new replica set for a deployment is really saturated.
- DesiredReplicasAnnotation = "deployment.kubernetes.io/desired-replicas"
- // MaxReplicasAnnotation is the maximum replicas a deployment can have at a given point, which
- // is deployment.spec.replicas + maxSurge. Used by the underlying replica sets to estimate their
- // proportions in case the deployment has surge replicas.
- MaxReplicasAnnotation = "deployment.kubernetes.io/max-replicas"
- // RollbackRevisionNotFound is not found rollback event reason
- RollbackRevisionNotFound = "DeploymentRollbackRevisionNotFound"
- // RollbackTemplateUnchanged is the template unchanged rollback event reason
- RollbackTemplateUnchanged = "DeploymentRollbackTemplateUnchanged"
- // RollbackDone is the done rollback event reason
- RollbackDone = "DeploymentRollback"
- // OverlapAnnotation marks deployments with overlapping selector with other deployments
- // TODO: Delete this annotation when we gracefully handle overlapping selectors. See https://github.com/kubernetes/kubernetes/issues/2210
- OverlapAnnotation = "deployment.kubernetes.io/error-selector-overlapping-with"
- // SelectorUpdateAnnotation marks the last time deployment selector update
- // TODO: Delete this annotation when we gracefully handle overlapping selectors. See https://github.com/kubernetes/kubernetes/issues/2210
- SelectorUpdateAnnotation = "deployment.kubernetes.io/selector-updated-at"
- )
- // MaxRevision finds the highest revision in the replica sets
- func MaxRevision(allRSs []*extensions.ReplicaSet) int64 {
- max := int64(0)
- for _, rs := range allRSs {
- if v, err := Revision(rs); err != nil {
- // Skip the replica sets when it failed to parse their revision information
- glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs)
- } else if v > max {
- max = v
- }
- }
- return max
- }
- // LastRevision finds the second max revision number in all replica sets (the last revision)
- func LastRevision(allRSs []*extensions.ReplicaSet) int64 {
- max, secMax := int64(0), int64(0)
- for _, rs := range allRSs {
- if v, err := Revision(rs); err != nil {
- // Skip the replica sets when it failed to parse their revision information
- glog.V(4).Infof("Error: %v. Couldn't parse revision for replica set %#v, deployment controller will skip it when reconciling revisions.", err, rs)
- } else if v >= max {
- secMax = max
- max = v
- } else if v > secMax {
- secMax = v
- }
- }
- return secMax
- }
- // SetNewReplicaSetAnnotations sets new replica set's annotations appropriately by updating its revision and
- // copying required deployment annotations to it; it returns true if replica set's annotation is changed.
- func SetNewReplicaSetAnnotations(deployment *extensions.Deployment, newRS *extensions.ReplicaSet, newRevision string, exists bool) bool {
- // First, copy deployment's annotations (except for apply and revision annotations)
- annotationChanged := copyDeploymentAnnotationsToReplicaSet(deployment, newRS)
- // Then, update replica set's revision annotation
- if newRS.Annotations == nil {
- newRS.Annotations = make(map[string]string)
- }
- // The newRS's revision should be the greatest among all RSes. Usually, its revision number is newRevision (the max revision number
- // of all old RSes + 1). However, it's possible that some of the old RSes are deleted after the newRS revision being updated, and
- // newRevision becomes smaller than newRS's revision. We should only update newRS revision when it's smaller than newRevision.
- if newRS.Annotations[RevisionAnnotation] < newRevision {
- newRS.Annotations[RevisionAnnotation] = newRevision
- annotationChanged = true
- glog.V(4).Infof("Updating replica set %q revision to %s", newRS.Name, newRevision)
- }
- if !exists && SetReplicasAnnotations(newRS, deployment.Spec.Replicas, deployment.Spec.Replicas+MaxSurge(*deployment)) {
- annotationChanged = true
- }
- return annotationChanged
- }
- var annotationsToSkip = map[string]bool{
- annotations.LastAppliedConfigAnnotation: true,
- RevisionAnnotation: true,
- DesiredReplicasAnnotation: true,
- MaxReplicasAnnotation: true,
- OverlapAnnotation: true,
- SelectorUpdateAnnotation: true,
- }
- // skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key
- // TODO: How to decide which annotations should / should not be copied?
- // See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615
- func skipCopyAnnotation(key string) bool {
- return annotationsToSkip[key]
- }
- // copyDeploymentAnnotationsToReplicaSet copies deployment's annotations to replica set's annotations,
- // and returns true if replica set's annotation is changed.
- // Note that apply and revision annotations are not copied.
- func copyDeploymentAnnotationsToReplicaSet(deployment *extensions.Deployment, rs *extensions.ReplicaSet) bool {
- rsAnnotationsChanged := false
- if rs.Annotations == nil {
- rs.Annotations = make(map[string]string)
- }
- for k, v := range deployment.Annotations {
- // newRS revision is updated automatically in getNewReplicaSet, and the deployment's revision number is then updated
- // by copying its newRS revision number. We should not copy deployment's revision to its newRS, since the update of
- // deployment revision number may fail (revision becomes stale) and the revision number in newRS is more reliable.
- if skipCopyAnnotation(k) || rs.Annotations[k] == v {
- continue
- }
- rs.Annotations[k] = v
- rsAnnotationsChanged = true
- }
- return rsAnnotationsChanged
- }
- // SetDeploymentAnnotationsTo sets deployment's annotations as given RS's annotations.
- // This action should be done if and only if the deployment is rolling back to this rs.
- // Note that apply and revision annotations are not changed.
- func SetDeploymentAnnotationsTo(deployment *extensions.Deployment, rollbackToRS *extensions.ReplicaSet) {
- deployment.Annotations = getSkippedAnnotations(deployment.Annotations)
- for k, v := range rollbackToRS.Annotations {
- if !skipCopyAnnotation(k) {
- deployment.Annotations[k] = v
- }
- }
- }
- func getSkippedAnnotations(annotations map[string]string) map[string]string {
- skippedAnnotations := make(map[string]string)
- for k, v := range annotations {
- if skipCopyAnnotation(k) {
- skippedAnnotations[k] = v
- }
- }
- return skippedAnnotations
- }
- // FindActiveOrLatest returns the only active or the latest replica set in case there is at most one active
- // replica set. If there are more active replica sets, then we should proportionally scale them.
- func FindActiveOrLatest(newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet) *extensions.ReplicaSet {
- if newRS == nil && len(oldRSs) == 0 {
- return nil
- }
- sort.Sort(sort.Reverse(controller.ReplicaSetsByCreationTimestamp(oldRSs)))
- allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
- switch len(allRSs) {
- case 0:
- // If there is no active replica set then we should return the newest.
- if newRS != nil {
- return newRS
- }
- return oldRSs[0]
- case 1:
- return allRSs[0]
- default:
- return nil
- }
- }
- // GetDesiredReplicasAnnotation returns the number of desired replicas
- func GetDesiredReplicasAnnotation(rs *extensions.ReplicaSet) (int32, bool) {
- return getIntFromAnnotation(rs, DesiredReplicasAnnotation)
- }
- func getMaxReplicasAnnotation(rs *extensions.ReplicaSet) (int32, bool) {
- return getIntFromAnnotation(rs, MaxReplicasAnnotation)
- }
- func getIntFromAnnotation(rs *extensions.ReplicaSet, annotationKey string) (int32, bool) {
- annotationValue, ok := rs.Annotations[annotationKey]
- if !ok {
- return int32(0), false
- }
- intValue, err := strconv.Atoi(annotationValue)
- if err != nil {
- glog.Warningf("Cannot convert the value %q with annotation key %q for the replica set %q",
- annotationValue, annotationKey, rs.Name)
- return int32(0), false
- }
- return int32(intValue), true
- }
- // SetReplicasAnnotations sets the desiredReplicas and maxReplicas into the annotations
- func SetReplicasAnnotations(rs *extensions.ReplicaSet, desiredReplicas, maxReplicas int32) bool {
- updated := false
- if rs.Annotations == nil {
- rs.Annotations = make(map[string]string)
- }
- desiredString := fmt.Sprintf("%d", desiredReplicas)
- if hasString := rs.Annotations[DesiredReplicasAnnotation]; hasString != desiredString {
- rs.Annotations[DesiredReplicasAnnotation] = desiredString
- updated = true
- }
- maxString := fmt.Sprintf("%d", maxReplicas)
- if hasString := rs.Annotations[MaxReplicasAnnotation]; hasString != maxString {
- rs.Annotations[MaxReplicasAnnotation] = maxString
- updated = true
- }
- return updated
- }
- // MaxUnavailable returns the maximum unavailable pods a rolling deployment can take.
- func MaxUnavailable(deployment extensions.Deployment) int32 {
- if !IsRollingUpdate(&deployment) {
- return int32(0)
- }
- // Error caught by validation
- _, maxUnavailable, _ := ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas)
- return maxUnavailable
- }
- // MinAvailable returns the minimum vailable pods of a given deployment
- func MinAvailable(deployment *extensions.Deployment) int32 {
- if !IsRollingUpdate(deployment) {
- return int32(0)
- }
- return deployment.Spec.Replicas - MaxUnavailable(*deployment)
- }
- // MaxSurge returns the maximum surge pods a rolling deployment can take.
- func MaxSurge(deployment extensions.Deployment) int32 {
- if !IsRollingUpdate(&deployment) {
- return int32(0)
- }
- // Error caught by validation
- maxSurge, _, _ := ResolveFenceposts(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, &deployment.Spec.Strategy.RollingUpdate.MaxUnavailable, deployment.Spec.Replicas)
- return maxSurge
- }
- // GetProportion will estimate the proportion for the provided replica set using 1. the current size
- // of the parent deployment, 2. the replica count that needs be added on the replica sets of the
- // deployment, and 3. the total replicas added in the replica sets of the deployment so far.
- func GetProportion(rs *extensions.ReplicaSet, d extensions.Deployment, deploymentReplicasToAdd, deploymentReplicasAdded int32) int32 {
- if rs == nil || rs.Spec.Replicas == 0 || deploymentReplicasToAdd == 0 || deploymentReplicasToAdd == deploymentReplicasAdded {
- return int32(0)
- }
- rsFraction := getReplicaSetFraction(*rs, d)
- allowed := deploymentReplicasToAdd - deploymentReplicasAdded
- if deploymentReplicasToAdd > 0 {
- // Use the minimum between the replica set fraction and the maximum allowed replicas
- // when scaling up. This way we ensure we will not scale up more than the allowed
- // replicas we can add.
- return integer.Int32Min(rsFraction, allowed)
- }
- // Use the maximum between the replica set fraction and the maximum allowed replicas
- // when scaling down. This way we ensure we will not scale down more than the allowed
- // replicas we can remove.
- return integer.Int32Max(rsFraction, allowed)
- }
- // getReplicaSetFraction estimates the fraction of replicas a replica set can have in
- // 1. a scaling event during a rollout or 2. when scaling a paused deployment.
- func getReplicaSetFraction(rs extensions.ReplicaSet, d extensions.Deployment) int32 {
- // If we are scaling down to zero then the fraction of this replica set is its whole size (negative)
- if d.Spec.Replicas == int32(0) {
- return -rs.Spec.Replicas
- }
- deploymentReplicas := d.Spec.Replicas + MaxSurge(d)
- annotatedReplicas, ok := getMaxReplicasAnnotation(&rs)
- if !ok {
- // If we cannot find the annotation then fallback to the current deployment size. Note that this
- // will not be an accurate proportion estimation in case other replica sets have different values
- // which means that the deployment was scaled at some point but we at least will stay in limits
- // due to the min-max comparisons in getProportion.
- annotatedReplicas = d.Status.Replicas
- }
- // We should never proportionally scale up from zero which means rs.spec.replicas and annotatedReplicas
- // will never be zero here.
- newRSsize := (float64(rs.Spec.Replicas * deploymentReplicas)) / float64(annotatedReplicas)
- return integer.RoundToInt32(newRSsize) - rs.Spec.Replicas
- }
- // GetAllReplicaSets returns the old and new replica sets targeted by the given Deployment. It gets PodList and ReplicaSetList from client interface.
- // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
- // The third returned value is the new replica set, and it may be nil if it doesn't exist yet.
- func GetAllReplicaSets(deployment *extensions.Deployment, c clientset.Interface) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, *extensions.ReplicaSet, error) {
- rsList, err := listReplicaSets(deployment, c)
- if err != nil {
- return nil, nil, nil, err
- }
- podList, err := listPods(deployment, c)
- if err != nil {
- return nil, nil, nil, err
- }
- oldRSes, allOldRSes, err := FindOldReplicaSets(deployment, rsList, podList)
- if err != nil {
- return nil, nil, nil, err
- }
- newRS, err := FindNewReplicaSet(deployment, rsList)
- if err != nil {
- return nil, nil, nil, err
- }
- return oldRSes, allOldRSes, newRS, nil
- }
- // GetOldReplicaSets returns the old replica sets targeted by the given Deployment; get PodList and ReplicaSetList from client interface.
- // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
- func GetOldReplicaSets(deployment *extensions.Deployment, c clientset.Interface) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
- rsList, err := listReplicaSets(deployment, c)
- if err != nil {
- return nil, nil, err
- }
- podList, err := listPods(deployment, c)
- if err != nil {
- return nil, nil, err
- }
- return FindOldReplicaSets(deployment, rsList, podList)
- }
- // GetNewReplicaSet returns a replica set that matches the intent of the given deployment; get ReplicaSetList from client interface.
- // Returns nil if the new replica set doesn't exist yet.
- func GetNewReplicaSet(deployment *extensions.Deployment, c clientset.Interface) (*extensions.ReplicaSet, error) {
- rsList, err := listReplicaSets(deployment, c)
- if err != nil {
- return nil, err
- }
- return FindNewReplicaSet(deployment, rsList)
- }
- // listReplicaSets lists all RSes the given deployment targets with the given client interface.
- func listReplicaSets(deployment *extensions.Deployment, c clientset.Interface) ([]extensions.ReplicaSet, error) {
- return ListReplicaSets(deployment,
- func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
- rsList, err := c.Extensions().ReplicaSets(namespace).List(options)
- return rsList.Items, err
- })
- }
- // listReplicaSets lists all Pods the given deployment targets with the given client interface.
- func listPods(deployment *extensions.Deployment, c clientset.Interface) (*api.PodList, error) {
- return ListPods(deployment,
- func(namespace string, options api.ListOptions) (*api.PodList, error) {
- return c.Core().Pods(namespace).List(options)
- })
- }
- // TODO: switch this to full namespacers
- type rsListFunc func(string, api.ListOptions) ([]extensions.ReplicaSet, error)
- type podListFunc func(string, api.ListOptions) (*api.PodList, error)
- // ListReplicaSets returns a slice of RSes the given deployment targets.
- func ListReplicaSets(deployment *extensions.Deployment, getRSList rsListFunc) ([]extensions.ReplicaSet, error) {
- // TODO: Right now we list replica sets by their labels. We should list them by selector, i.e. the replica set's selector
- // should be a superset of the deployment's selector, see https://github.com/kubernetes/kubernetes/issues/19830;
- // or use controllerRef, see https://github.com/kubernetes/kubernetes/issues/2210
- namespace := deployment.Namespace
- selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
- if err != nil {
- return nil, err
- }
- options := api.ListOptions{LabelSelector: selector}
- return getRSList(namespace, options)
- }
- // ListPods returns a list of pods the given deployment targets.
- func ListPods(deployment *extensions.Deployment, getPodList podListFunc) (*api.PodList, error) {
- namespace := deployment.Namespace
- selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
- if err != nil {
- return nil, err
- }
- options := api.ListOptions{LabelSelector: selector}
- return getPodList(namespace, options)
- }
- // equalIgnoreHash returns true if two given podTemplateSpec are equal, ignoring the diff in value of Labels[pod-template-hash]
- // We ignore pod-template-hash because the hash result would be different upon podTemplateSpec API changes
- // (e.g. the addition of a new field will cause the hash code to change)
- // Note that we assume input podTemplateSpecs contain non-empty labels
- func equalIgnoreHash(template1, template2 api.PodTemplateSpec) (bool, error) {
- // First, compare template.Labels (ignoring hash)
- labels1, labels2 := template1.Labels, template2.Labels
- // The podTemplateSpec must have a non-empty label so that label selectors can find them.
- // This is checked by validation (of resources contain a podTemplateSpec).
- if len(labels1) == 0 || len(labels2) == 0 {
- return false, fmt.Errorf("Unexpected empty labels found in given template")
- }
- if len(labels1) > len(labels2) {
- labels1, labels2 = labels2, labels1
- }
- // We make sure len(labels2) >= len(labels1)
- for k, v := range labels2 {
- if labels1[k] != v && k != extensions.DefaultDeploymentUniqueLabelKey {
- return false, nil
- }
- }
- // Then, compare the templates without comparing their labels
- template1.Labels, template2.Labels = nil, nil
- result := api.Semantic.DeepEqual(template1, template2)
- return result, nil
- }
- // FindNewReplicaSet returns the new RS this given deployment targets (the one with the same pod template).
- func FindNewReplicaSet(deployment *extensions.Deployment, rsList []extensions.ReplicaSet) (*extensions.ReplicaSet, error) {
- newRSTemplate := GetNewReplicaSetTemplate(deployment)
- for i := range rsList {
- equal, err := equalIgnoreHash(rsList[i].Spec.Template, newRSTemplate)
- if err != nil {
- return nil, err
- }
- if equal {
- // This is the new ReplicaSet.
- return &rsList[i], nil
- }
- }
- // new ReplicaSet does not exist.
- return nil, nil
- }
- // FindOldReplicaSets returns the old replica sets targeted by the given Deployment, with the given PodList and slice of RSes.
- // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
- func FindOldReplicaSets(deployment *extensions.Deployment, rsList []extensions.ReplicaSet, podList *api.PodList) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
- // Find all pods whose labels match deployment.Spec.Selector, and corresponding replica sets for pods in podList.
- // All pods and replica sets are labeled with pod-template-hash to prevent overlapping
- oldRSs := map[string]extensions.ReplicaSet{}
- allOldRSs := map[string]extensions.ReplicaSet{}
- newRSTemplate := GetNewReplicaSetTemplate(deployment)
- for _, pod := range podList.Items {
- podLabelsSelector := labels.Set(pod.ObjectMeta.Labels)
- for _, rs := range rsList {
- rsLabelsSelector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
- if err != nil {
- return nil, nil, fmt.Errorf("invalid label selector: %v", err)
- }
- // Filter out replica set that has the same pod template spec as the deployment - that is the new replica set.
- equal, err := equalIgnoreHash(rs.Spec.Template, newRSTemplate)
- if err != nil {
- return nil, nil, err
- }
- if equal {
- continue
- }
- allOldRSs[rs.ObjectMeta.Name] = rs
- if rsLabelsSelector.Matches(podLabelsSelector) {
- oldRSs[rs.ObjectMeta.Name] = rs
- }
- }
- }
- requiredRSs := []*extensions.ReplicaSet{}
- for key := range oldRSs {
- value := oldRSs[key]
- requiredRSs = append(requiredRSs, &value)
- }
- allRSs := []*extensions.ReplicaSet{}
- for key := range allOldRSs {
- value := allOldRSs[key]
- allRSs = append(allRSs, &value)
- }
- return requiredRSs, allRSs, nil
- }
- // WaitForReplicaSetUpdated polls the replica set until it is updated.
- func WaitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, namespace, name string) error {
- return wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
- rs, err := c.Extensions().ReplicaSets(namespace).Get(name)
- if err != nil {
- return false, err
- }
- return rs.Status.ObservedGeneration >= desiredGeneration, nil
- })
- }
- // WaitForPodsHashPopulated polls the replica set until updated and fully labeled.
- func WaitForPodsHashPopulated(c clientset.Interface, desiredGeneration int64, namespace, name string) error {
- return wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) {
- rs, err := c.Extensions().ReplicaSets(namespace).Get(name)
- if err != nil {
- return false, err
- }
- return rs.Status.ObservedGeneration >= desiredGeneration &&
- rs.Status.FullyLabeledReplicas == rs.Spec.Replicas, nil
- })
- }
- // LabelPodsWithHash labels all pods in the given podList with the new hash label.
- // The returned bool value can be used to tell if all pods are actually labeled.
- func LabelPodsWithHash(podList *api.PodList, rs *extensions.ReplicaSet, c clientset.Interface, namespace, hash string) (bool, error) {
- allPodsLabeled := true
- for _, pod := range podList.Items {
- // Only label the pod that doesn't already have the new hash
- if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash {
- if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod,
- func(podToUpdate *api.Pod) error {
- // Precondition: the pod doesn't contain the new hash in its label.
- if podToUpdate.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash {
- return errors.ErrPreconditionViolated
- }
- podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
- return nil
- }); err != nil {
- return false, fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err)
- } else if podUpdated {
- glog.V(4).Infof("Labeled %s %s/%s of %s %s/%s with hash %s.", pod.Kind, pod.Namespace, pod.Name, rs.Kind, rs.Namespace, rs.Name, hash)
- } else {
- // If the pod wasn't updated but didn't return error when we try to update it, we've hit "pod not found" or "precondition violated" error.
- // Then we can't say all pods are labeled
- allPodsLabeled = false
- }
- }
- }
- return allPodsLabeled, nil
- }
- // GetNewReplicaSetTemplate returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet.
- func GetNewReplicaSetTemplate(deployment *extensions.Deployment) api.PodTemplateSpec {
- // newRS will have the same template as in deployment spec, plus a unique label in some cases.
- newRSTemplate := api.PodTemplateSpec{
- ObjectMeta: deployment.Spec.Template.ObjectMeta,
- Spec: deployment.Spec.Template.Spec,
- }
- newRSTemplate.ObjectMeta.Labels = labelsutil.CloneAndAddLabel(
- deployment.Spec.Template.ObjectMeta.Labels,
- extensions.DefaultDeploymentUniqueLabelKey,
- podutil.GetPodTemplateSpecHash(newRSTemplate))
- return newRSTemplate
- }
- // SetFromReplicaSetTemplate sets the desired PodTemplateSpec from a replica set template to the given deployment.
- func SetFromReplicaSetTemplate(deployment *extensions.Deployment, template api.PodTemplateSpec) *extensions.Deployment {
- deployment.Spec.Template.ObjectMeta = template.ObjectMeta
- deployment.Spec.Template.Spec = template.Spec
- deployment.Spec.Template.ObjectMeta.Labels = labelsutil.CloneAndRemoveLabel(
- deployment.Spec.Template.ObjectMeta.Labels,
- extensions.DefaultDeploymentUniqueLabelKey)
- return deployment
- }
- // GetReplicaCountForReplicaSets returns the sum of Replicas of the given replica sets.
- func GetReplicaCountForReplicaSets(replicaSets []*extensions.ReplicaSet) int32 {
- totalReplicaCount := int32(0)
- for _, rs := range replicaSets {
- if rs != nil {
- totalReplicaCount += rs.Spec.Replicas
- }
- }
- return totalReplicaCount
- }
- // GetActualReplicaCountForReplicaSets returns the sum of actual replicas of the given replica sets.
- func GetActualReplicaCountForReplicaSets(replicaSets []*extensions.ReplicaSet) int32 {
- totalReplicaCount := int32(0)
- for _, rs := range replicaSets {
- if rs != nil {
- totalReplicaCount += rs.Status.Replicas
- }
- }
- return totalReplicaCount
- }
- // GetAvailablePodsForReplicaSets returns the number of available pods (listed from clientset) corresponding to the given replica sets.
- func GetAvailablePodsForReplicaSets(c clientset.Interface, deployment *extensions.Deployment, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) {
- podList, err := listPods(deployment, c)
- if err != nil {
- return 0, err
- }
- return CountAvailablePodsForReplicaSets(podList, rss, minReadySeconds)
- }
- // CountAvailablePodsForReplicaSets returns the number of available pods corresponding to the given pod list and replica sets.
- // Note that the input pod list should be the pods targeted by the deployment of input replica sets.
- func CountAvailablePodsForReplicaSets(podList *api.PodList, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) {
- rsPods, err := filterPodsMatchingReplicaSets(rss, podList, minReadySeconds)
- if err != nil {
- return 0, err
- }
- return countAvailablePods(rsPods, minReadySeconds), nil
- }
- // GetAvailablePodsForDeployment returns the number of available pods (listed from clientset) corresponding to the given deployment.
- func GetAvailablePodsForDeployment(c clientset.Interface, deployment *extensions.Deployment) (int32, error) {
- podList, err := listPods(deployment, c)
- if err != nil {
- return 0, err
- }
- return countAvailablePods(podList.Items, deployment.Spec.MinReadySeconds), nil
- }
- func countAvailablePods(pods []api.Pod, minReadySeconds int32) int32 {
- availablePodCount := int32(0)
- for _, pod := range pods {
- // TODO: Make the time.Now() as argument to allow unit test this.
- // FIXME: avoid using time.Now
- if IsPodAvailable(&pod, minReadySeconds, time.Now()) {
- glog.V(4).Infof("Pod %s/%s is available.", pod.Namespace, pod.Name)
- availablePodCount++
- }
- }
- return availablePodCount
- }
- // IsPodAvailable return true if the pod is available.
- func IsPodAvailable(pod *api.Pod, minReadySeconds int32, now time.Time) bool {
- if !controller.IsPodActive(pod) {
- return false
- }
- // Check if we've passed minReadySeconds since LastTransitionTime
- // If so, this pod is ready
- for _, c := range pod.Status.Conditions {
- // we only care about pod ready conditions
- if c.Type == api.PodReady && c.Status == api.ConditionTrue {
- glog.V(4).Infof("Comparing pod %s/%s ready condition last transition time %s + minReadySeconds %d with now %s.", pod.Namespace, pod.Name, c.LastTransitionTime.String(), minReadySeconds, now.String())
- // 2 cases that this ready condition is valid (passed minReadySeconds, i.e. the pod is available):
- // 1. minReadySeconds == 0, or
- // 2. LastTransitionTime (is set) + minReadySeconds (>0) < current time
- minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second
- if minReadySeconds == 0 || !c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now) {
- return true
- }
- }
- }
- return false
- }
- // filterPodsMatchingReplicaSets filters the given pod list and only return the ones targeted by the input replicasets
- func filterPodsMatchingReplicaSets(replicaSets []*extensions.ReplicaSet, podList *api.PodList, minReadySeconds int32) ([]api.Pod, error) {
- allRSPods := []api.Pod{}
- for _, rs := range replicaSets {
- matchingFunc, err := rsutil.MatchingPodsFunc(rs)
- if err != nil {
- return nil, err
- }
- if matchingFunc == nil {
- continue
- }
- rsPods := podutil.Filter(podList, matchingFunc)
- avaPodsCount := countAvailablePods(rsPods, minReadySeconds)
- if avaPodsCount > rs.Spec.Replicas {
- msg := fmt.Sprintf("Found %s/%s with %d available pods, more than its spec replicas %d", rs.Namespace, rs.Name, avaPodsCount, rs.Spec.Replicas)
- glog.Errorf("ERROR: %s", msg)
- return nil, fmt.Errorf(msg)
- }
- allRSPods = append(allRSPods, podutil.Filter(podList, matchingFunc)...)
- }
- return allRSPods, nil
- }
- // Revision returns the revision number of the input replica set
- func Revision(rs *extensions.ReplicaSet) (int64, error) {
- v, ok := rs.Annotations[RevisionAnnotation]
- if !ok {
- return 0, nil
- }
- return strconv.ParseInt(v, 10, 64)
- }
- // IsRollingUpdate returns true if the strategy type is a rolling update.
- func IsRollingUpdate(deployment *extensions.Deployment) bool {
- return deployment.Spec.Strategy.Type == extensions.RollingUpdateDeploymentStrategyType
- }
- // NewRSNewReplicas calculates the number of replicas a deployment's new RS should have.
- // When one of the followings is true, we're rolling out the deployment; otherwise, we're scaling it.
- // 1) The new RS is saturated: newRS's replicas == deployment's replicas
- // 2) Max number of pods allowed is reached: deployment's replicas + maxSurge == all RSs' replicas
- func NewRSNewReplicas(deployment *extensions.Deployment, allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) (int32, error) {
- switch deployment.Spec.Strategy.Type {
- case extensions.RollingUpdateDeploymentStrategyType:
- // Check if we can scale up.
- maxSurge, err := intstrutil.GetValueFromIntOrPercent(&deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(deployment.Spec.Replicas), true)
- if err != nil {
- return 0, err
- }
- // Find the total number of pods
- currentPodCount := GetReplicaCountForReplicaSets(allRSs)
- maxTotalPods := deployment.Spec.Replicas + int32(maxSurge)
- if currentPodCount >= maxTotalPods {
- // Cannot scale up.
- return newRS.Spec.Replicas, nil
- }
- // Scale up.
- scaleUpCount := maxTotalPods - currentPodCount
- // Do not exceed the number of desired replicas.
- scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(deployment.Spec.Replicas-newRS.Spec.Replicas)))
- return newRS.Spec.Replicas + scaleUpCount, nil
- case extensions.RecreateDeploymentStrategyType:
- return deployment.Spec.Replicas, nil
- default:
- return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
- }
- }
- // IsSaturated checks if the new replica set is saturated by comparing its size with its deployment size.
- // Both the deployment and the replica set have to believe this replica set can own all of the desired
- // replicas in the deployment and the annotation helps in achieving that.
- func IsSaturated(deployment *extensions.Deployment, rs *extensions.ReplicaSet) bool {
- if rs == nil {
- return false
- }
- desiredString := rs.Annotations[DesiredReplicasAnnotation]
- desired, err := strconv.Atoi(desiredString)
- if err != nil {
- return false
- }
- return rs.Spec.Replicas == deployment.Spec.Replicas && int32(desired) == deployment.Spec.Replicas
- }
- // WaitForObservedDeployment polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration.
- // Returns error if polling timesout.
- func WaitForObservedDeployment(getDeploymentFunc func() (*extensions.Deployment, error), desiredGeneration int64, interval, timeout time.Duration) error {
- // TODO: This should take clientset.Interface when all code is updated to use clientset. Keeping it this way allows the function to be used by callers who have client.Interface.
- return wait.Poll(interval, timeout, func() (bool, error) {
- deployment, err := getDeploymentFunc()
- if err != nil {
- return false, err
- }
- return deployment.Status.ObservedGeneration >= desiredGeneration, nil
- })
- }
- // ResolveFenceposts resolves both maxSurge and maxUnavailable. This needs to happen in one
- // step. For example:
- //
- // 2 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1), then old(-1), then new(+1)
- // 1 desired, max unavailable 1%, surge 0% - should scale old(-1), then new(+1)
- // 2 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1)
- // 1 desired, max unavailable 25%, surge 1% - should scale new(+1), then old(-1)
- // 2 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1), then new(+1), then old(-1)
- // 1 desired, max unavailable 0%, surge 1% - should scale new(+1), then old(-1)
- func ResolveFenceposts(maxSurge, maxUnavailable *intstrutil.IntOrString, desired int32) (int32, int32, error) {
- surge, err := intstrutil.GetValueFromIntOrPercent(maxSurge, int(desired), true)
- if err != nil {
- return 0, 0, err
- }
- unavailable, err := intstrutil.GetValueFromIntOrPercent(maxUnavailable, int(desired), false)
- if err != nil {
- return 0, 0, err
- }
- if surge == 0 && unavailable == 0 {
- // Validation should never allow the user to explicitly use zero values for both maxSurge
- // maxUnavailable. Due to rounding down maxUnavailable though, it may resolve to zero.
- // If both fenceposts resolve to zero, then we should set maxUnavailable to 1 on the
- // theory that surge might not work due to quota.
- unavailable = 1
- }
- return int32(surge), int32(unavailable), nil
- }
- func DeploymentDeepCopy(deployment *extensions.Deployment) (*extensions.Deployment, error) {
- objCopy, err := api.Scheme.DeepCopy(deployment)
- if err != nil {
- return nil, err
- }
- copied, ok := objCopy.(*extensions.Deployment)
- if !ok {
- return nil, fmt.Errorf("expected Deployment, got %#v", objCopy)
- }
- return copied, nil
- }
- // SelectorUpdatedBefore returns true if the former deployment's selector
- // is updated before the latter, false otherwise
- func SelectorUpdatedBefore(d1, d2 *extensions.Deployment) bool {
- t1, t2 := LastSelectorUpdate(d1), LastSelectorUpdate(d2)
- return t1.Before(t2)
- }
- // LastSelectorUpdate returns the last time given deployment's selector is updated
- func LastSelectorUpdate(d *extensions.Deployment) unversioned.Time {
- t := d.Annotations[SelectorUpdateAnnotation]
- if len(t) > 0 {
- parsedTime, err := time.Parse(t, time.RFC3339)
- // If failed to parse the time, use creation timestamp instead
- if err != nil {
- return d.CreationTimestamp
- }
- return unversioned.Time{Time: parsedTime}
- }
- // If it's never updated, use creation timestamp instead
- return d.CreationTimestamp
- }
- // BySelectorLastUpdateTime sorts a list of deployments by the last update time of their selector,
- // first using their creation timestamp and then their names as a tie breaker.
- type BySelectorLastUpdateTime []extensions.Deployment
- func (o BySelectorLastUpdateTime) Len() int { return len(o) }
- func (o BySelectorLastUpdateTime) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
- func (o BySelectorLastUpdateTime) Less(i, j int) bool {
- ti, tj := LastSelectorUpdate(&o[i]), LastSelectorUpdate(&o[j])
- if ti.Equal(tj) {
- if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
- return o[i].Name < o[j].Name
- }
- return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
- }
- return ti.Before(tj)
- }
|