jobcontroller_test.go 22 KB


  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package job
  14. import (
  15. "fmt"
  16. "reflect"
  17. "testing"
  18. "time"
  19. "k8s.io/kubernetes/pkg/api"
  20. "k8s.io/kubernetes/pkg/api/testapi"
  21. "k8s.io/kubernetes/pkg/api/unversioned"
  22. "k8s.io/kubernetes/pkg/apis/batch"
  23. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  24. "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
  25. "k8s.io/kubernetes/pkg/client/restclient"
  26. "k8s.io/kubernetes/pkg/client/testing/core"
  27. "k8s.io/kubernetes/pkg/client/unversioned/testclient"
  28. "k8s.io/kubernetes/pkg/controller"
  29. "k8s.io/kubernetes/pkg/util/rand"
  30. "k8s.io/kubernetes/pkg/util/wait"
  31. "k8s.io/kubernetes/pkg/watch"
  32. )
  33. var alwaysReady = func() bool { return true }
  34. func newJob(parallelism, completions int32) *batch.Job {
  35. j := &batch.Job{
  36. ObjectMeta: api.ObjectMeta{
  37. Name: "foobar",
  38. Namespace: api.NamespaceDefault,
  39. },
  40. Spec: batch.JobSpec{
  41. Selector: &unversioned.LabelSelector{
  42. MatchLabels: map[string]string{"foo": "bar"},
  43. },
  44. Template: api.PodTemplateSpec{
  45. ObjectMeta: api.ObjectMeta{
  46. Labels: map[string]string{
  47. "foo": "bar",
  48. },
  49. },
  50. Spec: api.PodSpec{
  51. Containers: []api.Container{
  52. {Image: "foo/bar"},
  53. },
  54. },
  55. },
  56. },
  57. }
  58. // Special case: -1 for either completions or parallelism means leave nil (negative is not allowed
  59. // in practice by validation.
  60. if completions >= 0 {
  61. j.Spec.Completions = &completions
  62. } else {
  63. j.Spec.Completions = nil
  64. }
  65. if parallelism >= 0 {
  66. j.Spec.Parallelism = &parallelism
  67. } else {
  68. j.Spec.Parallelism = nil
  69. }
  70. return j
  71. }
  72. func getKey(job *batch.Job, t *testing.T) string {
  73. if key, err := controller.KeyFunc(job); err != nil {
  74. t.Errorf("Unexpected error getting key for job %v: %v", job.Name, err)
  75. return ""
  76. } else {
  77. return key
  78. }
  79. }
  80. // create count pods with the given phase for the given job
  81. func newPodList(count int32, status api.PodPhase, job *batch.Job) []api.Pod {
  82. pods := []api.Pod{}
  83. for i := int32(0); i < count; i++ {
  84. newPod := api.Pod{
  85. ObjectMeta: api.ObjectMeta{
  86. Name: fmt.Sprintf("pod-%v", rand.String(10)),
  87. Labels: job.Spec.Selector.MatchLabels,
  88. Namespace: job.Namespace,
  89. },
  90. Status: api.PodStatus{Phase: status},
  91. }
  92. pods = append(pods, newPod)
  93. }
  94. return pods
  95. }
  96. func TestControllerSyncJob(t *testing.T) {
  97. testCases := map[string]struct {
  98. // job setup
  99. parallelism int32
  100. completions int32
  101. deleting bool
  102. // pod setup
  103. podControllerError error
  104. pendingPods int32
  105. activePods int32
  106. succeededPods int32
  107. failedPods int32
  108. // expectations
  109. expectedCreations int32
  110. expectedDeletions int32
  111. expectedActive int32
  112. expectedSucceeded int32
  113. expectedFailed int32
  114. expectedComplete bool
  115. }{
  116. "job start": {
  117. 2, 5, false,
  118. nil, 0, 0, 0, 0,
  119. 2, 0, 2, 0, 0, false,
  120. },
  121. "WQ job start": {
  122. 2, -1, false,
  123. nil, 0, 0, 0, 0,
  124. 2, 0, 2, 0, 0, false,
  125. },
  126. "pending pods": {
  127. 2, 5, false,
  128. nil, 2, 0, 0, 0,
  129. 0, 0, 2, 0, 0, false,
  130. },
  131. "correct # of pods": {
  132. 2, 5, false,
  133. nil, 0, 2, 0, 0,
  134. 0, 0, 2, 0, 0, false,
  135. },
  136. "WQ job: correct # of pods": {
  137. 2, -1, false,
  138. nil, 0, 2, 0, 0,
  139. 0, 0, 2, 0, 0, false,
  140. },
  141. "too few active pods": {
  142. 2, 5, false,
  143. nil, 0, 1, 1, 0,
  144. 1, 0, 2, 1, 0, false,
  145. },
  146. "too few active pods with a dynamic job": {
  147. 2, -1, false,
  148. nil, 0, 1, 0, 0,
  149. 1, 0, 2, 0, 0, false,
  150. },
  151. "too few active pods, with controller error": {
  152. 2, 5, false,
  153. fmt.Errorf("Fake error"), 0, 1, 1, 0,
  154. 1, 0, 1, 1, 0, false,
  155. },
  156. "too many active pods": {
  157. 2, 5, false,
  158. nil, 0, 3, 0, 0,
  159. 0, 1, 2, 0, 0, false,
  160. },
  161. "too many active pods, with controller error": {
  162. 2, 5, false,
  163. fmt.Errorf("Fake error"), 0, 3, 0, 0,
  164. 0, 1, 3, 0, 0, false,
  165. },
  166. "failed pod": {
  167. 2, 5, false,
  168. nil, 0, 1, 1, 1,
  169. 1, 0, 2, 1, 1, false,
  170. },
  171. "job finish": {
  172. 2, 5, false,
  173. nil, 0, 0, 5, 0,
  174. 0, 0, 0, 5, 0, true,
  175. },
  176. "WQ job finishing": {
  177. 2, -1, false,
  178. nil, 0, 1, 1, 0,
  179. 0, 0, 1, 1, 0, false,
  180. },
  181. "WQ job all finished": {
  182. 2, -1, false,
  183. nil, 0, 0, 2, 0,
  184. 0, 0, 0, 2, 0, true,
  185. },
  186. "WQ job all finished despite one failure": {
  187. 2, -1, false,
  188. nil, 0, 0, 1, 1,
  189. 0, 0, 0, 1, 1, true,
  190. },
  191. "more active pods than completions": {
  192. 2, 5, false,
  193. nil, 0, 10, 0, 0,
  194. 0, 8, 2, 0, 0, false,
  195. },
  196. "status change": {
  197. 2, 5, false,
  198. nil, 0, 2, 2, 0,
  199. 0, 0, 2, 2, 0, false,
  200. },
  201. "deleting job": {
  202. 2, 5, true,
  203. nil, 1, 1, 1, 0,
  204. 0, 0, 2, 1, 0, false,
  205. },
  206. }
  207. for name, tc := range testCases {
  208. // job manager setup
  209. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  210. manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  211. fakePodControl := controller.FakePodControl{Err: tc.podControllerError}
  212. manager.podControl = &fakePodControl
  213. manager.podStoreSynced = alwaysReady
  214. var actual *batch.Job
  215. manager.updateHandler = func(job *batch.Job) error {
  216. actual = job
  217. return nil
  218. }
  219. // job & pods setup
  220. job := newJob(tc.parallelism, tc.completions)
  221. if tc.deleting {
  222. now := unversioned.Now()
  223. job.DeletionTimestamp = &now
  224. }
  225. manager.jobStore.Store.Add(job)
  226. for _, pod := range newPodList(tc.pendingPods, api.PodPending, job) {
  227. manager.podStore.Indexer.Add(&pod)
  228. }
  229. for _, pod := range newPodList(tc.activePods, api.PodRunning, job) {
  230. manager.podStore.Indexer.Add(&pod)
  231. }
  232. for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) {
  233. manager.podStore.Indexer.Add(&pod)
  234. }
  235. for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) {
  236. manager.podStore.Indexer.Add(&pod)
  237. }
  238. // run
  239. err := manager.syncJob(getKey(job, t))
  240. if err != nil {
  241. t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
  242. }
  243. // validate created/deleted pods
  244. if int32(len(fakePodControl.Templates)) != tc.expectedCreations {
  245. t.Errorf("%s: unexpected number of creates. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.Templates))
  246. }
  247. if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions {
  248. t.Errorf("%s: unexpected number of deletes. Expected %d, saw %d\n", name, tc.expectedDeletions, len(fakePodControl.DeletePodName))
  249. }
  250. // validate status
  251. if actual.Status.Active != tc.expectedActive {
  252. t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActive, actual.Status.Active)
  253. }
  254. if actual.Status.Succeeded != tc.expectedSucceeded {
  255. t.Errorf("%s: unexpected number of succeeded pods. Expected %d, saw %d\n", name, tc.expectedSucceeded, actual.Status.Succeeded)
  256. }
  257. if actual.Status.Failed != tc.expectedFailed {
  258. t.Errorf("%s: unexpected number of failed pods. Expected %d, saw %d\n", name, tc.expectedFailed, actual.Status.Failed)
  259. }
  260. if actual.Status.StartTime == nil {
  261. t.Errorf("%s: .status.startTime was not set", name)
  262. }
  263. // validate conditions
  264. if tc.expectedComplete && !getCondition(actual, batch.JobComplete) {
  265. t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions)
  266. }
  267. }
  268. }
  269. func TestSyncJobPastDeadline(t *testing.T) {
  270. testCases := map[string]struct {
  271. // job setup
  272. parallelism int32
  273. completions int32
  274. activeDeadlineSeconds int64
  275. startTime int64
  276. // pod setup
  277. activePods int32
  278. succeededPods int32
  279. failedPods int32
  280. // expectations
  281. expectedDeletions int32
  282. expectedActive int32
  283. expectedSucceeded int32
  284. expectedFailed int32
  285. }{
  286. "activeDeadlineSeconds less than single pod execution": {
  287. 1, 1, 10, 15,
  288. 1, 0, 0,
  289. 1, 0, 0, 1,
  290. },
  291. "activeDeadlineSeconds bigger than single pod execution": {
  292. 1, 2, 10, 15,
  293. 1, 1, 0,
  294. 1, 0, 1, 1,
  295. },
  296. "activeDeadlineSeconds times-out before any pod starts": {
  297. 1, 1, 10, 10,
  298. 0, 0, 0,
  299. 0, 0, 0, 0,
  300. },
  301. }
  302. for name, tc := range testCases {
  303. // job manager setup
  304. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  305. manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  306. fakePodControl := controller.FakePodControl{}
  307. manager.podControl = &fakePodControl
  308. manager.podStoreSynced = alwaysReady
  309. var actual *batch.Job
  310. manager.updateHandler = func(job *batch.Job) error {
  311. actual = job
  312. return nil
  313. }
  314. // job & pods setup
  315. job := newJob(tc.parallelism, tc.completions)
  316. job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds
  317. start := unversioned.Unix(unversioned.Now().Time.Unix()-tc.startTime, 0)
  318. job.Status.StartTime = &start
  319. manager.jobStore.Store.Add(job)
  320. for _, pod := range newPodList(tc.activePods, api.PodRunning, job) {
  321. manager.podStore.Indexer.Add(&pod)
  322. }
  323. for _, pod := range newPodList(tc.succeededPods, api.PodSucceeded, job) {
  324. manager.podStore.Indexer.Add(&pod)
  325. }
  326. for _, pod := range newPodList(tc.failedPods, api.PodFailed, job) {
  327. manager.podStore.Indexer.Add(&pod)
  328. }
  329. // run
  330. err := manager.syncJob(getKey(job, t))
  331. if err != nil {
  332. t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
  333. }
  334. // validate created/deleted pods
  335. if int32(len(fakePodControl.Templates)) != 0 {
  336. t.Errorf("%s: unexpected number of creates. Expected 0, saw %d\n", name, len(fakePodControl.Templates))
  337. }
  338. if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions {
  339. t.Errorf("%s: unexpected number of deletes. Expected %d, saw %d\n", name, tc.expectedDeletions, len(fakePodControl.DeletePodName))
  340. }
  341. // validate status
  342. if actual.Status.Active != tc.expectedActive {
  343. t.Errorf("%s: unexpected number of active pods. Expected %d, saw %d\n", name, tc.expectedActive, actual.Status.Active)
  344. }
  345. if actual.Status.Succeeded != tc.expectedSucceeded {
  346. t.Errorf("%s: unexpected number of succeeded pods. Expected %d, saw %d\n", name, tc.expectedSucceeded, actual.Status.Succeeded)
  347. }
  348. if actual.Status.Failed != tc.expectedFailed {
  349. t.Errorf("%s: unexpected number of failed pods. Expected %d, saw %d\n", name, tc.expectedFailed, actual.Status.Failed)
  350. }
  351. if actual.Status.StartTime == nil {
  352. t.Errorf("%s: .status.startTime was not set", name)
  353. }
  354. // validate conditions
  355. if !getCondition(actual, batch.JobFailed) {
  356. t.Errorf("%s: expected fail condition. Got %#v", name, actual.Status.Conditions)
  357. }
  358. }
  359. }
  360. func getCondition(job *batch.Job, condition batch.JobConditionType) bool {
  361. for _, v := range job.Status.Conditions {
  362. if v.Type == condition && v.Status == api.ConditionTrue {
  363. return true
  364. }
  365. }
  366. return false
  367. }
  368. func TestSyncPastDeadlineJobFinished(t *testing.T) {
  369. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  370. manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  371. fakePodControl := controller.FakePodControl{}
  372. manager.podControl = &fakePodControl
  373. manager.podStoreSynced = alwaysReady
  374. var actual *batch.Job
  375. manager.updateHandler = func(job *batch.Job) error {
  376. actual = job
  377. return nil
  378. }
  379. job := newJob(1, 1)
  380. activeDeadlineSeconds := int64(10)
  381. job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
  382. start := unversioned.Unix(unversioned.Now().Time.Unix()-15, 0)
  383. job.Status.StartTime = &start
  384. job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
  385. manager.jobStore.Store.Add(job)
  386. err := manager.syncJob(getKey(job, t))
  387. if err != nil {
  388. t.Errorf("Unexpected error when syncing jobs %v", err)
  389. }
  390. if len(fakePodControl.Templates) != 0 {
  391. t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
  392. }
  393. if len(fakePodControl.DeletePodName) != 0 {
  394. t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
  395. }
  396. if actual != nil {
  397. t.Error("Unexpected job modification")
  398. }
  399. }
  400. func TestSyncJobComplete(t *testing.T) {
  401. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  402. manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  403. fakePodControl := controller.FakePodControl{}
  404. manager.podControl = &fakePodControl
  405. manager.podStoreSynced = alwaysReady
  406. job := newJob(1, 1)
  407. job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
  408. manager.jobStore.Store.Add(job)
  409. err := manager.syncJob(getKey(job, t))
  410. if err != nil {
  411. t.Fatalf("Unexpected error when syncing jobs %v", err)
  412. }
  413. uncastJob, _, err := manager.jobStore.Store.Get(job)
  414. if err != nil {
  415. t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
  416. }
  417. actual := uncastJob.(*batch.Job)
  418. // Verify that after syncing a complete job, the conditions are the same.
  419. if got, expected := len(actual.Status.Conditions), 1; got != expected {
  420. t.Fatalf("Unexpected job status conditions amount; expected %d, got %d", expected, got)
  421. }
  422. }
  423. func TestSyncJobDeleted(t *testing.T) {
  424. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  425. manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  426. fakePodControl := controller.FakePodControl{}
  427. manager.podControl = &fakePodControl
  428. manager.podStoreSynced = alwaysReady
  429. manager.updateHandler = func(job *batch.Job) error { return nil }
  430. job := newJob(2, 2)
  431. err := manager.syncJob(getKey(job, t))
  432. if err != nil {
  433. t.Errorf("Unexpected error when syncing jobs %v", err)
  434. }
  435. if len(fakePodControl.Templates) != 0 {
  436. t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
  437. }
  438. if len(fakePodControl.DeletePodName) != 0 {
  439. t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
  440. }
  441. }
  442. func TestSyncJobUpdateRequeue(t *testing.T) {
  443. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  444. manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  445. fakePodControl := controller.FakePodControl{}
  446. manager.podControl = &fakePodControl
  447. manager.podStoreSynced = alwaysReady
  448. manager.updateHandler = func(job *batch.Job) error { return fmt.Errorf("Fake error") }
  449. job := newJob(2, 2)
  450. manager.jobStore.Store.Add(job)
  451. err := manager.syncJob(getKey(job, t))
  452. if err != nil {
  453. t.Errorf("Unxpected error when syncing jobs, got %v", err)
  454. }
  455. t.Log("Waiting for a job in the queue")
  456. key, _ := manager.queue.Get()
  457. expectedKey := getKey(job, t)
  458. if key != expectedKey {
  459. t.Errorf("Expected requeue of job with key %s got %s", expectedKey, key)
  460. }
  461. }
  462. func TestJobPodLookup(t *testing.T) {
  463. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  464. manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  465. manager.podStoreSynced = alwaysReady
  466. testCases := []struct {
  467. job *batch.Job
  468. pod *api.Pod
  469. expectedName string
  470. }{
  471. // pods without labels don't match any job
  472. {
  473. job: &batch.Job{
  474. ObjectMeta: api.ObjectMeta{Name: "basic"},
  475. },
  476. pod: &api.Pod{
  477. ObjectMeta: api.ObjectMeta{Name: "foo1", Namespace: api.NamespaceAll},
  478. },
  479. expectedName: "",
  480. },
  481. // matching labels, different namespace
  482. {
  483. job: &batch.Job{
  484. ObjectMeta: api.ObjectMeta{Name: "foo"},
  485. Spec: batch.JobSpec{
  486. Selector: &unversioned.LabelSelector{
  487. MatchLabels: map[string]string{"foo": "bar"},
  488. },
  489. },
  490. },
  491. pod: &api.Pod{
  492. ObjectMeta: api.ObjectMeta{
  493. Name: "foo2",
  494. Namespace: "ns",
  495. Labels: map[string]string{"foo": "bar"},
  496. },
  497. },
  498. expectedName: "",
  499. },
  500. // matching ns and labels returns
  501. {
  502. job: &batch.Job{
  503. ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"},
  504. Spec: batch.JobSpec{
  505. Selector: &unversioned.LabelSelector{
  506. MatchExpressions: []unversioned.LabelSelectorRequirement{
  507. {
  508. Key: "foo",
  509. Operator: unversioned.LabelSelectorOpIn,
  510. Values: []string{"bar"},
  511. },
  512. },
  513. },
  514. },
  515. },
  516. pod: &api.Pod{
  517. ObjectMeta: api.ObjectMeta{
  518. Name: "foo3",
  519. Namespace: "ns",
  520. Labels: map[string]string{"foo": "bar"},
  521. },
  522. },
  523. expectedName: "bar",
  524. },
  525. }
  526. for _, tc := range testCases {
  527. manager.jobStore.Add(tc.job)
  528. if job := manager.getPodJob(tc.pod); job != nil {
  529. if tc.expectedName != job.Name {
  530. t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName)
  531. }
  532. } else if tc.expectedName != "" {
  533. t.Errorf("Expected a job %v pod %v, found none", tc.expectedName, tc.pod.Name)
  534. }
  535. }
  536. }
  537. type FakeJobExpectations struct {
  538. *controller.ControllerExpectations
  539. satisfied bool
  540. expSatisfied func()
  541. }
  542. func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
  543. fe.expSatisfied()
  544. return fe.satisfied
  545. }
  546. // TestSyncJobExpectations tests that a pod cannot sneak in between counting active pods
  547. // and checking expectations.
  548. func TestSyncJobExpectations(t *testing.T) {
  549. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  550. manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  551. fakePodControl := controller.FakePodControl{}
  552. manager.podControl = &fakePodControl
  553. manager.podStoreSynced = alwaysReady
  554. manager.updateHandler = func(job *batch.Job) error { return nil }
  555. job := newJob(2, 2)
  556. manager.jobStore.Store.Add(job)
  557. pods := newPodList(2, api.PodPending, job)
  558. manager.podStore.Indexer.Add(&pods[0])
  559. manager.expectations = FakeJobExpectations{
  560. controller.NewControllerExpectations(), true, func() {
  561. // If we check active pods before checking expectataions, the job
  562. // will create a new replica because it doesn't see this pod, but
  563. // has fulfilled its expectations.
  564. manager.podStore.Indexer.Add(&pods[1])
  565. },
  566. }
  567. manager.syncJob(getKey(job, t))
  568. if len(fakePodControl.Templates) != 0 {
  569. t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
  570. }
  571. if len(fakePodControl.DeletePodName) != 0 {
  572. t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
  573. }
  574. }
  575. type FakeWatcher struct {
  576. w *watch.FakeWatcher
  577. *testclient.Fake
  578. }
  579. func TestWatchJobs(t *testing.T) {
  580. clientset := fake.NewSimpleClientset()
  581. fakeWatch := watch.NewFake()
  582. clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil))
  583. manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  584. manager.podStoreSynced = alwaysReady
  585. var testJob batch.Job
  586. received := make(chan struct{})
  587. // The update sent through the fakeWatcher should make its way into the workqueue,
  588. // and eventually into the syncHandler.
  589. manager.syncHandler = func(key string) error {
  590. obj, exists, err := manager.jobStore.Store.GetByKey(key)
  591. if !exists || err != nil {
  592. t.Errorf("Expected to find job under key %v", key)
  593. }
  594. job, ok := obj.(*batch.Job)
  595. if !ok {
  596. t.Fatalf("unexpected type: %v %#v", reflect.TypeOf(obj), obj)
  597. }
  598. if !api.Semantic.DeepDerivative(*job, testJob) {
  599. t.Errorf("Expected %#v, but got %#v", testJob, *job)
  600. }
  601. close(received)
  602. return nil
  603. }
  604. // Start only the job watcher and the workqueue, send a watch event,
  605. // and make sure it hits the sync method.
  606. stopCh := make(chan struct{})
  607. defer close(stopCh)
  608. go manager.Run(1, stopCh)
  609. // We're sending new job to see if it reaches syncHandler.
  610. testJob.Name = "foo"
  611. fakeWatch.Add(&testJob)
  612. t.Log("Waiting for job to reach syncHandler")
  613. <-received
  614. }
  615. func TestWatchPods(t *testing.T) {
  616. testJob := newJob(2, 2)
  617. clientset := fake.NewSimpleClientset(testJob)
  618. fakeWatch := watch.NewFake()
  619. clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
  620. manager := NewJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
  621. manager.podStoreSynced = alwaysReady
  622. // Put one job and one pod into the store
  623. manager.jobStore.Store.Add(testJob)
  624. received := make(chan struct{})
  625. // The pod update sent through the fakeWatcher should figure out the managing job and
  626. // send it into the syncHandler.
  627. manager.syncHandler = func(key string) error {
  628. obj, exists, err := manager.jobStore.Store.GetByKey(key)
  629. if !exists || err != nil {
  630. t.Errorf("Expected to find job under key %v", key)
  631. close(received)
  632. return nil
  633. }
  634. job, ok := obj.(*batch.Job)
  635. if !ok {
  636. t.Errorf("unexpected type: %v %#v", reflect.TypeOf(obj), obj)
  637. close(received)
  638. return nil
  639. }
  640. if !api.Semantic.DeepDerivative(job, testJob) {
  641. t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job)
  642. close(received)
  643. return nil
  644. }
  645. close(received)
  646. return nil
  647. }
  648. // Start only the pod watcher and the workqueue, send a watch event,
  649. // and make sure it hits the sync method for the right job.
  650. stopCh := make(chan struct{})
  651. defer close(stopCh)
  652. go manager.internalPodInformer.Run(stopCh)
  653. go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
  654. pods := newPodList(1, api.PodRunning, testJob)
  655. testPod := pods[0]
  656. testPod.Status.Phase = api.PodFailed
  657. fakeWatch.Add(&testPod)
  658. t.Log("Waiting for pod to reach syncHandler")
  659. <-received
  660. }