volume_manager.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package volumemanager
  14. import (
  15. "fmt"
  16. "strconv"
  17. "time"
  18. "github.com/golang/glog"
  19. "k8s.io/kubernetes/pkg/api"
  20. "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  21. "k8s.io/kubernetes/pkg/client/record"
  22. "k8s.io/kubernetes/pkg/kubelet/config"
  23. "k8s.io/kubernetes/pkg/kubelet/container"
  24. kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
  25. "k8s.io/kubernetes/pkg/kubelet/pod"
  26. "k8s.io/kubernetes/pkg/kubelet/util/format"
  27. "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
  28. "k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
  29. "k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler"
  30. "k8s.io/kubernetes/pkg/util/mount"
  31. "k8s.io/kubernetes/pkg/util/runtime"
  32. "k8s.io/kubernetes/pkg/util/sets"
  33. "k8s.io/kubernetes/pkg/util/wait"
  34. "k8s.io/kubernetes/pkg/volume"
  35. "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
  36. "k8s.io/kubernetes/pkg/volume/util/types"
  37. "k8s.io/kubernetes/pkg/volume/util/volumehelper"
  38. )
  39. const (
  40. // reconcilerLoopSleepPeriod is the amount of time the reconciler loop waits
  41. // between successive executions
  42. reconcilerLoopSleepPeriod time.Duration = 100 * time.Millisecond
  43. // reconcilerReconstructSleepPeriod is the amount of time the reconciler reconstruct process
  44. // waits between successive executions
  45. reconcilerReconstructSleepPeriod time.Duration = 3 * time.Minute
  46. // desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
  47. // DesiredStateOfWorldPopulator loop waits between successive executions
  48. desiredStateOfWorldPopulatorLoopSleepPeriod time.Duration = 100 * time.Millisecond
  49. // desiredStateOfWorldPopulatorGetPodStatusRetryDuration is the amount of
  50. // time the DesiredStateOfWorldPopulator loop waits between successive pod
  51. // cleanup calls (to prevent calling containerruntime.GetPodStatus too
  52. // frequently).
  53. desiredStateOfWorldPopulatorGetPodStatusRetryDuration time.Duration = 2 * time.Second
  54. // podAttachAndMountTimeout is the maximum amount of time the
  55. // WaitForAttachAndMount call will wait for all volumes in the specified pod
  56. // to be attached and mounted. Even though cloud operations can take several
  57. // minutes to complete, we set the timeout to 2 minutes because kubelet
  58. // will retry in the next sync iteration. This frees the associated
  59. // goroutine of the pod to process newer updates if needed (e.g., a delete
  60. // request to the pod).
  61. podAttachAndMountTimeout time.Duration = 2 * time.Minute
  62. // podAttachAndMountRetryInterval is the amount of time the GetVolumesForPod
  63. // call waits before retrying
  64. podAttachAndMountRetryInterval time.Duration = 300 * time.Millisecond
  65. // waitForAttachTimeout is the maximum amount of time a
  66. // operationexecutor.Mount call will wait for a volume to be attached.
  67. // Set to 10 minutes because we've seen attach operations take several
  68. // minutes to complete for some volume plugins in some cases. While this
  69. // operation is waiting it only blocks other operations on the same device,
  70. // other devices are not affected.
  71. waitForAttachTimeout time.Duration = 10 * time.Minute
  72. // reconcilerStartGracePeriod is the maximum amount of time volume manager
  73. // can wait to start reconciler
  74. reconcilerStartGracePeriod time.Duration = 60 * time.Second
  75. )
  76. // VolumeManager runs a set of asynchronous loops that figure out which volumes
  77. // need to be attached/mounted/unmounted/detached based on the pods scheduled on
  78. // this node and makes it so.
  79. type VolumeManager interface {
  80. // Starts the volume manager and all the asynchronous loops that it controls
  81. Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
  82. // WaitForAttachAndMount processes the volumes referenced in the specified
  83. // pod and blocks until they are all attached and mounted (reflected in
  84. // actual state of the world).
  85. // An error is returned if all volumes are not attached and mounted within
  86. // the duration defined in podAttachAndMountTimeout.
  87. WaitForAttachAndMount(pod *api.Pod) error
  88. // GetMountedVolumesForPod returns a VolumeMap containing the volumes
  89. // referenced by the specified pod that are successfully attached and
  90. // mounted. The key in the map is the OuterVolumeSpecName (i.e.
  91. // pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
  92. // volumes.
  93. GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
  94. // GetExtraSupplementalGroupsForPod returns a list of the extra
  95. // supplemental groups for the Pod. These extra supplemental groups come
  96. // from annotations on persistent volumes that the pod depends on.
  97. GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64
  98. // Returns a list of all volumes that implement the volume.Attacher
  99. // interface and are currently in use according to the actual and desired
  100. // state of the world caches. A volume is considered "in use" as soon as it
  101. // is added to the desired state of world, indicating it *should* be
  102. // attached to this node and remains "in use" until it is removed from both
  103. // the desired state of the world and the actual state of the world, or it
  104. // has been unmounted (as indicated in actual state of world).
  105. // TODO(#27653): VolumesInUse should be handled gracefully on kubelet'
  106. // restarts.
  107. GetVolumesInUse() []api.UniqueVolumeName
  108. // VolumeIsAttached returns true if the given volume is attached to this
  109. // node.
  110. VolumeIsAttached(volumeName api.UniqueVolumeName) bool
  111. // Marks the specified volume as having successfully been reported as "in
  112. // use" in the nodes's volume status.
  113. MarkVolumesAsReportedInUse(volumesReportedAsInUse []api.UniqueVolumeName)
  114. }
  115. // NewVolumeManager returns a new concrete instance implementing the
  116. // VolumeManager interface.
  117. //
  118. // kubeClient - kubeClient is the kube API client used by DesiredStateOfWorldPopulator
  119. // to communicate with the API server to fetch PV and PVC objects
  120. // volumePluginMgr - the volume plugin manager used to access volume plugins.
  121. // Must be pre-initialized.
  122. func NewVolumeManager(
  123. controllerAttachDetachEnabled bool,
  124. hostName string,
  125. podManager pod.Manager,
  126. kubeClient internalclientset.Interface,
  127. volumePluginMgr *volume.VolumePluginMgr,
  128. kubeContainerRuntime kubecontainer.Runtime,
  129. mounter mount.Interface,
  130. kubeletPodsDir string,
  131. recorder record.EventRecorder) (VolumeManager, error) {
  132. vm := &volumeManager{
  133. kubeClient: kubeClient,
  134. volumePluginMgr: volumePluginMgr,
  135. desiredStateOfWorld: cache.NewDesiredStateOfWorld(volumePluginMgr),
  136. actualStateOfWorld: cache.NewActualStateOfWorld(hostName, volumePluginMgr),
  137. operationExecutor: operationexecutor.NewOperationExecutor(
  138. kubeClient,
  139. volumePluginMgr,
  140. recorder),
  141. }
  142. vm.reconciler = reconciler.NewReconciler(
  143. kubeClient,
  144. controllerAttachDetachEnabled,
  145. reconcilerLoopSleepPeriod,
  146. reconcilerReconstructSleepPeriod,
  147. waitForAttachTimeout,
  148. hostName,
  149. vm.desiredStateOfWorld,
  150. vm.actualStateOfWorld,
  151. vm.operationExecutor,
  152. mounter,
  153. volumePluginMgr,
  154. kubeletPodsDir)
  155. vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
  156. kubeClient,
  157. desiredStateOfWorldPopulatorLoopSleepPeriod,
  158. desiredStateOfWorldPopulatorGetPodStatusRetryDuration,
  159. podManager,
  160. vm.desiredStateOfWorld,
  161. kubeContainerRuntime)
  162. return vm, nil
  163. }
  164. // volumeManager implements the VolumeManager interface
  165. type volumeManager struct {
  166. // kubeClient is the kube API client used by DesiredStateOfWorldPopulator to
  167. // communicate with the API server to fetch PV and PVC objects
  168. kubeClient internalclientset.Interface
  169. // volumePluginMgr is the volume plugin manager used to access volume
  170. // plugins. It must be pre-initialized.
  171. volumePluginMgr *volume.VolumePluginMgr
  172. // desiredStateOfWorld is a data structure containing the desired state of
  173. // the world according to the volume manager: i.e. what volumes should be
  174. // attached and which pods are referencing the volumes).
  175. // The data structure is populated by the desired state of the world
  176. // populator using the kubelet pod manager.
  177. desiredStateOfWorld cache.DesiredStateOfWorld
  178. // actualStateOfWorld is a data structure containing the actual state of
  179. // the world according to the manager: i.e. which volumes are attached to
  180. // this node and what pods the volumes are mounted to.
  181. // The data structure is populated upon successful completion of attach,
  182. // detach, mount, and unmount actions triggered by the reconciler.
  183. actualStateOfWorld cache.ActualStateOfWorld
  184. // operationExecutor is used to start asynchronous attach, detach, mount,
  185. // and unmount operations.
  186. operationExecutor operationexecutor.OperationExecutor
  187. // reconciler runs an asynchronous periodic loop to reconcile the
  188. // desiredStateOfWorld with the actualStateOfWorld by triggering attach,
  189. // detach, mount, and unmount operations using the operationExecutor.
  190. reconciler reconciler.Reconciler
  191. // desiredStateOfWorldPopulator runs an asynchronous periodic loop to
  192. // populate the desiredStateOfWorld using the kubelet PodManager.
  193. desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
  194. }
  195. func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
  196. defer runtime.HandleCrash()
  197. go vm.desiredStateOfWorldPopulator.Run(stopCh)
  198. glog.V(2).Infof("The desired_state_of_world populator starts")
  199. glog.Infof("Starting Kubelet Volume Manager")
  200. go vm.reconciler.Run(sourcesReady, stopCh)
  201. <-stopCh
  202. glog.Infof("Shutting down Kubelet Volume Manager")
  203. }
  204. func (vm *volumeManager) GetMountedVolumesForPod(
  205. podName types.UniquePodName) container.VolumeMap {
  206. podVolumes := make(container.VolumeMap)
  207. for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
  208. podVolumes[mountedVolume.OuterVolumeSpecName] = container.VolumeInfo{Mounter: mountedVolume.Mounter}
  209. }
  210. return podVolumes
  211. }
  212. func (vm *volumeManager) GetExtraSupplementalGroupsForPod(pod *api.Pod) []int64 {
  213. podName := volumehelper.GetUniquePodName(pod)
  214. supplementalGroups := sets.NewString()
  215. for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
  216. if mountedVolume.VolumeGidValue != "" {
  217. supplementalGroups.Insert(mountedVolume.VolumeGidValue)
  218. }
  219. }
  220. result := make([]int64, 0, supplementalGroups.Len())
  221. for _, group := range supplementalGroups.List() {
  222. iGroup, extra := getExtraSupplementalGid(group, pod)
  223. if !extra {
  224. continue
  225. }
  226. result = append(result, int64(iGroup))
  227. }
  228. return result
  229. }
  230. func (vm *volumeManager) GetVolumesInUse() []api.UniqueVolumeName {
  231. // Report volumes in desired state of world and actual state of world so
  232. // that volumes are marked in use as soon as the decision is made that the
  233. // volume *should* be attached to this node until it is safely unmounted.
  234. desiredVolumes := vm.desiredStateOfWorld.GetVolumesToMount()
  235. mountedVolumes := vm.actualStateOfWorld.GetGloballyMountedVolumes()
  236. volumesToReportInUse :=
  237. make(
  238. []api.UniqueVolumeName,
  239. 0, /* len */
  240. len(desiredVolumes)+len(mountedVolumes) /* cap */)
  241. desiredVolumesMap :=
  242. make(
  243. map[api.UniqueVolumeName]bool,
  244. len(desiredVolumes)+len(mountedVolumes) /* cap */)
  245. for _, volume := range desiredVolumes {
  246. if volume.PluginIsAttachable {
  247. desiredVolumesMap[volume.VolumeName] = true
  248. volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
  249. }
  250. }
  251. for _, volume := range mountedVolumes {
  252. if volume.PluginIsAttachable {
  253. if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
  254. volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
  255. }
  256. }
  257. }
  258. return volumesToReportInUse
  259. }
  260. func (vm *volumeManager) VolumeIsAttached(
  261. volumeName api.UniqueVolumeName) bool {
  262. return vm.actualStateOfWorld.VolumeExists(volumeName)
  263. }
  264. func (vm *volumeManager) MarkVolumesAsReportedInUse(
  265. volumesReportedAsInUse []api.UniqueVolumeName) {
  266. vm.desiredStateOfWorld.MarkVolumesReportedInUse(volumesReportedAsInUse)
  267. }
  268. func (vm *volumeManager) WaitForAttachAndMount(pod *api.Pod) error {
  269. expectedVolumes := getExpectedVolumes(pod)
  270. if len(expectedVolumes) == 0 {
  271. // No volumes to verify
  272. return nil
  273. }
  274. glog.V(3).Infof("Waiting for volumes to attach and mount for pod %q", format.Pod(pod))
  275. uniquePodName := volumehelper.GetUniquePodName(pod)
  276. // Some pods expect to have Setup called over and over again to update.
  277. // Remount plugins for which this is true. (Atomically updating volumes,
  278. // like Downward API, depend on this to update the contents of the volume).
  279. vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
  280. vm.actualStateOfWorld.MarkRemountRequired(uniquePodName)
  281. err := wait.Poll(
  282. podAttachAndMountRetryInterval,
  283. podAttachAndMountTimeout,
  284. vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))
  285. if err != nil {
  286. // Timeout expired
  287. ummountedVolumes :=
  288. vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
  289. if len(ummountedVolumes) == 0 {
  290. return nil
  291. }
  292. return fmt.Errorf(
  293. "timeout expired waiting for volumes to attach/mount for pod %q/%q. list of unattached/unmounted volumes=%v",
  294. pod.Name,
  295. pod.Namespace,
  296. ummountedVolumes)
  297. }
  298. glog.V(3).Infof("All volumes are attached and mounted for pod %q", format.Pod(pod))
  299. return nil
  300. }
  301. // verifyVolumesMountedFunc returns a method that returns true when all expected
  302. // volumes are mounted.
  303. func (vm *volumeManager) verifyVolumesMountedFunc(
  304. podName types.UniquePodName, expectedVolumes []string) wait.ConditionFunc {
  305. return func() (done bool, err error) {
  306. return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
  307. }
  308. }
  309. // getUnmountedVolumes fetches the current list of mounted volumes from
  310. // the actual state of the world, and uses it to process the list of
  311. // expectedVolumes. It returns a list of unmounted volumes.
  312. func (vm *volumeManager) getUnmountedVolumes(
  313. podName types.UniquePodName, expectedVolumes []string) []string {
  314. mountedVolumes := sets.NewString()
  315. for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
  316. mountedVolumes.Insert(mountedVolume.OuterVolumeSpecName)
  317. }
  318. return filterUnmountedVolumes(mountedVolumes, expectedVolumes)
  319. }
  320. // filterUnmountedVolumes adds each element of expectedVolumes that is not in
  321. // mountedVolumes to a list of unmountedVolumes and returns it.
  322. func filterUnmountedVolumes(
  323. mountedVolumes sets.String, expectedVolumes []string) []string {
  324. unmountedVolumes := []string{}
  325. for _, expectedVolume := range expectedVolumes {
  326. if !mountedVolumes.Has(expectedVolume) {
  327. unmountedVolumes = append(unmountedVolumes, expectedVolume)
  328. }
  329. }
  330. return unmountedVolumes
  331. }
  332. // getExpectedVolumes returns a list of volumes that must be mounted in order to
  333. // consider the volume setup step for this pod satisfied.
  334. func getExpectedVolumes(pod *api.Pod) []string {
  335. expectedVolumes := []string{}
  336. if pod == nil {
  337. return expectedVolumes
  338. }
  339. for _, podVolume := range pod.Spec.Volumes {
  340. expectedVolumes = append(expectedVolumes, podVolume.Name)
  341. }
  342. return expectedVolumes
  343. }
  344. // getExtraSupplementalGid returns the value of an extra supplemental GID as
  345. // defined by an annotation on a volume and a boolean indicating whether the
  346. // volume defined a GID that the pod doesn't already request.
  347. func getExtraSupplementalGid(volumeGidValue string, pod *api.Pod) (int64, bool) {
  348. if volumeGidValue == "" {
  349. return 0, false
  350. }
  351. gid, err := strconv.ParseInt(volumeGidValue, 10, 64)
  352. if err != nil {
  353. return 0, false
  354. }
  355. if pod.Spec.SecurityContext != nil {
  356. for _, existingGid := range pod.Spec.SecurityContext.SupplementalGroups {
  357. if gid == existingGid {
  358. return 0, false
  359. }
  360. }
  361. }
  362. return gid, true
  363. }