123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package endpoint
- import (
- "fmt"
- "net/http"
- "net/http/httptest"
- "testing"
- "k8s.io/kubernetes/pkg/api"
- endptspkg "k8s.io/kubernetes/pkg/api/endpoints"
- "k8s.io/kubernetes/pkg/api/testapi"
- "k8s.io/kubernetes/pkg/api/unversioned"
- _ "k8s.io/kubernetes/pkg/apimachinery/registered"
- "k8s.io/kubernetes/pkg/client/cache"
- clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
- "k8s.io/kubernetes/pkg/client/restclient"
- "k8s.io/kubernetes/pkg/controller"
- "k8s.io/kubernetes/pkg/runtime"
- "k8s.io/kubernetes/pkg/util/intstr"
- utiltesting "k8s.io/kubernetes/pkg/util/testing"
- )
- var alwaysReady = func() bool { return true }
- var emptyNodeName string
- func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
- for i := 0; i < nPods+nNotReady; i++ {
- p := &api.Pod{
- TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()},
- ObjectMeta: api.ObjectMeta{
- Namespace: namespace,
- Name: fmt.Sprintf("pod%d", i),
- Labels: map[string]string{"foo": "bar"},
- },
- Spec: api.PodSpec{
- Containers: []api.Container{{Ports: []api.ContainerPort{}}},
- },
- Status: api.PodStatus{
- PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
- Conditions: []api.PodCondition{
- {
- Type: api.PodReady,
- Status: api.ConditionTrue,
- },
- },
- },
- }
- if i >= nPods {
- p.Status.Conditions[0].Status = api.ConditionFalse
- }
- for j := 0; j < nPorts; j++ {
- p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
- api.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: int32(8080 + j)})
- }
- store.Add(p)
- }
- }
- type serverResponse struct {
- statusCode int
- obj interface{}
- }
- func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResponse) (*httptest.Server, *utiltesting.FakeHandler) {
- fakeEndpointsHandler := utiltesting.FakeHandler{
- StatusCode: endpointsResponse.statusCode,
- ResponseBody: runtime.EncodeOrDie(testapi.Default.Codec(), endpointsResponse.obj.(runtime.Object)),
- }
- mux := http.NewServeMux()
- mux.Handle(testapi.Default.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler)
- mux.Handle(testapi.Default.ResourcePath("endpoints/", namespace, ""), &fakeEndpointsHandler)
- mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
- t.Errorf("unexpected request: %v", req.RequestURI)
- res.WriteHeader(http.StatusNotFound)
- })
- return httptest.NewServer(mux), &fakeEndpointsHandler
- }
- func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
- ns := api.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, ns,
- serverResponse{http.StatusOK, &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []api.EndpointPort{{Port: 1000}},
- }},
- }})
- defer testServer.Close()
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
- endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
- endpoints.podStoreSynced = alwaysReady
- endpoints.serviceStore.Store.Add(&api.Service{
- ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}},
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 0)
- }
- func TestCheckLeftoverEndpoints(t *testing.T) {
- ns := api.NamespaceDefault
- // Note that this requests *all* endpoints, therefore the NamespaceAll
- // below.
- testServer, _ := makeTestServer(t, api.NamespaceAll,
- serverResponse{http.StatusOK, &api.EndpointsList{
- ListMeta: unversioned.ListMeta{
- ResourceVersion: "1",
- },
- Items: []api.Endpoints{{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []api.EndpointPort{{Port: 1000}},
- }},
- }},
- }})
- defer testServer.Close()
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
- endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
- endpoints.podStoreSynced = alwaysReady
- endpoints.checkLeftoverEndpoints()
- if e, a := 1, endpoints.queue.Len(); e != a {
- t.Fatalf("Expected %v, got %v", e, a)
- }
- got, _ := endpoints.queue.Get()
- if e, a := ns+"/foo", got; e != a {
- t.Errorf("Expected %v, got %v", e, a)
- }
- }
- func TestSyncEndpointsProtocolTCP(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns,
- serverResponse{http.StatusOK, &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []api.EndpointPort{{Port: 1000, Protocol: "TCP"}},
- }},
- }})
- defer testServer.Close()
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
- endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
- endpoints.podStoreSynced = alwaysReady
- addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
- endpoints.serviceStore.Store.Add(&api.Service{
- ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: api.ServiceSpec{
- Selector: map[string]string{},
- Ports: []api.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 2)
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsProtocolUDP(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns,
- serverResponse{http.StatusOK, &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []api.EndpointPort{{Port: 1000, Protocol: "UDP"}},
- }},
- }})
- defer testServer.Close()
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
- endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
- endpoints.podStoreSynced = alwaysReady
- addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
- endpoints.serviceStore.Store.Add(&api.Service{
- ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: api.ServiceSpec{
- Selector: map[string]string{},
- Ports: []api.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "UDP"}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequestCount(t, 2)
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []api.EndpointPort{{Port: 8080, Protocol: "UDP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns,
- serverResponse{http.StatusOK, &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{},
- }})
- defer testServer.Close()
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
- endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
- endpoints.podStoreSynced = alwaysReady
- addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
- endpoints.serviceStore.Store.Add(&api.Service{
- ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: api.ServiceSpec{
- Selector: map[string]string{},
- Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns,
- serverResponse{http.StatusOK, &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{},
- }})
- defer testServer.Close()
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
- endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
- endpoints.podStoreSynced = alwaysReady
- addPods(endpoints.podStore.Indexer, ns, 0, 1, 1)
- endpoints.serviceStore.Store.Add(&api.Service{
- ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: api.ServiceSpec{
- Selector: map[string]string{},
- Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{{
- NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns,
- serverResponse{http.StatusOK, &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{},
- }})
- defer testServer.Close()
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
- endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
- endpoints.podStoreSynced = alwaysReady
- addPods(endpoints.podStore.Indexer, ns, 1, 1, 1)
- endpoints.serviceStore.Store.Add(&api.Service{
- ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: api.ServiceSpec{
- Selector: map[string]string{},
- Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}},
- Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsItemsPreexisting(t *testing.T) {
- ns := "bar"
- testServer, endpointsHandler := makeTestServer(t, ns,
- serverResponse{http.StatusOK, &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []api.EndpointPort{{Port: 1000}},
- }},
- }})
- defer testServer.Close()
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
- endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
- endpoints.podStoreSynced = alwaysReady
- addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
- endpoints.serviceStore.Store.Add(&api.Service{
- ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: api.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- },
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
- func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
- ns := api.NamespaceDefault
- testServer, endpointsHandler := makeTestServer(t, api.NamespaceDefault,
- serverResponse{http.StatusOK, &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- ResourceVersion: "1",
- Name: "foo",
- Namespace: ns,
- },
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- }})
- defer testServer.Close()
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
- endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
- endpoints.podStoreSynced = alwaysReady
- addPods(endpoints.podStore.Indexer, api.NamespaceDefault, 1, 1, 0)
- endpoints.serviceStore.Store.Add(&api.Service{
- ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
- Spec: api.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", api.NamespaceDefault, "foo"), "GET", nil)
- }
- func TestSyncEndpointsItems(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns,
- serverResponse{http.StatusOK, &api.Endpoints{}})
- defer testServer.Close()
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
- endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
- endpoints.podStoreSynced = alwaysReady
- addPods(endpoints.podStore.Indexer, ns, 3, 2, 0)
- addPods(endpoints.podStore.Indexer, "blah", 5, 2, 0) // make sure these aren't found!
- endpoints.serviceStore.Store.Add(&api.Service{
- ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
- Spec: api.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []api.ServicePort{
- {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
- {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
- },
- },
- })
- endpoints.syncService("other/foo")
- expectedSubsets := []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{
- {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
- {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
- {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
- },
- Ports: []api.EndpointPort{
- {Name: "port0", Port: 8080, Protocol: "TCP"},
- {Name: "port1", Port: 8088, Protocol: "TCP"},
- },
- }}
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- ResourceVersion: "",
- },
- Subsets: endptspkg.SortSubsets(expectedSubsets),
- })
- // endpointsHandler should get 2 requests - one for "GET" and the next for "POST".
- endpointsHandler.ValidateRequestCount(t, 2)
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
- }
- func TestSyncEndpointsItemsWithLabels(t *testing.T) {
- ns := "other"
- testServer, endpointsHandler := makeTestServer(t, ns,
- serverResponse{http.StatusOK, &api.Endpoints{}})
- defer testServer.Close()
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
- endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
- endpoints.podStoreSynced = alwaysReady
- addPods(endpoints.podStore.Indexer, ns, 3, 2, 0)
- serviceLabels := map[string]string{"foo": "bar"}
- endpoints.serviceStore.Store.Add(&api.Service{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- Labels: serviceLabels,
- },
- Spec: api.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []api.ServicePort{
- {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)},
- {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: intstr.FromInt(8088)},
- },
- },
- })
- endpoints.syncService(ns + "/foo")
- expectedSubsets := []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{
- {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
- {IP: "1.2.3.5", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
- {IP: "1.2.3.6", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
- },
- Ports: []api.EndpointPort{
- {Name: "port0", Port: 8080, Protocol: "TCP"},
- {Name: "port1", Port: 8088, Protocol: "TCP"},
- },
- }}
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- ResourceVersion: "",
- Labels: serviceLabels,
- },
- Subsets: endptspkg.SortSubsets(expectedSubsets),
- })
- // endpointsHandler should get 2 requests - one for "GET" and the next for "POST".
- endpointsHandler.ValidateRequestCount(t, 2)
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
- }
- func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
- ns := "bar"
- testServer, endpointsHandler := makeTestServer(t, ns,
- serverResponse{http.StatusOK, &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: map[string]string{
- "foo": "bar",
- },
- },
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
- Ports: []api.EndpointPort{{Port: 1000}},
- }},
- }})
- defer testServer.Close()
- client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
- endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc)
- endpoints.podStoreSynced = alwaysReady
- addPods(endpoints.podStore.Indexer, ns, 1, 1, 0)
- serviceLabels := map[string]string{"baz": "blah"}
- endpoints.serviceStore.Store.Add(&api.Service{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- Labels: serviceLabels,
- },
- Spec: api.ServiceSpec{
- Selector: map[string]string{"foo": "bar"},
- Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
- },
- })
- endpoints.syncService(ns + "/foo")
- data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{
- ObjectMeta: api.ObjectMeta{
- Name: "foo",
- Namespace: ns,
- ResourceVersion: "1",
- Labels: serviceLabels,
- },
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
- Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
- }},
- })
- endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
- }
|