batch_v1_jobs.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // This file is very similar to ./job.go. That one uses extensions/v1beta1, this one
  14. // uses batch/v1. That one uses ManualSelectors, this one does not. Keep them in sync.
  15. // Delete that one when Job removed from extensions/v1beta1.
  16. package e2e
  17. import (
  18. "time"
  19. "k8s.io/kubernetes/pkg/api"
  20. "k8s.io/kubernetes/pkg/api/errors"
  21. "k8s.io/kubernetes/pkg/apis/batch"
  22. client "k8s.io/kubernetes/pkg/client/unversioned"
  23. "k8s.io/kubernetes/pkg/kubectl"
  24. "k8s.io/kubernetes/pkg/labels"
  25. "k8s.io/kubernetes/pkg/util/wait"
  26. "k8s.io/kubernetes/test/e2e/framework"
  27. . "github.com/onsi/ginkgo"
  28. . "github.com/onsi/gomega"
  29. )
  30. const (
  31. // How long to wait for a job to finish.
  32. v1JobTimeout = 15 * time.Minute
  33. // Job selector name
  34. v1JobSelectorKey = "job-name"
  35. )
  36. var _ = framework.KubeDescribe("V1Job", func() {
  37. f := framework.NewDefaultFramework("v1job")
  38. parallelism := int32(2)
  39. completions := int32(4)
  40. lotsOfFailures := int32(5) // more than completions
  41. // Simplest case: all pods succeed promptly
  42. It("should run a job to completion when tasks succeed", func() {
  43. By("Creating a job")
  44. job := newTestV1Job("succeed", "all-succeed", api.RestartPolicyNever, parallelism, completions)
  45. job, err := createV1Job(f.Client, f.Namespace.Name, job)
  46. Expect(err).NotTo(HaveOccurred())
  47. By("Ensuring job reaches completions")
  48. err = waitForV1JobFinish(f.Client, f.Namespace.Name, job.Name, completions)
  49. Expect(err).NotTo(HaveOccurred())
  50. })
  51. // Pods sometimes fail, but eventually succeed.
  52. It("should run a job to completion when tasks sometimes fail and are locally restarted", func() {
  53. By("Creating a job")
  54. // One failure, then a success, local restarts.
  55. // We can't use the random failure approach used by the
  56. // non-local test below, because kubelet will throttle
  57. // frequently failing containers in a given pod, ramping
  58. // up to 5 minutes between restarts, making test timeouts
  59. // due to successive failures too likely with a reasonable
  60. // test timeout.
  61. job := newTestV1Job("failOnce", "fail-once-local", api.RestartPolicyOnFailure, parallelism, completions)
  62. job, err := createV1Job(f.Client, f.Namespace.Name, job)
  63. Expect(err).NotTo(HaveOccurred())
  64. By("Ensuring job reaches completions")
  65. err = waitForV1JobFinish(f.Client, f.Namespace.Name, job.Name, completions)
  66. Expect(err).NotTo(HaveOccurred())
  67. })
  68. // Pods sometimes fail, but eventually succeed, after pod restarts
  69. It("should run a job to completion when tasks sometimes fail and are not locally restarted", func() {
  70. By("Creating a job")
  71. // 50% chance of container success, local restarts.
  72. // Can't use the failOnce approach because that relies
  73. // on an emptyDir, which is not preserved across new pods.
  74. // Worst case analysis: 15 failures, each taking 1 minute to
  75. // run due to some slowness, 1 in 2^15 chance of happening,
  76. // causing test flake. Should be very rare.
  77. job := newTestV1Job("randomlySucceedOrFail", "rand-non-local", api.RestartPolicyNever, parallelism, completions)
  78. job, err := createV1Job(f.Client, f.Namespace.Name, job)
  79. Expect(err).NotTo(HaveOccurred())
  80. By("Ensuring job reaches completions")
  81. err = waitForV1JobFinish(f.Client, f.Namespace.Name, job.Name, completions)
  82. Expect(err).NotTo(HaveOccurred())
  83. })
  84. It("should keep restarting failed pods", func() {
  85. By("Creating a job")
  86. job := newTestV1Job("fail", "all-fail", api.RestartPolicyNever, parallelism, completions)
  87. job, err := createV1Job(f.Client, f.Namespace.Name, job)
  88. Expect(err).NotTo(HaveOccurred())
  89. By("Ensuring job shows many failures")
  90. err = wait.Poll(framework.Poll, v1JobTimeout, func() (bool, error) {
  91. curr, err := f.Client.Batch().Jobs(f.Namespace.Name).Get(job.Name)
  92. if err != nil {
  93. return false, err
  94. }
  95. return curr.Status.Failed > lotsOfFailures, nil
  96. })
  97. })
  98. It("should scale a job up", func() {
  99. startParallelism := int32(1)
  100. endParallelism := int32(2)
  101. By("Creating a job")
  102. job := newTestV1Job("notTerminate", "scale-up", api.RestartPolicyNever, startParallelism, completions)
  103. job, err := createV1Job(f.Client, f.Namespace.Name, job)
  104. Expect(err).NotTo(HaveOccurred())
  105. By("Ensuring active pods == startParallelism")
  106. err = waitForAllPodsRunningV1(f.Client, f.Namespace.Name, job.Name, startParallelism)
  107. Expect(err).NotTo(HaveOccurred())
  108. By("scale job up")
  109. scaler, err := kubectl.ScalerFor(batch.Kind("Job"), f.Client)
  110. Expect(err).NotTo(HaveOccurred())
  111. waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute)
  112. waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute)
  113. scaler.Scale(f.Namespace.Name, job.Name, uint(endParallelism), nil, waitForScale, waitForReplicas)
  114. Expect(err).NotTo(HaveOccurred())
  115. By("Ensuring active pods == endParallelism")
  116. err = waitForAllPodsRunningV1(f.Client, f.Namespace.Name, job.Name, endParallelism)
  117. Expect(err).NotTo(HaveOccurred())
  118. })
  119. It("should scale a job down", func() {
  120. startParallelism := int32(2)
  121. endParallelism := int32(1)
  122. By("Creating a job")
  123. job := newTestV1Job("notTerminate", "scale-down", api.RestartPolicyNever, startParallelism, completions)
  124. job, err := createV1Job(f.Client, f.Namespace.Name, job)
  125. Expect(err).NotTo(HaveOccurred())
  126. By("Ensuring active pods == startParallelism")
  127. err = waitForAllPodsRunningV1(f.Client, f.Namespace.Name, job.Name, startParallelism)
  128. Expect(err).NotTo(HaveOccurred())
  129. By("scale job down")
  130. scaler, err := kubectl.ScalerFor(batch.Kind("Job"), f.Client)
  131. Expect(err).NotTo(HaveOccurred())
  132. waitForScale := kubectl.NewRetryParams(5*time.Second, 1*time.Minute)
  133. waitForReplicas := kubectl.NewRetryParams(5*time.Second, 5*time.Minute)
  134. err = scaler.Scale(f.Namespace.Name, job.Name, uint(endParallelism), nil, waitForScale, waitForReplicas)
  135. Expect(err).NotTo(HaveOccurred())
  136. By("Ensuring active pods == endParallelism")
  137. err = waitForAllPodsRunningV1(f.Client, f.Namespace.Name, job.Name, endParallelism)
  138. Expect(err).NotTo(HaveOccurred())
  139. })
  140. It("should delete a job", func() {
  141. By("Creating a job")
  142. job := newTestV1Job("notTerminate", "foo", api.RestartPolicyNever, parallelism, completions)
  143. job, err := createV1Job(f.Client, f.Namespace.Name, job)
  144. Expect(err).NotTo(HaveOccurred())
  145. By("Ensuring active pods == parallelism")
  146. err = waitForAllPodsRunningV1(f.Client, f.Namespace.Name, job.Name, parallelism)
  147. Expect(err).NotTo(HaveOccurred())
  148. By("delete a job")
  149. reaper, err := kubectl.ReaperFor(batch.Kind("Job"), f.Client)
  150. Expect(err).NotTo(HaveOccurred())
  151. timeout := 1 * time.Minute
  152. err = reaper.Stop(f.Namespace.Name, job.Name, timeout, api.NewDeleteOptions(0))
  153. Expect(err).NotTo(HaveOccurred())
  154. By("Ensuring job was deleted")
  155. _, err = f.Client.Batch().Jobs(f.Namespace.Name).Get(job.Name)
  156. Expect(err).To(HaveOccurred())
  157. Expect(errors.IsNotFound(err)).To(BeTrue())
  158. })
  159. It("should fail a job [Slow]", func() {
  160. By("Creating a job")
  161. job := newTestV1Job("notTerminate", "foo", api.RestartPolicyNever, parallelism, completions)
  162. activeDeadlineSeconds := int64(10)
  163. job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
  164. job, err := createV1Job(f.Client, f.Namespace.Name, job)
  165. Expect(err).NotTo(HaveOccurred())
  166. By("Ensuring job was failed")
  167. err = waitForV1JobFail(f.Client, f.Namespace.Name, job.Name)
  168. Expect(err).NotTo(HaveOccurred())
  169. })
  170. })
  171. // newTestV1Job returns a job which does one of several testing behaviors.
  172. func newTestV1Job(behavior, name string, rPol api.RestartPolicy, parallelism, completions int32) *batch.Job {
  173. job := &batch.Job{
  174. ObjectMeta: api.ObjectMeta{
  175. Name: name,
  176. },
  177. Spec: batch.JobSpec{
  178. Parallelism: &parallelism,
  179. Completions: &completions,
  180. Template: api.PodTemplateSpec{
  181. ObjectMeta: api.ObjectMeta{
  182. Labels: map[string]string{"somekey": "somevalue"},
  183. },
  184. Spec: api.PodSpec{
  185. RestartPolicy: rPol,
  186. Volumes: []api.Volume{
  187. {
  188. Name: "data",
  189. VolumeSource: api.VolumeSource{
  190. EmptyDir: &api.EmptyDirVolumeSource{},
  191. },
  192. },
  193. },
  194. Containers: []api.Container{
  195. {
  196. Name: "c",
  197. Image: "gcr.io/google_containers/busybox:1.24",
  198. Command: []string{},
  199. VolumeMounts: []api.VolumeMount{
  200. {
  201. MountPath: "/data",
  202. Name: "data",
  203. },
  204. },
  205. },
  206. },
  207. },
  208. },
  209. },
  210. }
  211. switch behavior {
  212. case "notTerminate":
  213. job.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "1000000"}
  214. case "fail":
  215. job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit 1"}
  216. case "succeed":
  217. job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit 0"}
  218. case "randomlySucceedOrFail":
  219. // Bash's $RANDOM generates pseudorandom int in range 0 - 32767.
  220. // Dividing by 16384 gives roughly 50/50 chance of success.
  221. job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "exit $(( $RANDOM / 16384 ))"}
  222. case "failOnce":
  223. // Fail the first the container of the pod is run, and
  224. // succeed the second time. Checks for file on emptydir.
  225. // If present, succeed. If not, create but fail.
  226. // Note that this cannot be used with RestartNever because
  227. // it always fails the first time for a pod.
  228. job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "if [[ -r /data/foo ]] ; then exit 0 ; else touch /data/foo ; exit 1 ; fi"}
  229. }
  230. return job
  231. }
  232. func createV1Job(c *client.Client, ns string, job *batch.Job) (*batch.Job, error) {
  233. return c.Batch().Jobs(ns).Create(job)
  234. }
  235. func deleteV1Job(c *client.Client, ns, name string) error {
  236. return c.Batch().Jobs(ns).Delete(name, api.NewDeleteOptions(0))
  237. }
  238. // Wait for all pods to become Running. Only use when pods will run for a long time, or it will be racy.
  239. func waitForAllPodsRunningV1(c *client.Client, ns, jobName string, parallelism int32) error {
  240. label := labels.SelectorFromSet(labels.Set(map[string]string{v1JobSelectorKey: jobName}))
  241. return wait.Poll(framework.Poll, v1JobTimeout, func() (bool, error) {
  242. options := api.ListOptions{LabelSelector: label}
  243. pods, err := c.Pods(ns).List(options)
  244. if err != nil {
  245. return false, err
  246. }
  247. count := int32(0)
  248. for _, p := range pods.Items {
  249. if p.Status.Phase == api.PodRunning {
  250. count++
  251. }
  252. }
  253. return count == parallelism, nil
  254. })
  255. }
  256. // Wait for job to reach completions.
  257. func waitForV1JobFinish(c *client.Client, ns, jobName string, completions int32) error {
  258. return wait.Poll(framework.Poll, v1JobTimeout, func() (bool, error) {
  259. curr, err := c.Batch().Jobs(ns).Get(jobName)
  260. if err != nil {
  261. return false, err
  262. }
  263. return curr.Status.Succeeded == completions, nil
  264. })
  265. }
  266. // Wait for job fail.
  267. func waitForV1JobFail(c *client.Client, ns, jobName string) error {
  268. return wait.Poll(framework.Poll, v1JobTimeout, func() (bool, error) {
  269. curr, err := c.Batch().Jobs(ns).Get(jobName)
  270. if err != nil {
  271. return false, err
  272. }
  273. for _, c := range curr.Status.Conditions {
  274. if c.Type == batch.JobFailed && c.Status == api.ConditionTrue {
  275. return true, nil
  276. }
  277. }
  278. return false, nil
  279. })
  280. }