endpoints_controller_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package endpoint
  14. import (
  15. "fmt"
  16. "net/http"
  17. "net/http/httptest"
  18. "testing"
  19. "k8s.io/kubernetes/pkg/api"
  20. endptspkg "k8s.io/kubernetes/pkg/api/endpoints"
  21. "k8s.io/kubernetes/pkg/api/testapi"
  22. "k8s.io/kubernetes/pkg/api/unversioned"
  23. _ "k8s.io/kubernetes/pkg/apimachinery/registered"
  24. "k8s.io/kubernetes/pkg/client/cache"
  25. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  26. "k8s.io/kubernetes/pkg/client/restclient"
  27. "k8s.io/kubernetes/pkg/controller"
  28. "k8s.io/kubernetes/pkg/runtime"
  29. "k8s.io/kubernetes/pkg/util/intstr"
  30. utiltesting "k8s.io/kubernetes/pkg/util/testing"
  31. )
  32. var alwaysReady = func() bool { return true }
  33. var emptyNodeName string
  34. func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
  35. for i := 0; i < nPods+nNotReady; i++ {
  36. p := &api.Pod{
  37. TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()},
  38. ObjectMeta: api.ObjectMeta{
  39. Namespace: namespace,
  40. Name: fmt.Sprintf("pod%d", i),
  41. Labels: map[string]string{"foo": "bar"},
  42. },
  43. Spec: api.PodSpec{
  44. Containers: []api.Container{{Ports: []api.ContainerPort{}}},
  45. },
  46. Status: api.PodStatus{
  47. PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
  48. Conditions: []api.PodCondition{
  49. {
  50. Type: api.PodReady,
  51. Status: api.ConditionTrue,
  52. },
  53. },
  54. },
  55. }
  56. if i >= nPods {
  57. p.Status.Conditions[0].Status = api.ConditionFalse
  58. }
  59. for j := 0; j < nPorts; j++ {
  60. p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
  61. api.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: int32(8080 + j)})
  62. }
  63. store.Add(p)
  64. }
  65. }
  66. type serverResponse struct {
  67. statusCode int
  68. obj interface{}
  69. }
  70. func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResponse) (*httptest.Server, *utiltesting.FakeHandler) {
  71. fakeEndpointsHandler := utiltesting.FakeHandler{
  72. StatusCode: endpointsResponse.statusCode,
  73. ResponseBody: runtime.EncodeOrDie(testapi.Default.Codec(), endpointsResponse.obj.(runtime.Object)),
  74. }
  75. mux := http.NewServeMux()
  76. mux.Handle(testapi.Default.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler)
  77. mux.Handle(testapi.Default.ResourcePath("endpoints/", namespace, ""), &fakeEndpointsHandler)
  78. mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
  79. t.Errorf("unexpected request: %v", req.RequestURI)
  80. res.WriteHeader(http.StatusNotFound)
  81. })
  82. return httptest.NewServer(mux), &fakeEndpointsHandler
  83. }
  84. func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
  85. ns := api.NamespaceDefault
  86. testServer, endpointsHandler := makeTestServer(t, ns,
  87. serverResponse{http.StatusOK, &api.Endpoints{
  88. ObjectMeta: api.ObjectMeta{
  89. Name: "foo",
  90. Namespace: ns,
  91. ResourceVersion: "1",
  92. },
  93. Subsets: []api.EndpointSubset{{
  94. Addresses: []api.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  95. Ports: []api.EndpointPort{{Port: 1000}},
  96. }},
  97. }})
  98. defer testServer.Close()
  99. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  100. endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
  101. endpoints.podStoreSynced = alwaysReady
  102. endpoints.serviceStore.Store.Add(&api.Service{
  103. ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
  104. Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}},
  105. })
  106. endpoints.syncService(ns + "/foo")
  107. endpointsHandler.ValidateRequestCount(t, 0)
  108. }
  109. func TestCheckLeftoverEndpoints(t *testing.T) {
  110. ns := api.NamespaceDefault
  111. // Note that this requests *all* endpoints, therefore the NamespaceAll
  112. // below.
  113. testServer, _ := makeTestServer(t, api.NamespaceAll,
  114. serverResponse{http.StatusOK, &api.EndpointsList{
  115. ListMeta: unversioned.ListMeta{
  116. ResourceVersion: "1",
  117. },
  118. Items: []api.Endpoints{{
  119. ObjectMeta: api.ObjectMeta{
  120. Name: "foo",
  121. Namespace: ns,
  122. ResourceVersion: "1",
  123. },
  124. Subsets: []api.EndpointSubset{{
  125. Addresses: []api.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  126. Ports: []api.EndpointPort{{Port: 1000}},
  127. }},
  128. }},
  129. }})
  130. defer testServer.Close()
  131. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  132. endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
  133. endpoints.podStoreSynced = alwaysReady
  134. endpoints.checkLeftoverEndpoints()
  135. if e, a := 1, endpoints.queue.Len(); e != a {
  136. t.Fatalf("Expected %v, got %v", e, a)
  137. }
  138. got, _ := endpoints.queue.Get()
  139. if e, a := ns+"/foo", got; e != a {
  140. t.Errorf("Expected %v, got %v", e, a)
  141. }
  142. }
  143. func TestSyncEndpointsProtocolTCP(t *testing.T) {
  144. ns := "other"
  145. testServer, endpointsHandler := makeTestServer(t, ns,
  146. serverResponse{http.StatusOK, &api.Endpoints{
  147. ObjectMeta: api.ObjectMeta{
  148. Name: "foo",
  149. Namespace: ns,
  150. ResourceVersion: "1",
  151. },
  152. Subsets: []api.EndpointSubset{{
  153. Addresses: []api.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  154. Ports: []api.EndpointPort{{Port: 1000, Protocol: "TCP"}},
  155. }},
  156. }})
  157. defer testServer.Close()
  158. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  159. endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
  160. endpoints.podStoreSynced = alwaysReady
  161. addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
  162. endpoints.serviceStore.Store.Add(&api.Service{
  163. ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
  164. Spec: api.ServiceSpec{
  165. Selector: map[string]string{},
  166. Ports: []api.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
  167. },
  168. })
  169. endpoints.syncService(ns + "/foo")
  170. endpointsHandler.ValidateRequestCount(t, 2)
  171. data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
  172. ObjectMeta: api.ObjectMeta{
  173. Name: "foo",
  174. Namespace: ns,
  175. ResourceVersion: "1",
  176. },
  177. Subsets: []api.EndpointSubset{{
  178. Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  179. Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  180. }},
  181. })
  182. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  183. }
  184. func TestSyncEndpointsProtocolUDP(t *testing.T) {
  185. ns := "other"
  186. testServer, endpointsHandler := makeTestServer(t, ns,
  187. serverResponse{http.StatusOK, &api.Endpoints{
  188. ObjectMeta: api.ObjectMeta{
  189. Name: "foo",
  190. Namespace: ns,
  191. ResourceVersion: "1",
  192. },
  193. Subsets: []api.EndpointSubset{{
  194. Addresses: []api.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  195. Ports: []api.EndpointPort{{Port: 1000, Protocol: "UDP"}},
  196. }},
  197. }})
  198. defer testServer.Close()
  199. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  200. endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
  201. endpoints.podStoreSynced = alwaysReady
  202. addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
  203. endpoints.serviceStore.Store.Add(&api.Service{
  204. ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
  205. Spec: api.ServiceSpec{
  206. Selector: map[string]string{},
  207. Ports: []api.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "UDP"}},
  208. },
  209. })
  210. endpoints.syncService(ns + "/foo")
  211. endpointsHandler.ValidateRequestCount(t, 2)
  212. data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
  213. ObjectMeta: api.ObjectMeta{
  214. Name: "foo",
  215. Namespace: ns,
  216. ResourceVersion: "1",
  217. },
  218. Subsets: []api.EndpointSubset{{
  219. Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  220. Ports: []api.EndpointPort{{Port: 8080, Protocol: "UDP"}},
  221. }},
  222. })
  223. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  224. }
  225. func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
  226. ns := "other"
  227. testServer, endpointsHandler := makeTestServer(t, ns,
  228. serverResponse{http.StatusOK, &api.Endpoints{
  229. ObjectMeta: api.ObjectMeta{
  230. Name: "foo",
  231. Namespace: ns,
  232. ResourceVersion: "1",
  233. },
  234. Subsets: []api.EndpointSubset{},
  235. }})
  236. defer testServer.Close()
  237. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  238. endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
  239. endpoints.podStoreSynced = alwaysReady
  240. addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
  241. endpoints.serviceStore.Store.Add(&api.Service{
  242. ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
  243. Spec: api.ServiceSpec{
  244. Selector: map[string]string{},
  245. Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  246. },
  247. })
  248. endpoints.syncService(ns + "/foo")
  249. data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
  250. ObjectMeta: api.ObjectMeta{
  251. Name: "foo",
  252. Namespace: ns,
  253. ResourceVersion: "1",
  254. },
  255. Subsets: []api.EndpointSubset{{
  256. Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  257. Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  258. }},
  259. })
  260. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  261. }
  262. func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
  263. ns := "other"
  264. testServer, endpointsHandler := makeTestServer(t, ns,
  265. serverResponse{http.StatusOK, &api.Endpoints{
  266. ObjectMeta: api.ObjectMeta{
  267. Name: "foo",
  268. Namespace: ns,
  269. ResourceVersion: "1",
  270. },
  271. Subsets: []api.EndpointSubset{},
  272. }})
  273. defer testServer.Close()
  274. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  275. endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
  276. endpoints.podStoreSynced = alwaysReady
  277. addPods(endpoints.podStore.Indexer, ns, 0, 1, 1)
  278. endpoints.serviceStore.Store.Add(&api.Service{
  279. ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
  280. Spec: api.ServiceSpec{
  281. Selector: map[string]string{},
  282. Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  283. },
  284. })
  285. endpoints.syncService(ns + "/foo")
  286. data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
  287. ObjectMeta: api.ObjectMeta{
  288. Name: "foo",
  289. Namespace: ns,
  290. ResourceVersion: "1",
  291. },
  292. Subsets: []api.EndpointSubset{{
  293. NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  294. Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  295. }},
  296. })
  297. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  298. }
  299. func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
  300. ns := "other"
  301. testServer, endpointsHandler := makeTestServer(t, ns,
  302. serverResponse{http.StatusOK, &api.Endpoints{
  303. ObjectMeta: api.ObjectMeta{
  304. Name: "foo",
  305. Namespace: ns,
  306. ResourceVersion: "1",
  307. },
  308. Subsets: []api.EndpointSubset{},
  309. }})
  310. defer testServer.Close()
  311. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  312. endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
  313. endpoints.podStoreSynced = alwaysReady
  314. addPods(endpoints.podStore.Indexer, ns, 1, 1, 1)
  315. endpoints.serviceStore.Store.Add(&api.Service{
  316. ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
  317. Spec: api.ServiceSpec{
  318. Selector: map[string]string{},
  319. Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  320. },
  321. })
  322. endpoints.syncService(ns + "/foo")
  323. data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
  324. ObjectMeta: api.ObjectMeta{
  325. Name: "foo",
  326. Namespace: ns,
  327. ResourceVersion: "1",
  328. },
  329. Subsets: []api.EndpointSubset{{
  330. Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  331. NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}},
  332. Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  333. }},
  334. })
  335. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  336. }
  337. func TestSyncEndpointsItemsPreexisting(t *testing.T) {
  338. ns := "bar"
  339. testServer, endpointsHandler := makeTestServer(t, ns,
  340. serverResponse{http.StatusOK, &api.Endpoints{
  341. ObjectMeta: api.ObjectMeta{
  342. Name: "foo",
  343. Namespace: ns,
  344. ResourceVersion: "1",
  345. },
  346. Subsets: []api.EndpointSubset{{
  347. Addresses: []api.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  348. Ports: []api.EndpointPort{{Port: 1000}},
  349. }},
  350. }})
  351. defer testServer.Close()
  352. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  353. endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
  354. endpoints.podStoreSynced = alwaysReady
  355. addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
  356. endpoints.serviceStore.Store.Add(&api.Service{
  357. ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
  358. Spec: api.ServiceSpec{
  359. Selector: map[string]string{"foo": "bar"},
  360. Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  361. },
  362. })
  363. endpoints.syncService(ns + "/foo")
  364. data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
  365. ObjectMeta: api.ObjectMeta{
  366. Name: "foo",
  367. Namespace: ns,
  368. ResourceVersion: "1",
  369. },
  370. Subsets: []api.EndpointSubset{{
  371. Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  372. Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  373. }},
  374. })
  375. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  376. }
  377. func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
  378. ns := api.NamespaceDefault
  379. testServer, endpointsHandler := makeTestServer(t, api.NamespaceDefault,
  380. serverResponse{http.StatusOK, &api.Endpoints{
  381. ObjectMeta: api.ObjectMeta{
  382. ResourceVersion: "1",
  383. Name: "foo",
  384. Namespace: ns,
  385. },
  386. Subsets: []api.EndpointSubset{{
  387. Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  388. Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  389. }},
  390. }})
  391. defer testServer.Close()
  392. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  393. endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
  394. endpoints.podStoreSynced = alwaysReady
  395. addPods(endpoints.podStore.Indexer, api.NamespaceDefault, 1, 1, 0)
  396. endpoints.serviceStore.Store.Add(&api.Service{
  397. ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
  398. Spec: api.ServiceSpec{
  399. Selector: map[string]string{"foo": "bar"},
  400. Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  401. },
  402. })
  403. endpoints.syncService(ns + "/foo")
  404. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", api.NamespaceDefault, "foo"), "GET", nil)
  405. }
  406. func TestSyncEndpointsItems(t *testing.T) {
  407. ns := "other"
  408. testServer, endpointsHandler := makeTestServer(t, ns,
  409. serverResponse{http.StatusOK, &api.Endpoints{}})
  410. defer testServer.Close()
  411. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  412. endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
  413. endpoints.podStoreSynced = alwaysReady
  414. addPods(endpoints.podStore.Indexer, ns, 3, 2, 0)
  415. addPods(endpoints.podStore.Indexer, "blah", 5, 2, 0) // make sure these aren't found!
  416. endpoints.serviceStore.Store.Add(&api.Service{
  417. ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
  418. Spec: api.ServiceSpec{
  419. Selector: map[string]string{"foo": "bar"},
  420. Ports: []api.ServicePort{
  421. {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
  422. {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
  423. },
  424. },
  425. })
  426. endpoints.syncService("other/foo")
  427. expectedSubsets := []api.EndpointSubset{{
  428. Addresses: []api.EndpointAddress{
  429. {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
  430. {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
  431. {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
  432. },
  433. Ports: []api.EndpointPort{
  434. {Name: "port0", Port: 8080, Protocol: "TCP"},
  435. {Name: "port1", Port: 8088, Protocol: "TCP"},
  436. },
  437. }}
  438. data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
  439. ObjectMeta: api.ObjectMeta{
  440. ResourceVersion: "",
  441. },
  442. Subsets: endptspkg.SortSubsets(expectedSubsets),
  443. })
  444. // endpointsHandler should get 2 requests - one for "GET" and the next for "POST".
  445. endpointsHandler.ValidateRequestCount(t, 2)
  446. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
  447. }
  448. func TestSyncEndpointsItemsWithLabels(t *testing.T) {
  449. ns := "other"
  450. testServer, endpointsHandler := makeTestServer(t, ns,
  451. serverResponse{http.StatusOK, &api.Endpoints{}})
  452. defer testServer.Close()
  453. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  454. endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
  455. endpoints.podStoreSynced = alwaysReady
  456. addPods(endpoints.podStore.Indexer, ns, 3, 2, 0)
  457. serviceLabels := map[string]string{"foo": "bar"}
  458. endpoints.serviceStore.Store.Add(&api.Service{
  459. ObjectMeta: api.ObjectMeta{
  460. Name: "foo",
  461. Namespace: ns,
  462. Labels: serviceLabels,
  463. },
  464. Spec: api.ServiceSpec{
  465. Selector: map[string]string{"foo": "bar"},
  466. Ports: []api.ServicePort{
  467. {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
  468. {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
  469. },
  470. },
  471. })
  472. endpoints.syncService(ns + "/foo")
  473. expectedSubsets := []api.EndpointSubset{{
  474. Addresses: []api.EndpointAddress{
  475. {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
  476. {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
  477. {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
  478. },
  479. Ports: []api.EndpointPort{
  480. {Name: "port0", Port: 8080, Protocol: "TCP"},
  481. {Name: "port1", Port: 8088, Protocol: "TCP"},
  482. },
  483. }}
  484. data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
  485. ObjectMeta: api.ObjectMeta{
  486. ResourceVersion: "",
  487. Labels: serviceLabels,
  488. },
  489. Subsets: endptspkg.SortSubsets(expectedSubsets),
  490. })
  491. // endpointsHandler should get 2 requests - one for "GET" and the next for "POST".
  492. endpointsHandler.ValidateRequestCount(t, 2)
  493. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
  494. }
  495. func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
  496. ns := "bar"
  497. testServer, endpointsHandler := makeTestServer(t, ns,
  498. serverResponse{http.StatusOK, &api.Endpoints{
  499. ObjectMeta: api.ObjectMeta{
  500. Name: "foo",
  501. Namespace: ns,
  502. ResourceVersion: "1",
  503. Labels: map[string]string{
  504. "foo": "bar",
  505. },
  506. },
  507. Subsets: []api.EndpointSubset{{
  508. Addresses: []api.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
  509. Ports: []api.EndpointPort{{Port: 1000}},
  510. }},
  511. }})
  512. defer testServer.Close()
  513. client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
  514. endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
  515. endpoints.podStoreSynced = alwaysReady
  516. addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
  517. serviceLabels := map[string]string{"baz": "blah"}
  518. endpoints.serviceStore.Store.Add(&api.Service{
  519. ObjectMeta: api.ObjectMeta{
  520. Name: "foo",
  521. Namespace: ns,
  522. Labels: serviceLabels,
  523. },
  524. Spec: api.ServiceSpec{
  525. Selector: map[string]string{"foo": "bar"},
  526. Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
  527. },
  528. })
  529. endpoints.syncService(ns + "/foo")
  530. data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
  531. ObjectMeta: api.ObjectMeta{
  532. Name: "foo",
  533. Namespace: ns,
  534. ResourceVersion: "1",
  535. Labels: serviceLabels,
  536. },
  537. Subsets: []api.EndpointSubset{{
  538. Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
  539. Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
  540. }},
  541. })
  542. endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
  543. }