scheduler_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package scheduler
  14. import (
  15. "errors"
  16. "reflect"
  17. "testing"
  18. "time"
  19. "k8s.io/kubernetes/pkg/api"
  20. "k8s.io/kubernetes/pkg/api/resource"
  21. "k8s.io/kubernetes/pkg/api/testapi"
  22. clientcache "k8s.io/kubernetes/pkg/client/cache"
  23. "k8s.io/kubernetes/pkg/client/record"
  24. "k8s.io/kubernetes/pkg/labels"
  25. "k8s.io/kubernetes/pkg/util/diff"
  26. "k8s.io/kubernetes/pkg/util/wait"
  27. "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
  28. "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
  29. "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
  30. schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
  31. )
  32. type fakeBinder struct {
  33. b func(binding *api.Binding) error
  34. }
  35. func (fb fakeBinder) Bind(binding *api.Binding) error { return fb.b(binding) }
  36. type fakePodConditionUpdater struct{}
  37. func (fc fakePodConditionUpdater) Update(pod *api.Pod, podCondition *api.PodCondition) error {
  38. return nil
  39. }
  40. func podWithID(id, desiredHost string) *api.Pod {
  41. return &api.Pod{
  42. ObjectMeta: api.ObjectMeta{Name: id, SelfLink: testapi.Default.SelfLink("pods", id)},
  43. Spec: api.PodSpec{
  44. NodeName: desiredHost,
  45. },
  46. }
  47. }
  48. func podWithPort(id, desiredHost string, port int) *api.Pod {
  49. pod := podWithID(id, desiredHost)
  50. pod.Spec.Containers = []api.Container{
  51. {Name: "ctr", Ports: []api.ContainerPort{{HostPort: int32(port)}}},
  52. }
  53. return pod
  54. }
  55. func podWithResources(id, desiredHost string, limits api.ResourceList, requests api.ResourceList) *api.Pod {
  56. pod := podWithID(id, desiredHost)
  57. pod.Spec.Containers = []api.Container{
  58. {Name: "ctr", Resources: api.ResourceRequirements{Limits: limits, Requests: requests}},
  59. }
  60. return pod
  61. }
  62. type mockScheduler struct {
  63. machine string
  64. err error
  65. }
  66. func (es mockScheduler) Schedule(pod *api.Pod, ml algorithm.NodeLister) (string, error) {
  67. return es.machine, es.err
  68. }
  69. func TestScheduler(t *testing.T) {
  70. eventBroadcaster := record.NewBroadcaster()
  71. eventBroadcaster.StartLogging(t.Logf).Stop()
  72. errS := errors.New("scheduler")
  73. errB := errors.New("binder")
  74. testNode := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1"}}
  75. table := []struct {
  76. injectBindError error
  77. sendPod *api.Pod
  78. algo algorithm.ScheduleAlgorithm
  79. expectErrorPod *api.Pod
  80. expectAssumedPod *api.Pod
  81. expectError error
  82. expectBind *api.Binding
  83. eventReason string
  84. }{
  85. {
  86. sendPod: podWithID("foo", ""),
  87. algo: mockScheduler{testNode.Name, nil},
  88. expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: testNode.Name}},
  89. expectAssumedPod: podWithID("foo", testNode.Name),
  90. eventReason: "Scheduled",
  91. }, {
  92. sendPod: podWithID("foo", ""),
  93. algo: mockScheduler{testNode.Name, errS},
  94. expectError: errS,
  95. expectErrorPod: podWithID("foo", ""),
  96. eventReason: "FailedScheduling",
  97. }, {
  98. sendPod: podWithID("foo", ""),
  99. algo: mockScheduler{testNode.Name, nil},
  100. expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: testNode.Name}},
  101. expectAssumedPod: podWithID("foo", testNode.Name),
  102. injectBindError: errB,
  103. expectError: errB,
  104. expectErrorPod: podWithID("foo", ""),
  105. eventReason: "FailedScheduling",
  106. },
  107. }
  108. for i, item := range table {
  109. var gotError error
  110. var gotPod *api.Pod
  111. var gotAssumedPod *api.Pod
  112. var gotBinding *api.Binding
  113. c := &Config{
  114. SchedulerCache: &schedulertesting.FakeCache{
  115. AssumeFunc: func(pod *api.Pod) {
  116. gotAssumedPod = pod
  117. },
  118. },
  119. NodeLister: algorithm.FakeNodeLister(
  120. []*api.Node{&testNode},
  121. ),
  122. Algorithm: item.algo,
  123. Binder: fakeBinder{func(b *api.Binding) error {
  124. gotBinding = b
  125. return item.injectBindError
  126. }},
  127. PodConditionUpdater: fakePodConditionUpdater{},
  128. Error: func(p *api.Pod, err error) {
  129. gotPod = p
  130. gotError = err
  131. },
  132. NextPod: func() *api.Pod {
  133. return item.sendPod
  134. },
  135. Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "scheduler"}),
  136. }
  137. s := New(c)
  138. called := make(chan struct{})
  139. events := eventBroadcaster.StartEventWatcher(func(e *api.Event) {
  140. if e, a := item.eventReason, e.Reason; e != a {
  141. t.Errorf("%v: expected %v, got %v", i, e, a)
  142. }
  143. close(called)
  144. })
  145. s.scheduleOne()
  146. <-called
  147. if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) {
  148. t.Errorf("%v: assumed pod: wanted %v, got %v", i, e, a)
  149. }
  150. if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) {
  151. t.Errorf("%v: error pod: wanted %v, got %v", i, e, a)
  152. }
  153. if e, a := item.expectError, gotError; !reflect.DeepEqual(e, a) {
  154. t.Errorf("%v: error: wanted %v, got %v", i, e, a)
  155. }
  156. if e, a := item.expectBind, gotBinding; !reflect.DeepEqual(e, a) {
  157. t.Errorf("%v: error: %s", i, diff.ObjectDiff(e, a))
  158. }
  159. events.Stop()
  160. }
  161. }
  162. func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
  163. stop := make(chan struct{})
  164. defer close(stop)
  165. queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
  166. scache := schedulercache.New(100*time.Millisecond, stop)
  167. pod := podWithPort("pod.Name", "", 8080)
  168. node := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1"}}
  169. nodeLister := algorithm.FakeNodeLister([]*api.Node{&node})
  170. predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
  171. scheduler, bindingChan, _ := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, pod, &node)
  172. waitPodExpireChan := make(chan struct{})
  173. timeout := make(chan struct{})
  174. go func() {
  175. for {
  176. select {
  177. case <-timeout:
  178. return
  179. default:
  180. }
  181. pods, err := scache.List(labels.Everything())
  182. if err != nil {
  183. t.Fatalf("cache.List failed: %v", err)
  184. }
  185. if len(pods) == 0 {
  186. close(waitPodExpireChan)
  187. return
  188. }
  189. time.Sleep(100 * time.Millisecond)
  190. }
  191. }()
  192. // waiting for the assumed pod to expire
  193. select {
  194. case <-waitPodExpireChan:
  195. case <-time.After(wait.ForeverTestTimeout):
  196. close(timeout)
  197. t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
  198. }
  199. // We use conflicted pod ports to incur fit predicate failure if first pod not removed.
  200. secondPod := podWithPort("bar", "", 8080)
  201. queuedPodStore.Add(secondPod)
  202. scheduler.scheduleOne()
  203. select {
  204. case b := <-bindingChan:
  205. expectBinding := &api.Binding{
  206. ObjectMeta: api.ObjectMeta{Name: "bar"},
  207. Target: api.ObjectReference{Kind: "Node", Name: node.Name},
  208. }
  209. if !reflect.DeepEqual(expectBinding, b) {
  210. t.Errorf("binding want=%v, get=%v", expectBinding, b)
  211. }
  212. case <-time.After(wait.ForeverTestTimeout):
  213. t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
  214. }
  215. }
  216. func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
  217. stop := make(chan struct{})
  218. defer close(stop)
  219. queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
  220. scache := schedulercache.New(10*time.Minute, stop)
  221. firstPod := podWithPort("pod.Name", "", 8080)
  222. node := api.Node{ObjectMeta: api.ObjectMeta{Name: "machine1"}}
  223. nodeLister := algorithm.FakeNodeLister([]*api.Node{&node})
  224. predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
  225. scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(t, queuedPodStore, scache, nodeLister, predicateMap, firstPod, &node)
  226. // We use conflicted pod ports to incur fit predicate failure.
  227. secondPod := podWithPort("bar", "", 8080)
  228. queuedPodStore.Add(secondPod)
  229. // queuedPodStore: [bar:8080]
  230. // cache: [(assumed)foo:8080]
  231. scheduler.scheduleOne()
  232. select {
  233. case err := <-errChan:
  234. expectErr := &FitError{
  235. Pod: secondPod,
  236. FailedPredicates: FailedPredicateMap{node.Name: []algorithm.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}},
  237. }
  238. if !reflect.DeepEqual(expectErr, err) {
  239. t.Errorf("err want=%v, get=%v", expectErr, err)
  240. }
  241. case <-time.After(wait.ForeverTestTimeout):
  242. t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
  243. }
  244. // We mimic the workflow of cache behavior when a pod is removed by user.
  245. // Note: if the schedulercache timeout would be super short, the first pod would expire
  246. // and would be removed itself (without any explicit actions on schedulercache). Even in that case,
  247. // explicitly AddPod will as well correct the behavior.
  248. firstPod.Spec.NodeName = node.Name
  249. if err := scache.AddPod(firstPod); err != nil {
  250. t.Fatalf("err: %v", err)
  251. }
  252. if err := scache.RemovePod(firstPod); err != nil {
  253. t.Fatalf("err: %v", err)
  254. }
  255. queuedPodStore.Add(secondPod)
  256. scheduler.scheduleOne()
  257. select {
  258. case b := <-bindingChan:
  259. expectBinding := &api.Binding{
  260. ObjectMeta: api.ObjectMeta{Name: "bar"},
  261. Target: api.ObjectReference{Kind: "Node", Name: node.Name},
  262. }
  263. if !reflect.DeepEqual(expectBinding, b) {
  264. t.Errorf("binding want=%v, get=%v", expectBinding, b)
  265. }
  266. case <-time.After(wait.ForeverTestTimeout):
  267. t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
  268. }
  269. }
  270. // queuedPodStore: pods queued before processing.
  271. // cache: scheduler cache that might contain assumed pods.
  272. func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache,
  273. nodeLister algorithm.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, pod *api.Pod, node *api.Node) (*Scheduler, chan *api.Binding, chan error) {
  274. scheduler, bindingChan, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap)
  275. queuedPodStore.Add(pod)
  276. // queuedPodStore: [foo:8080]
  277. // cache: []
  278. scheduler.scheduleOne()
  279. // queuedPodStore: []
  280. // cache: [(assumed)foo:8080]
  281. select {
  282. case b := <-bindingChan:
  283. expectBinding := &api.Binding{
  284. ObjectMeta: api.ObjectMeta{Name: pod.Name},
  285. Target: api.ObjectReference{Kind: "Node", Name: node.Name},
  286. }
  287. if !reflect.DeepEqual(expectBinding, b) {
  288. t.Errorf("binding want=%v, get=%v", expectBinding, b)
  289. }
  290. case <-time.After(wait.ForeverTestTimeout):
  291. t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
  292. }
  293. return scheduler, bindingChan, errChan
  294. }
  295. func TestSchedulerFailedSchedulingReasons(t *testing.T) {
  296. stop := make(chan struct{})
  297. defer close(stop)
  298. queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
  299. scache := schedulercache.New(10*time.Minute, stop)
  300. node := api.Node{
  301. ObjectMeta: api.ObjectMeta{Name: "machine1"},
  302. Status: api.NodeStatus{
  303. Capacity: api.ResourceList{
  304. api.ResourceCPU: *(resource.NewQuantity(2, resource.DecimalSI)),
  305. api.ResourceMemory: *(resource.NewQuantity(100, resource.DecimalSI)),
  306. api.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
  307. },
  308. Allocatable: api.ResourceList{
  309. api.ResourceCPU: *(resource.NewQuantity(2, resource.DecimalSI)),
  310. api.ResourceMemory: *(resource.NewQuantity(100, resource.DecimalSI)),
  311. api.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
  312. }},
  313. }
  314. scache.AddNode(&node)
  315. nodeLister := algorithm.FakeNodeLister([]*api.Node{&node})
  316. predicateMap := map[string]algorithm.FitPredicate{
  317. "PodFitsResources": predicates.PodFitsResources,
  318. }
  319. scheduler, _, errChan := setupTestScheduler(queuedPodStore, scache, nodeLister, predicateMap)
  320. podWithTooBigResourceRequests := podWithResources("bar", "", api.ResourceList{
  321. api.ResourceCPU: *(resource.NewQuantity(4, resource.DecimalSI)),
  322. api.ResourceMemory: *(resource.NewQuantity(500, resource.DecimalSI)),
  323. }, api.ResourceList{
  324. api.ResourceCPU: *(resource.NewQuantity(4, resource.DecimalSI)),
  325. api.ResourceMemory: *(resource.NewQuantity(500, resource.DecimalSI)),
  326. })
  327. queuedPodStore.Add(podWithTooBigResourceRequests)
  328. scheduler.scheduleOne()
  329. select {
  330. case err := <-errChan:
  331. expectErr := &FitError{
  332. Pod: podWithTooBigResourceRequests,
  333. FailedPredicates: FailedPredicateMap{node.Name: []algorithm.PredicateFailureReason{
  334. predicates.NewInsufficientResourceError(api.ResourceCPU, 4000, 0, 2000),
  335. predicates.NewInsufficientResourceError(api.ResourceMemory, 500, 0, 100),
  336. }},
  337. }
  338. if !reflect.DeepEqual(expectErr, err) {
  339. t.Errorf("err want=%+v, get=%+v", expectErr, err)
  340. }
  341. case <-time.After(wait.ForeverTestTimeout):
  342. t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
  343. }
  344. }
  345. // queuedPodStore: pods queued before processing.
  346. // scache: scheduler cache that might contain assumed pods.
  347. func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister algorithm.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate) (*Scheduler, chan *api.Binding, chan error) {
  348. algo := NewGenericScheduler(
  349. scache,
  350. predicateMap,
  351. []algorithm.PriorityConfig{},
  352. []algorithm.SchedulerExtender{})
  353. bindingChan := make(chan *api.Binding, 1)
  354. errChan := make(chan error, 1)
  355. cfg := &Config{
  356. SchedulerCache: scache,
  357. NodeLister: nodeLister,
  358. Algorithm: algo,
  359. Binder: fakeBinder{func(b *api.Binding) error {
  360. bindingChan <- b
  361. return nil
  362. }},
  363. NextPod: func() *api.Pod {
  364. return clientcache.Pop(queuedPodStore).(*api.Pod)
  365. },
  366. Error: func(p *api.Pod, err error) {
  367. errChan <- err
  368. },
  369. Recorder: &record.FakeRecorder{},
  370. PodConditionUpdater: fakePodConditionUpdater{},
  371. }
  372. return New(cfg), bindingChan, errChan
  373. }