replication_controller_test.go 51 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // If you make changes to this file, you should also make the corresponding change in ReplicaSet.
  14. package replication
  15. import (
  16. "fmt"
  17. "math/rand"
  18. "net/http/httptest"
  19. "strings"
  20. "testing"
  21. "time"
  22. "k8s.io/kubernetes/pkg/api"
  23. "k8s.io/kubernetes/pkg/api/testapi"
  24. "k8s.io/kubernetes/pkg/api/unversioned"
  25. "k8s.io/kubernetes/pkg/client/cache"
  26. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  27. "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
  28. fakeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
  29. "k8s.io/kubernetes/pkg/client/restclient"
  30. "k8s.io/kubernetes/pkg/client/testing/core"
  31. "k8s.io/kubernetes/pkg/controller"
  32. "k8s.io/kubernetes/pkg/runtime"
  33. "k8s.io/kubernetes/pkg/securitycontext"
  34. "k8s.io/kubernetes/pkg/util/sets"
  35. utiltesting "k8s.io/kubernetes/pkg/util/testing"
  36. "k8s.io/kubernetes/pkg/util/uuid"
  37. "k8s.io/kubernetes/pkg/util/wait"
  38. "k8s.io/kubernetes/pkg/watch"
  39. )
  40. var alwaysReady = func() bool { return true }
  41. func getKey(rc *api.ReplicationController, t *testing.T) string {
  42. if key, err := controller.KeyFunc(rc); err != nil {
  43. t.Errorf("Unexpected error getting key for rc %v: %v", rc.Name, err)
  44. return ""
  45. } else {
  46. return key
  47. }
  48. }
  49. func newReplicationController(replicas int) *api.ReplicationController {
  50. rc := &api.ReplicationController{
  51. TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()},
  52. ObjectMeta: api.ObjectMeta{
  53. UID: uuid.NewUUID(),
  54. Name: "foobar",
  55. Namespace: api.NamespaceDefault,
  56. ResourceVersion: "18",
  57. },
  58. Spec: api.ReplicationControllerSpec{
  59. Replicas: int32(replicas),
  60. Selector: map[string]string{"foo": "bar"},
  61. Template: &api.PodTemplateSpec{
  62. ObjectMeta: api.ObjectMeta{
  63. Labels: map[string]string{
  64. "name": "foo",
  65. "type": "production",
  66. },
  67. },
  68. Spec: api.PodSpec{
  69. Containers: []api.Container{
  70. {
  71. Image: "foo/bar",
  72. TerminationMessagePath: api.TerminationMessagePathDefault,
  73. ImagePullPolicy: api.PullIfNotPresent,
  74. SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
  75. },
  76. },
  77. RestartPolicy: api.RestartPolicyAlways,
  78. DNSPolicy: api.DNSDefault,
  79. NodeSelector: map[string]string{
  80. "baz": "blah",
  81. },
  82. },
  83. },
  84. },
  85. }
  86. return rc
  87. }
  88. // create a pod with the given phase for the given rc (same selectors and namespace).
  89. func newPod(name string, rc *api.ReplicationController, status api.PodPhase) *api.Pod {
  90. var conditions []api.PodCondition
  91. if status == api.PodRunning {
  92. conditions = append(conditions, api.PodCondition{Type: api.PodReady, Status: api.ConditionTrue})
  93. }
  94. return &api.Pod{
  95. ObjectMeta: api.ObjectMeta{
  96. Name: name,
  97. Labels: rc.Spec.Selector,
  98. Namespace: rc.Namespace,
  99. },
  100. Status: api.PodStatus{Phase: status, Conditions: conditions},
  101. }
  102. }
  103. // create count pods with the given phase for the given rc (same selectors and namespace), and add them to the store.
  104. func newPodList(store cache.Store, count int, status api.PodPhase, rc *api.ReplicationController, name string) *api.PodList {
  105. pods := []api.Pod{}
  106. var trueVar = true
  107. controllerReference := api.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &trueVar}
  108. for i := 0; i < count; i++ {
  109. pod := newPod(fmt.Sprintf("%s%d", name, i), rc, status)
  110. pod.OwnerReferences = []api.OwnerReference{controllerReference}
  111. if store != nil {
  112. store.Add(pod)
  113. }
  114. pods = append(pods, *pod)
  115. }
  116. return &api.PodList{
  117. Items: pods,
  118. }
  119. }
  120. func validateSyncReplication(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) {
  121. if e, a := expectedCreates, len(fakePodControl.Templates); e != a {
  122. t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a)
  123. }
  124. if e, a := expectedDeletes, len(fakePodControl.DeletePodName); e != a {
  125. t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", e, a)
  126. }
  127. if e, a := expectedPatches, len(fakePodControl.Patches); e != a {
  128. t.Errorf("Unexpected number of patches. Expected %d, saw %d\n", e, a)
  129. }
  130. }
  131. func replicationControllerResourceName() string {
  132. return "replicationcontrollers"
  133. }
  134. type serverResponse struct {
  135. statusCode int
  136. obj interface{}
  137. }
  138. func TestSyncReplicationControllerDoesNothing(t *testing.T) {
  139. c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  140. fakePodControl := controller.FakePodControl{}
  141. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  142. manager.podStoreSynced = alwaysReady
  143. // 2 running pods, a controller with 2 replicas, sync is a no-op
  144. controllerSpec := newReplicationController(2)
  145. manager.rcStore.Indexer.Add(controllerSpec)
  146. newPodList(manager.podStore.Indexer, 2, api.PodRunning, controllerSpec, "pod")
  147. manager.podControl = &fakePodControl
  148. manager.syncReplicationController(getKey(controllerSpec, t))
  149. validateSyncReplication(t, &fakePodControl, 0, 0, 0)
  150. }
  151. func TestSyncReplicationControllerDeletes(t *testing.T) {
  152. c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  153. fakePodControl := controller.FakePodControl{}
  154. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  155. manager.podStoreSynced = alwaysReady
  156. manager.podControl = &fakePodControl
  157. // 2 running pods and a controller with 1 replica, one pod delete expected
  158. controllerSpec := newReplicationController(1)
  159. manager.rcStore.Indexer.Add(controllerSpec)
  160. newPodList(manager.podStore.Indexer, 2, api.PodRunning, controllerSpec, "pod")
  161. manager.syncReplicationController(getKey(controllerSpec, t))
  162. validateSyncReplication(t, &fakePodControl, 0, 1, 0)
  163. }
  164. func TestDeleteFinalStateUnknown(t *testing.T) {
  165. c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  166. fakePodControl := controller.FakePodControl{}
  167. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  168. manager.podStoreSynced = alwaysReady
  169. manager.podControl = &fakePodControl
  170. received := make(chan string)
  171. manager.syncHandler = func(key string) error {
  172. received <- key
  173. return nil
  174. }
  175. // The DeletedFinalStateUnknown object should cause the rc manager to insert
  176. // the controller matching the selectors of the deleted pod into the work queue.
  177. controllerSpec := newReplicationController(1)
  178. manager.rcStore.Indexer.Add(controllerSpec)
  179. pods := newPodList(nil, 1, api.PodRunning, controllerSpec, "pod")
  180. manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
  181. go manager.worker()
  182. expected := getKey(controllerSpec, t)
  183. select {
  184. case key := <-received:
  185. if key != expected {
  186. t.Errorf("Unexpected sync all for rc %v, expected %v", key, expected)
  187. }
  188. case <-time.After(wait.ForeverTestTimeout):
  189. t.Errorf("Processing DeleteFinalStateUnknown took longer than expected")
  190. }
  191. }
  192. func TestSyncReplicationControllerCreates(t *testing.T) {
  193. c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  194. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  195. manager.podStoreSynced = alwaysReady
  196. // A controller with 2 replicas and no pods in the store, 2 creates expected
  197. rc := newReplicationController(2)
  198. manager.rcStore.Indexer.Add(rc)
  199. fakePodControl := controller.FakePodControl{}
  200. manager.podControl = &fakePodControl
  201. manager.syncReplicationController(getKey(rc, t))
  202. validateSyncReplication(t, &fakePodControl, 2, 0, 0)
  203. }
  204. func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
  205. // Setup a fake server to listen for requests, and run the rc manager in steady state
  206. fakeHandler := utiltesting.FakeHandler{
  207. StatusCode: 200,
  208. ResponseBody: "",
  209. }
  210. testServer := httptest.NewServer(&fakeHandler)
  211. defer testServer.Close()
  212. c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  213. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  214. manager.podStoreSynced = alwaysReady
  215. // Steady state for the replication controller, no Status.Replicas updates expected
  216. activePods := 5
  217. rc := newReplicationController(activePods)
  218. manager.rcStore.Indexer.Add(rc)
  219. rc.Status = api.ReplicationControllerStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods)}
  220. newPodList(manager.podStore.Indexer, activePods, api.PodRunning, rc, "pod")
  221. fakePodControl := controller.FakePodControl{}
  222. manager.podControl = &fakePodControl
  223. manager.syncReplicationController(getKey(rc, t))
  224. validateSyncReplication(t, &fakePodControl, 0, 0, 0)
  225. if fakeHandler.RequestReceived != nil {
  226. t.Errorf("Unexpected update when pods and rcs are in a steady state")
  227. }
  228. // This response body is just so we don't err out decoding the http response, all
  229. // we care about is the request body sent below.
  230. response := runtime.EncodeOrDie(testapi.Default.Codec(), &api.ReplicationController{})
  231. fakeHandler.ResponseBody = response
  232. rc.Generation = rc.Generation + 1
  233. manager.syncReplicationController(getKey(rc, t))
  234. rc.Status.ObservedGeneration = rc.Generation
  235. updatedRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc)
  236. fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &updatedRc)
  237. }
  238. func TestControllerUpdateReplicas(t *testing.T) {
  239. // This is a happy server just to record the PUT request we expect for status.Replicas
  240. fakeHandler := utiltesting.FakeHandler{
  241. StatusCode: 200,
  242. ResponseBody: "",
  243. }
  244. testServer := httptest.NewServer(&fakeHandler)
  245. defer testServer.Close()
  246. c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  247. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  248. manager.podStoreSynced = alwaysReady
  249. // Insufficient number of pods in the system, and Status.Replicas is wrong;
  250. // Status.Replica should update to match number of pods in system, 1 new pod should be created.
  251. rc := newReplicationController(5)
  252. manager.rcStore.Indexer.Add(rc)
  253. rc.Status = api.ReplicationControllerStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, ObservedGeneration: 0}
  254. rc.Generation = 1
  255. newPodList(manager.podStore.Indexer, 2, api.PodRunning, rc, "pod")
  256. rcCopy := *rc
  257. extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"}
  258. rcCopy.Spec.Selector = extraLabelMap
  259. newPodList(manager.podStore.Indexer, 2, api.PodRunning, &rcCopy, "podWithExtraLabel")
  260. // This response body is just so we don't err out decoding the http response
  261. response := runtime.EncodeOrDie(testapi.Default.Codec(), &api.ReplicationController{})
  262. fakeHandler.ResponseBody = response
  263. fakePodControl := controller.FakePodControl{}
  264. manager.podControl = &fakePodControl
  265. manager.syncReplicationController(getKey(rc, t))
  266. // 1. Status.Replicas should go up from 2->4 even though we created 5-4=1 pod.
  267. // 2. Status.FullyLabeledReplicas should equal to the number of pods that
  268. // has the extra labels, i.e., 2.
  269. // 3. Every update to the status should include the Generation of the spec.
  270. rc.Status = api.ReplicationControllerStatus{Replicas: 4, ReadyReplicas: 4, ObservedGeneration: 1}
  271. decRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc)
  272. fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &decRc)
  273. validateSyncReplication(t, &fakePodControl, 1, 0, 0)
  274. }
  275. func TestSyncReplicationControllerDormancy(t *testing.T) {
  276. // Setup a test server so we can lie about the current state of pods
  277. fakeHandler := utiltesting.FakeHandler{
  278. StatusCode: 200,
  279. ResponseBody: "{}",
  280. }
  281. testServer := httptest.NewServer(&fakeHandler)
  282. defer testServer.Close()
  283. c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  284. fakePodControl := controller.FakePodControl{}
  285. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  286. manager.podStoreSynced = alwaysReady
  287. manager.podControl = &fakePodControl
  288. controllerSpec := newReplicationController(2)
  289. manager.rcStore.Indexer.Add(controllerSpec)
  290. newPodList(manager.podStore.Indexer, 1, api.PodRunning, controllerSpec, "pod")
  291. // Creates a replica and sets expectations
  292. controllerSpec.Status.Replicas = 1
  293. controllerSpec.Status.ReadyReplicas = 1
  294. manager.syncReplicationController(getKey(controllerSpec, t))
  295. validateSyncReplication(t, &fakePodControl, 1, 0, 0)
  296. // Expectations prevents replicas but not an update on status
  297. controllerSpec.Status.Replicas = 0
  298. controllerSpec.Status.ReadyReplicas = 0
  299. fakePodControl.Clear()
  300. manager.syncReplicationController(getKey(controllerSpec, t))
  301. validateSyncReplication(t, &fakePodControl, 0, 0, 0)
  302. // Get the key for the controller
  303. rcKey, err := controller.KeyFunc(controllerSpec)
  304. if err != nil {
  305. t.Errorf("Couldn't get key for object %#v: %v", controllerSpec, err)
  306. }
  307. // Lowering expectations should lead to a sync that creates a replica, however the
  308. // fakePodControl error will prevent this, leaving expectations at 0, 0.
  309. manager.expectations.CreationObserved(rcKey)
  310. controllerSpec.Status.Replicas = 1
  311. controllerSpec.Status.ReadyReplicas = 1
  312. fakePodControl.Clear()
  313. fakePodControl.Err = fmt.Errorf("Fake Error")
  314. manager.syncReplicationController(getKey(controllerSpec, t))
  315. validateSyncReplication(t, &fakePodControl, 1, 0, 0)
  316. // This replica should not need a Lowering of expectations, since the previous create failed
  317. fakePodControl.Clear()
  318. fakePodControl.Err = nil
  319. manager.syncReplicationController(getKey(controllerSpec, t))
  320. validateSyncReplication(t, &fakePodControl, 1, 0, 0)
  321. // 1 PUT for the rc status during dormancy window.
  322. // Note that the pod creates go through pod control so they're not recorded.
  323. fakeHandler.ValidateRequestCount(t, 1)
  324. }
  325. func TestPodControllerLookup(t *testing.T) {
  326. manager := NewReplicationManagerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0)
  327. manager.podStoreSynced = alwaysReady
  328. testCases := []struct {
  329. inRCs []*api.ReplicationController
  330. pod *api.Pod
  331. outRCName string
  332. }{
  333. // pods without labels don't match any rcs
  334. {
  335. inRCs: []*api.ReplicationController{
  336. {ObjectMeta: api.ObjectMeta{Name: "basic"}}},
  337. pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo1", Namespace: api.NamespaceAll}},
  338. outRCName: "",
  339. },
  340. // Matching labels, not namespace
  341. {
  342. inRCs: []*api.ReplicationController{
  343. {
  344. ObjectMeta: api.ObjectMeta{Name: "foo"},
  345. Spec: api.ReplicationControllerSpec{
  346. Selector: map[string]string{"foo": "bar"},
  347. },
  348. },
  349. },
  350. pod: &api.Pod{
  351. ObjectMeta: api.ObjectMeta{
  352. Name: "foo2", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}},
  353. outRCName: "",
  354. },
  355. // Matching ns and labels returns the key to the rc, not the rc name
  356. {
  357. inRCs: []*api.ReplicationController{
  358. {
  359. ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"},
  360. Spec: api.ReplicationControllerSpec{
  361. Selector: map[string]string{"foo": "bar"},
  362. },
  363. },
  364. },
  365. pod: &api.Pod{
  366. ObjectMeta: api.ObjectMeta{
  367. Name: "foo3", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}},
  368. outRCName: "bar",
  369. },
  370. }
  371. for _, c := range testCases {
  372. for _, r := range c.inRCs {
  373. manager.rcStore.Indexer.Add(r)
  374. }
  375. if rc := manager.getPodController(c.pod); rc != nil {
  376. if c.outRCName != rc.Name {
  377. t.Errorf("Got controller %+v expected %+v", rc.Name, c.outRCName)
  378. }
  379. } else if c.outRCName != "" {
  380. t.Errorf("Expected a controller %v pod %v, found none", c.outRCName, c.pod.Name)
  381. }
  382. }
  383. }
  384. func TestWatchControllers(t *testing.T) {
  385. fakeWatch := watch.NewFake()
  386. c := &fake.Clientset{}
  387. c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
  388. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  389. manager.podStoreSynced = alwaysReady
  390. var testControllerSpec api.ReplicationController
  391. received := make(chan string)
  392. // The update sent through the fakeWatcher should make its way into the workqueue,
  393. // and eventually into the syncHandler. The handler validates the received controller
  394. // and closes the received channel to indicate that the test can finish.
  395. manager.syncHandler = func(key string) error {
  396. obj, exists, err := manager.rcStore.Indexer.GetByKey(key)
  397. if !exists || err != nil {
  398. t.Errorf("Expected to find controller under key %v", key)
  399. }
  400. controllerSpec := *obj.(*api.ReplicationController)
  401. if !api.Semantic.DeepDerivative(controllerSpec, testControllerSpec) {
  402. t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec)
  403. }
  404. close(received)
  405. return nil
  406. }
  407. // Start only the rc watcher and the workqueue, send a watch event,
  408. // and make sure it hits the sync method.
  409. stopCh := make(chan struct{})
  410. defer close(stopCh)
  411. go manager.rcController.Run(stopCh)
  412. go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
  413. testControllerSpec.Name = "foo"
  414. fakeWatch.Add(&testControllerSpec)
  415. select {
  416. case <-received:
  417. case <-time.After(wait.ForeverTestTimeout):
  418. t.Errorf("unexpected timeout from result channel")
  419. }
  420. }
  421. func TestWatchPods(t *testing.T) {
  422. fakeWatch := watch.NewFake()
  423. c := &fake.Clientset{}
  424. c.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
  425. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  426. manager.podStoreSynced = alwaysReady
  427. // Put one rc and one pod into the controller's stores
  428. testControllerSpec := newReplicationController(1)
  429. manager.rcStore.Indexer.Add(testControllerSpec)
  430. received := make(chan string)
  431. // The pod update sent through the fakeWatcher should figure out the managing rc and
  432. // send it into the syncHandler.
  433. manager.syncHandler = func(key string) error {
  434. obj, exists, err := manager.rcStore.Indexer.GetByKey(key)
  435. if !exists || err != nil {
  436. t.Errorf("Expected to find controller under key %v", key)
  437. }
  438. controllerSpec := obj.(*api.ReplicationController)
  439. if !api.Semantic.DeepDerivative(controllerSpec, testControllerSpec) {
  440. t.Errorf("\nExpected %#v,\nbut got %#v", testControllerSpec, controllerSpec)
  441. }
  442. close(received)
  443. return nil
  444. }
  445. // Start only the pod watcher and the workqueue, send a watch event,
  446. // and make sure it hits the sync method for the right rc.
  447. stopCh := make(chan struct{})
  448. defer close(stopCh)
  449. go manager.podController.Run(stopCh)
  450. go manager.internalPodInformer.Run(stopCh)
  451. go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
  452. pods := newPodList(nil, 1, api.PodRunning, testControllerSpec, "pod")
  453. testPod := pods.Items[0]
  454. testPod.Status.Phase = api.PodFailed
  455. fakeWatch.Add(&testPod)
  456. select {
  457. case <-received:
  458. case <-time.After(wait.ForeverTestTimeout):
  459. t.Errorf("unexpected timeout from result channel")
  460. }
  461. }
  462. func TestUpdatePods(t *testing.T) {
  463. manager := NewReplicationManagerFromClient(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0)
  464. manager.podStoreSynced = alwaysReady
  465. received := make(chan string)
  466. manager.syncHandler = func(key string) error {
  467. obj, exists, err := manager.rcStore.Indexer.GetByKey(key)
  468. if !exists || err != nil {
  469. t.Errorf("Expected to find controller under key %v", key)
  470. }
  471. received <- obj.(*api.ReplicationController).Name
  472. return nil
  473. }
  474. stopCh := make(chan struct{})
  475. defer close(stopCh)
  476. go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
  477. // Put 2 rcs and one pod into the controller's stores
  478. testControllerSpec1 := newReplicationController(1)
  479. manager.rcStore.Indexer.Add(testControllerSpec1)
  480. testControllerSpec2 := *testControllerSpec1
  481. testControllerSpec2.Spec.Selector = map[string]string{"bar": "foo"}
  482. testControllerSpec2.Name = "barfoo"
  483. manager.rcStore.Indexer.Add(&testControllerSpec2)
  484. // case 1: We put in the podStore a pod with labels matching
  485. // testControllerSpec1, then update its labels to match testControllerSpec2.
  486. // We expect to receive a sync request for both controllers.
  487. pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, testControllerSpec1, "pod").Items[0]
  488. pod1.ResourceVersion = "1"
  489. pod2 := pod1
  490. pod2.Labels = testControllerSpec2.Spec.Selector
  491. pod2.ResourceVersion = "2"
  492. manager.updatePod(&pod1, &pod2)
  493. expected := sets.NewString(testControllerSpec1.Name, testControllerSpec2.Name)
  494. for _, name := range expected.List() {
  495. t.Logf("Expecting update for %+v", name)
  496. select {
  497. case got := <-received:
  498. if !expected.Has(got) {
  499. t.Errorf("Expected keys %#v got %v", expected, got)
  500. }
  501. case <-time.After(wait.ForeverTestTimeout):
  502. t.Errorf("Expected update notifications for controllers within 100ms each")
  503. }
  504. }
  505. // case 2: pod1 in the podStore has labels matching testControllerSpec1.
  506. // We update its labels to match no replication controller. We expect to
  507. // receive a sync request for testControllerSpec1.
  508. pod2.Labels = make(map[string]string)
  509. pod2.ResourceVersion = "2"
  510. manager.updatePod(&pod1, &pod2)
  511. expected = sets.NewString(testControllerSpec1.Name)
  512. for _, name := range expected.List() {
  513. t.Logf("Expecting update for %+v", name)
  514. select {
  515. case got := <-received:
  516. if !expected.Has(got) {
  517. t.Errorf("Expected keys %#v got %v", expected, got)
  518. }
  519. case <-time.After(wait.ForeverTestTimeout):
  520. t.Errorf("Expected update notifications for controllers within 100ms each")
  521. }
  522. }
  523. }
  524. func TestControllerUpdateRequeue(t *testing.T) {
  525. // This server should force a requeue of the controller because it fails to update status.Replicas.
  526. fakeHandler := utiltesting.FakeHandler{
  527. StatusCode: 500,
  528. ResponseBody: "",
  529. }
  530. testServer := httptest.NewServer(&fakeHandler)
  531. defer testServer.Close()
  532. c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  533. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  534. manager.podStoreSynced = alwaysReady
  535. rc := newReplicationController(1)
  536. manager.rcStore.Indexer.Add(rc)
  537. rc.Status = api.ReplicationControllerStatus{Replicas: 2}
  538. newPodList(manager.podStore.Indexer, 1, api.PodRunning, rc, "pod")
  539. fakePodControl := controller.FakePodControl{}
  540. manager.podControl = &fakePodControl
  541. // an error from the sync function will be requeued, check to make sure we returned an error
  542. if err := manager.syncReplicationController(getKey(rc, t)); err == nil {
  543. t.Errorf("missing error for requeue")
  544. }
  545. // 1 Update and 1 GET, both of which fail
  546. fakeHandler.ValidateRequestCount(t, 2)
  547. }
  548. func TestControllerUpdateStatusWithFailure(t *testing.T) {
  549. rc := newReplicationController(1)
  550. c := &fake.Clientset{}
  551. c.AddReactor("get", "replicationcontrollers", func(action core.Action) (bool, runtime.Object, error) {
  552. return true, rc, nil
  553. })
  554. c.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
  555. return true, &api.ReplicationController{}, fmt.Errorf("Fake error")
  556. })
  557. fakeRCClient := c.Core().ReplicationControllers("default")
  558. numReplicas := 10
  559. updateReplicaCount(fakeRCClient, *rc, numReplicas, 0, 0)
  560. updates, gets := 0, 0
  561. for _, a := range c.Actions() {
  562. if a.GetResource().Resource != "replicationcontrollers" {
  563. t.Errorf("Unexpected action %+v", a)
  564. continue
  565. }
  566. switch action := a.(type) {
  567. case core.GetAction:
  568. gets++
  569. // Make sure the get is for the right rc even though the update failed.
  570. if action.GetName() != rc.Name {
  571. t.Errorf("Expected get for rc %v, got %+v instead", rc.Name, action.GetName())
  572. }
  573. case core.UpdateAction:
  574. updates++
  575. // Confirm that the update has the right status.Replicas even though the Get
  576. // returned an rc with replicas=1.
  577. if c, ok := action.GetObject().(*api.ReplicationController); !ok {
  578. t.Errorf("Expected an rc as the argument to update, got %T", c)
  579. } else if c.Status.Replicas != int32(numReplicas) {
  580. t.Errorf("Expected update for rc to contain replicas %v, got %v instead",
  581. numReplicas, c.Status.Replicas)
  582. }
  583. default:
  584. t.Errorf("Unexpected action %+v", a)
  585. break
  586. }
  587. }
  588. if gets != 1 || updates != 2 {
  589. t.Errorf("Expected 1 get and 2 updates, got %d gets %d updates", gets, updates)
  590. }
  591. }
  592. // TODO: This test is too hairy for a unittest. It should be moved to an E2E suite.
  593. func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
  594. c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  595. fakePodControl := controller.FakePodControl{}
  596. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, burstReplicas, 0)
  597. manager.podStoreSynced = alwaysReady
  598. manager.podControl = &fakePodControl
  599. controllerSpec := newReplicationController(numReplicas)
  600. manager.rcStore.Indexer.Add(controllerSpec)
  601. expectedPods := 0
  602. pods := newPodList(nil, numReplicas, api.PodPending, controllerSpec, "pod")
  603. rcKey, err := controller.KeyFunc(controllerSpec)
  604. if err != nil {
  605. t.Errorf("Couldn't get key for object %#v: %v", controllerSpec, err)
  606. }
  607. // Size up the controller, then size it down, and confirm the expected create/delete pattern
  608. for _, replicas := range []int{numReplicas, 0} {
  609. controllerSpec.Spec.Replicas = int32(replicas)
  610. manager.rcStore.Indexer.Add(controllerSpec)
  611. for i := 0; i < numReplicas; i += burstReplicas {
  612. manager.syncReplicationController(getKey(controllerSpec, t))
  613. // The store accrues active pods. It's also used by the rc to determine how many
  614. // replicas to create.
  615. activePods := len(manager.podStore.Indexer.List())
  616. if replicas != 0 {
  617. // This is the number of pods currently "in flight". They were created by the rc manager above,
  618. // which then puts the rc to sleep till all of them have been observed.
  619. expectedPods = replicas - activePods
  620. if expectedPods > burstReplicas {
  621. expectedPods = burstReplicas
  622. }
  623. // This validates the rc manager sync actually created pods
  624. validateSyncReplication(t, &fakePodControl, expectedPods, 0, 0)
  625. // This simulates the watch events for all but 1 of the expected pods.
  626. // None of these should wake the controller because it has expectations==BurstReplicas.
  627. for i := 0; i < expectedPods-1; i++ {
  628. manager.podStore.Indexer.Add(&pods.Items[i])
  629. manager.addPod(&pods.Items[i])
  630. }
  631. podExp, exists, err := manager.expectations.GetExpectations(rcKey)
  632. if !exists || err != nil {
  633. t.Fatalf("Did not find expectations for rc.")
  634. }
  635. if add, _ := podExp.GetExpectations(); add != 1 {
  636. t.Fatalf("Expectations are wrong %v", podExp)
  637. }
  638. } else {
  639. expectedPods = (replicas - activePods) * -1
  640. if expectedPods > burstReplicas {
  641. expectedPods = burstReplicas
  642. }
  643. validateSyncReplication(t, &fakePodControl, 0, expectedPods, 0)
  644. // To accurately simulate a watch we must delete the exact pods
  645. // the rc is waiting for.
  646. expectedDels := manager.expectations.GetUIDs(getKey(controllerSpec, t))
  647. podsToDelete := []*api.Pod{}
  648. for _, key := range expectedDels.List() {
  649. nsName := strings.Split(key, "/")
  650. podsToDelete = append(podsToDelete, &api.Pod{
  651. ObjectMeta: api.ObjectMeta{
  652. Name: nsName[1],
  653. Namespace: nsName[0],
  654. Labels: controllerSpec.Spec.Selector,
  655. },
  656. })
  657. }
  658. // Don't delete all pods because we confirm that the last pod
  659. // has exactly one expectation at the end, to verify that we
  660. // don't double delete.
  661. for i := range podsToDelete[1:] {
  662. manager.podStore.Indexer.Delete(podsToDelete[i])
  663. manager.deletePod(podsToDelete[i])
  664. }
  665. podExp, exists, err := manager.expectations.GetExpectations(rcKey)
  666. if !exists || err != nil {
  667. t.Fatalf("Did not find expectations for rc.")
  668. }
  669. if _, del := podExp.GetExpectations(); del != 1 {
  670. t.Fatalf("Expectations are wrong %v", podExp)
  671. }
  672. }
  673. // Check that the rc didn't take any action for all the above pods
  674. fakePodControl.Clear()
  675. manager.syncReplicationController(getKey(controllerSpec, t))
  676. validateSyncReplication(t, &fakePodControl, 0, 0, 0)
  677. // Create/Delete the last pod
  678. // The last add pod will decrease the expectation of the rc to 0,
  679. // which will cause it to create/delete the remaining replicas up to burstReplicas.
  680. if replicas != 0 {
  681. manager.podStore.Indexer.Add(&pods.Items[expectedPods-1])
  682. manager.addPod(&pods.Items[expectedPods-1])
  683. } else {
  684. expectedDel := manager.expectations.GetUIDs(getKey(controllerSpec, t))
  685. if expectedDel.Len() != 1 {
  686. t.Fatalf("Waiting on unexpected number of deletes.")
  687. }
  688. nsName := strings.Split(expectedDel.List()[0], "/")
  689. lastPod := &api.Pod{
  690. ObjectMeta: api.ObjectMeta{
  691. Name: nsName[1],
  692. Namespace: nsName[0],
  693. Labels: controllerSpec.Spec.Selector,
  694. },
  695. }
  696. manager.podStore.Indexer.Delete(lastPod)
  697. manager.deletePod(lastPod)
  698. }
  699. pods.Items = pods.Items[expectedPods:]
  700. }
  701. // Confirm that we've created the right number of replicas
  702. activePods := int32(len(manager.podStore.Indexer.List()))
  703. if activePods != controllerSpec.Spec.Replicas {
  704. t.Fatalf("Unexpected number of active pods, expected %d, got %d", controllerSpec.Spec.Replicas, activePods)
  705. }
  706. // Replenish the pod list, since we cut it down sizing up
  707. pods = newPodList(nil, replicas, api.PodRunning, controllerSpec, "pod")
  708. }
  709. }
  710. func TestControllerBurstReplicas(t *testing.T) {
  711. doTestControllerBurstReplicas(t, 5, 30)
  712. doTestControllerBurstReplicas(t, 5, 12)
  713. doTestControllerBurstReplicas(t, 3, 2)
  714. }
  715. type FakeRCExpectations struct {
  716. *controller.ControllerExpectations
  717. satisfied bool
  718. expSatisfied func()
  719. }
  720. func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool {
  721. fe.expSatisfied()
  722. return fe.satisfied
  723. }
  724. // TestRCSyncExpectations tests that a pod cannot sneak in between counting active pods
  725. // and checking expectations.
  726. func TestRCSyncExpectations(t *testing.T) {
  727. c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  728. fakePodControl := controller.FakePodControl{}
  729. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0)
  730. manager.podStoreSynced = alwaysReady
  731. manager.podControl = &fakePodControl
  732. controllerSpec := newReplicationController(2)
  733. manager.rcStore.Indexer.Add(controllerSpec)
  734. pods := newPodList(nil, 2, api.PodPending, controllerSpec, "pod")
  735. manager.podStore.Indexer.Add(&pods.Items[0])
  736. postExpectationsPod := pods.Items[1]
  737. manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRCExpectations{
  738. controller.NewControllerExpectations(), true, func() {
  739. // If we check active pods before checking expectataions, the rc
  740. // will create a new replica because it doesn't see this pod, but
  741. // has fulfilled its expectations.
  742. manager.podStore.Indexer.Add(&postExpectationsPod)
  743. },
  744. })
  745. manager.syncReplicationController(getKey(controllerSpec, t))
  746. validateSyncReplication(t, &fakePodControl, 0, 0, 0)
  747. }
  748. func TestDeleteControllerAndExpectations(t *testing.T) {
  749. c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  750. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
  751. manager.podStoreSynced = alwaysReady
  752. rc := newReplicationController(1)
  753. manager.rcStore.Indexer.Add(rc)
  754. fakePodControl := controller.FakePodControl{}
  755. manager.podControl = &fakePodControl
  756. // This should set expectations for the rc
  757. manager.syncReplicationController(getKey(rc, t))
  758. validateSyncReplication(t, &fakePodControl, 1, 0, 0)
  759. fakePodControl.Clear()
  760. // Get the RC key
  761. rcKey, err := controller.KeyFunc(rc)
  762. if err != nil {
  763. t.Errorf("Couldn't get key for object %#v: %v", rc, err)
  764. }
  765. // This is to simulate a concurrent addPod, that has a handle on the expectations
  766. // as the controller deletes it.
  767. podExp, exists, err := manager.expectations.GetExpectations(rcKey)
  768. if !exists || err != nil {
  769. t.Errorf("No expectations found for rc")
  770. }
  771. manager.rcStore.Indexer.Delete(rc)
  772. manager.syncReplicationController(getKey(rc, t))
  773. if _, exists, err = manager.expectations.GetExpectations(rcKey); exists {
  774. t.Errorf("Found expectaions, expected none since the rc has been deleted.")
  775. }
  776. // This should have no effect, since we've deleted the rc.
  777. podExp.Add(-1, 0)
  778. manager.podStore.Indexer.Replace(make([]interface{}, 0), "0")
  779. manager.syncReplicationController(getKey(rc, t))
  780. validateSyncReplication(t, &fakePodControl, 0, 0, 0)
  781. }
  782. func TestRCManagerNotReady(t *testing.T) {
  783. c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  784. fakePodControl := controller.FakePodControl{}
  785. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 2, 0)
  786. manager.podControl = &fakePodControl
  787. manager.podStoreSynced = func() bool { return false }
  788. // Simulates the rc reflector running before the pod reflector. We don't
  789. // want to end up creating replicas in this case until the pod reflector
  790. // has synced, so the rc manager should just requeue the rc.
  791. controllerSpec := newReplicationController(1)
  792. manager.rcStore.Indexer.Add(controllerSpec)
  793. rcKey := getKey(controllerSpec, t)
  794. manager.syncReplicationController(rcKey)
  795. validateSyncReplication(t, &fakePodControl, 0, 0, 0)
  796. queueRC, _ := manager.queue.Get()
  797. if queueRC != rcKey {
  798. t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
  799. }
  800. manager.podStoreSynced = alwaysReady
  801. manager.syncReplicationController(rcKey)
  802. validateSyncReplication(t, &fakePodControl, 1, 0, 0)
  803. }
  804. // shuffle returns a new shuffled list of container controllers.
  805. func shuffle(controllers []*api.ReplicationController) []*api.ReplicationController {
  806. numControllers := len(controllers)
  807. randIndexes := rand.Perm(numControllers)
  808. shuffled := make([]*api.ReplicationController, numControllers)
  809. for i := 0; i < numControllers; i++ {
  810. shuffled[i] = controllers[randIndexes[i]]
  811. }
  812. return shuffled
  813. }
  814. func TestOverlappingRCs(t *testing.T) {
  815. c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  816. for i := 0; i < 5; i++ {
  817. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
  818. manager.podStoreSynced = alwaysReady
  819. // Create 10 rcs, shuffled them randomly and insert them into the rc manager's store
  820. var controllers []*api.ReplicationController
  821. for j := 1; j < 10; j++ {
  822. controllerSpec := newReplicationController(1)
  823. controllerSpec.CreationTimestamp = unversioned.Date(2014, time.December, j, 0, 0, 0, 0, time.Local)
  824. controllerSpec.Name = string(uuid.NewUUID())
  825. controllers = append(controllers, controllerSpec)
  826. }
  827. shuffledControllers := shuffle(controllers)
  828. for j := range shuffledControllers {
  829. manager.rcStore.Indexer.Add(shuffledControllers[j])
  830. }
  831. // Add a pod and make sure only the oldest rc is synced
  832. pods := newPodList(nil, 1, api.PodPending, controllers[0], "pod")
  833. rcKey := getKey(controllers[0], t)
  834. manager.addPod(&pods.Items[0])
  835. queueRC, _ := manager.queue.Get()
  836. if queueRC != rcKey {
  837. t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
  838. }
  839. }
  840. }
  841. func TestDeletionTimestamp(t *testing.T) {
  842. c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  843. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
  844. manager.podStoreSynced = alwaysReady
  845. controllerSpec := newReplicationController(1)
  846. manager.rcStore.Indexer.Add(controllerSpec)
  847. rcKey, err := controller.KeyFunc(controllerSpec)
  848. if err != nil {
  849. t.Errorf("Couldn't get key for object %#v: %v", controllerSpec, err)
  850. }
  851. pod := newPodList(nil, 1, api.PodPending, controllerSpec, "pod").Items[0]
  852. pod.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
  853. pod.ResourceVersion = "1"
  854. manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)})
  855. // A pod added with a deletion timestamp should decrement deletions, not creations.
  856. manager.addPod(&pod)
  857. queueRC, _ := manager.queue.Get()
  858. if queueRC != rcKey {
  859. t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
  860. }
  861. manager.queue.Done(rcKey)
  862. podExp, exists, err := manager.expectations.GetExpectations(rcKey)
  863. if !exists || err != nil || !podExp.Fulfilled() {
  864. t.Fatalf("Wrong expectations %#v", podExp)
  865. }
  866. // An update from no deletion timestamp to having one should be treated
  867. // as a deletion.
  868. oldPod := newPodList(nil, 1, api.PodPending, controllerSpec, "pod").Items[0]
  869. oldPod.ResourceVersion = "2"
  870. manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(&pod)})
  871. manager.updatePod(&oldPod, &pod)
  872. queueRC, _ = manager.queue.Get()
  873. if queueRC != rcKey {
  874. t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
  875. }
  876. manager.queue.Done(rcKey)
  877. podExp, exists, err = manager.expectations.GetExpectations(rcKey)
  878. if !exists || err != nil || !podExp.Fulfilled() {
  879. t.Fatalf("Wrong expectations %#v", podExp)
  880. }
  881. // An update to the pod (including an update to the deletion timestamp)
  882. // should not be counted as a second delete.
  883. secondPod := &api.Pod{
  884. ObjectMeta: api.ObjectMeta{
  885. Namespace: pod.Namespace,
  886. Name: "secondPod",
  887. Labels: pod.Labels,
  888. },
  889. }
  890. manager.expectations.ExpectDeletions(rcKey, []string{controller.PodKey(secondPod)})
  891. oldPod.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
  892. oldPod.ResourceVersion = "2"
  893. manager.updatePod(&oldPod, &pod)
  894. podExp, exists, err = manager.expectations.GetExpectations(rcKey)
  895. if !exists || err != nil || podExp.Fulfilled() {
  896. t.Fatalf("Wrong expectations %#v", podExp)
  897. }
  898. // A pod with a non-nil deletion timestamp should also be ignored by the
  899. // delete handler, because it's already been counted in the update.
  900. manager.deletePod(&pod)
  901. podExp, exists, err = manager.expectations.GetExpectations(rcKey)
  902. if !exists || err != nil || podExp.Fulfilled() {
  903. t.Fatalf("Wrong expectations %#v", podExp)
  904. }
  905. // Deleting the second pod should clear expectations.
  906. manager.deletePod(secondPod)
  907. queueRC, _ = manager.queue.Get()
  908. if queueRC != rcKey {
  909. t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
  910. }
  911. manager.queue.Done(rcKey)
  912. podExp, exists, err = manager.expectations.GetExpectations(rcKey)
  913. if !exists || err != nil || !podExp.Fulfilled() {
  914. t.Fatalf("Wrong expectations %#v", podExp)
  915. }
  916. }
  917. func BenchmarkGetPodControllerMultiNS(b *testing.B) {
  918. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  919. manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  920. const nsNum = 1000
  921. pods := []api.Pod{}
  922. for i := 0; i < nsNum; i++ {
  923. ns := fmt.Sprintf("ns-%d", i)
  924. for j := 0; j < 10; j++ {
  925. rcName := fmt.Sprintf("rc-%d", j)
  926. for k := 0; k < 10; k++ {
  927. podName := fmt.Sprintf("pod-%d-%d", j, k)
  928. pods = append(pods, api.Pod{
  929. ObjectMeta: api.ObjectMeta{
  930. Name: podName,
  931. Namespace: ns,
  932. Labels: map[string]string{"rcName": rcName},
  933. },
  934. })
  935. }
  936. }
  937. }
  938. for i := 0; i < nsNum; i++ {
  939. ns := fmt.Sprintf("ns-%d", i)
  940. for j := 0; j < 10; j++ {
  941. rcName := fmt.Sprintf("rc-%d", j)
  942. manager.rcStore.Indexer.Add(&api.ReplicationController{
  943. ObjectMeta: api.ObjectMeta{Name: rcName, Namespace: ns},
  944. Spec: api.ReplicationControllerSpec{
  945. Selector: map[string]string{"rcName": rcName},
  946. },
  947. })
  948. }
  949. }
  950. b.ResetTimer()
  951. for i := 0; i < b.N; i++ {
  952. for _, pod := range pods {
  953. manager.getPodController(&pod)
  954. }
  955. }
  956. }
  957. func BenchmarkGetPodControllerSingleNS(b *testing.B) {
  958. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  959. manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  960. const rcNum = 1000
  961. const replicaNum = 3
  962. pods := []api.Pod{}
  963. for i := 0; i < rcNum; i++ {
  964. rcName := fmt.Sprintf("rc-%d", i)
  965. for j := 0; j < replicaNum; j++ {
  966. podName := fmt.Sprintf("pod-%d-%d", i, j)
  967. pods = append(pods, api.Pod{
  968. ObjectMeta: api.ObjectMeta{
  969. Name: podName,
  970. Namespace: "foo",
  971. Labels: map[string]string{"rcName": rcName},
  972. },
  973. })
  974. }
  975. }
  976. for i := 0; i < rcNum; i++ {
  977. rcName := fmt.Sprintf("rc-%d", i)
  978. manager.rcStore.Indexer.Add(&api.ReplicationController{
  979. ObjectMeta: api.ObjectMeta{Name: rcName, Namespace: "foo"},
  980. Spec: api.ReplicationControllerSpec{
  981. Selector: map[string]string{"rcName": rcName},
  982. },
  983. })
  984. }
  985. b.ResetTimer()
  986. for i := 0; i < b.N; i++ {
  987. for _, pod := range pods {
  988. manager.getPodController(&pod)
  989. }
  990. }
  991. }
  992. // setupManagerWithGCEnabled creates a RC manager with a fakePodControl and with garbageCollectorEnabled set to true
  993. func setupManagerWithGCEnabled(objs ...runtime.Object) (manager *ReplicationManager, fakePodControl *controller.FakePodControl) {
  994. c := fakeclientset.NewSimpleClientset(objs...)
  995. fakePodControl = &controller.FakePodControl{}
  996. manager = NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  997. manager.garbageCollectorEnabled = true
  998. manager.podStoreSynced = alwaysReady
  999. manager.podControl = fakePodControl
  1000. return manager, fakePodControl
  1001. }
  1002. func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
  1003. manager, fakePodControl := setupManagerWithGCEnabled()
  1004. rc := newReplicationController(2)
  1005. manager.rcStore.Indexer.Add(rc)
  1006. var trueVar = true
  1007. otherControllerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1", Kind: "ReplicationController", Name: "AnotherRC", Controller: &trueVar}
  1008. // add to podStore a matching Pod controlled by another controller. Expect no patch.
  1009. pod := newPod("pod", rc, api.PodRunning)
  1010. pod.OwnerReferences = []api.OwnerReference{otherControllerReference}
  1011. manager.podStore.Indexer.Add(pod)
  1012. err := manager.syncReplicationController(getKey(rc, t))
  1013. if err != nil {
  1014. t.Fatal(err)
  1015. }
  1016. // because the matching pod already has a controller, so 2 pods should be created.
  1017. validateSyncReplication(t, fakePodControl, 2, 0, 0)
  1018. }
  1019. func TestPatchPodWithOtherOwnerRef(t *testing.T) {
  1020. rc := newReplicationController(2)
  1021. manager, fakePodControl := setupManagerWithGCEnabled(rc)
  1022. manager.rcStore.Indexer.Add(rc)
  1023. // add to podStore one more matching pod that doesn't have a controller
  1024. // ref, but has an owner ref pointing to other object. Expect a patch to
  1025. // take control of it.
  1026. unrelatedOwnerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"}
  1027. pod := newPod("pod", rc, api.PodRunning)
  1028. pod.OwnerReferences = []api.OwnerReference{unrelatedOwnerReference}
  1029. manager.podStore.Indexer.Add(pod)
  1030. err := manager.syncReplicationController(getKey(rc, t))
  1031. if err != nil {
  1032. t.Fatal(err)
  1033. }
  1034. // 1 patch to take control of pod, and 1 create of new pod.
  1035. validateSyncReplication(t, fakePodControl, 1, 0, 1)
  1036. }
  1037. func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
  1038. rc := newReplicationController(2)
  1039. manager, fakePodControl := setupManagerWithGCEnabled(rc)
  1040. manager.rcStore.Indexer.Add(rc)
  1041. // add to podStore a matching pod that has an ownerRef pointing to the rc,
  1042. // but ownerRef.Controller is false. Expect a patch to take control it.
  1043. rcOwnerReference := api.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name}
  1044. pod := newPod("pod", rc, api.PodRunning)
  1045. pod.OwnerReferences = []api.OwnerReference{rcOwnerReference}
  1046. manager.podStore.Indexer.Add(pod)
  1047. err := manager.syncReplicationController(getKey(rc, t))
  1048. if err != nil {
  1049. t.Fatal(err)
  1050. }
  1051. // 1 patch to take control of pod, and 1 create of new pod.
  1052. validateSyncReplication(t, fakePodControl, 1, 0, 1)
  1053. }
  1054. func TestPatchPodFails(t *testing.T) {
  1055. rc := newReplicationController(2)
  1056. manager, fakePodControl := setupManagerWithGCEnabled(rc)
  1057. manager.rcStore.Indexer.Add(rc)
  1058. // add to podStore two matching pods. Expect two patches to take control
  1059. // them.
  1060. manager.podStore.Indexer.Add(newPod("pod1", rc, api.PodRunning))
  1061. manager.podStore.Indexer.Add(newPod("pod2", rc, api.PodRunning))
  1062. // let both patches fail. The rc manager will assume it fails to take
  1063. // control of the pods and create new ones.
  1064. fakePodControl.Err = fmt.Errorf("Fake Error")
  1065. err := manager.syncReplicationController(getKey(rc, t))
  1066. if err == nil || err.Error() != "Fake Error" {
  1067. t.Fatalf("expected Fake Error, got %v", err)
  1068. }
  1069. // 2 patches to take control of pod1 and pod2 (both fail), 2 creates.
  1070. validateSyncReplication(t, fakePodControl, 2, 0, 2)
  1071. }
  1072. func TestPatchExtraPodsThenDelete(t *testing.T) {
  1073. rc := newReplicationController(2)
  1074. manager, fakePodControl := setupManagerWithGCEnabled(rc)
  1075. manager.rcStore.Indexer.Add(rc)
  1076. // add to podStore three matching pods. Expect three patches to take control
  1077. // them, and later delete one of them.
  1078. manager.podStore.Indexer.Add(newPod("pod1", rc, api.PodRunning))
  1079. manager.podStore.Indexer.Add(newPod("pod2", rc, api.PodRunning))
  1080. manager.podStore.Indexer.Add(newPod("pod3", rc, api.PodRunning))
  1081. err := manager.syncReplicationController(getKey(rc, t))
  1082. if err != nil {
  1083. t.Fatal(err)
  1084. }
  1085. // 3 patches to take control of the pods, and 1 deletion because there is an extra pod.
  1086. validateSyncReplication(t, fakePodControl, 0, 1, 3)
  1087. }
  1088. func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
  1089. manager, fakePodControl := setupManagerWithGCEnabled()
  1090. rc := newReplicationController(2)
  1091. manager.rcStore.Indexer.Add(rc)
  1092. // put one pod in the podStore
  1093. pod := newPod("pod", rc, api.PodRunning)
  1094. pod.ResourceVersion = "1"
  1095. var trueVar = true
  1096. rcOwnerReference := api.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &trueVar}
  1097. pod.OwnerReferences = []api.OwnerReference{rcOwnerReference}
  1098. updatedPod := *pod
  1099. // reset the labels
  1100. updatedPod.Labels = make(map[string]string)
  1101. updatedPod.ResourceVersion = "2"
  1102. // add the updatedPod to the store. This is consistent with the behavior of
  1103. // the Informer: Informer updates the store before call the handler
  1104. // (updatePod() in this case).
  1105. manager.podStore.Indexer.Add(&updatedPod)
  1106. // send a update of the same pod with modified labels
  1107. manager.updatePod(pod, &updatedPod)
  1108. // verifies that rc is added to the queue
  1109. rcKey := getKey(rc, t)
  1110. queueRC, _ := manager.queue.Get()
  1111. if queueRC != rcKey {
  1112. t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
  1113. }
  1114. manager.queue.Done(queueRC)
  1115. err := manager.syncReplicationController(rcKey)
  1116. if err != nil {
  1117. t.Fatal(err)
  1118. }
  1119. // expect 1 patch to be sent to remove the controllerRef for the pod.
  1120. // expect 2 creates because the rc.Spec.Replicas=2 and there exists no
  1121. // matching pod.
  1122. validateSyncReplication(t, fakePodControl, 2, 0, 1)
  1123. fakePodControl.Clear()
  1124. }
  1125. func TestUpdateSelectorControllerRef(t *testing.T) {
  1126. manager, fakePodControl := setupManagerWithGCEnabled()
  1127. rc := newReplicationController(2)
  1128. // put 2 pods in the podStore
  1129. newPodList(manager.podStore.Indexer, 2, api.PodRunning, rc, "pod")
  1130. // update the RC so that its selector no longer matches the pods
  1131. updatedRC := *rc
  1132. updatedRC.Spec.Selector = map[string]string{"foo": "baz"}
  1133. // put the updatedRC into the store. This is consistent with the behavior of
  1134. // the Informer: Informer updates the store before call the handler
  1135. // (updateRC() in this case).
  1136. manager.rcStore.Indexer.Add(&updatedRC)
  1137. manager.updateRC(rc, &updatedRC)
  1138. // verifies that the rc is added to the queue
  1139. rcKey := getKey(rc, t)
  1140. queueRC, _ := manager.queue.Get()
  1141. if queueRC != rcKey {
  1142. t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
  1143. }
  1144. manager.queue.Done(queueRC)
  1145. err := manager.syncReplicationController(rcKey)
  1146. if err != nil {
  1147. t.Fatal(err)
  1148. }
  1149. // expect 2 patches to be sent to remove the controllerRef for the pods.
  1150. // expect 2 creates because the rc.Spec.Replicas=2 and there exists no
  1151. // matching pod.
  1152. validateSyncReplication(t, fakePodControl, 2, 0, 2)
  1153. fakePodControl.Clear()
  1154. }
  1155. // RC manager shouldn't adopt or create more pods if the rc is about to be
  1156. // deleted.
  1157. func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
  1158. manager, fakePodControl := setupManagerWithGCEnabled()
  1159. rc := newReplicationController(2)
  1160. now := unversioned.Now()
  1161. rc.DeletionTimestamp = &now
  1162. manager.rcStore.Indexer.Add(rc)
  1163. pod1 := newPod("pod1", rc, api.PodRunning)
  1164. manager.podStore.Indexer.Add(pod1)
  1165. // no patch, no create
  1166. err := manager.syncReplicationController(getKey(rc, t))
  1167. if err != nil {
  1168. t.Fatal(err)
  1169. }
  1170. validateSyncReplication(t, fakePodControl, 0, 0, 0)
  1171. }
  1172. func TestReadyReplicas(t *testing.T) {
  1173. // This is a happy server just to record the PUT request we expect for status.Replicas
  1174. fakeHandler := utiltesting.FakeHandler{
  1175. StatusCode: 200,
  1176. ResponseBody: "{}",
  1177. }
  1178. testServer := httptest.NewServer(&fakeHandler)
  1179. defer testServer.Close()
  1180. c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  1181. manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  1182. manager.podStoreSynced = alwaysReady
  1183. // Status.Replica should update to match number of pods in system, 1 new pod should be created.
  1184. rc := newReplicationController(2)
  1185. rc.Status = api.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 0, ObservedGeneration: 1}
  1186. rc.Generation = 1
  1187. manager.rcStore.Indexer.Add(rc)
  1188. newPodList(manager.podStore.Indexer, 2, api.PodPending, rc, "pod")
  1189. newPodList(manager.podStore.Indexer, 2, api.PodRunning, rc, "pod")
  1190. // This response body is just so we don't err out decoding the http response
  1191. response := runtime.EncodeOrDie(testapi.Default.Codec(), &api.ReplicationController{})
  1192. fakeHandler.ResponseBody = response
  1193. fakePodControl := controller.FakePodControl{}
  1194. manager.podControl = &fakePodControl
  1195. manager.syncReplicationController(getKey(rc, t))
  1196. // ReadyReplicas should go from 0 to 2.
  1197. rc.Status = api.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 2, ObservedGeneration: 1}
  1198. decRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc)
  1199. fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &decRc)
  1200. validateSyncReplication(t, &fakePodControl, 0, 0, 0)
  1201. }