replica_set_test.go 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // If you make changes to this file, you should also make the corresponding change in ReplicationController.
  14. package replicaset
  15. import (
  16. "fmt"
  17. "math/rand"
  18. "net/http/httptest"
  19. "strings"
  20. "testing"
  21. "time"
  22. "k8s.io/kubernetes/pkg/api"
  23. "k8s.io/kubernetes/pkg/api/testapi"
  24. "k8s.io/kubernetes/pkg/api/unversioned"
  25. "k8s.io/kubernetes/pkg/apis/extensions"
  26. "k8s.io/kubernetes/pkg/client/cache"
  27. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  28. "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
  29. "k8s.io/kubernetes/pkg/client/restclient"
  30. "k8s.io/kubernetes/pkg/client/testing/core"
  31. "k8s.io/kubernetes/pkg/controller"
  32. "k8s.io/kubernetes/pkg/runtime"
  33. "k8s.io/kubernetes/pkg/securitycontext"
  34. "k8s.io/kubernetes/pkg/util/sets"
  35. utiltesting "k8s.io/kubernetes/pkg/util/testing"
  36. "k8s.io/kubernetes/pkg/util/uuid"
  37. "k8s.io/kubernetes/pkg/util/wait"
  38. "k8s.io/kubernetes/pkg/watch"
  39. )
  40. var alwaysReady = func() bool { return true }
  41. func getKey(rs *extensions.ReplicaSet, t *testing.T) string {
  42. if key, err := controller.KeyFunc(rs); err != nil {
  43. t.Errorf("Unexpected error getting key for ReplicaSet %v: %v", rs.Name, err)
  44. return ""
  45. } else {
  46. return key
  47. }
  48. }
  49. func newReplicaSet(replicas int, selectorMap map[string]string) *extensions.ReplicaSet {
  50. rs := &extensions.ReplicaSet{
  51. TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()},
  52. ObjectMeta: api.ObjectMeta{
  53. UID: uuid.NewUUID(),
  54. Name: "foobar",
  55. Namespace: api.NamespaceDefault,
  56. ResourceVersion: "18",
  57. },
  58. Spec: extensions.ReplicaSetSpec{
  59. Replicas: int32(replicas),
  60. Selector: &unversioned.LabelSelector{MatchLabels: selectorMap},
  61. Template: api.PodTemplateSpec{
  62. ObjectMeta: api.ObjectMeta{
  63. Labels: map[string]string{
  64. "name": "foo",
  65. "type": "production",
  66. },
  67. },
  68. Spec: api.PodSpec{
  69. Containers: []api.Container{
  70. {
  71. Image: "foo/bar",
  72. TerminationMessagePath: api.TerminationMessagePathDefault,
  73. ImagePullPolicy: api.PullIfNotPresent,
  74. SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
  75. },
  76. },
  77. RestartPolicy: api.RestartPolicyAlways,
  78. DNSPolicy: api.DNSDefault,
  79. NodeSelector: map[string]string{
  80. "baz": "blah",
  81. },
  82. },
  83. },
  84. },
  85. }
  86. return rs
  87. }
  88. // create a pod with the given phase for the given rs (same selectors and namespace)
  89. func newPod(name string, rs *extensions.ReplicaSet, status api.PodPhase) *api.Pod {
  90. var conditions []api.PodCondition
  91. if status == api.PodRunning {
  92. conditions = append(conditions, api.PodCondition{Type: api.PodReady, Status: api.ConditionTrue})
  93. }
  94. return &api.Pod{
  95. ObjectMeta: api.ObjectMeta{
  96. Name: name,
  97. Namespace: rs.Namespace,
  98. Labels: rs.Spec.Selector.MatchLabels,
  99. },
  100. Status: api.PodStatus{Phase: status, Conditions: conditions},
  101. }
  102. }
  103. // create count pods with the given phase for the given ReplicaSet (same selectors and namespace), and add them to the store.
  104. func newPodList(store cache.Store, count int, status api.PodPhase, labelMap map[string]string, rs *extensions.ReplicaSet, name string) *api.PodList {
  105. pods := []api.Pod{}
  106. var trueVar = true
  107. controllerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}
  108. for i := 0; i < count; i++ {
  109. pod := newPod(fmt.Sprintf("%s%d", name, i), rs, status)
  110. pod.ObjectMeta.Labels = labelMap
  111. pod.OwnerReferences = []api.OwnerReference{controllerReference}
  112. if store != nil {
  113. store.Add(pod)
  114. }
  115. pods = append(pods, *pod)
  116. }
  117. return &api.PodList{
  118. Items: pods,
  119. }
  120. }
  121. func validateSyncReplicaSet(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) {
  122. if e, a := expectedCreates, len(fakePodControl.Templates); e != a {
  123. t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a)
  124. }
  125. if e, a := expectedDeletes, len(fakePodControl.DeletePodName); e != a {
  126. t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", e, a)
  127. }
  128. if e, a := expectedPatches, len(fakePodControl.Patches); e != a {
  129. t.Errorf("Unexpected number of patches. Expected %d, saw %d\n", e, a)
  130. }
  131. }
  132. func replicaSetResourceName() string {
  133. return "replicasets"
  134. }
  135. type serverResponse struct {
  136. statusCode int
  137. obj interface{}
  138. }
  139. func TestSyncReplicaSetDoesNothing(t *testing.T) {
  140. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  141. fakePodControl := controller.FakePodControl{}
  142. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  143. manager.podStoreSynced = alwaysReady
  144. // 2 running pods, a controller with 2 replicas, sync is a no-op
  145. labelMap := map[string]string{"foo": "bar"}
  146. rsSpec := newReplicaSet(2, labelMap)
  147. manager.rsStore.Store.Add(rsSpec)
  148. newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod")
  149. manager.podControl = &fakePodControl
  150. manager.syncReplicaSet(getKey(rsSpec, t))
  151. validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
  152. }
  153. func TestSyncReplicaSetDeletes(t *testing.T) {
  154. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  155. fakePodControl := controller.FakePodControl{}
  156. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  157. manager.podStoreSynced = alwaysReady
  158. manager.podControl = &fakePodControl
  159. // 2 running pods and a controller with 1 replica, one pod delete expected
  160. labelMap := map[string]string{"foo": "bar"}
  161. rsSpec := newReplicaSet(1, labelMap)
  162. manager.rsStore.Store.Add(rsSpec)
  163. newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod")
  164. manager.syncReplicaSet(getKey(rsSpec, t))
  165. validateSyncReplicaSet(t, &fakePodControl, 0, 1, 0)
  166. }
  167. func TestDeleteFinalStateUnknown(t *testing.T) {
  168. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  169. fakePodControl := controller.FakePodControl{}
  170. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  171. manager.podStoreSynced = alwaysReady
  172. manager.podControl = &fakePodControl
  173. received := make(chan string)
  174. manager.syncHandler = func(key string) error {
  175. received <- key
  176. return nil
  177. }
  178. // The DeletedFinalStateUnknown object should cause the ReplicaSet manager to insert
  179. // the controller matching the selectors of the deleted pod into the work queue.
  180. labelMap := map[string]string{"foo": "bar"}
  181. rsSpec := newReplicaSet(1, labelMap)
  182. manager.rsStore.Store.Add(rsSpec)
  183. pods := newPodList(nil, 1, api.PodRunning, labelMap, rsSpec, "pod")
  184. manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
  185. go manager.worker()
  186. expected := getKey(rsSpec, t)
  187. select {
  188. case key := <-received:
  189. if key != expected {
  190. t.Errorf("Unexpected sync all for ReplicaSet %v, expected %v", key, expected)
  191. }
  192. case <-time.After(wait.ForeverTestTimeout):
  193. t.Errorf("Processing DeleteFinalStateUnknown took longer than expected")
  194. }
  195. }
  196. func TestSyncReplicaSetCreates(t *testing.T) {
  197. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  198. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  199. manager.podStoreSynced = alwaysReady
  200. // A controller with 2 replicas and no pods in the store, 2 creates expected
  201. labelMap := map[string]string{"foo": "bar"}
  202. rs := newReplicaSet(2, labelMap)
  203. manager.rsStore.Store.Add(rs)
  204. fakePodControl := controller.FakePodControl{}
  205. manager.podControl = &fakePodControl
  206. manager.syncReplicaSet(getKey(rs, t))
  207. validateSyncReplicaSet(t, &fakePodControl, 2, 0, 0)
  208. }
  209. func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
  210. // Setup a fake server to listen for requests, and run the ReplicaSet controller in steady state
  211. fakeHandler := utiltesting.FakeHandler{
  212. StatusCode: 200,
  213. ResponseBody: "{}",
  214. }
  215. testServer := httptest.NewServer(&fakeHandler)
  216. defer testServer.Close()
  217. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  218. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  219. manager.podStoreSynced = alwaysReady
  220. // Steady state for the ReplicaSet, no Status.Replicas updates expected
  221. activePods := 5
  222. labelMap := map[string]string{"foo": "bar"}
  223. rs := newReplicaSet(activePods, labelMap)
  224. manager.rsStore.Store.Add(rs)
  225. rs.Status = extensions.ReplicaSetStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods)}
  226. newPodList(manager.podStore.Indexer, activePods, api.PodRunning, labelMap, rs, "pod")
  227. fakePodControl := controller.FakePodControl{}
  228. manager.podControl = &fakePodControl
  229. manager.syncReplicaSet(getKey(rs, t))
  230. validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
  231. if fakeHandler.RequestReceived != nil {
  232. t.Errorf("Unexpected update when pods and ReplicaSets are in a steady state")
  233. }
  234. // This response body is just so we don't err out decoding the http response, all
  235. // we care about is the request body sent below.
  236. response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{})
  237. fakeHandler.ResponseBody = response
  238. rs.Generation = rs.Generation + 1
  239. manager.syncReplicaSet(getKey(rs, t))
  240. rs.Status.ObservedGeneration = rs.Generation
  241. updatedRc := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs)
  242. fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &updatedRc)
  243. }
  244. func TestControllerUpdateReplicas(t *testing.T) {
  245. // This is a happy server just to record the PUT request we expect for status.Replicas
  246. fakeHandler := utiltesting.FakeHandler{
  247. StatusCode: 200,
  248. ResponseBody: "{}",
  249. }
  250. testServer := httptest.NewServer(&fakeHandler)
  251. defer testServer.Close()
  252. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  253. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  254. manager.podStoreSynced = alwaysReady
  255. // Insufficient number of pods in the system, and Status.Replicas is wrong;
  256. // Status.Replica should update to match number of pods in system, 1 new pod should be created.
  257. labelMap := map[string]string{"foo": "bar"}
  258. extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"}
  259. rs := newReplicaSet(5, labelMap)
  260. rs.Spec.Template.Labels = extraLabelMap
  261. manager.rsStore.Store.Add(rs)
  262. rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, ObservedGeneration: 0}
  263. rs.Generation = 1
  264. newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
  265. newPodList(manager.podStore.Indexer, 2, api.PodRunning, extraLabelMap, rs, "podWithExtraLabel")
  266. // This response body is just so we don't err out decoding the http response
  267. response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{})
  268. fakeHandler.ResponseBody = response
  269. fakePodControl := controller.FakePodControl{}
  270. manager.podControl = &fakePodControl
  271. manager.syncReplicaSet(getKey(rs, t))
  272. // 1. Status.Replicas should go up from 2->4 even though we created 5-4=1 pod.
  273. // 2. Status.FullyLabeledReplicas should equal to the number of pods that
  274. // has the extra labels, i.e., 2.
  275. // 3. Every update to the status should include the Generation of the spec.
  276. rs.Status = extensions.ReplicaSetStatus{Replicas: 4, FullyLabeledReplicas: 2, ReadyReplicas: 4, ObservedGeneration: 1}
  277. decRc := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs)
  278. fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRc)
  279. validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
  280. }
  281. func TestSyncReplicaSetDormancy(t *testing.T) {
  282. // Setup a test server so we can lie about the current state of pods
  283. fakeHandler := utiltesting.FakeHandler{
  284. StatusCode: 200,
  285. ResponseBody: "{}",
  286. }
  287. testServer := httptest.NewServer(&fakeHandler)
  288. defer testServer.Close()
  289. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  290. fakePodControl := controller.FakePodControl{}
  291. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  292. manager.podStoreSynced = alwaysReady
  293. manager.podControl = &fakePodControl
  294. labelMap := map[string]string{"foo": "bar"}
  295. rsSpec := newReplicaSet(2, labelMap)
  296. manager.rsStore.Store.Add(rsSpec)
  297. newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rsSpec, "pod")
  298. // Creates a replica and sets expectations
  299. rsSpec.Status.Replicas = 1
  300. rsSpec.Status.ReadyReplicas = 1
  301. manager.syncReplicaSet(getKey(rsSpec, t))
  302. validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
  303. // Expectations prevents replicas but not an update on status
  304. rsSpec.Status.Replicas = 0
  305. rsSpec.Status.ReadyReplicas = 0
  306. fakePodControl.Clear()
  307. manager.syncReplicaSet(getKey(rsSpec, t))
  308. validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
  309. // Get the key for the controller
  310. rsKey, err := controller.KeyFunc(rsSpec)
  311. if err != nil {
  312. t.Errorf("Couldn't get key for object %#v: %v", rsSpec, err)
  313. }
  314. // Lowering expectations should lead to a sync that creates a replica, however the
  315. // fakePodControl error will prevent this, leaving expectations at 0, 0
  316. manager.expectations.CreationObserved(rsKey)
  317. rsSpec.Status.Replicas = 1
  318. rsSpec.Status.ReadyReplicas = 1
  319. fakePodControl.Clear()
  320. fakePodControl.Err = fmt.Errorf("Fake Error")
  321. manager.syncReplicaSet(getKey(rsSpec, t))
  322. validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
  323. // This replica should not need a Lowering of expectations, since the previous create failed
  324. fakePodControl.Clear()
  325. fakePodControl.Err = nil
  326. manager.syncReplicaSet(getKey(rsSpec, t))
  327. validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
  328. // 1 PUT for the ReplicaSet status during dormancy window.
  329. // Note that the pod creates go through pod control so they're not recorded.
  330. fakeHandler.ValidateRequestCount(t, 1)
  331. }
  332. func TestPodControllerLookup(t *testing.T) {
  333. manager := NewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}), controller.NoResyncPeriodFunc, BurstReplicas, 0)
  334. manager.podStoreSynced = alwaysReady
  335. testCases := []struct {
  336. inRSs []*extensions.ReplicaSet
  337. pod *api.Pod
  338. outRSName string
  339. }{
  340. // pods without labels don't match any ReplicaSets
  341. {
  342. inRSs: []*extensions.ReplicaSet{
  343. {ObjectMeta: api.ObjectMeta{Name: "basic"}}},
  344. pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo1", Namespace: api.NamespaceAll}},
  345. outRSName: "",
  346. },
  347. // Matching labels, not namespace
  348. {
  349. inRSs: []*extensions.ReplicaSet{
  350. {
  351. ObjectMeta: api.ObjectMeta{Name: "foo"},
  352. Spec: extensions.ReplicaSetSpec{
  353. Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
  354. },
  355. },
  356. },
  357. pod: &api.Pod{
  358. ObjectMeta: api.ObjectMeta{
  359. Name: "foo2", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}},
  360. outRSName: "",
  361. },
  362. // Matching ns and labels returns the key to the ReplicaSet, not the ReplicaSet name
  363. {
  364. inRSs: []*extensions.ReplicaSet{
  365. {
  366. ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"},
  367. Spec: extensions.ReplicaSetSpec{
  368. Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
  369. },
  370. },
  371. },
  372. pod: &api.Pod{
  373. ObjectMeta: api.ObjectMeta{
  374. Name: "foo3", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}},
  375. outRSName: "bar",
  376. },
  377. }
  378. for _, c := range testCases {
  379. for _, r := range c.inRSs {
  380. manager.rsStore.Add(r)
  381. }
  382. if rs := manager.getPodReplicaSet(c.pod); rs != nil {
  383. if c.outRSName != rs.Name {
  384. t.Errorf("Got replica set %+v expected %+v", rs.Name, c.outRSName)
  385. }
  386. } else if c.outRSName != "" {
  387. t.Errorf("Expected a replica set %v pod %v, found none", c.outRSName, c.pod.Name)
  388. }
  389. }
  390. }
  391. type FakeWatcher struct {
  392. w *watch.FakeWatcher
  393. *fake.Clientset
  394. }
  395. func TestWatchControllers(t *testing.T) {
  396. fakeWatch := watch.NewFake()
  397. client := &fake.Clientset{}
  398. client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
  399. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  400. manager.podStoreSynced = alwaysReady
  401. var testRSSpec extensions.ReplicaSet
  402. received := make(chan string)
  403. // The update sent through the fakeWatcher should make its way into the workqueue,
  404. // and eventually into the syncHandler. The handler validates the received controller
  405. // and closes the received channel to indicate that the test can finish.
  406. manager.syncHandler = func(key string) error {
  407. obj, exists, err := manager.rsStore.Store.GetByKey(key)
  408. if !exists || err != nil {
  409. t.Errorf("Expected to find replica set under key %v", key)
  410. }
  411. rsSpec := *obj.(*extensions.ReplicaSet)
  412. if !api.Semantic.DeepDerivative(rsSpec, testRSSpec) {
  413. t.Errorf("Expected %#v, but got %#v", testRSSpec, rsSpec)
  414. }
  415. close(received)
  416. return nil
  417. }
  418. // Start only the ReplicaSet watcher and the workqueue, send a watch event,
  419. // and make sure it hits the sync method.
  420. stopCh := make(chan struct{})
  421. defer close(stopCh)
  422. go manager.rsController.Run(stopCh)
  423. go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
  424. testRSSpec.Name = "foo"
  425. fakeWatch.Add(&testRSSpec)
  426. select {
  427. case <-received:
  428. case <-time.After(wait.ForeverTestTimeout):
  429. t.Errorf("unexpected timeout from result channel")
  430. }
  431. }
  432. func TestWatchPods(t *testing.T) {
  433. fakeWatch := watch.NewFake()
  434. client := &fake.Clientset{}
  435. client.AddWatchReactor("*", core.DefaultWatchReactor(fakeWatch, nil))
  436. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  437. manager.podStoreSynced = alwaysReady
  438. // Put one ReplicaSet and one pod into the controller's stores
  439. labelMap := map[string]string{"foo": "bar"}
  440. testRSSpec := newReplicaSet(1, labelMap)
  441. manager.rsStore.Store.Add(testRSSpec)
  442. received := make(chan string)
  443. // The pod update sent through the fakeWatcher should figure out the managing ReplicaSet and
  444. // send it into the syncHandler.
  445. manager.syncHandler = func(key string) error {
  446. obj, exists, err := manager.rsStore.Store.GetByKey(key)
  447. if !exists || err != nil {
  448. t.Errorf("Expected to find replica set under key %v", key)
  449. }
  450. rsSpec := obj.(*extensions.ReplicaSet)
  451. if !api.Semantic.DeepDerivative(rsSpec, testRSSpec) {
  452. t.Errorf("\nExpected %#v,\nbut got %#v", testRSSpec, rsSpec)
  453. }
  454. close(received)
  455. return nil
  456. }
  457. // Start only the pod watcher and the workqueue, send a watch event,
  458. // and make sure it hits the sync method for the right ReplicaSet.
  459. stopCh := make(chan struct{})
  460. defer close(stopCh)
  461. go manager.podController.Run(stopCh)
  462. go manager.internalPodInformer.Run(stopCh)
  463. go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
  464. pods := newPodList(nil, 1, api.PodRunning, labelMap, testRSSpec, "pod")
  465. testPod := pods.Items[0]
  466. testPod.Status.Phase = api.PodFailed
  467. fakeWatch.Add(&testPod)
  468. select {
  469. case <-received:
  470. case <-time.After(wait.ForeverTestTimeout):
  471. t.Errorf("unexpected timeout from result channel")
  472. }
  473. }
  474. func TestUpdatePods(t *testing.T) {
  475. manager := NewReplicaSetControllerFromClient(fake.NewSimpleClientset(), controller.NoResyncPeriodFunc, BurstReplicas, 0)
  476. manager.podStoreSynced = alwaysReady
  477. received := make(chan string)
  478. manager.syncHandler = func(key string) error {
  479. obj, exists, err := manager.rsStore.Store.GetByKey(key)
  480. if !exists || err != nil {
  481. t.Errorf("Expected to find replica set under key %v", key)
  482. }
  483. received <- obj.(*extensions.ReplicaSet).Name
  484. return nil
  485. }
  486. stopCh := make(chan struct{})
  487. defer close(stopCh)
  488. go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
  489. // Put 2 ReplicaSets and one pod into the controller's stores
  490. labelMap1 := map[string]string{"foo": "bar"}
  491. testRSSpec1 := newReplicaSet(1, labelMap1)
  492. manager.rsStore.Store.Add(testRSSpec1)
  493. testRSSpec2 := *testRSSpec1
  494. labelMap2 := map[string]string{"bar": "foo"}
  495. testRSSpec2.Spec.Selector = &unversioned.LabelSelector{MatchLabels: labelMap2}
  496. testRSSpec2.Name = "barfoo"
  497. manager.rsStore.Store.Add(&testRSSpec2)
  498. // case 1: We put in the podStore a pod with labels matching testRSSpec1,
  499. // then update its labels to match testRSSpec2. We expect to receive a sync
  500. // request for both replica sets.
  501. pod1 := newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap1, testRSSpec1, "pod").Items[0]
  502. pod1.ResourceVersion = "1"
  503. pod2 := pod1
  504. pod2.Labels = labelMap2
  505. pod2.ResourceVersion = "2"
  506. manager.updatePod(&pod1, &pod2)
  507. expected := sets.NewString(testRSSpec1.Name, testRSSpec2.Name)
  508. for _, name := range expected.List() {
  509. t.Logf("Expecting update for %+v", name)
  510. select {
  511. case got := <-received:
  512. if !expected.Has(got) {
  513. t.Errorf("Expected keys %#v got %v", expected, got)
  514. }
  515. case <-time.After(wait.ForeverTestTimeout):
  516. t.Errorf("Expected update notifications for replica sets within 100ms each")
  517. }
  518. }
  519. // case 2: pod1 in the podStore has labels matching testRSSpec1. We update
  520. // its labels to match no replica set. We expect to receive a sync request
  521. // for testRSSpec1.
  522. pod2.Labels = make(map[string]string)
  523. pod2.ResourceVersion = "2"
  524. manager.updatePod(&pod1, &pod2)
  525. expected = sets.NewString(testRSSpec1.Name)
  526. for _, name := range expected.List() {
  527. t.Logf("Expecting update for %+v", name)
  528. select {
  529. case got := <-received:
  530. if !expected.Has(got) {
  531. t.Errorf("Expected keys %#v got %v", expected, got)
  532. }
  533. case <-time.After(wait.ForeverTestTimeout):
  534. t.Errorf("Expected update notifications for replica sets within 100ms each")
  535. }
  536. }
  537. }
  538. func TestControllerUpdateRequeue(t *testing.T) {
  539. // This server should force a requeue of the controller because it fails to update status.Replicas.
  540. fakeHandler := utiltesting.FakeHandler{
  541. StatusCode: 500,
  542. ResponseBody: "{}",
  543. }
  544. testServer := httptest.NewServer(&fakeHandler)
  545. defer testServer.Close()
  546. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  547. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  548. manager.podStoreSynced = alwaysReady
  549. labelMap := map[string]string{"foo": "bar"}
  550. rs := newReplicaSet(1, labelMap)
  551. manager.rsStore.Store.Add(rs)
  552. rs.Status = extensions.ReplicaSetStatus{Replicas: 2}
  553. newPodList(manager.podStore.Indexer, 1, api.PodRunning, labelMap, rs, "pod")
  554. fakePodControl := controller.FakePodControl{}
  555. manager.podControl = &fakePodControl
  556. manager.syncReplicaSet(getKey(rs, t))
  557. ch := make(chan interface{})
  558. go func() {
  559. item, _ := manager.queue.Get()
  560. ch <- item
  561. }()
  562. select {
  563. case key := <-ch:
  564. expectedKey := getKey(rs, t)
  565. if key != expectedKey {
  566. t.Errorf("Expected requeue of replica set with key %s got %s", expectedKey, key)
  567. }
  568. case <-time.After(wait.ForeverTestTimeout):
  569. manager.queue.ShutDown()
  570. t.Errorf("Expected to find a ReplicaSet in the queue, found none.")
  571. }
  572. // 1 Update and 1 GET, both of which fail
  573. fakeHandler.ValidateRequestCount(t, 2)
  574. }
  575. func TestControllerUpdateStatusWithFailure(t *testing.T) {
  576. rs := newReplicaSet(1, map[string]string{"foo": "bar"})
  577. fakeClient := &fake.Clientset{}
  578. fakeClient.AddReactor("get", "replicasets", func(action core.Action) (bool, runtime.Object, error) { return true, rs, nil })
  579. fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
  580. return true, &extensions.ReplicaSet{}, fmt.Errorf("Fake error")
  581. })
  582. fakeRSClient := fakeClient.Extensions().ReplicaSets("default")
  583. numReplicas := 10
  584. updateReplicaCount(fakeRSClient, *rs, numReplicas, 0, 0)
  585. updates, gets := 0, 0
  586. for _, a := range fakeClient.Actions() {
  587. if a.GetResource().Resource != "replicasets" {
  588. t.Errorf("Unexpected action %+v", a)
  589. continue
  590. }
  591. switch action := a.(type) {
  592. case core.GetAction:
  593. gets++
  594. // Make sure the get is for the right ReplicaSet even though the update failed.
  595. if action.GetName() != rs.Name {
  596. t.Errorf("Expected get for ReplicaSet %v, got %+v instead", rs.Name, action.GetName())
  597. }
  598. case core.UpdateAction:
  599. updates++
  600. // Confirm that the update has the right status.Replicas even though the Get
  601. // returned a ReplicaSet with replicas=1.
  602. if c, ok := action.GetObject().(*extensions.ReplicaSet); !ok {
  603. t.Errorf("Expected a ReplicaSet as the argument to update, got %T", c)
  604. } else if int(c.Status.Replicas) != numReplicas {
  605. t.Errorf("Expected update for ReplicaSet to contain replicas %v, got %v instead",
  606. numReplicas, c.Status.Replicas)
  607. }
  608. default:
  609. t.Errorf("Unexpected action %+v", a)
  610. break
  611. }
  612. }
  613. if gets != 1 || updates != 2 {
  614. t.Errorf("Expected 1 get and 2 updates, got %d gets %d updates", gets, updates)
  615. }
  616. }
  617. // TODO: This test is too hairy for a unittest. It should be moved to an E2E suite.
  618. func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
  619. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  620. fakePodControl := controller.FakePodControl{}
  621. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, burstReplicas, 0)
  622. manager.podStoreSynced = alwaysReady
  623. manager.podControl = &fakePodControl
  624. labelMap := map[string]string{"foo": "bar"}
  625. rsSpec := newReplicaSet(numReplicas, labelMap)
  626. manager.rsStore.Store.Add(rsSpec)
  627. expectedPods := int32(0)
  628. pods := newPodList(nil, numReplicas, api.PodPending, labelMap, rsSpec, "pod")
  629. rsKey, err := controller.KeyFunc(rsSpec)
  630. if err != nil {
  631. t.Errorf("Couldn't get key for object %#v: %v", rsSpec, err)
  632. }
  633. // Size up the controller, then size it down, and confirm the expected create/delete pattern
  634. for _, replicas := range []int32{int32(numReplicas), 0} {
  635. rsSpec.Spec.Replicas = replicas
  636. manager.rsStore.Store.Add(rsSpec)
  637. for i := 0; i < numReplicas; i += burstReplicas {
  638. manager.syncReplicaSet(getKey(rsSpec, t))
  639. // The store accrues active pods. It's also used by the ReplicaSet to determine how many
  640. // replicas to create.
  641. activePods := int32(len(manager.podStore.Indexer.List()))
  642. if replicas != 0 {
  643. // This is the number of pods currently "in flight". They were created by the
  644. // ReplicaSet controller above, which then puts the ReplicaSet to sleep till
  645. // all of them have been observed.
  646. expectedPods = replicas - activePods
  647. if expectedPods > int32(burstReplicas) {
  648. expectedPods = int32(burstReplicas)
  649. }
  650. // This validates the ReplicaSet manager sync actually created pods
  651. validateSyncReplicaSet(t, &fakePodControl, int(expectedPods), 0, 0)
  652. // This simulates the watch events for all but 1 of the expected pods.
  653. // None of these should wake the controller because it has expectations==BurstReplicas.
  654. for i := int32(0); i < expectedPods-1; i++ {
  655. manager.podStore.Indexer.Add(&pods.Items[i])
  656. manager.addPod(&pods.Items[i])
  657. }
  658. podExp, exists, err := manager.expectations.GetExpectations(rsKey)
  659. if !exists || err != nil {
  660. t.Fatalf("Did not find expectations for rs.")
  661. }
  662. if add, _ := podExp.GetExpectations(); add != 1 {
  663. t.Fatalf("Expectations are wrong %v", podExp)
  664. }
  665. } else {
  666. expectedPods = (replicas - activePods) * -1
  667. if expectedPods > int32(burstReplicas) {
  668. expectedPods = int32(burstReplicas)
  669. }
  670. validateSyncReplicaSet(t, &fakePodControl, 0, int(expectedPods), 0)
  671. // To accurately simulate a watch we must delete the exact pods
  672. // the rs is waiting for.
  673. expectedDels := manager.expectations.GetUIDs(getKey(rsSpec, t))
  674. podsToDelete := []*api.Pod{}
  675. for _, key := range expectedDels.List() {
  676. nsName := strings.Split(key, "/")
  677. podsToDelete = append(podsToDelete, &api.Pod{
  678. ObjectMeta: api.ObjectMeta{
  679. Name: nsName[1],
  680. Namespace: nsName[0],
  681. Labels: rsSpec.Spec.Selector.MatchLabels,
  682. },
  683. })
  684. }
  685. // Don't delete all pods because we confirm that the last pod
  686. // has exactly one expectation at the end, to verify that we
  687. // don't double delete.
  688. for i := range podsToDelete[1:] {
  689. manager.podStore.Delete(podsToDelete[i])
  690. manager.deletePod(podsToDelete[i])
  691. }
  692. podExp, exists, err := manager.expectations.GetExpectations(rsKey)
  693. if !exists || err != nil {
  694. t.Fatalf("Did not find expectations for ReplicaSet.")
  695. }
  696. if _, del := podExp.GetExpectations(); del != 1 {
  697. t.Fatalf("Expectations are wrong %v", podExp)
  698. }
  699. }
  700. // Check that the ReplicaSet didn't take any action for all the above pods
  701. fakePodControl.Clear()
  702. manager.syncReplicaSet(getKey(rsSpec, t))
  703. validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
  704. // Create/Delete the last pod
  705. // The last add pod will decrease the expectation of the ReplicaSet to 0,
  706. // which will cause it to create/delete the remaining replicas up to burstReplicas.
  707. if replicas != 0 {
  708. manager.podStore.Indexer.Add(&pods.Items[expectedPods-1])
  709. manager.addPod(&pods.Items[expectedPods-1])
  710. } else {
  711. expectedDel := manager.expectations.GetUIDs(getKey(rsSpec, t))
  712. if expectedDel.Len() != 1 {
  713. t.Fatalf("Waiting on unexpected number of deletes.")
  714. }
  715. nsName := strings.Split(expectedDel.List()[0], "/")
  716. lastPod := &api.Pod{
  717. ObjectMeta: api.ObjectMeta{
  718. Name: nsName[1],
  719. Namespace: nsName[0],
  720. Labels: rsSpec.Spec.Selector.MatchLabels,
  721. },
  722. }
  723. manager.podStore.Indexer.Delete(lastPod)
  724. manager.deletePod(lastPod)
  725. }
  726. pods.Items = pods.Items[expectedPods:]
  727. }
  728. // Confirm that we've created the right number of replicas
  729. activePods := int32(len(manager.podStore.Indexer.List()))
  730. if activePods != rsSpec.Spec.Replicas {
  731. t.Fatalf("Unexpected number of active pods, expected %d, got %d", rsSpec.Spec.Replicas, activePods)
  732. }
  733. // Replenish the pod list, since we cut it down sizing up
  734. pods = newPodList(nil, int(replicas), api.PodRunning, labelMap, rsSpec, "pod")
  735. }
  736. }
  737. func TestControllerBurstReplicas(t *testing.T) {
  738. doTestControllerBurstReplicas(t, 5, 30)
  739. doTestControllerBurstReplicas(t, 5, 12)
  740. doTestControllerBurstReplicas(t, 3, 2)
  741. }
  742. type FakeRSExpectations struct {
  743. *controller.ControllerExpectations
  744. satisfied bool
  745. expSatisfied func()
  746. }
  747. func (fe FakeRSExpectations) SatisfiedExpectations(controllerKey string) bool {
  748. fe.expSatisfied()
  749. return fe.satisfied
  750. }
  751. // TestRSSyncExpectations tests that a pod cannot sneak in between counting active pods
  752. // and checking expectations.
  753. func TestRSSyncExpectations(t *testing.T) {
  754. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  755. fakePodControl := controller.FakePodControl{}
  756. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 2, 0)
  757. manager.podStoreSynced = alwaysReady
  758. manager.podControl = &fakePodControl
  759. labelMap := map[string]string{"foo": "bar"}
  760. rsSpec := newReplicaSet(2, labelMap)
  761. manager.rsStore.Store.Add(rsSpec)
  762. pods := newPodList(nil, 2, api.PodPending, labelMap, rsSpec, "pod")
  763. manager.podStore.Indexer.Add(&pods.Items[0])
  764. postExpectationsPod := pods.Items[1]
  765. manager.expectations = controller.NewUIDTrackingControllerExpectations(FakeRSExpectations{
  766. controller.NewControllerExpectations(), true, func() {
  767. // If we check active pods before checking expectataions, the
  768. // ReplicaSet will create a new replica because it doesn't see
  769. // this pod, but has fulfilled its expectations.
  770. manager.podStore.Indexer.Add(&postExpectationsPod)
  771. },
  772. })
  773. manager.syncReplicaSet(getKey(rsSpec, t))
  774. validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
  775. }
  776. func TestDeleteControllerAndExpectations(t *testing.T) {
  777. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  778. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 10, 0)
  779. manager.podStoreSynced = alwaysReady
  780. rs := newReplicaSet(1, map[string]string{"foo": "bar"})
  781. manager.rsStore.Store.Add(rs)
  782. fakePodControl := controller.FakePodControl{}
  783. manager.podControl = &fakePodControl
  784. // This should set expectations for the ReplicaSet
  785. manager.syncReplicaSet(getKey(rs, t))
  786. validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
  787. fakePodControl.Clear()
  788. // Get the ReplicaSet key
  789. rsKey, err := controller.KeyFunc(rs)
  790. if err != nil {
  791. t.Errorf("Couldn't get key for object %#v: %v", rs, err)
  792. }
  793. // This is to simulate a concurrent addPod, that has a handle on the expectations
  794. // as the controller deletes it.
  795. podExp, exists, err := manager.expectations.GetExpectations(rsKey)
  796. if !exists || err != nil {
  797. t.Errorf("No expectations found for ReplicaSet")
  798. }
  799. manager.rsStore.Delete(rs)
  800. manager.syncReplicaSet(getKey(rs, t))
  801. if _, exists, err = manager.expectations.GetExpectations(rsKey); exists {
  802. t.Errorf("Found expectaions, expected none since the ReplicaSet has been deleted.")
  803. }
  804. // This should have no effect, since we've deleted the ReplicaSet.
  805. podExp.Add(-1, 0)
  806. manager.podStore.Indexer.Replace(make([]interface{}, 0), "0")
  807. manager.syncReplicaSet(getKey(rs, t))
  808. validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
  809. }
  810. func TestRSManagerNotReady(t *testing.T) {
  811. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  812. fakePodControl := controller.FakePodControl{}
  813. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 2, 0)
  814. manager.podControl = &fakePodControl
  815. manager.podStoreSynced = func() bool { return false }
  816. // Simulates the ReplicaSet reflector running before the pod reflector. We don't
  817. // want to end up creating replicas in this case until the pod reflector
  818. // has synced, so the ReplicaSet controller should just requeue the ReplicaSet.
  819. rsSpec := newReplicaSet(1, map[string]string{"foo": "bar"})
  820. manager.rsStore.Store.Add(rsSpec)
  821. rsKey := getKey(rsSpec, t)
  822. manager.syncReplicaSet(rsKey)
  823. validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
  824. queueRS, _ := manager.queue.Get()
  825. if queueRS != rsKey {
  826. t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
  827. }
  828. manager.podStoreSynced = alwaysReady
  829. manager.syncReplicaSet(rsKey)
  830. validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
  831. }
  832. // shuffle returns a new shuffled list of container controllers.
  833. func shuffle(controllers []*extensions.ReplicaSet) []*extensions.ReplicaSet {
  834. numControllers := len(controllers)
  835. randIndexes := rand.Perm(numControllers)
  836. shuffled := make([]*extensions.ReplicaSet, numControllers)
  837. for i := 0; i < numControllers; i++ {
  838. shuffled[i] = controllers[randIndexes[i]]
  839. }
  840. return shuffled
  841. }
  842. func TestOverlappingRSs(t *testing.T) {
  843. client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  844. labelMap := map[string]string{"foo": "bar"}
  845. for i := 0; i < 5; i++ {
  846. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, 10, 0)
  847. manager.podStoreSynced = alwaysReady
  848. // Create 10 ReplicaSets, shuffled them randomly and insert them into the ReplicaSet controller's store
  849. var controllers []*extensions.ReplicaSet
  850. for j := 1; j < 10; j++ {
  851. rsSpec := newReplicaSet(1, labelMap)
  852. rsSpec.CreationTimestamp = unversioned.Date(2014, time.December, j, 0, 0, 0, 0, time.Local)
  853. rsSpec.Name = string(uuid.NewUUID())
  854. controllers = append(controllers, rsSpec)
  855. }
  856. shuffledControllers := shuffle(controllers)
  857. for j := range shuffledControllers {
  858. manager.rsStore.Store.Add(shuffledControllers[j])
  859. }
  860. // Add a pod and make sure only the oldest ReplicaSet is synced
  861. pods := newPodList(nil, 1, api.PodPending, labelMap, controllers[0], "pod")
  862. rsKey := getKey(controllers[0], t)
  863. manager.addPod(&pods.Items[0])
  864. queueRS, _ := manager.queue.Get()
  865. if queueRS != rsKey {
  866. t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
  867. }
  868. }
  869. }
  870. func TestDeletionTimestamp(t *testing.T) {
  871. c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  872. labelMap := map[string]string{"foo": "bar"}
  873. manager := NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, 10, 0)
  874. manager.podStoreSynced = alwaysReady
  875. rs := newReplicaSet(1, labelMap)
  876. manager.rsStore.Store.Add(rs)
  877. rsKey, err := controller.KeyFunc(rs)
  878. if err != nil {
  879. t.Errorf("Couldn't get key for object %#v: %v", rs, err)
  880. }
  881. pod := newPodList(nil, 1, api.PodPending, labelMap, rs, "pod").Items[0]
  882. pod.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
  883. pod.ResourceVersion = "1"
  884. manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
  885. // A pod added with a deletion timestamp should decrement deletions, not creations.
  886. manager.addPod(&pod)
  887. queueRS, _ := manager.queue.Get()
  888. if queueRS != rsKey {
  889. t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
  890. }
  891. manager.queue.Done(rsKey)
  892. podExp, exists, err := manager.expectations.GetExpectations(rsKey)
  893. if !exists || err != nil || !podExp.Fulfilled() {
  894. t.Fatalf("Wrong expectations %#v", podExp)
  895. }
  896. // An update from no deletion timestamp to having one should be treated
  897. // as a deletion.
  898. oldPod := newPodList(nil, 1, api.PodPending, labelMap, rs, "pod").Items[0]
  899. oldPod.ResourceVersion = "2"
  900. manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
  901. manager.updatePod(&oldPod, &pod)
  902. queueRS, _ = manager.queue.Get()
  903. if queueRS != rsKey {
  904. t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
  905. }
  906. manager.queue.Done(rsKey)
  907. podExp, exists, err = manager.expectations.GetExpectations(rsKey)
  908. if !exists || err != nil || !podExp.Fulfilled() {
  909. t.Fatalf("Wrong expectations %#v", podExp)
  910. }
  911. // An update to the pod (including an update to the deletion timestamp)
  912. // should not be counted as a second delete.
  913. secondPod := &api.Pod{
  914. ObjectMeta: api.ObjectMeta{
  915. Namespace: pod.Namespace,
  916. Name: "secondPod",
  917. Labels: pod.Labels,
  918. },
  919. }
  920. manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(secondPod)})
  921. oldPod.DeletionTimestamp = &unversioned.Time{Time: time.Now()}
  922. oldPod.ResourceVersion = "2"
  923. manager.updatePod(&oldPod, &pod)
  924. podExp, exists, err = manager.expectations.GetExpectations(rsKey)
  925. if !exists || err != nil || podExp.Fulfilled() {
  926. t.Fatalf("Wrong expectations %#v", podExp)
  927. }
  928. // A pod with a non-nil deletion timestamp should also be ignored by the
  929. // delete handler, because it's already been counted in the update.
  930. manager.deletePod(&pod)
  931. podExp, exists, err = manager.expectations.GetExpectations(rsKey)
  932. if !exists || err != nil || podExp.Fulfilled() {
  933. t.Fatalf("Wrong expectations %#v", podExp)
  934. }
  935. // Deleting the second pod should clear expectations.
  936. manager.deletePod(secondPod)
  937. queueRS, _ = manager.queue.Get()
  938. if queueRS != rsKey {
  939. t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
  940. }
  941. manager.queue.Done(rsKey)
  942. podExp, exists, err = manager.expectations.GetExpectations(rsKey)
  943. if !exists || err != nil || !podExp.Fulfilled() {
  944. t.Fatalf("Wrong expectations %#v", podExp)
  945. }
  946. }
  947. // setupManagerWithGCEnabled creates a RS manager with a fakePodControl
  948. // and with garbageCollectorEnabled set to true
  949. func setupManagerWithGCEnabled() (manager *ReplicaSetController, fakePodControl *controller.FakePodControl) {
  950. c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  951. fakePodControl = &controller.FakePodControl{}
  952. manager = NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  953. manager.garbageCollectorEnabled = true
  954. manager.podStoreSynced = alwaysReady
  955. manager.podControl = fakePodControl
  956. return manager, fakePodControl
  957. }
  958. func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
  959. manager, fakePodControl := setupManagerWithGCEnabled()
  960. labelMap := map[string]string{"foo": "bar"}
  961. rs := newReplicaSet(2, labelMap)
  962. manager.rsStore.Store.Add(rs)
  963. var trueVar = true
  964. otherControllerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar}
  965. // add to podStore a matching Pod controlled by another controller. Expect no patch.
  966. pod := newPod("pod", rs, api.PodRunning)
  967. pod.OwnerReferences = []api.OwnerReference{otherControllerReference}
  968. manager.podStore.Indexer.Add(pod)
  969. err := manager.syncReplicaSet(getKey(rs, t))
  970. if err != nil {
  971. t.Fatal(err)
  972. }
  973. // because the matching pod already has a controller, so 2 pods should be created.
  974. validateSyncReplicaSet(t, fakePodControl, 2, 0, 0)
  975. }
  976. func TestPatchPodWithOtherOwnerRef(t *testing.T) {
  977. manager, fakePodControl := setupManagerWithGCEnabled()
  978. labelMap := map[string]string{"foo": "bar"}
  979. rs := newReplicaSet(2, labelMap)
  980. manager.rsStore.Store.Add(rs)
  981. // add to podStore one more matching pod that doesn't have a controller
  982. // ref, but has an owner ref pointing to other object. Expect a patch to
  983. // take control of it.
  984. unrelatedOwnerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"}
  985. pod := newPod("pod", rs, api.PodRunning)
  986. pod.OwnerReferences = []api.OwnerReference{unrelatedOwnerReference}
  987. manager.podStore.Indexer.Add(pod)
  988. err := manager.syncReplicaSet(getKey(rs, t))
  989. if err != nil {
  990. t.Fatal(err)
  991. }
  992. // 1 patch to take control of pod, and 1 create of new pod.
  993. validateSyncReplicaSet(t, fakePodControl, 1, 0, 1)
  994. }
  995. func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
  996. manager, fakePodControl := setupManagerWithGCEnabled()
  997. labelMap := map[string]string{"foo": "bar"}
  998. rs := newReplicaSet(2, labelMap)
  999. manager.rsStore.Store.Add(rs)
  1000. // add to podStore a matching pod that has an ownerRef pointing to the rs,
  1001. // but ownerRef.Controller is false. Expect a patch to take control it.
  1002. rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name}
  1003. pod := newPod("pod", rs, api.PodRunning)
  1004. pod.OwnerReferences = []api.OwnerReference{rsOwnerReference}
  1005. manager.podStore.Indexer.Add(pod)
  1006. err := manager.syncReplicaSet(getKey(rs, t))
  1007. if err != nil {
  1008. t.Fatal(err)
  1009. }
  1010. // 1 patch to take control of pod, and 1 create of new pod.
  1011. validateSyncReplicaSet(t, fakePodControl, 1, 0, 1)
  1012. }
  1013. func TestPatchPodFails(t *testing.T) {
  1014. manager, fakePodControl := setupManagerWithGCEnabled()
  1015. labelMap := map[string]string{"foo": "bar"}
  1016. rs := newReplicaSet(2, labelMap)
  1017. manager.rsStore.Store.Add(rs)
  1018. // add to podStore two matching pods. Expect two patches to take control
  1019. // them.
  1020. manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning))
  1021. manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning))
  1022. // let both patches fail. The rs controller will assume it fails to take
  1023. // control of the pods and create new ones.
  1024. fakePodControl.Err = fmt.Errorf("Fake Error")
  1025. err := manager.syncReplicaSet(getKey(rs, t))
  1026. if err != nil {
  1027. t.Fatal(err)
  1028. }
  1029. // 2 patches to take control of pod1 and pod2 (both fail), 2 creates.
  1030. validateSyncReplicaSet(t, fakePodControl, 2, 0, 2)
  1031. }
  1032. func TestPatchExtraPodsThenDelete(t *testing.T) {
  1033. manager, fakePodControl := setupManagerWithGCEnabled()
  1034. labelMap := map[string]string{"foo": "bar"}
  1035. rs := newReplicaSet(2, labelMap)
  1036. manager.rsStore.Store.Add(rs)
  1037. // add to podStore three matching pods. Expect three patches to take control
  1038. // them, and later delete one of them.
  1039. manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning))
  1040. manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning))
  1041. manager.podStore.Indexer.Add(newPod("pod3", rs, api.PodRunning))
  1042. err := manager.syncReplicaSet(getKey(rs, t))
  1043. if err != nil {
  1044. t.Fatal(err)
  1045. }
  1046. // 3 patches to take control of the pods, and 1 deletion because there is an extra pod.
  1047. validateSyncReplicaSet(t, fakePodControl, 0, 1, 3)
  1048. }
  1049. func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
  1050. manager, fakePodControl := setupManagerWithGCEnabled()
  1051. labelMap := map[string]string{"foo": "bar"}
  1052. rs := newReplicaSet(2, labelMap)
  1053. manager.rsStore.Store.Add(rs)
  1054. // put one pod in the podStore
  1055. pod := newPod("pod", rs, api.PodRunning)
  1056. pod.ResourceVersion = "1"
  1057. var trueVar = true
  1058. rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}
  1059. pod.OwnerReferences = []api.OwnerReference{rsOwnerReference}
  1060. updatedPod := *pod
  1061. // reset the labels
  1062. updatedPod.Labels = make(map[string]string)
  1063. updatedPod.ResourceVersion = "2"
  1064. // add the updatedPod to the store. This is consistent with the behavior of
  1065. // the Informer: Informer updates the store before call the handler
  1066. // (updatePod() in this case).
  1067. manager.podStore.Indexer.Add(&updatedPod)
  1068. // send a update of the same pod with modified labels
  1069. manager.updatePod(pod, &updatedPod)
  1070. // verifies that rs is added to the queue
  1071. rsKey := getKey(rs, t)
  1072. queueRS, _ := manager.queue.Get()
  1073. if queueRS != rsKey {
  1074. t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
  1075. }
  1076. manager.queue.Done(queueRS)
  1077. err := manager.syncReplicaSet(rsKey)
  1078. if err != nil {
  1079. t.Fatal(err)
  1080. }
  1081. // expect 1 patch to be sent to remove the controllerRef for the pod.
  1082. // expect 2 creates because the rs.Spec.Replicas=2 and there exists no
  1083. // matching pod.
  1084. validateSyncReplicaSet(t, fakePodControl, 2, 0, 1)
  1085. fakePodControl.Clear()
  1086. }
  1087. func TestUpdateSelectorControllerRef(t *testing.T) {
  1088. manager, fakePodControl := setupManagerWithGCEnabled()
  1089. labelMap := map[string]string{"foo": "bar"}
  1090. rs := newReplicaSet(2, labelMap)
  1091. // put 2 pods in the podStore
  1092. newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
  1093. // update the RS so that its selector no longer matches the pods
  1094. updatedRS := *rs
  1095. updatedRS.Spec.Selector.MatchLabels = map[string]string{"foo": "baz"}
  1096. // put the updatedRS into the store. This is consistent with the behavior of
  1097. // the Informer: Informer updates the store before call the handler
  1098. // (updateRS() in this case).
  1099. manager.rsStore.Store.Add(&updatedRS)
  1100. manager.updateRS(rs, &updatedRS)
  1101. // verifies that the rs is added to the queue
  1102. rsKey := getKey(rs, t)
  1103. queueRS, _ := manager.queue.Get()
  1104. if queueRS != rsKey {
  1105. t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
  1106. }
  1107. manager.queue.Done(queueRS)
  1108. err := manager.syncReplicaSet(rsKey)
  1109. if err != nil {
  1110. t.Fatal(err)
  1111. }
  1112. // expect 2 patches to be sent to remove the controllerRef for the pods.
  1113. // expect 2 creates because the rc.Spec.Replicas=2 and there exists no
  1114. // matching pod.
  1115. validateSyncReplicaSet(t, fakePodControl, 2, 0, 2)
  1116. fakePodControl.Clear()
  1117. }
  1118. // RS controller shouldn't adopt or create more pods if the rc is about to be
  1119. // deleted.
  1120. func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
  1121. manager, fakePodControl := setupManagerWithGCEnabled()
  1122. labelMap := map[string]string{"foo": "bar"}
  1123. rs := newReplicaSet(2, labelMap)
  1124. now := unversioned.Now()
  1125. rs.DeletionTimestamp = &now
  1126. manager.rsStore.Store.Add(rs)
  1127. pod1 := newPod("pod1", rs, api.PodRunning)
  1128. manager.podStore.Indexer.Add(pod1)
  1129. // no patch, no create
  1130. err := manager.syncReplicaSet(getKey(rs, t))
  1131. if err != nil {
  1132. t.Fatal(err)
  1133. }
  1134. validateSyncReplicaSet(t, fakePodControl, 0, 0, 0)
  1135. }
  1136. func TestReadyReplicas(t *testing.T) {
  1137. // This is a happy server just to record the PUT request we expect for status.Replicas
  1138. fakeHandler := utiltesting.FakeHandler{
  1139. StatusCode: 200,
  1140. ResponseBody: "{}",
  1141. }
  1142. testServer := httptest.NewServer(&fakeHandler)
  1143. defer testServer.Close()
  1144. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  1145. manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
  1146. manager.podStoreSynced = alwaysReady
  1147. // Status.Replica should update to match number of pods in system, 1 new pod should be created.
  1148. labelMap := map[string]string{"foo": "bar"}
  1149. rs := newReplicaSet(2, labelMap)
  1150. rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 0, ObservedGeneration: 1}
  1151. rs.Generation = 1
  1152. manager.rsStore.Store.Add(rs)
  1153. newPodList(manager.podStore.Indexer, 2, api.PodPending, labelMap, rs, "pod")
  1154. newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
  1155. // This response body is just so we don't err out decoding the http response
  1156. response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{})
  1157. fakeHandler.ResponseBody = response
  1158. fakePodControl := controller.FakePodControl{}
  1159. manager.podControl = &fakePodControl
  1160. manager.syncReplicaSet(getKey(rs, t))
  1161. // ReadyReplicas should go from 0 to 2.
  1162. rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 2, ObservedGeneration: 1}
  1163. decRs := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs)
  1164. fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRs)
  1165. validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
  1166. }