123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package volumemanager
- import (
- "fmt"
- "strconv"
- "time"
- "github.com/golang/glog"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
- "k8s.io/kubernetes/pkg/client/record"
- "k8s.io/kubernetes/pkg/kubelet/config"
- "k8s.io/kubernetes/pkg/kubelet/container"
- kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
- "k8s.io/kubernetes/pkg/kubelet/pod"
- "k8s.io/kubernetes/pkg/kubelet/util/format"
- "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
- "k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
- "k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler"
- "k8s.io/kubernetes/pkg/util/mount"
- "k8s.io/kubernetes/pkg/util/runtime"
- "k8s.io/kubernetes/pkg/util/sets"
- "k8s.io/kubernetes/pkg/util/wait"
- "k8s.io/kubernetes/pkg/volume"
- "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
- "k8s.io/kubernetes/pkg/volume/util/types"
- "k8s.io/kubernetes/pkg/volume/util/volumehelper"
- )
- const (
- // reconcilerLoopSleepPeriod is the amount of time the reconciler loop waits
- // between successive executions
- reconcilerLoopSleepPeriod time.Duration = 100 * time.Millisecond
- // reconcilerReconstructSleepPeriod is the amount of time the reconciler reconstruct process
- // waits between successive executions
- reconcilerReconstructSleepPeriod time.Duration = 3 * time.Minute
- // desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
- // DesiredStateOfWorldPopulator loop waits between successive executions
- desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 100 * time.Millisecond
- // desiredStateOfWorldPopulatorGetPodStatusRetryDuration is the amount of
- // time the DesiredStateOfWorldPopulator loop waits between successive pod
- // cleanup calls (to prevent calling containerruntime.GetPodStatus too
- // frequently).
- desiredStateOfWorldPopulatorGetPodStatusRetryDuration time.Duration = 2 * time.Second
- // podAttachAndMountTimeout is the maximum amount of time the
- // WaitForAttachAndMount call will wait for all volumes in the specified pod
- // to be attached and mounted. Even though cloud operations can take several
- // minutes to complete, we set the timeout to 2 minutes because kubelet
- // will retry in the next sync iteration. This frees the associated
- // goroutine of the pod to process newer updates if needed (e.g., a delete
- // request to the pod).
- podAttachAndMountTimeout time.Duration = 2 * time.Minute
- // podAttachAndMountRetryInterval is the amount of time the GetVolumesForPod
- // call waits before retrying
- podAttachAndMountRetryInterval time.Duration = 300 * time.Millisecond
- // waitForAttachTimeout is the maximum amount of time a
- // operationexecutor.Mount call will wait for a volume to be attached.
- // Set to 10 minutes because we've seen attach operations take several
- // minutes to complete for some volume plugins in some cases. While this
- // operation is waiting it only blocks other operations on the same device,
- // other devices are not affected.
- waitForAttachTimeout time.Duration = 10 * time.Minute
- // reconcilerStartGracePeriod is the maximum amount of time volume manager
- // can wait to start reconciler
- reconcilerStartGracePeriod time.Duration = 60 * time.Second
- )
- // VolumeManager runs a set of asynchronous loops that figure out which volumes
- // need to be attached/mounted/unmounted/detached based on the pods scheduled on
- // this node and makes it so.
- type VolumeManager interface {
- // Starts the volume manager and all the asynchronous loops that it controls
- Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
- // WaitForAttachAndMount processes the volumes referenced in the specified
- // pod and blocks until they are all attached and mounted (reflected in
- // actual state of the world).
- // An error is returned if all volumes are not attached and mounted within
- // the duration defined in podAttachAndMountTimeout.
- WaitForAttachAndMount(pod *api.Pod) error
- // GetMountedVolumesForPod returns a VolumeMap containing the volumes
- // referenced by the specified pod that are successfully attached and
- // mounted. The key in the map is the OuterVolumeSpecName (i.e.
- // pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
- // volumes.
- GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
- // GetExtraSupplementalGroupsForPod returns a list of the extra
- // supplemental groups for the Pod. These extra supplemental groups come
- // from annotations on persistent volumes that the pod depends on.
- GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64
- // Returns a list of all volumes that implement the volume.Attacher
- // interface and are currently in use according to the actual and desired
- // state of the world caches. A volume is considered "in use" as soon as it
- // is added to the desired state of world, indicating it *should* be
- // attached to this node and remains "in use" until it is removed from both
- // the desired state of the world and the actual state of the world, or it
- // has been unmounted (as indicated in actual state of world).
- // TODO(#27653): VolumesInUse should be handled gracefully on kubelet'
- // restarts.
- GetVolumesInUse() []api.UniqueVolumeName
- // VolumeIsAttached returns true if the given volume is attached to this
- // node.
- VolumeIsAttached(volumeName api.UniqueVolumeName) bool
- // Marks the specified volume as having successfully been reported as "in
- // use" in the nodes's volume status.
- MarkVolumesAsReportedInUse(volumesReportedAsInUse []api.UniqueVolumeName)
- }
- // NewVolumeManager returns a new concrete instance implementing the
- // VolumeManager interface.
- //
- // kubeClient - kubeClient is the kube API client used by DesiredStateOfWorldPopulator
- // to communicate with the API server to fetch PV and PVC objects
- // volumePluginMgr - the volume plugin manager used to access volume plugins.
- // Must be pre-initialized.
- func NewVolumeManager(
- controllerAttachDetachEnabled bool,
- hostName string,
- podManager pod.Manager,
- kubeClient internalclientset.Interface,
- volumePluginMgr *volume.VolumePluginMgr,
- kubeContainerRuntime kubecontainer.Runtime,
- mounter mount.Interface,
- kubeletPodsDir string,
- recorder record.EventRecorder) (VolumeManager, error) {
- vm := &volumeManager{
- kubeClient: kubeClient,
- volumePluginMgr: volumePluginMgr,
- desiredStateOfWorld: cache.NewDesiredStateOfWorld(volumePluginMgr),
- actualStateOfWorld: cache.NewActualStateOfWorld(hostName, volumePluginMgr),
- operationExecutor: operationexecutor.NewOperationExecutor(
- kubeClient,
- volumePluginMgr,
- recorder),
- }
- vm.reconciler = reconciler.NewReconciler(
- kubeClient,
- controllerAttachDetachEnabled,
- reconcilerLoopSleepPeriod,
- reconcilerReconstructSleepPeriod,
- waitForAttachTimeout,
- hostName,
- vm.desiredStateOfWorld,
- vm.actualStateOfWorld,
- vm.operationExecutor,
- mounter,
- volumePluginMgr,
- kubeletPodsDir)
- vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
- kubeClient,
- desiredStateOfWorldPopulatorLoopSleepPeriod,
- desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
- podManager,
- vm.desiredStateOfWorld,
- kubeContainerRuntime)
- return vm, nil
- }
- // volumeManager implements the VolumeManager interface
- type volumeManager struct {
- // kubeClient is the kube API client used by DesiredStateOfWorldPopulator to
- // communicate with the API server to fetch PV and PVC objects
- kubeClient internalclientset.Interface
- // volumePluginMgr is the volume plugin manager used to access volume
- // plugins. It must be pre-initialized.
- volumePluginMgr *volume.VolumePluginMgr
- // desiredStateOfWorld is a data structure containing the desired state of
- // the world according to the volume manager: i.e. what volumes should be
- // attached and which pods are referencing the volumes).
- // The data structure is populated by the desired state of the world
- // populator using the kubelet pod manager.
- desiredStateOfWorld cache.DesiredStateOfWorld
- // actualStateOfWorld is a data structure containing the actual state of
- // the world according to the manager: i.e. which volumes are attached to
- // this node and what pods the volumes are mounted to.
- // The data structure is populated upon successful completion of attach,
- // detach, mount, and unmount actions triggered by the reconciler.
- actualStateOfWorld cache.ActualStateOfWorld
- // operationExecutor is used to start asynchronous attach, detach, mount,
- // and unmount operations.
- operationExecutor operationexecutor.OperationExecutor
- // reconciler runs an asynchronous periodic loop to reconcile the
- // desiredStateOfWorld with the actualStateOfWorld by triggering attach,
- // detach, mount, and unmount operations using the operationExecutor.
- reconciler reconciler.Reconciler
- // desiredStateOfWorldPopulator runs an asynchronous periodic loop to
- // populate the desiredStateOfWorld using the kubelet PodManager.
- desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
- }
- func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
- defer runtime.HandleCrash()
- go vm.desiredStateOfWorldPopulator.Run(stopCh)
- glog.V(2).Infof("The desired_state_of_world populator starts")
- glog.Infof("Starting Kubelet Volume Manager")
- go vm.reconciler.Run(sourcesReady, stopCh)
- <-stopCh
- glog.Infof("Shutting down Kubelet Volume Manager")
- }
- func (vm *volumeManager) GetMountedVolumesForPod(
- podName types.UniquePodName) container.VolumeMap {
- podVolumes := make(container.VolumeMap)
- for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
- podVolumes[mountedVolume.OuterVolumeSpecName] = container.VolumeInfo{Mounter: mountedVolume.Mounter}
- }
- return podVolumes
- }
- func (vm *volumeManager) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 {
- podName := volumehelper.GetUniquePodName(pod)
- supplementalGroups := sets.NewString()
- for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
- if mountedVolume.VolumeGidValue != "" {
- supplementalGroups.Insert(mountedVolume.VolumeGidValue)
- }
- }
- result := make([]int64, 0, supplementalGroups.Len())
- for _, group := range supplementalGroups.List() {
- iGroup, extra := getExtraSupplementalGid(group, pod)
- if !extra {
- continue
- }
- result = append(result, int64(iGroup))
- }
- return result
- }
- func (vm *volumeManager) GetVolumesInUse() []api.UniqueVolumeName {
- // Report volumes in desired state of world and actual state of world so
- // that volumes are marked in use as soon as the decision is made that the
- // volume *should* be attached to this node until it is safely unmounted.
- desiredVolumes := vm.desiredStateOfWorld.GetVolumesToMount()
- mountedVolumes := vm.actualStateOfWorld.GetGloballyMountedVolumes()
- volumesToReportInUse :=
- make(
- []api.UniqueVolumeName,
- 0, /* len */
- len(desiredVolumes)+len(mountedVolumes) /* cap */)
- desiredVolumesMap :=
- make(
- map[api.UniqueVolumeName]bool,
- len(desiredVolumes)+len(mountedVolumes) /* cap */)
- for _, volume := range desiredVolumes {
- if volume.PluginIsAttachable {
- desiredVolumesMap[volume.VolumeName] = true
- volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
- }
- }
- for _, volume := range mountedVolumes {
- if volume.PluginIsAttachable {
- if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
- volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
- }
- }
- }
- return volumesToReportInUse
- }
- func (vm *volumeManager) VolumeIsAttached(
- volumeName api.UniqueVolumeName) bool {
- return vm.actualStateOfWorld.VolumeExists(volumeName)
- }
- func (vm *volumeManager) MarkVolumesAsReportedInUse(
- volumesReportedAsInUse []api.UniqueVolumeName) {
- vm.desiredStateOfWorld.MarkVolumesReportedInUse(volumesReportedAsInUse)
- }
- func (vm *volumeManager) WaitForAttachAndMount(pod *api.Pod) error {
- expectedVolumes := getExpectedVolumes(pod)
- if len(expectedVolumes) == 0 {
- // No volumes to verify
- return nil
- }
- glog.V(3).Infof("Waiting for volumes to attach and mount for pod %q", format.Pod(pod))
- uniquePodName := volumehelper.GetUniquePodName(pod)
- // Some pods expect to have Setup called over and over again to update.
- // Remount plugins for which this is true. (Atomically updating volumes,
- // like Downward API, depend on this to update the contents of the volume).
- vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
- vm.actualStateOfWorld.MarkRemountRequired(uniquePodName)
- err := wait.Poll(
- podAttachAndMountRetryInterval,
- podAttachAndMountTimeout,
- vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))
- if err != nil {
- // Timeout expired
- ummountedVolumes :=
- vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
- if len(ummountedVolumes) == 0 {
- return nil
- }
- return fmt.Errorf(
- "timeout expired waiting for volumes to attach/mount for pod %q/%q. list of unattached/unmounted volumes=%v",
- pod.Name,
- pod.Namespace,
- ummountedVolumes)
- }
- glog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod))
- return nil
- }
- // verifyVolumesMountedFunc returns a method that returns true when all expected
- // volumes are mounted.
- func (vm *volumeManager) verifyVolumesMountedFunc(
- podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc {
- return func() (done bool, err error) {
- return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
- }
- }
- // getUnmountedVolumes fetches the current list of mounted volumes from
- // the actual state of the world, and uses it to process the list of
- // expectedVolumes. It returns a list of unmounted volumes.
- func (vm *volumeManager) getUnmountedVolumes(
- podName types.UniquePodName, expectedVolumes []string) []string {
- mountedVolumes := sets.NewString()
- for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
- mountedVolumes.Insert(mountedVolume.OuterVolumeSpecName)
- }
- return filterUnmountedVolumes(mountedVolumes, expectedVolumes)
- }
- // filterUnmountedVolumes adds each element of expectedVolumes that is not in
- // mountedVolumes to a list of unmountedVolumes and returns it.
- func filterUnmountedVolumes(
- mountedVolumes sets.String, expectedVolumes []string) []string {
- unmountedVolumes := []string{}
- for _, expectedVolume := range expectedVolumes {
- if !mountedVolumes.Has(expectedVolume) {
- unmountedVolumes = append(unmountedVolumes, expectedVolume)
- }
- }
- return unmountedVolumes
- }
- // getExpectedVolumes returns a list of volumes that must be mounted in order to
- // consider the volume setup step for this pod satisfied.
- func getExpectedVolumes(pod *api.Pod) []string {
- expectedVolumes := []string{}
- if pod == nil {
- return expectedVolumes
- }
- for _, podVolume := range pod.Spec.Volumes {
- expectedVolumes = append(expectedVolumes, podVolume.Name)
- }
- return expectedVolumes
- }
- // getExtraSupplementalGid returns the value of an extra supplemental GID as
- // defined by an annotation on a volume and a boolean indicating whether the
- // volume defined a GID that the pod doesn't already request.
- func getExtraSupplementalGid(volumeGidValue string, pod *api.Pod) (int64, bool) {
- if volumeGidValue == "" {
- return 0, false
- }
- gid, err := strconv.ParseInt(volumeGidValue, 10, 64)
- if err != nil {
- return 0, false
- }
- if pod.Spec.SecurityContext != nil {
- for _, existingGid := range pod.Spec.SecurityContext.SupplementalGroups {
- if gid == existingGid {
- return 0, false
- }
- }
- }
- return gid, true
- }
|