pd.go 27 KB


  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package e2e
  14. import (
  15. "fmt"
  16. mathrand "math/rand"
  17. "strings"
  18. "time"
  19. "google.golang.org/api/googleapi"
  20. "github.com/aws/aws-sdk-go/aws"
  21. "github.com/aws/aws-sdk-go/aws/awserr"
  22. "github.com/aws/aws-sdk-go/aws/session"
  23. "github.com/aws/aws-sdk-go/service/ec2"
  24. . "github.com/onsi/ginkgo"
  25. . "github.com/onsi/gomega"
  26. "k8s.io/kubernetes/pkg/api"
  27. "k8s.io/kubernetes/pkg/api/resource"
  28. "k8s.io/kubernetes/pkg/api/unversioned"
  29. "k8s.io/kubernetes/pkg/apimachinery/registered"
  30. client "k8s.io/kubernetes/pkg/client/unversioned"
  31. awscloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
  32. gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
  33. "k8s.io/kubernetes/pkg/util/uuid"
  34. "k8s.io/kubernetes/test/e2e/framework"
  35. )
  36. const (
  37. gcePDDetachTimeout = 10 * time.Minute
  38. gcePDDetachPollTime = 10 * time.Second
  39. nodeStatusTimeout = 1 * time.Minute
  40. nodeStatusPollTime = 1 * time.Second
  41. gcePDRetryTimeout = 5 * time.Minute
  42. gcePDRetryPollTime = 5 * time.Second
  43. )
  44. var _ = framework.KubeDescribe("Pod Disks", func() {
  45. var (
  46. podClient client.PodInterface
  47. nodeClient client.NodeInterface
  48. host0Name string
  49. host1Name string
  50. )
  51. f := framework.NewDefaultFramework("pod-disks")
  52. BeforeEach(func() {
  53. framework.SkipUnlessNodeCountIsAtLeast(2)
  54. podClient = f.Client.Pods(f.Namespace.Name)
  55. nodeClient = f.Client.Nodes()
  56. nodes := framework.GetReadySchedulableNodesOrDie(f.Client)
  57. Expect(len(nodes.Items)).To(BeNumerically(">=", 2), "Requires at least 2 nodes")
  58. host0Name = nodes.Items[0].ObjectMeta.Name
  59. host1Name = nodes.Items[1].ObjectMeta.Name
  60. mathrand.Seed(time.Now().UTC().UnixNano())
  61. })
  62. It("should schedule a pod w/ a RW PD, ungracefully remove it, then schedule it on another host [Slow]", func() {
  63. framework.SkipUnlessProviderIs("gce", "gke", "aws")
  64. By("creating PD")
  65. diskName, err := createPDWithRetry()
  66. framework.ExpectNoError(err, "Error creating PD")
  67. host0Pod := testPDPod([]string{diskName}, host0Name, false /* readOnly */, 1 /* numContainers */)
  68. host1Pod := testPDPod([]string{diskName}, host1Name, false /* readOnly */, 1 /* numContainers */)
  69. containerName := "mycontainer"
  70. defer func() {
  71. // Teardown pods, PD. Ignore errors.
  72. // Teardown should do nothing unless test failed.
  73. By("cleaning up PD-RW test environment")
  74. podClient.Delete(host0Pod.Name, api.NewDeleteOptions(0))
  75. podClient.Delete(host1Pod.Name, api.NewDeleteOptions(0))
  76. detachAndDeletePDs(diskName, []string{host0Name, host1Name})
  77. }()
  78. By("submitting host0Pod to kubernetes")
  79. _, err = podClient.Create(host0Pod)
  80. framework.ExpectNoError(err, fmt.Sprintf("Failed to create host0Pod: %v", err))
  81. framework.ExpectNoError(f.WaitForPodRunningSlow(host0Pod.Name))
  82. testFile := "/testpd1/tracker"
  83. testFileContents := fmt.Sprintf("%v", mathrand.Int())
  84. framework.ExpectNoError(f.WriteFileViaContainer(host0Pod.Name, containerName, testFile, testFileContents))
  85. framework.Logf("Wrote value: %v", testFileContents)
  86. // Verify that disk shows up for in node 1's VolumeInUse list
  87. framework.ExpectNoError(waitForPDInVolumesInUse(nodeClient, diskName, host0Name, nodeStatusTimeout, true /* shouldExist */))
  88. By("deleting host0Pod")
  89. // Delete pod with 0 grace period
  90. framework.ExpectNoError(podClient.Delete(host0Pod.Name, api.NewDeleteOptions(0)), "Failed to delete host0Pod")
  91. By("submitting host1Pod to kubernetes")
  92. _, err = podClient.Create(host1Pod)
  93. framework.ExpectNoError(err, "Failed to create host1Pod")
  94. framework.ExpectNoError(f.WaitForPodRunningSlow(host1Pod.Name))
  95. v, err := f.ReadFileViaContainer(host1Pod.Name, containerName, testFile)
  96. framework.ExpectNoError(err)
  97. framework.Logf("Read value: %v", v)
  98. Expect(strings.TrimSpace(v)).To(Equal(strings.TrimSpace(testFileContents)))
  99. // Verify that disk is removed from node 1's VolumeInUse list
  100. framework.ExpectNoError(waitForPDInVolumesInUse(nodeClient, diskName, host0Name, nodeStatusTimeout, false /* shouldExist */))
  101. By("deleting host1Pod")
  102. framework.ExpectNoError(podClient.Delete(host1Pod.Name, api.NewDeleteOptions(0)), "Failed to delete host1Pod")
  103. By("Test completed successfully, waiting for PD to safely detach")
  104. waitForPDDetach(diskName, host0Name)
  105. waitForPDDetach(diskName, host1Name)
  106. return
  107. })
  108. It("Should schedule a pod w/ a RW PD, gracefully remove it, then schedule it on another host [Slow]", func() {
  109. framework.SkipUnlessProviderIs("gce", "gke", "aws")
  110. By("creating PD")
  111. diskName, err := createPDWithRetry()
  112. framework.ExpectNoError(err, "Error creating PD")
  113. host0Pod := testPDPod([]string{diskName}, host0Name, false /* readOnly */, 1 /* numContainers */)
  114. host1Pod := testPDPod([]string{diskName}, host1Name, false /* readOnly */, 1 /* numContainers */)
  115. containerName := "mycontainer"
  116. defer func() {
  117. // Teardown pods, PD. Ignore errors.
  118. // Teardown should do nothing unless test failed.
  119. By("cleaning up PD-RW test environment")
  120. podClient.Delete(host0Pod.Name, &api.DeleteOptions{})
  121. podClient.Delete(host1Pod.Name, &api.DeleteOptions{})
  122. detachAndDeletePDs(diskName, []string{host0Name, host1Name})
  123. }()
  124. By("submitting host0Pod to kubernetes")
  125. _, err = podClient.Create(host0Pod)
  126. framework.ExpectNoError(err, fmt.Sprintf("Failed to create host0Pod: %v", err))
  127. framework.ExpectNoError(f.WaitForPodRunningSlow(host0Pod.Name))
  128. testFile := "/testpd1/tracker"
  129. testFileContents := fmt.Sprintf("%v", mathrand.Int())
  130. framework.ExpectNoError(f.WriteFileViaContainer(host0Pod.Name, containerName, testFile, testFileContents))
  131. framework.Logf("Wrote value: %v", testFileContents)
  132. // Verify that disk shows up for in node 1's VolumeInUse list
  133. framework.ExpectNoError(waitForPDInVolumesInUse(nodeClient, diskName, host0Name, nodeStatusTimeout, true /* shouldExist */))
  134. By("deleting host0Pod")
  135. // Delete pod with default grace period 30s
  136. framework.ExpectNoError(podClient.Delete(host0Pod.Name, &api.DeleteOptions{}), "Failed to delete host0Pod")
  137. By("submitting host1Pod to kubernetes")
  138. _, err = podClient.Create(host1Pod)
  139. framework.ExpectNoError(err, "Failed to create host1Pod")
  140. framework.ExpectNoError(f.WaitForPodRunningSlow(host1Pod.Name))
  141. v, err := f.ReadFileViaContainer(host1Pod.Name, containerName, testFile)
  142. framework.ExpectNoError(err)
  143. framework.Logf("Read value: %v", v)
  144. Expect(strings.TrimSpace(v)).To(Equal(strings.TrimSpace(testFileContents)))
  145. // Verify that disk is removed from node 1's VolumeInUse list
  146. framework.ExpectNoError(waitForPDInVolumesInUse(nodeClient, diskName, host0Name, nodeStatusTimeout, false /* shouldExist */))
  147. By("deleting host1Pod")
  148. framework.ExpectNoError(podClient.Delete(host1Pod.Name, &api.DeleteOptions{}), "Failed to delete host1Pod")
  149. By("Test completed successfully, waiting for PD to safely detach")
  150. waitForPDDetach(diskName, host0Name)
  151. waitForPDDetach(diskName, host1Name)
  152. return
  153. })
  154. It("should schedule a pod w/ a readonly PD on two hosts, then remove both ungracefully. [Slow]", func() {
  155. framework.SkipUnlessProviderIs("gce", "gke")
  156. By("creating PD")
  157. diskName, err := createPDWithRetry()
  158. framework.ExpectNoError(err, "Error creating PD")
  159. rwPod := testPDPod([]string{diskName}, host0Name, false /* readOnly */, 1 /* numContainers */)
  160. host0ROPod := testPDPod([]string{diskName}, host0Name, true /* readOnly */, 1 /* numContainers */)
  161. host1ROPod := testPDPod([]string{diskName}, host1Name, true /* readOnly */, 1 /* numContainers */)
  162. defer func() {
  163. By("cleaning up PD-RO test environment")
  164. // Teardown pods, PD. Ignore errors.
  165. // Teardown should do nothing unless test failed.
  166. podClient.Delete(rwPod.Name, api.NewDeleteOptions(0))
  167. podClient.Delete(host0ROPod.Name, api.NewDeleteOptions(0))
  168. podClient.Delete(host1ROPod.Name, api.NewDeleteOptions(0))
  169. detachAndDeletePDs(diskName, []string{host0Name, host1Name})
  170. }()
  171. By("submitting rwPod to ensure PD is formatted")
  172. _, err = podClient.Create(rwPod)
  173. framework.ExpectNoError(err, "Failed to create rwPod")
  174. framework.ExpectNoError(f.WaitForPodRunningSlow(rwPod.Name))
  175. // Delete pod with 0 grace period
  176. framework.ExpectNoError(podClient.Delete(rwPod.Name, api.NewDeleteOptions(0)), "Failed to delete host0Pod")
  177. framework.ExpectNoError(waitForPDDetach(diskName, host0Name))
  178. By("submitting host0ROPod to kubernetes")
  179. _, err = podClient.Create(host0ROPod)
  180. framework.ExpectNoError(err, "Failed to create host0ROPod")
  181. By("submitting host1ROPod to kubernetes")
  182. _, err = podClient.Create(host1ROPod)
  183. framework.ExpectNoError(err, "Failed to create host1ROPod")
  184. framework.ExpectNoError(f.WaitForPodRunningSlow(host0ROPod.Name))
  185. framework.ExpectNoError(f.WaitForPodRunningSlow(host1ROPod.Name))
  186. By("deleting host0ROPod")
  187. framework.ExpectNoError(podClient.Delete(host0ROPod.Name, api.NewDeleteOptions(0)), "Failed to delete host0ROPod")
  188. By("deleting host1ROPod")
  189. framework.ExpectNoError(podClient.Delete(host1ROPod.Name, api.NewDeleteOptions(0)), "Failed to delete host1ROPod")
  190. By("Test completed successfully, waiting for PD to safely detach")
  191. waitForPDDetach(diskName, host0Name)
  192. waitForPDDetach(diskName, host1Name)
  193. })
  194. It("Should schedule a pod w/ a readonly PD on two hosts, then remove both gracefully. [Slow]", func() {
  195. framework.SkipUnlessProviderIs("gce", "gke")
  196. By("creating PD")
  197. diskName, err := createPDWithRetry()
  198. framework.ExpectNoError(err, "Error creating PD")
  199. rwPod := testPDPod([]string{diskName}, host0Name, false /* readOnly */, 1 /* numContainers */)
  200. host0ROPod := testPDPod([]string{diskName}, host0Name, true /* readOnly */, 1 /* numContainers */)
  201. host1ROPod := testPDPod([]string{diskName}, host1Name, true /* readOnly */, 1 /* numContainers */)
  202. defer func() {
  203. By("cleaning up PD-RO test environment")
  204. // Teardown pods, PD. Ignore errors.
  205. // Teardown should do nothing unless test failed.
  206. podClient.Delete(rwPod.Name, &api.DeleteOptions{})
  207. podClient.Delete(host0ROPod.Name, &api.DeleteOptions{})
  208. podClient.Delete(host1ROPod.Name, &api.DeleteOptions{})
  209. detachAndDeletePDs(diskName, []string{host0Name, host1Name})
  210. }()
  211. By("submitting rwPod to ensure PD is formatted")
  212. _, err = podClient.Create(rwPod)
  213. framework.ExpectNoError(err, "Failed to create rwPod")
  214. framework.ExpectNoError(f.WaitForPodRunningSlow(rwPod.Name))
  215. // Delete pod with default grace period 30s
  216. framework.ExpectNoError(podClient.Delete(rwPod.Name, &api.DeleteOptions{}), "Failed to delete host0Pod")
  217. framework.ExpectNoError(waitForPDDetach(diskName, host0Name))
  218. By("submitting host0ROPod to kubernetes")
  219. _, err = podClient.Create(host0ROPod)
  220. framework.ExpectNoError(err, "Failed to create host0ROPod")
  221. By("submitting host1ROPod to kubernetes")
  222. _, err = podClient.Create(host1ROPod)
  223. framework.ExpectNoError(err, "Failed to create host1ROPod")
  224. framework.ExpectNoError(f.WaitForPodRunningSlow(host0ROPod.Name))
  225. framework.ExpectNoError(f.WaitForPodRunningSlow(host1ROPod.Name))
  226. By("deleting host0ROPod")
  227. framework.ExpectNoError(podClient.Delete(host0ROPod.Name, &api.DeleteOptions{}), "Failed to delete host0ROPod")
  228. By("deleting host1ROPod")
  229. framework.ExpectNoError(podClient.Delete(host1ROPod.Name, &api.DeleteOptions{}), "Failed to delete host1ROPod")
  230. By("Test completed successfully, waiting for PD to safely detach")
  231. waitForPDDetach(diskName, host0Name)
  232. waitForPDDetach(diskName, host1Name)
  233. })
  234. It("should schedule a pod w/ a RW PD shared between multiple containers, write to PD, delete pod, verify contents, and repeat in rapid succession [Slow]", func() {
  235. framework.SkipUnlessProviderIs("gce", "gke", "aws")
  236. By("creating PD")
  237. diskName, err := createPDWithRetry()
  238. framework.ExpectNoError(err, "Error creating PD")
  239. numContainers := 4
  240. var host0Pod *api.Pod
  241. defer func() {
  242. By("cleaning up PD-RW test environment")
  243. // Teardown pods, PD. Ignore errors.
  244. // Teardown should do nothing unless test failed.
  245. if host0Pod != nil {
  246. podClient.Delete(host0Pod.Name, api.NewDeleteOptions(0))
  247. }
  248. detachAndDeletePDs(diskName, []string{host0Name})
  249. }()
  250. fileAndContentToVerify := make(map[string]string)
  251. for i := 0; i < 3; i++ {
  252. framework.Logf("PD Read/Writer Iteration #%v", i)
  253. By("submitting host0Pod to kubernetes")
  254. host0Pod = testPDPod([]string{diskName}, host0Name, false /* readOnly */, numContainers)
  255. _, err = podClient.Create(host0Pod)
  256. framework.ExpectNoError(err, fmt.Sprintf("Failed to create host0Pod: %v", err))
  257. framework.ExpectNoError(f.WaitForPodRunningSlow(host0Pod.Name))
  258. // randomly select a container and read/verify pd contents from it
  259. containerName := fmt.Sprintf("mycontainer%v", mathrand.Intn(numContainers)+1)
  260. verifyPDContentsViaContainer(f, host0Pod.Name, containerName, fileAndContentToVerify)
  261. // Randomly select a container to write a file to PD from
  262. containerName = fmt.Sprintf("mycontainer%v", mathrand.Intn(numContainers)+1)
  263. testFile := fmt.Sprintf("/testpd1/tracker%v", i)
  264. testFileContents := fmt.Sprintf("%v", mathrand.Int())
  265. fileAndContentToVerify[testFile] = testFileContents
  266. framework.ExpectNoError(f.WriteFileViaContainer(host0Pod.Name, containerName, testFile, testFileContents))
  267. framework.Logf("Wrote value: \"%v\" to PD %q from pod %q container %q", testFileContents, diskName, host0Pod.Name, containerName)
  268. // Randomly select a container and read/verify pd contents from it
  269. containerName = fmt.Sprintf("mycontainer%v", mathrand.Intn(numContainers)+1)
  270. verifyPDContentsViaContainer(f, host0Pod.Name, containerName, fileAndContentToVerify)
  271. By("deleting host0Pod")
  272. framework.ExpectNoError(podClient.Delete(host0Pod.Name, api.NewDeleteOptions(0)), "Failed to delete host0Pod")
  273. }
  274. By("Test completed successfully, waiting for PD to safely detach")
  275. waitForPDDetach(diskName, host0Name)
  276. })
  277. It("should schedule a pod w/two RW PDs both mounted to one container, write to PD, verify contents, delete pod, recreate pod, verify contents, and repeat in rapid succession [Slow]", func() {
  278. framework.SkipUnlessProviderIs("gce", "gke", "aws")
  279. By("creating PD1")
  280. disk1Name, err := createPDWithRetry()
  281. framework.ExpectNoError(err, "Error creating PD1")
  282. By("creating PD2")
  283. disk2Name, err := createPDWithRetry()
  284. framework.ExpectNoError(err, "Error creating PD2")
  285. var host0Pod *api.Pod
  286. defer func() {
  287. By("cleaning up PD-RW test environment")
  288. // Teardown pods, PD. Ignore errors.
  289. // Teardown should do nothing unless test failed.
  290. if host0Pod != nil {
  291. podClient.Delete(host0Pod.Name, api.NewDeleteOptions(0))
  292. }
  293. detachAndDeletePDs(disk1Name, []string{host0Name})
  294. detachAndDeletePDs(disk2Name, []string{host0Name})
  295. }()
  296. containerName := "mycontainer"
  297. fileAndContentToVerify := make(map[string]string)
  298. for i := 0; i < 3; i++ {
  299. framework.Logf("PD Read/Writer Iteration #%v", i)
  300. By("submitting host0Pod to kubernetes")
  301. host0Pod = testPDPod([]string{disk1Name, disk2Name}, host0Name, false /* readOnly */, 1 /* numContainers */)
  302. _, err = podClient.Create(host0Pod)
  303. framework.ExpectNoError(err, fmt.Sprintf("Failed to create host0Pod: %v", err))
  304. framework.ExpectNoError(f.WaitForPodRunningSlow(host0Pod.Name))
  305. // Read/verify pd contents for both disks from container
  306. verifyPDContentsViaContainer(f, host0Pod.Name, containerName, fileAndContentToVerify)
  307. // Write a file to both PDs from container
  308. testFilePD1 := fmt.Sprintf("/testpd1/tracker%v", i)
  309. testFilePD2 := fmt.Sprintf("/testpd2/tracker%v", i)
  310. testFilePD1Contents := fmt.Sprintf("%v", mathrand.Int())
  311. testFilePD2Contents := fmt.Sprintf("%v", mathrand.Int())
  312. fileAndContentToVerify[testFilePD1] = testFilePD1Contents
  313. fileAndContentToVerify[testFilePD2] = testFilePD2Contents
  314. framework.ExpectNoError(f.WriteFileViaContainer(host0Pod.Name, containerName, testFilePD1, testFilePD1Contents))
  315. framework.Logf("Wrote value: \"%v\" to PD1 (%q) from pod %q container %q", testFilePD1Contents, disk1Name, host0Pod.Name, containerName)
  316. framework.ExpectNoError(f.WriteFileViaContainer(host0Pod.Name, containerName, testFilePD2, testFilePD2Contents))
  317. framework.Logf("Wrote value: \"%v\" to PD2 (%q) from pod %q container %q", testFilePD2Contents, disk2Name, host0Pod.Name, containerName)
  318. // Read/verify pd contents for both disks from container
  319. verifyPDContentsViaContainer(f, host0Pod.Name, containerName, fileAndContentToVerify)
  320. By("deleting host0Pod")
  321. framework.ExpectNoError(podClient.Delete(host0Pod.Name, api.NewDeleteOptions(0)), "Failed to delete host0Pod")
  322. }
  323. By("Test completed successfully, waiting for PD to safely detach")
  324. waitForPDDetach(disk1Name, host0Name)
  325. waitForPDDetach(disk2Name, host0Name)
  326. })
  327. })
  328. func createPDWithRetry() (string, error) {
  329. newDiskName := ""
  330. var err error
  331. for start := time.Now(); time.Since(start) < gcePDRetryTimeout; time.Sleep(gcePDRetryPollTime) {
  332. if newDiskName, err = createPD(); err != nil {
  333. framework.Logf("Couldn't create a new PD. Sleeping 5 seconds (%v)", err)
  334. continue
  335. }
  336. framework.Logf("Successfully created a new PD: %q.", newDiskName)
  337. break
  338. }
  339. return newDiskName, err
  340. }
  341. func deletePDWithRetry(diskName string) {
  342. var err error
  343. for start := time.Now(); time.Since(start) < gcePDRetryTimeout; time.Sleep(gcePDRetryPollTime) {
  344. if err = deletePD(diskName); err != nil {
  345. framework.Logf("Couldn't delete PD %q. Sleeping 5 seconds (%v)", diskName, err)
  346. continue
  347. }
  348. framework.Logf("Successfully deleted PD %q.", diskName)
  349. break
  350. }
  351. framework.ExpectNoError(err, "Error deleting PD")
  352. }
  353. func verifyPDContentsViaContainer(f *framework.Framework, podName, containerName string, fileAndContentToVerify map[string]string) {
  354. for filePath, expectedContents := range fileAndContentToVerify {
  355. v, err := f.ReadFileViaContainer(podName, containerName, filePath)
  356. if err != nil {
  357. framework.Logf("Error reading file: %v", err)
  358. }
  359. framework.ExpectNoError(err)
  360. framework.Logf("Read file %q with content: %v", filePath, v)
  361. Expect(strings.TrimSpace(v)).To(Equal(strings.TrimSpace(expectedContents)))
  362. }
  363. }
  364. func createPD() (string, error) {
  365. if framework.TestContext.Provider == "gce" || framework.TestContext.Provider == "gke" {
  366. pdName := fmt.Sprintf("%s-%s", framework.TestContext.Prefix, string(uuid.NewUUID()))
  367. gceCloud, err := getGCECloud()
  368. if err != nil {
  369. return "", err
  370. }
  371. tags := map[string]string{}
  372. err = gceCloud.CreateDisk(pdName, gcecloud.DiskTypeSSD, framework.TestContext.CloudConfig.Zone, 10 /* sizeGb */, tags)
  373. if err != nil {
  374. return "", err
  375. }
  376. return pdName, nil
  377. } else if framework.TestContext.Provider == "aws" {
  378. client := ec2.New(session.New())
  379. request := &ec2.CreateVolumeInput{}
  380. request.AvailabilityZone = aws.String(cloudConfig.Zone)
  381. request.Size = aws.Int64(10)
  382. request.VolumeType = aws.String(awscloud.DefaultVolumeType)
  383. response, err := client.CreateVolume(request)
  384. if err != nil {
  385. return "", err
  386. }
  387. az := aws.StringValue(response.AvailabilityZone)
  388. awsID := aws.StringValue(response.VolumeId)
  389. volumeName := "aws://" + az + "/" + awsID
  390. return volumeName, nil
  391. } else {
  392. return "", fmt.Errorf("Provider does not support volume creation")
  393. }
  394. }
  395. func deletePD(pdName string) error {
  396. if framework.TestContext.Provider == "gce" || framework.TestContext.Provider == "gke" {
  397. gceCloud, err := getGCECloud()
  398. if err != nil {
  399. return err
  400. }
  401. err = gceCloud.DeleteDisk(pdName)
  402. if err != nil {
  403. if gerr, ok := err.(*googleapi.Error); ok && len(gerr.Errors) > 0 && gerr.Errors[0].Reason == "notFound" {
  404. // PD already exists, ignore error.
  405. return nil
  406. }
  407. framework.Logf("Error deleting PD %q: %v", pdName, err)
  408. }
  409. return err
  410. } else if framework.TestContext.Provider == "aws" {
  411. client := ec2.New(session.New())
  412. tokens := strings.Split(pdName, "/")
  413. awsVolumeID := tokens[len(tokens)-1]
  414. request := &ec2.DeleteVolumeInput{VolumeId: aws.String(awsVolumeID)}
  415. _, err := client.DeleteVolume(request)
  416. if err != nil {
  417. if awsError, ok := err.(awserr.Error); ok && awsError.Code() == "InvalidVolume.NotFound" {
  418. framework.Logf("Volume deletion implicitly succeeded because volume %q does not exist.", pdName)
  419. } else {
  420. return fmt.Errorf("error deleting EBS volumes: %v", err)
  421. }
  422. }
  423. return nil
  424. } else {
  425. return fmt.Errorf("Provider does not support volume deletion")
  426. }
  427. }
  428. func detachPD(hostName, pdName string) error {
  429. if framework.TestContext.Provider == "gce" || framework.TestContext.Provider == "gke" {
  430. instanceName := strings.Split(hostName, ".")[0]
  431. gceCloud, err := getGCECloud()
  432. if err != nil {
  433. return err
  434. }
  435. err = gceCloud.DetachDisk(pdName, instanceName)
  436. if err != nil {
  437. if gerr, ok := err.(*googleapi.Error); ok && strings.Contains(gerr.Message, "Invalid value for field 'disk'") {
  438. // PD already detached, ignore error.
  439. return nil
  440. }
  441. framework.Logf("Error detaching PD %q: %v", pdName, err)
  442. }
  443. return err
  444. } else if framework.TestContext.Provider == "aws" {
  445. client := ec2.New(session.New())
  446. tokens := strings.Split(pdName, "/")
  447. awsVolumeID := tokens[len(tokens)-1]
  448. request := ec2.DetachVolumeInput{
  449. VolumeId: aws.String(awsVolumeID),
  450. }
  451. _, err := client.DetachVolume(&request)
  452. if err != nil {
  453. return fmt.Errorf("error detaching EBS volume: %v", err)
  454. }
  455. return nil
  456. } else {
  457. return fmt.Errorf("Provider does not support volume detaching")
  458. }
  459. }
  460. func testPDPod(diskNames []string, targetHost string, readOnly bool, numContainers int) *api.Pod {
  461. containers := make([]api.Container, numContainers)
  462. for i := range containers {
  463. containers[i].Name = "mycontainer"
  464. if numContainers > 1 {
  465. containers[i].Name = fmt.Sprintf("mycontainer%v", i+1)
  466. }
  467. containers[i].Image = "gcr.io/google_containers/busybox:1.24"
  468. containers[i].Command = []string{"sleep", "6000"}
  469. containers[i].VolumeMounts = make([]api.VolumeMount, len(diskNames))
  470. for k := range diskNames {
  471. containers[i].VolumeMounts[k].Name = fmt.Sprintf("testpd%v", k+1)
  472. containers[i].VolumeMounts[k].MountPath = fmt.Sprintf("/testpd%v", k+1)
  473. }
  474. containers[i].Resources.Limits = api.ResourceList{}
  475. containers[i].Resources.Limits[api.ResourceCPU] = *resource.NewQuantity(int64(0), resource.DecimalSI)
  476. }
  477. pod := &api.Pod{
  478. TypeMeta: unversioned.TypeMeta{
  479. Kind: "Pod",
  480. APIVersion: registered.GroupOrDie(api.GroupName).GroupVersion.String(),
  481. },
  482. ObjectMeta: api.ObjectMeta{
  483. Name: "pd-test-" + string(uuid.NewUUID()),
  484. },
  485. Spec: api.PodSpec{
  486. Containers: containers,
  487. NodeName: targetHost,
  488. },
  489. }
  490. if framework.TestContext.Provider == "gce" || framework.TestContext.Provider == "gke" {
  491. pod.Spec.Volumes = make([]api.Volume, len(diskNames))
  492. for k, diskName := range diskNames {
  493. pod.Spec.Volumes[k].Name = fmt.Sprintf("testpd%v", k+1)
  494. pod.Spec.Volumes[k].VolumeSource = api.VolumeSource{
  495. GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
  496. PDName: diskName,
  497. FSType: "ext4",
  498. ReadOnly: readOnly,
  499. },
  500. }
  501. }
  502. } else if framework.TestContext.Provider == "aws" {
  503. pod.Spec.Volumes = make([]api.Volume, len(diskNames))
  504. for k, diskName := range diskNames {
  505. pod.Spec.Volumes[k].Name = fmt.Sprintf("testpd%v", k+1)
  506. pod.Spec.Volumes[k].VolumeSource = api.VolumeSource{
  507. AWSElasticBlockStore: &api.AWSElasticBlockStoreVolumeSource{
  508. VolumeID: diskName,
  509. FSType: "ext4",
  510. ReadOnly: readOnly,
  511. },
  512. }
  513. }
  514. } else {
  515. panic("Unknown provider: " + framework.TestContext.Provider)
  516. }
  517. return pod
  518. }
  519. // Waits for specified PD to to detach from specified hostName
  520. func waitForPDDetach(diskName, hostName string) error {
  521. if framework.TestContext.Provider == "gce" || framework.TestContext.Provider == "gke" {
  522. framework.Logf("Waiting for GCE PD %q to detach from node %q.", diskName, hostName)
  523. gceCloud, err := getGCECloud()
  524. if err != nil {
  525. return err
  526. }
  527. for start := time.Now(); time.Since(start) < gcePDDetachTimeout; time.Sleep(gcePDDetachPollTime) {
  528. diskAttached, err := gceCloud.DiskIsAttached(diskName, hostName)
  529. if err != nil {
  530. framework.Logf("Error waiting for PD %q to detach from node %q. 'DiskIsAttached(...)' failed with %v", diskName, hostName, err)
  531. return err
  532. }
  533. if !diskAttached {
  534. // Specified disk does not appear to be attached to specified node
  535. framework.Logf("GCE PD %q appears to have successfully detached from %q.", diskName, hostName)
  536. return nil
  537. }
  538. framework.Logf("Waiting for GCE PD %q to detach from %q.", diskName, hostName)
  539. }
  540. return fmt.Errorf("Gave up waiting for GCE PD %q to detach from %q after %v", diskName, hostName, gcePDDetachTimeout)
  541. }
  542. return nil
  543. }
  544. func getGCECloud() (*gcecloud.GCECloud, error) {
  545. gceCloud, ok := framework.TestContext.CloudConfig.Provider.(*gcecloud.GCECloud)
  546. if !ok {
  547. return nil, fmt.Errorf("failed to convert CloudConfig.Provider to GCECloud: %#v", framework.TestContext.CloudConfig.Provider)
  548. }
  549. return gceCloud, nil
  550. }
  551. func detachAndDeletePDs(diskName string, hosts []string) {
  552. for _, host := range hosts {
  553. framework.Logf("Detaching GCE PD %q from node %q.", diskName, host)
  554. detachPD(host, diskName)
  555. By(fmt.Sprintf("Waiting for PD %q to detach from %q", diskName, host))
  556. waitForPDDetach(diskName, host)
  557. }
  558. By(fmt.Sprintf("Deleting PD %q", diskName))
  559. deletePDWithRetry(diskName)
  560. }
  561. func waitForPDInVolumesInUse(
  562. nodeClient client.NodeInterface,
  563. diskName, nodeName string,
  564. timeout time.Duration,
  565. shouldExist bool) error {
  566. logStr := "to contain"
  567. if !shouldExist {
  568. logStr = "to NOT contain"
  569. }
  570. framework.Logf(
  571. "Waiting for node %s's VolumesInUse Status %s PD %q",
  572. nodeName, logStr, diskName)
  573. for start := time.Now(); time.Since(start) < timeout; time.Sleep(nodeStatusPollTime) {
  574. nodeObj, err := nodeClient.Get(nodeName)
  575. if err != nil || nodeObj == nil {
  576. framework.Logf(
  577. "Failed to fetch node object %q from API server. err=%v",
  578. nodeName, err)
  579. continue
  580. }
  581. exists := false
  582. for _, volumeInUse := range nodeObj.Status.VolumesInUse {
  583. volumeInUseStr := string(volumeInUse)
  584. if strings.Contains(volumeInUseStr, diskName) {
  585. if shouldExist {
  586. framework.Logf(
  587. "Found PD %q in node %q's VolumesInUse Status: %q",
  588. diskName, nodeName, volumeInUseStr)
  589. return nil
  590. }
  591. exists = true
  592. }
  593. }
  594. if !shouldExist && !exists {
  595. framework.Logf(
  596. "Verified PD %q does not exist in node %q's VolumesInUse Status.",
  597. diskName, nodeName)
  598. return nil
  599. }
  600. }
  601. return fmt.Errorf(
  602. "Timed out waiting for node %s VolumesInUse Status %s diskName %q",
  603. nodeName, logStr, diskName)
  604. }