controller_utils_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package controller
  14. import (
  15. "fmt"
  16. "math/rand"
  17. "net/http/httptest"
  18. "reflect"
  19. "sort"
  20. "sync"
  21. "testing"
  22. "time"
  23. "k8s.io/kubernetes/pkg/api"
  24. "k8s.io/kubernetes/pkg/api/testapi"
  25. "k8s.io/kubernetes/pkg/api/unversioned"
  26. "k8s.io/kubernetes/pkg/client/cache"
  27. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  28. "k8s.io/kubernetes/pkg/client/record"
  29. "k8s.io/kubernetes/pkg/client/restclient"
  30. "k8s.io/kubernetes/pkg/runtime"
  31. "k8s.io/kubernetes/pkg/securitycontext"
  32. "k8s.io/kubernetes/pkg/util/clock"
  33. "k8s.io/kubernetes/pkg/util/sets"
  34. utiltesting "k8s.io/kubernetes/pkg/util/testing"
  35. "k8s.io/kubernetes/pkg/util/uuid"
  36. )
  37. // NewFakeControllerExpectationsLookup creates a fake store for PodExpectations.
  38. func NewFakeControllerExpectationsLookup(ttl time.Duration) (*ControllerExpectations, *clock.FakeClock) {
  39. fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
  40. fakeClock := clock.NewFakeClock(fakeTime)
  41. ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock}
  42. ttlStore := cache.NewFakeExpirationStore(
  43. ExpKeyFunc, nil, ttlPolicy, fakeClock)
  44. return &ControllerExpectations{ttlStore}, fakeClock
  45. }
  46. func newReplicationController(replicas int) *api.ReplicationController {
  47. rc := &api.ReplicationController{
  48. TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()},
  49. ObjectMeta: api.ObjectMeta{
  50. UID: uuid.NewUUID(),
  51. Name: "foobar",
  52. Namespace: api.NamespaceDefault,
  53. ResourceVersion: "18",
  54. },
  55. Spec: api.ReplicationControllerSpec{
  56. Replicas: int32(replicas),
  57. Selector: map[string]string{"foo": "bar"},
  58. Template: &api.PodTemplateSpec{
  59. ObjectMeta: api.ObjectMeta{
  60. Labels: map[string]string{
  61. "name": "foo",
  62. "type": "production",
  63. },
  64. },
  65. Spec: api.PodSpec{
  66. Containers: []api.Container{
  67. {
  68. Image: "foo/bar",
  69. TerminationMessagePath: api.TerminationMessagePathDefault,
  70. ImagePullPolicy: api.PullIfNotPresent,
  71. SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
  72. },
  73. },
  74. RestartPolicy: api.RestartPolicyAlways,
  75. DNSPolicy: api.DNSDefault,
  76. NodeSelector: map[string]string{
  77. "baz": "blah",
  78. },
  79. },
  80. },
  81. },
  82. }
  83. return rc
  84. }
  85. // create count pods with the given phase for the given rc (same selectors and namespace), and add them to the store.
  86. func newPodList(store cache.Store, count int, status api.PodPhase, rc *api.ReplicationController) *api.PodList {
  87. pods := []api.Pod{}
  88. for i := 0; i < count; i++ {
  89. newPod := api.Pod{
  90. ObjectMeta: api.ObjectMeta{
  91. Name: fmt.Sprintf("pod%d", i),
  92. Labels: rc.Spec.Selector,
  93. Namespace: rc.Namespace,
  94. },
  95. Status: api.PodStatus{Phase: status},
  96. }
  97. if store != nil {
  98. store.Add(&newPod)
  99. }
  100. pods = append(pods, newPod)
  101. }
  102. return &api.PodList{
  103. Items: pods,
  104. }
  105. }
  106. func TestControllerExpectations(t *testing.T) {
  107. ttl := 30 * time.Second
  108. e, fakeClock := NewFakeControllerExpectationsLookup(ttl)
  109. // In practice we can't really have add and delete expectations since we only either create or
  110. // delete replicas in one rc pass, and the rc goes to sleep soon after until the expectations are
  111. // either fulfilled or timeout.
  112. adds, dels := 10, 30
  113. rc := newReplicationController(1)
  114. // RC fires off adds and deletes at apiserver, then sets expectations
  115. rcKey, err := KeyFunc(rc)
  116. if err != nil {
  117. t.Errorf("Couldn't get key for object %#v: %v", rc, err)
  118. }
  119. e.SetExpectations(rcKey, adds, dels)
  120. var wg sync.WaitGroup
  121. for i := 0; i < adds+1; i++ {
  122. wg.Add(1)
  123. go func() {
  124. // In prod this can happen either because of a failed create by the rc
  125. // or after having observed a create via informer
  126. e.CreationObserved(rcKey)
  127. wg.Done()
  128. }()
  129. }
  130. wg.Wait()
  131. // There are still delete expectations
  132. if e.SatisfiedExpectations(rcKey) {
  133. t.Errorf("Rc will sync before expectations are met")
  134. }
  135. for i := 0; i < dels+1; i++ {
  136. wg.Add(1)
  137. go func() {
  138. e.DeletionObserved(rcKey)
  139. wg.Done()
  140. }()
  141. }
  142. wg.Wait()
  143. // Expectations have been surpassed
  144. if podExp, exists, err := e.GetExpectations(rcKey); err == nil && exists {
  145. add, del := podExp.GetExpectations()
  146. if add != -1 || del != -1 {
  147. t.Errorf("Unexpected pod expectations %#v", podExp)
  148. }
  149. } else {
  150. t.Errorf("Could not get expectations for rc, exists %v and err %v", exists, err)
  151. }
  152. if !e.SatisfiedExpectations(rcKey) {
  153. t.Errorf("Expectations are met but the rc will not sync")
  154. }
  155. // Next round of rc sync, old expectations are cleared
  156. e.SetExpectations(rcKey, 1, 2)
  157. if podExp, exists, err := e.GetExpectations(rcKey); err == nil && exists {
  158. add, del := podExp.GetExpectations()
  159. if add != 1 || del != 2 {
  160. t.Errorf("Unexpected pod expectations %#v", podExp)
  161. }
  162. } else {
  163. t.Errorf("Could not get expectations for rc, exists %v and err %v", exists, err)
  164. }
  165. // Expectations have expired because of ttl
  166. fakeClock.Step(ttl + 1)
  167. if !e.SatisfiedExpectations(rcKey) {
  168. t.Errorf("Expectations should have expired but didn't")
  169. }
  170. }
  171. func TestUIDExpectations(t *testing.T) {
  172. uidExp := NewUIDTrackingControllerExpectations(NewControllerExpectations())
  173. rcList := []*api.ReplicationController{
  174. newReplicationController(2),
  175. newReplicationController(1),
  176. newReplicationController(0),
  177. newReplicationController(5),
  178. }
  179. rcToPods := map[string][]string{}
  180. rcKeys := []string{}
  181. for i := range rcList {
  182. rc := rcList[i]
  183. rcName := fmt.Sprintf("rc-%v", i)
  184. rc.Name = rcName
  185. rc.Spec.Selector[rcName] = rcName
  186. podList := newPodList(nil, 5, api.PodRunning, rc)
  187. rcKey, err := KeyFunc(rc)
  188. if err != nil {
  189. t.Fatalf("Couldn't get key for object %#v: %v", rc, err)
  190. }
  191. rcKeys = append(rcKeys, rcKey)
  192. rcPodNames := []string{}
  193. for i := range podList.Items {
  194. p := &podList.Items[i]
  195. p.Name = fmt.Sprintf("%v-%v", p.Name, rc.Name)
  196. rcPodNames = append(rcPodNames, PodKey(p))
  197. }
  198. rcToPods[rcKey] = rcPodNames
  199. uidExp.ExpectDeletions(rcKey, rcPodNames)
  200. }
  201. for i := range rcKeys {
  202. j := rand.Intn(i + 1)
  203. rcKeys[i], rcKeys[j] = rcKeys[j], rcKeys[i]
  204. }
  205. for _, rcKey := range rcKeys {
  206. if uidExp.SatisfiedExpectations(rcKey) {
  207. t.Errorf("Controller %v satisfied expectations before deletion", rcKey)
  208. }
  209. for _, p := range rcToPods[rcKey] {
  210. uidExp.DeletionObserved(rcKey, p)
  211. }
  212. if !uidExp.SatisfiedExpectations(rcKey) {
  213. t.Errorf("Controller %v didn't satisfy expectations after deletion", rcKey)
  214. }
  215. uidExp.DeleteExpectations(rcKey)
  216. if uidExp.GetUIDs(rcKey) != nil {
  217. t.Errorf("Failed to delete uid expectations for %v", rcKey)
  218. }
  219. }
  220. }
  221. func TestCreatePods(t *testing.T) {
  222. ns := api.NamespaceDefault
  223. body := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "empty_pod"}})
  224. fakeHandler := utiltesting.FakeHandler{
  225. StatusCode: 200,
  226. ResponseBody: string(body),
  227. }
  228. testServer := httptest.NewServer(&fakeHandler)
  229. defer testServer.Close()
  230. clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  231. podControl := RealPodControl{
  232. KubeClient: clientset,
  233. Recorder: &record.FakeRecorder{},
  234. }
  235. controllerSpec := newReplicationController(1)
  236. // Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template
  237. if err := podControl.CreatePods(ns, controllerSpec.Spec.Template, controllerSpec); err != nil {
  238. t.Fatalf("unexpected error: %v", err)
  239. }
  240. expectedPod := api.Pod{
  241. ObjectMeta: api.ObjectMeta{
  242. Labels: controllerSpec.Spec.Template.Labels,
  243. GenerateName: fmt.Sprintf("%s-", controllerSpec.Name),
  244. },
  245. Spec: controllerSpec.Spec.Template.Spec,
  246. }
  247. fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath("pods", api.NamespaceDefault, ""), "POST", nil)
  248. actualPod, err := runtime.Decode(testapi.Default.Codec(), []byte(fakeHandler.RequestBody))
  249. if err != nil {
  250. t.Fatalf("Unexpected error: %v", err)
  251. }
  252. if !api.Semantic.DeepDerivative(&expectedPod, actualPod) {
  253. t.Logf("Body: %s", fakeHandler.RequestBody)
  254. t.Errorf("Unexpected mismatch. Expected\n %#v,\n Got:\n %#v", &expectedPod, actualPod)
  255. }
  256. }
  257. func TestActivePodFiltering(t *testing.T) {
  258. // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace.
  259. rc := newReplicationController(0)
  260. podList := newPodList(nil, 5, api.PodRunning, rc)
  261. podList.Items[0].Status.Phase = api.PodSucceeded
  262. podList.Items[1].Status.Phase = api.PodFailed
  263. expectedNames := sets.NewString()
  264. for _, pod := range podList.Items[2:] {
  265. expectedNames.Insert(pod.Name)
  266. }
  267. var podPointers []*api.Pod
  268. for i := range podList.Items {
  269. podPointers = append(podPointers, &podList.Items[i])
  270. }
  271. got := FilterActivePods(podPointers)
  272. gotNames := sets.NewString()
  273. for _, pod := range got {
  274. gotNames.Insert(pod.Name)
  275. }
  276. if expectedNames.Difference(gotNames).Len() != 0 || gotNames.Difference(expectedNames).Len() != 0 {
  277. t.Errorf("expected %v, got %v", expectedNames.List(), gotNames.List())
  278. }
  279. }
  280. func TestSortingActivePods(t *testing.T) {
  281. numPods := 9
  282. // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace.
  283. rc := newReplicationController(0)
  284. podList := newPodList(nil, numPods, api.PodRunning, rc)
  285. pods := make([]*api.Pod, len(podList.Items))
  286. for i := range podList.Items {
  287. pods[i] = &podList.Items[i]
  288. }
  289. // pods[0] is not scheduled yet.
  290. pods[0].Spec.NodeName = ""
  291. pods[0].Status.Phase = api.PodPending
  292. // pods[1] is scheduled but pending.
  293. pods[1].Spec.NodeName = "bar"
  294. pods[1].Status.Phase = api.PodPending
  295. // pods[2] is unknown.
  296. pods[2].Spec.NodeName = "foo"
  297. pods[2].Status.Phase = api.PodUnknown
  298. // pods[3] is running but not ready.
  299. pods[3].Spec.NodeName = "foo"
  300. pods[3].Status.Phase = api.PodRunning
  301. // pods[4] is running and ready but without LastTransitionTime.
  302. now := unversioned.Now()
  303. pods[4].Spec.NodeName = "foo"
  304. pods[4].Status.Phase = api.PodRunning
  305. pods[4].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue}}
  306. pods[4].Status.ContainerStatuses = []api.ContainerStatus{{RestartCount: 3}, {RestartCount: 0}}
  307. // pods[5] is running and ready and with LastTransitionTime.
  308. pods[5].Spec.NodeName = "foo"
  309. pods[5].Status.Phase = api.PodRunning
  310. pods[5].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue, LastTransitionTime: now}}
  311. pods[5].Status.ContainerStatuses = []api.ContainerStatus{{RestartCount: 3}, {RestartCount: 0}}
  312. // pods[6] is running ready for a longer time than pods[5].
  313. then := unversioned.Time{Time: now.AddDate(0, -1, 0)}
  314. pods[6].Spec.NodeName = "foo"
  315. pods[6].Status.Phase = api.PodRunning
  316. pods[6].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue, LastTransitionTime: then}}
  317. pods[6].Status.ContainerStatuses = []api.ContainerStatus{{RestartCount: 3}, {RestartCount: 0}}
  318. // pods[7] has lower container restart count than pods[6].
  319. pods[7].Spec.NodeName = "foo"
  320. pods[7].Status.Phase = api.PodRunning
  321. pods[7].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue, LastTransitionTime: then}}
  322. pods[7].Status.ContainerStatuses = []api.ContainerStatus{{RestartCount: 2}, {RestartCount: 1}}
  323. pods[7].CreationTimestamp = now
  324. // pods[8] is older than pods[7].
  325. pods[8].Spec.NodeName = "foo"
  326. pods[8].Status.Phase = api.PodRunning
  327. pods[8].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue, LastTransitionTime: then}}
  328. pods[8].Status.ContainerStatuses = []api.ContainerStatus{{RestartCount: 2}, {RestartCount: 1}}
  329. pods[8].CreationTimestamp = then
  330. getOrder := func(pods []*api.Pod) []string {
  331. names := make([]string, len(pods))
  332. for i := range pods {
  333. names[i] = pods[i].Name
  334. }
  335. return names
  336. }
  337. expected := getOrder(pods)
  338. for i := 0; i < 20; i++ {
  339. idx := rand.Perm(numPods)
  340. randomizedPods := make([]*api.Pod, numPods)
  341. for j := 0; j < numPods; j++ {
  342. randomizedPods[j] = pods[idx[j]]
  343. }
  344. sort.Sort(ActivePods(randomizedPods))
  345. actual := getOrder(randomizedPods)
  346. if !reflect.DeepEqual(actual, expected) {
  347. t.Errorf("expected %v, got %v", expected, actual)
  348. }
  349. }
  350. }