12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package e2e
- import (
- "bytes"
- "fmt"
- "io/ioutil"
- "math/rand"
- "net"
- "net/http"
- "sort"
- "strconv"
- "strings"
- "time"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/api/errors"
- "k8s.io/kubernetes/pkg/api/service"
- client "k8s.io/kubernetes/pkg/client/unversioned"
- "k8s.io/kubernetes/pkg/controller/endpoint"
- "k8s.io/kubernetes/pkg/labels"
- "k8s.io/kubernetes/pkg/types"
- "k8s.io/kubernetes/pkg/util/intstr"
- utilnet "k8s.io/kubernetes/pkg/util/net"
- "k8s.io/kubernetes/pkg/util/sets"
- "k8s.io/kubernetes/pkg/util/uuid"
- "k8s.io/kubernetes/pkg/util/wait"
- "k8s.io/kubernetes/test/e2e/framework"
- )
- const (
- // Maximum time a kube-proxy daemon on a node is allowed to not
- // notice a Service update, such as type=NodePort.
- // TODO: This timeout should be O(10s), observed values are O(1m), 5m is very
- // liberal. Fix tracked in #20567.
- kubeProxyLagTimeout = 5 * time.Minute
- // Maximum time a load balancer is allowed to not respond after creation.
- loadBalancerLagTimeoutDefault = 2 * time.Minute
- // On AWS there is a delay between ELB creation and serving traffic;
- // a few minutes is typical, so use 10m.
- loadBalancerLagTimeoutAWS = 10 * time.Minute
- // How long to wait for a load balancer to be created/modified.
- //TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
- loadBalancerCreateTimeoutDefault = 20 * time.Minute
- loadBalancerCreateTimeoutLarge = time.Hour
- )
- // This should match whatever the default/configured range is
- var ServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
- var _ = framework.KubeDescribe("Services", func() {
- f := framework.NewDefaultFramework("services")
- var c *client.Client
- BeforeEach(func() {
- c = f.Client
- })
- // TODO: We get coverage of TCP/UDP and multi-port services through the DNS test. We should have a simpler test for multi-port TCP here.
- It("should provide secure master service [Conformance]", func() {
- _, err := c.Services(api.NamespaceDefault).Get("kubernetes")
- Expect(err).NotTo(HaveOccurred())
- })
- It("should serve a basic endpoint from pods [Conformance]", func() {
- // TODO: use the ServiceTestJig here
- serviceName := "endpoint-test2"
- ns := f.Namespace.Name
- labels := map[string]string{
- "foo": "bar",
- "baz": "blah",
- }
- By("creating service " + serviceName + " in namespace " + ns)
- defer func() {
- err := c.Services(ns).Delete(serviceName)
- Expect(err).NotTo(HaveOccurred())
- }()
- service := &api.Service{
- ObjectMeta: api.ObjectMeta{
- Name: serviceName,
- },
- Spec: api.ServiceSpec{
- Selector: labels,
- Ports: []api.ServicePort{{
- Port: 80,
- TargetPort: intstr.FromInt(80),
- }},
- },
- }
- _, err := c.Services(ns).Create(service)
- Expect(err).NotTo(HaveOccurred())
- validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
- names := map[string]bool{}
- defer func() {
- for name := range names {
- err := c.Pods(ns).Delete(name, nil)
- Expect(err).NotTo(HaveOccurred())
- }
- }()
- name1 := "pod1"
- name2 := "pod2"
- createPodOrFail(c, ns, name1, labels, []api.ContainerPort{{ContainerPort: 80}})
- names[name1] = true
- validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name1: {80}})
- createPodOrFail(c, ns, name2, labels, []api.ContainerPort{{ContainerPort: 80}})
- names[name2] = true
- validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name1: {80}, name2: {80}})
- deletePodOrFail(c, ns, name1)
- delete(names, name1)
- validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name2: {80}})
- deletePodOrFail(c, ns, name2)
- delete(names, name2)
- validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
- })
- It("should serve multiport endpoints from pods [Conformance]", func() {
- // TODO: use the ServiceTestJig here
- // repacking functionality is intentionally not tested here - it's better to test it in an integration test.
- serviceName := "multi-endpoint-test"
- ns := f.Namespace.Name
- defer func() {
- err := c.Services(ns).Delete(serviceName)
- Expect(err).NotTo(HaveOccurred())
- }()
- labels := map[string]string{"foo": "bar"}
- svc1port := "svc1"
- svc2port := "svc2"
- By("creating service " + serviceName + " in namespace " + ns)
- service := &api.Service{
- ObjectMeta: api.ObjectMeta{
- Name: serviceName,
- },
- Spec: api.ServiceSpec{
- Selector: labels,
- Ports: []api.ServicePort{
- {
- Name: "portname1",
- Port: 80,
- TargetPort: intstr.FromString(svc1port),
- },
- {
- Name: "portname2",
- Port: 81,
- TargetPort: intstr.FromString(svc2port),
- },
- },
- },
- }
- _, err := c.Services(ns).Create(service)
- Expect(err).NotTo(HaveOccurred())
- port1 := 100
- port2 := 101
- validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
- names := map[string]bool{}
- defer func() {
- for name := range names {
- err := c.Pods(ns).Delete(name, nil)
- Expect(err).NotTo(HaveOccurred())
- }
- }()
- containerPorts1 := []api.ContainerPort{
- {
- Name: svc1port,
- ContainerPort: int32(port1),
- },
- }
- containerPorts2 := []api.ContainerPort{
- {
- Name: svc2port,
- ContainerPort: int32(port2),
- },
- }
- podname1 := "pod1"
- podname2 := "pod2"
- createPodOrFail(c, ns, podname1, labels, containerPorts1)
- names[podname1] = true
- validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname1: {port1}})
- createPodOrFail(c, ns, podname2, labels, containerPorts2)
- names[podname2] = true
- validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname1: {port1}, podname2: {port2}})
- deletePodOrFail(c, ns, podname1)
- delete(names, podname1)
- validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname2: {port2}})
- deletePodOrFail(c, ns, podname2)
- delete(names, podname2)
- validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
- })
- It("should be able to up and down services", func() {
- // TODO: use the ServiceTestJig here
- // this test uses framework.NodeSSHHosts that does not work if a Node only reports LegacyHostIP
- framework.SkipUnlessProviderIs(framework.ProvidersWithSSH...)
- ns := f.Namespace.Name
- numPods, servicePort := 3, 80
- By("creating service1 in namespace " + ns)
- podNames1, svc1IP, err := startServeHostnameService(c, ns, "service1", servicePort, numPods)
- Expect(err).NotTo(HaveOccurred())
- By("creating service2 in namespace " + ns)
- podNames2, svc2IP, err := startServeHostnameService(c, ns, "service2", servicePort, numPods)
- Expect(err).NotTo(HaveOccurred())
- hosts, err := framework.NodeSSHHosts(c)
- Expect(err).NotTo(HaveOccurred())
- if len(hosts) == 0 {
- framework.Failf("No ssh-able nodes")
- }
- host := hosts[0]
- By("verifying service1 is up")
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
- By("verifying service2 is up")
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
- // Stop service 1 and make sure it is gone.
- By("stopping service1")
- framework.ExpectNoError(stopServeHostnameService(c, ns, "service1"))
- By("verifying service1 is not up")
- framework.ExpectNoError(verifyServeHostnameServiceDown(c, host, svc1IP, servicePort))
- By("verifying service2 is still up")
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
- // Start another service and verify both are up.
- By("creating service3 in namespace " + ns)
- podNames3, svc3IP, err := startServeHostnameService(c, ns, "service3", servicePort, numPods)
- Expect(err).NotTo(HaveOccurred())
- if svc2IP == svc3IP {
- framework.Failf("service IPs conflict: %v", svc2IP)
- }
- By("verifying service2 is still up")
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
- By("verifying service3 is up")
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames3, svc3IP, servicePort))
- })
- It("should work after restarting kube-proxy [Disruptive]", func() {
- // TODO: use the ServiceTestJig here
- framework.SkipUnlessProviderIs("gce", "gke")
- ns := f.Namespace.Name
- numPods, servicePort := 3, 80
- svc1 := "service1"
- svc2 := "service2"
- defer func() { framework.ExpectNoError(stopServeHostnameService(c, ns, svc1)) }()
- podNames1, svc1IP, err := startServeHostnameService(c, ns, svc1, servicePort, numPods)
- Expect(err).NotTo(HaveOccurred())
- defer func() { framework.ExpectNoError(stopServeHostnameService(c, ns, svc2)) }()
- podNames2, svc2IP, err := startServeHostnameService(c, ns, svc2, servicePort, numPods)
- Expect(err).NotTo(HaveOccurred())
- if svc1IP == svc2IP {
- framework.Failf("VIPs conflict: %v", svc1IP)
- }
- hosts, err := framework.NodeSSHHosts(c)
- Expect(err).NotTo(HaveOccurred())
- if len(hosts) == 0 {
- framework.Failf("No ssh-able nodes")
- }
- host := hosts[0]
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
- By("Restarting kube-proxy")
- if err := framework.RestartKubeProxy(host); err != nil {
- framework.Failf("error restarting kube-proxy: %v", err)
- }
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
- By("Removing iptable rules")
- result, err := framework.SSH(`
- sudo iptables -t nat -F KUBE-SERVICES || true;
- sudo iptables -t nat -F KUBE-PORTALS-HOST || true;
- sudo iptables -t nat -F KUBE-PORTALS-CONTAINER || true`, host, framework.TestContext.Provider)
- if err != nil || result.Code != 0 {
- framework.LogSSHResult(result)
- framework.Failf("couldn't remove iptable rules: %v", err)
- }
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
- })
- It("should work after restarting apiserver [Disruptive]", func() {
- // TODO: use the ServiceTestJig here
- framework.SkipUnlessProviderIs("gce", "gke")
- ns := f.Namespace.Name
- numPods, servicePort := 3, 80
- defer func() { framework.ExpectNoError(stopServeHostnameService(c, ns, "service1")) }()
- podNames1, svc1IP, err := startServeHostnameService(c, ns, "service1", servicePort, numPods)
- Expect(err).NotTo(HaveOccurred())
- hosts, err := framework.NodeSSHHosts(c)
- Expect(err).NotTo(HaveOccurred())
- if len(hosts) == 0 {
- framework.Failf("No ssh-able nodes")
- }
- host := hosts[0]
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
- // Restart apiserver
- if err := framework.RestartApiserver(c); err != nil {
- framework.Failf("error restarting apiserver: %v", err)
- }
- if err := framework.WaitForApiserverUp(c); err != nil {
- framework.Failf("error while waiting for apiserver up: %v", err)
- }
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
- // Create a new service and check if it's not reusing IP.
- defer func() { framework.ExpectNoError(stopServeHostnameService(c, ns, "service2")) }()
- podNames2, svc2IP, err := startServeHostnameService(c, ns, "service2", servicePort, numPods)
- Expect(err).NotTo(HaveOccurred())
- if svc1IP == svc2IP {
- framework.Failf("VIPs conflict: %v", svc1IP)
- }
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
- framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
- })
- // TODO: Run this test against the userspace proxy and nodes
- // configured with a default deny firewall to validate that the
- // proxy whitelists NodePort traffic.
- It("should be able to create a functioning NodePort service", func() {
- serviceName := "nodeport-test"
- ns := f.Namespace.Name
- jig := NewServiceTestJig(c, serviceName)
- nodeIP := pickNodeIP(jig.Client) // for later
- By("creating service " + serviceName + " with type=NodePort in namespace " + ns)
- service := jig.CreateTCPServiceOrFail(ns, func(svc *api.Service) {
- svc.Spec.Type = api.ServiceTypeNodePort
- })
- jig.SanityCheckService(service, api.ServiceTypeNodePort)
- nodePort := int(service.Spec.Ports[0].NodePort)
- By("creating pod to be part of service " + serviceName)
- jig.RunOrFail(ns, nil)
- By("hitting the pod through the service's NodePort")
- jig.TestReachableHTTP(nodeIP, nodePort, kubeProxyLagTimeout)
- By("verifying the node port is locked")
- hostExec := framework.LaunchHostExecPod(f.Client, f.Namespace.Name, "hostexec")
- // Even if the node-ip:node-port check above passed, this hostexec pod
- // might fall on a node with a laggy kube-proxy.
- cmd := fmt.Sprintf(`for i in $(seq 1 300); do if ss -ant46 'sport = :%d' | grep ^LISTEN; then exit 0; fi; sleep 1; done; exit 1`, nodePort)
- stdout, err := framework.RunHostCmd(hostExec.Namespace, hostExec.Name, cmd)
- if err != nil {
- framework.Failf("expected node port %d to be in use, stdout: %v. err: %v", nodePort, stdout, err)
- }
- })
- It("should be able to change the type and ports of a service [Slow]", func() {
- // requires cloud load-balancer support
- framework.SkipUnlessProviderIs("gce", "gke", "aws")
- loadBalancerSupportsUDP := !framework.ProviderIs("aws")
- loadBalancerLagTimeout := loadBalancerLagTimeoutDefault
- if framework.ProviderIs("aws") {
- loadBalancerLagTimeout = loadBalancerLagTimeoutAWS
- }
- loadBalancerCreateTimeout := loadBalancerCreateTimeoutDefault
- largeClusterMinNodesNumber := 100
- if nodes := framework.GetReadySchedulableNodesOrDie(c); len(nodes.Items) > largeClusterMinNodesNumber {
- loadBalancerCreateTimeout = loadBalancerCreateTimeoutLarge
- }
- // This test is more monolithic than we'd like because LB turnup can be
- // very slow, so we lumped all the tests into one LB lifecycle.
- serviceName := "mutability-test"
- ns1 := f.Namespace.Name // LB1 in ns1 on TCP
- framework.Logf("namespace for TCP test: %s", ns1)
- By("creating a second namespace")
- namespacePtr, err := f.CreateNamespace("services", nil)
- Expect(err).NotTo(HaveOccurred())
- ns2 := namespacePtr.Name // LB2 in ns2 on UDP
- framework.Logf("namespace for UDP test: %s", ns2)
- jig := NewServiceTestJig(c, serviceName)
- nodeIP := pickNodeIP(jig.Client) // for later
- // Test TCP and UDP Services. Services with the same name in different
- // namespaces should get different node ports and load balancers.
- By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns1)
- tcpService := jig.CreateTCPServiceOrFail(ns1, nil)
- jig.SanityCheckService(tcpService, api.ServiceTypeClusterIP)
- By("creating a UDP service " + serviceName + " with type=ClusterIP in namespace " + ns2)
- udpService := jig.CreateUDPServiceOrFail(ns2, nil)
- jig.SanityCheckService(udpService, api.ServiceTypeClusterIP)
- By("verifying that TCP and UDP use the same port")
- if tcpService.Spec.Ports[0].Port != udpService.Spec.Ports[0].Port {
- framework.Failf("expected to use the same port for TCP and UDP")
- }
- svcPort := int(tcpService.Spec.Ports[0].Port)
- framework.Logf("service port (TCP and UDP): %d", svcPort)
- By("creating a pod to be part of the TCP service " + serviceName)
- jig.RunOrFail(ns1, nil)
- By("creating a pod to be part of the UDP service " + serviceName)
- jig.RunOrFail(ns2, nil)
- // Change the services to NodePort.
- By("changing the TCP service to type=NodePort")
- tcpService = jig.UpdateServiceOrFail(ns1, tcpService.Name, func(s *api.Service) {
- s.Spec.Type = api.ServiceTypeNodePort
- })
- jig.SanityCheckService(tcpService, api.ServiceTypeNodePort)
- tcpNodePort := int(tcpService.Spec.Ports[0].NodePort)
- framework.Logf("TCP node port: %d", tcpNodePort)
- By("changing the UDP service to type=NodePort")
- udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) {
- s.Spec.Type = api.ServiceTypeNodePort
- })
- jig.SanityCheckService(udpService, api.ServiceTypeNodePort)
- udpNodePort := int(udpService.Spec.Ports[0].NodePort)
- framework.Logf("UDP node port: %d", udpNodePort)
- By("hitting the TCP service's NodePort")
- jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
- By("hitting the UDP service's NodePort")
- jig.TestReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout)
- // Change the services to LoadBalancer.
- // Here we test that LoadBalancers can receive static IP addresses. This isn't
- // necessary, but is an additional feature this monolithic test checks.
- requestedIP := ""
- staticIPName := ""
- if framework.ProviderIs("gce", "gke") {
- By("creating a static load balancer IP")
- staticIPName = fmt.Sprintf("e2e-external-lb-test-%s", framework.RunId)
- requestedIP, err = createGCEStaticIP(staticIPName)
- Expect(err).NotTo(HaveOccurred())
- defer func() {
- if staticIPName != "" {
- // Release GCE static IP - this is not kube-managed and will not be automatically released.
- if err := deleteGCEStaticIP(staticIPName); err != nil {
- framework.Logf("failed to release static IP %s: %v", staticIPName, err)
- }
- }
- }()
- framework.Logf("Allocated static load balancer IP: %s", requestedIP)
- }
- By("changing the TCP service to type=LoadBalancer")
- tcpService = jig.UpdateServiceOrFail(ns1, tcpService.Name, func(s *api.Service) {
- s.Spec.LoadBalancerIP = requestedIP // will be "" if not applicable
- s.Spec.Type = api.ServiceTypeLoadBalancer
- })
- if loadBalancerSupportsUDP {
- By("changing the UDP service to type=LoadBalancer")
- udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) {
- s.Spec.Type = api.ServiceTypeLoadBalancer
- })
- }
- By("waiting for the TCP service to have a load balancer")
- // Wait for the load balancer to be created asynchronously
- tcpService = jig.WaitForLoadBalancerOrFail(ns1, tcpService.Name, loadBalancerCreateTimeout)
- jig.SanityCheckService(tcpService, api.ServiceTypeLoadBalancer)
- if int(tcpService.Spec.Ports[0].NodePort) != tcpNodePort {
- framework.Failf("TCP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", tcpNodePort, tcpService.Spec.Ports[0].NodePort)
- }
- if requestedIP != "" && getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != requestedIP {
- framework.Failf("unexpected TCP Status.LoadBalancer.Ingress (expected %s, got %s)", requestedIP, getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
- }
- tcpIngressIP := getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])
- framework.Logf("TCP load balancer: %s", tcpIngressIP)
- if framework.ProviderIs("gce", "gke") {
- // Do this as early as possible, which overrides the `defer` above.
- // This is mostly out of fear of leaking the IP in a timeout case
- // (as of this writing we're not 100% sure where the leaks are
- // coming from, so this is first-aid rather than surgery).
- By("demoting the static IP to ephemeral")
- if staticIPName != "" {
- // Deleting it after it is attached "demotes" it to an
- // ephemeral IP, which can be auto-released.
- if err := deleteGCEStaticIP(staticIPName); err != nil {
- framework.Failf("failed to release static IP %s: %v", staticIPName, err)
- }
- staticIPName = ""
- }
- }
- var udpIngressIP string
- if loadBalancerSupportsUDP {
- By("waiting for the UDP service to have a load balancer")
- // 2nd one should be faster since they ran in parallel.
- udpService = jig.WaitForLoadBalancerOrFail(ns2, udpService.Name, loadBalancerCreateTimeout)
- jig.SanityCheckService(udpService, api.ServiceTypeLoadBalancer)
- if int(udpService.Spec.Ports[0].NodePort) != udpNodePort {
- framework.Failf("UDP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", udpNodePort, udpService.Spec.Ports[0].NodePort)
- }
- udpIngressIP = getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])
- framework.Logf("UDP load balancer: %s", udpIngressIP)
- By("verifying that TCP and UDP use different load balancers")
- if tcpIngressIP == udpIngressIP {
- framework.Failf("Load balancers are not different: %s", getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
- }
- }
- By("hitting the TCP service's NodePort")
- jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
- By("hitting the UDP service's NodePort")
- jig.TestReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout)
- By("hitting the TCP service's LoadBalancer")
- jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout)
- if loadBalancerSupportsUDP {
- By("hitting the UDP service's LoadBalancer")
- jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout)
- }
- // Change the services' node ports.
- By("changing the TCP service's NodePort")
- tcpService = jig.ChangeServiceNodePortOrFail(ns1, tcpService.Name, tcpNodePort)
- jig.SanityCheckService(tcpService, api.ServiceTypeLoadBalancer)
- tcpNodePortOld := tcpNodePort
- tcpNodePort = int(tcpService.Spec.Ports[0].NodePort)
- if tcpNodePort == tcpNodePortOld {
- framework.Failf("TCP Spec.Ports[0].NodePort (%d) did not change", tcpNodePort)
- }
- if getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP {
- framework.Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
- }
- framework.Logf("TCP node port: %d", tcpNodePort)
- By("changing the UDP service's NodePort")
- udpService = jig.ChangeServiceNodePortOrFail(ns2, udpService.Name, udpNodePort)
- if loadBalancerSupportsUDP {
- jig.SanityCheckService(udpService, api.ServiceTypeLoadBalancer)
- } else {
- jig.SanityCheckService(udpService, api.ServiceTypeNodePort)
- }
- udpNodePortOld := udpNodePort
- udpNodePort = int(udpService.Spec.Ports[0].NodePort)
- if udpNodePort == udpNodePortOld {
- framework.Failf("UDP Spec.Ports[0].NodePort (%d) did not change", udpNodePort)
- }
- if loadBalancerSupportsUDP && getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) != udpIngressIP {
- framework.Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]))
- }
- framework.Logf("UDP node port: %d", udpNodePort)
- By("hitting the TCP service's new NodePort")
- jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
- By("hitting the UDP service's new NodePort")
- jig.TestReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout)
- By("checking the old TCP NodePort is closed")
- jig.TestNotReachableHTTP(nodeIP, tcpNodePortOld, kubeProxyLagTimeout)
- By("checking the old UDP NodePort is closed")
- jig.TestNotReachableUDP(nodeIP, udpNodePortOld, kubeProxyLagTimeout)
- By("hitting the TCP service's LoadBalancer")
- jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout)
- if loadBalancerSupportsUDP {
- By("hitting the UDP service's LoadBalancer")
- jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout)
- }
- // Change the services' main ports.
- By("changing the TCP service's port")
- tcpService = jig.UpdateServiceOrFail(ns1, tcpService.Name, func(s *api.Service) {
- s.Spec.Ports[0].Port++
- })
- jig.SanityCheckService(tcpService, api.ServiceTypeLoadBalancer)
- svcPortOld := svcPort
- svcPort = int(tcpService.Spec.Ports[0].Port)
- if svcPort == svcPortOld {
- framework.Failf("TCP Spec.Ports[0].Port (%d) did not change", svcPort)
- }
- if int(tcpService.Spec.Ports[0].NodePort) != tcpNodePort {
- framework.Failf("TCP Spec.Ports[0].NodePort (%d) changed", tcpService.Spec.Ports[0].NodePort)
- }
- if getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP {
- framework.Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
- }
- By("changing the UDP service's port")
- udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) {
- s.Spec.Ports[0].Port++
- })
- if loadBalancerSupportsUDP {
- jig.SanityCheckService(udpService, api.ServiceTypeLoadBalancer)
- } else {
- jig.SanityCheckService(udpService, api.ServiceTypeNodePort)
- }
- if int(udpService.Spec.Ports[0].Port) != svcPort {
- framework.Failf("UDP Spec.Ports[0].Port (%d) did not change", udpService.Spec.Ports[0].Port)
- }
- if int(udpService.Spec.Ports[0].NodePort) != udpNodePort {
- framework.Failf("UDP Spec.Ports[0].NodePort (%d) changed", udpService.Spec.Ports[0].NodePort)
- }
- if loadBalancerSupportsUDP && getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) != udpIngressIP {
- framework.Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]))
- }
- framework.Logf("service port (TCP and UDP): %d", svcPort)
- By("hitting the TCP service's NodePort")
- jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
- By("hitting the UDP service's NodePort")
- jig.TestReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout)
- By("hitting the TCP service's LoadBalancer")
- jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB
- if loadBalancerSupportsUDP {
- By("hitting the UDP service's LoadBalancer")
- jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB)
- }
- // Change the services back to ClusterIP.
- By("changing TCP service back to type=ClusterIP")
- tcpService = jig.UpdateServiceOrFail(ns1, tcpService.Name, func(s *api.Service) {
- s.Spec.Type = api.ServiceTypeClusterIP
- s.Spec.Ports[0].NodePort = 0
- })
- // Wait for the load balancer to be destroyed asynchronously
- tcpService = jig.WaitForLoadBalancerDestroyOrFail(ns1, tcpService.Name, tcpIngressIP, svcPort, loadBalancerCreateTimeout)
- jig.SanityCheckService(tcpService, api.ServiceTypeClusterIP)
- By("changing UDP service back to type=ClusterIP")
- udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) {
- s.Spec.Type = api.ServiceTypeClusterIP
- s.Spec.Ports[0].NodePort = 0
- })
- if loadBalancerSupportsUDP {
- // Wait for the load balancer to be destroyed asynchronously
- udpService = jig.WaitForLoadBalancerDestroyOrFail(ns2, udpService.Name, udpIngressIP, svcPort, loadBalancerCreateTimeout)
- jig.SanityCheckService(udpService, api.ServiceTypeClusterIP)
- }
- By("checking the TCP NodePort is closed")
- jig.TestNotReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
- By("checking the UDP NodePort is closed")
- jig.TestNotReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout)
- By("checking the TCP LoadBalancer is closed")
- jig.TestNotReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout)
- if loadBalancerSupportsUDP {
- By("checking the UDP LoadBalancer is closed")
- jig.TestNotReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout)
- }
- })
- It("should use same NodePort with same port but different protocols", func() {
- serviceName := "nodeports"
- ns := f.Namespace.Name
- t := NewServerTest(c, ns, serviceName)
- defer func() {
- defer GinkgoRecover()
- errs := t.Cleanup()
- if len(errs) != 0 {
- framework.Failf("errors in cleanup: %v", errs)
- }
- }()
- By("creating service " + serviceName + " with same NodePort but different protocols in namespace " + ns)
- service := &api.Service{
- ObjectMeta: api.ObjectMeta{
- Name: t.ServiceName,
- Namespace: t.Namespace,
- },
- Spec: api.ServiceSpec{
- Selector: t.Labels,
- Type: api.ServiceTypeNodePort,
- Ports: []api.ServicePort{
- {
- Name: "tcp-port",
- Port: 53,
- Protocol: api.ProtocolTCP,
- },
- {
- Name: "udp-port",
- Port: 53,
- Protocol: api.ProtocolUDP,
- },
- },
- },
- }
- result, err := t.CreateService(service)
- Expect(err).NotTo(HaveOccurred())
- if len(result.Spec.Ports) != 2 {
- framework.Failf("got unexpected len(Spec.Ports) for new service: %v", result)
- }
- if result.Spec.Ports[0].NodePort != result.Spec.Ports[1].NodePort {
- framework.Failf("should use same NodePort for new service: %v", result)
- }
- })
- It("should prevent NodePort collisions", func() {
- // TODO: use the ServiceTestJig here
- baseName := "nodeport-collision-"
- serviceName1 := baseName + "1"
- serviceName2 := baseName + "2"
- ns := f.Namespace.Name
- t := NewServerTest(c, ns, serviceName1)
- defer func() {
- defer GinkgoRecover()
- errs := t.Cleanup()
- if len(errs) != 0 {
- framework.Failf("errors in cleanup: %v", errs)
- }
- }()
- By("creating service " + serviceName1 + " with type NodePort in namespace " + ns)
- service := t.BuildServiceSpec()
- service.Spec.Type = api.ServiceTypeNodePort
- result, err := t.CreateService(service)
- Expect(err).NotTo(HaveOccurred())
- if result.Spec.Type != api.ServiceTypeNodePort {
- framework.Failf("got unexpected Spec.Type for new service: %v", result)
- }
- if len(result.Spec.Ports) != 1 {
- framework.Failf("got unexpected len(Spec.Ports) for new service: %v", result)
- }
- port := result.Spec.Ports[0]
- if port.NodePort == 0 {
- framework.Failf("got unexpected Spec.Ports[0].nodePort for new service: %v", result)
- }
- By("creating service " + serviceName2 + " with conflicting NodePort")
- service2 := t.BuildServiceSpec()
- service2.Name = serviceName2
- service2.Spec.Type = api.ServiceTypeNodePort
- service2.Spec.Ports[0].NodePort = port.NodePort
- result2, err := t.CreateService(service2)
- if err == nil {
- framework.Failf("Created service with conflicting NodePort: %v", result2)
- }
- expectedErr := fmt.Sprintf("%d.*port is already allocated", port.NodePort)
- Expect(fmt.Sprintf("%v", err)).To(MatchRegexp(expectedErr))
- By("deleting service " + serviceName1 + " to release NodePort")
- err = t.DeleteService(serviceName1)
- Expect(err).NotTo(HaveOccurred())
- By("creating service " + serviceName2 + " with no-longer-conflicting NodePort")
- _, err = t.CreateService(service2)
- Expect(err).NotTo(HaveOccurred())
- })
- It("should check NodePort out-of-range", func() {
- // TODO: use the ServiceTestJig here
- serviceName := "nodeport-range-test"
- ns := f.Namespace.Name
- t := NewServerTest(c, ns, serviceName)
- defer func() {
- defer GinkgoRecover()
- errs := t.Cleanup()
- if len(errs) != 0 {
- framework.Failf("errors in cleanup: %v", errs)
- }
- }()
- service := t.BuildServiceSpec()
- service.Spec.Type = api.ServiceTypeNodePort
- By("creating service " + serviceName + " with type NodePort in namespace " + ns)
- service, err := t.CreateService(service)
- Expect(err).NotTo(HaveOccurred())
- if service.Spec.Type != api.ServiceTypeNodePort {
- framework.Failf("got unexpected Spec.Type for new service: %v", service)
- }
- if len(service.Spec.Ports) != 1 {
- framework.Failf("got unexpected len(Spec.Ports) for new service: %v", service)
- }
- port := service.Spec.Ports[0]
- if port.NodePort == 0 {
- framework.Failf("got unexpected Spec.Ports[0].nodePort for new service: %v", service)
- }
- if !ServiceNodePortRange.Contains(int(port.NodePort)) {
- framework.Failf("got unexpected (out-of-range) port for new service: %v", service)
- }
- outOfRangeNodePort := 0
- rand.Seed(time.Now().UTC().UnixNano())
- for {
- outOfRangeNodePort = 1 + rand.Intn(65535)
- if !ServiceNodePortRange.Contains(outOfRangeNodePort) {
- break
- }
- }
- By(fmt.Sprintf("changing service "+serviceName+" to out-of-range NodePort %d", outOfRangeNodePort))
- result, err := updateService(c, ns, serviceName, func(s *api.Service) {
- s.Spec.Ports[0].NodePort = int32(outOfRangeNodePort)
- })
- if err == nil {
- framework.Failf("failed to prevent update of service with out-of-range NodePort: %v", result)
- }
- expectedErr := fmt.Sprintf("%d.*port is not in the valid range", outOfRangeNodePort)
- Expect(fmt.Sprintf("%v", err)).To(MatchRegexp(expectedErr))
- By("deleting original service " + serviceName)
- err = t.DeleteService(serviceName)
- Expect(err).NotTo(HaveOccurred())
- By(fmt.Sprintf("creating service "+serviceName+" with out-of-range NodePort %d", outOfRangeNodePort))
- service = t.BuildServiceSpec()
- service.Spec.Type = api.ServiceTypeNodePort
- service.Spec.Ports[0].NodePort = int32(outOfRangeNodePort)
- service, err = t.CreateService(service)
- if err == nil {
- framework.Failf("failed to prevent create of service with out-of-range NodePort (%d): %v", outOfRangeNodePort, service)
- }
- Expect(fmt.Sprintf("%v", err)).To(MatchRegexp(expectedErr))
- })
- It("should release NodePorts on delete", func() {
- // TODO: use the ServiceTestJig here
- serviceName := "nodeport-reuse"
- ns := f.Namespace.Name
- t := NewServerTest(c, ns, serviceName)
- defer func() {
- defer GinkgoRecover()
- errs := t.Cleanup()
- if len(errs) != 0 {
- framework.Failf("errors in cleanup: %v", errs)
- }
- }()
- service := t.BuildServiceSpec()
- service.Spec.Type = api.ServiceTypeNodePort
- By("creating service " + serviceName + " with type NodePort in namespace " + ns)
- service, err := t.CreateService(service)
- Expect(err).NotTo(HaveOccurred())
- if service.Spec.Type != api.ServiceTypeNodePort {
- framework.Failf("got unexpected Spec.Type for new service: %v", service)
- }
- if len(service.Spec.Ports) != 1 {
- framework.Failf("got unexpected len(Spec.Ports) for new service: %v", service)
- }
- port := service.Spec.Ports[0]
- if port.NodePort == 0 {
- framework.Failf("got unexpected Spec.Ports[0].nodePort for new service: %v", service)
- }
- if !ServiceNodePortRange.Contains(int(port.NodePort)) {
- framework.Failf("got unexpected (out-of-range) port for new service: %v", service)
- }
- nodePort := port.NodePort
- By("deleting original service " + serviceName)
- err = t.DeleteService(serviceName)
- Expect(err).NotTo(HaveOccurred())
- hostExec := framework.LaunchHostExecPod(f.Client, f.Namespace.Name, "hostexec")
- cmd := fmt.Sprintf(`! ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN`, nodePort)
- var stdout string
- if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
- var err error
- stdout, err = framework.RunHostCmd(hostExec.Namespace, hostExec.Name, cmd)
- if err != nil {
- framework.Logf("expected node port (%d) to not be in use, stdout: %v", nodePort, stdout)
- return false, nil
- }
- return true, nil
- }); pollErr != nil {
- framework.Failf("expected node port (%d) to not be in use in %v, stdout: %v", nodePort, kubeProxyLagTimeout, stdout)
- }
- By(fmt.Sprintf("creating service "+serviceName+" with same NodePort %d", nodePort))
- service = t.BuildServiceSpec()
- service.Spec.Type = api.ServiceTypeNodePort
- service.Spec.Ports[0].NodePort = nodePort
- service, err = t.CreateService(service)
- Expect(err).NotTo(HaveOccurred())
- })
- It("should create endpoints for unready pods", func() {
- serviceName := "never-ready"
- ns := f.Namespace.Name
- t := NewServerTest(c, ns, serviceName)
- defer func() {
- defer GinkgoRecover()
- errs := t.Cleanup()
- if len(errs) != 0 {
- framework.Failf("errors in cleanup: %v", errs)
- }
- }()
- service := t.BuildServiceSpec()
- service.Annotations = map[string]string{endpoint.TolerateUnreadyEndpointsAnnotation: "true"}
- rcSpec := rcByNameContainer(t.name, 1, t.image, t.Labels, api.Container{
- Name: t.name,
- Image: t.image,
- Ports: []api.ContainerPort{{ContainerPort: int32(80), Protocol: api.ProtocolTCP}},
- ReadinessProbe: &api.Probe{
- Handler: api.Handler{
- Exec: &api.ExecAction{
- Command: []string{"/bin/false"},
- },
- },
- },
- })
- By(fmt.Sprintf("createing RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector))
- _, err := t.createRC(rcSpec)
- ExpectNoError(err)
- By(fmt.Sprintf("creating Service %v with selectors %v", service.Name, service.Spec.Selector))
- _, err = t.CreateService(service)
- ExpectNoError(err)
- By("Verifying pods for RC " + t.name)
- ExpectNoError(framework.VerifyPods(t.Client, t.Namespace, t.name, false, 1))
- svcName := fmt.Sprintf("%v.%v", serviceName, f.Namespace.Name)
- By("waiting for endpoints of Service with DNS name " + svcName)
- execPodName := createExecPodOrFail(f.Client, f.Namespace.Name, "execpod-")
- cmd := fmt.Sprintf("wget -qO- %v", svcName)
- var stdout string
- if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
- var err error
- stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd)
- if err != nil {
- framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.name, stdout, err)
- return false, nil
- }
- return true, nil
- }); pollErr != nil {
- framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
- }
- })
- It("should be able to create services of type LoadBalancer and externalTraffic=localOnly [Slow][Feature:ExternalTrafficLocalOnly]", func() {
- // requires cloud load-balancer support - this feature currently supported only on GCE/GKE
- framework.SkipUnlessProviderIs("gce", "gke")
- loadBalancerCreateTimeout := loadBalancerCreateTimeoutDefault
- largeClusterMinNodesNumber := 100
- if nodes := framework.GetReadySchedulableNodesOrDie(c); len(nodes.Items) > largeClusterMinNodesNumber {
- loadBalancerCreateTimeout = loadBalancerCreateTimeoutLarge
- }
- namespace := f.Namespace.Name
- serviceName := "external-local"
- jig := NewServiceTestJig(c, serviceName)
- By("creating a service " + namespace + "/" + namespace + " with type=LoadBalancer and annotation for local-traffic-only")
- svc := jig.CreateTCPServiceOrFail(namespace, func(svc *api.Service) {
- svc.Spec.Type = api.ServiceTypeLoadBalancer
- // We need to turn affinity off for our LB distribution tests
- svc.Spec.SessionAffinity = api.ServiceAffinityNone
- svc.ObjectMeta.Annotations = map[string]string{
- service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal}
- svc.Spec.Ports = []api.ServicePort{{Protocol: "TCP", Port: 80}}
- })
- By("creating a pod to be part of the service " + serviceName)
- // This container is an nginx container listening on port 80
- // See kubernetes/contrib/ingress/echoheaders/nginx.conf for content of response
- jig.RunOrFail(namespace, nil)
- By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
- svc = jig.WaitForLoadBalancerOrFail(namespace, serviceName, loadBalancerCreateTimeout)
- jig.SanityCheckService(svc, api.ServiceTypeLoadBalancer)
- svcTcpPort := int(svc.Spec.Ports[0].Port)
- framework.Logf("service port : %d", svcTcpPort)
- tcpNodePort := int(svc.Spec.Ports[0].NodePort)
- framework.Logf("TCP node port: %d", tcpNodePort)
- ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
- framework.Logf("TCP load balancer: %s", ingressIP)
- healthCheckNodePort := int(service.GetServiceHealthCheckNodePort(svc))
- By("checking health check node port allocated")
- if healthCheckNodePort == 0 {
- framework.Failf("Service HealthCheck NodePort was not allocated")
- }
- nodeIP := pickNodeIP(jig.Client)
- By("hitting the TCP service's NodePort on " + nodeIP + ":" + fmt.Sprintf("%d", tcpNodePort))
- jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
- By("hitting the TCP service's service port, via its external VIP " + ingressIP + ":" + fmt.Sprintf("%d", svcTcpPort))
- jig.TestReachableHTTP(ingressIP, svcTcpPort, kubeProxyLagTimeout)
- By("reading clientIP using the TCP service's NodePort")
- content := jig.GetHTTPContent(nodeIP, tcpNodePort, kubeProxyLagTimeout, "/clientip")
- clientIP := content.String()
- framework.Logf("ClientIP detected by target pod using NodePort is %s", clientIP)
- By("reading clientIP using the TCP service's service port via its external VIP")
- content = jig.GetHTTPContent(ingressIP, svcTcpPort, kubeProxyLagTimeout, "/clientip")
- clientIP = content.String()
- framework.Logf("ClientIP detected by target pod using VIP:SvcPort is %s", clientIP)
- By("checking if Source IP is preserved")
- if strings.HasPrefix(clientIP, "10.") {
- framework.Failf("Source IP was NOT preserved")
- }
- By("finding nodes for all service endpoints")
- endpoints, err := c.Endpoints(namespace).Get(serviceName)
- if err != nil {
- framework.Failf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err)
- }
- if len(endpoints.Subsets[0].Addresses) == 0 {
- framework.Failf("Expected Ready endpoints - found none")
- }
- readyHostName := *endpoints.Subsets[0].Addresses[0].NodeName
- framework.Logf("Pod for service %s/%s is on node %s", namespace, serviceName, readyHostName)
- // HealthCheck responder validation - iterate over all node IPs and check their HC responses
- // Collect all node names and their public IPs - the nodes and ips slices parallel each other
- nodes := framework.GetReadySchedulableNodesOrDie(jig.Client)
- ips := collectAddresses(nodes, api.NodeExternalIP)
- if len(ips) == 0 {
- ips = collectAddresses(nodes, api.NodeLegacyHostIP)
- }
- By("checking kube-proxy health check responses are correct")
- for n, publicIP := range ips {
- framework.Logf("Checking health check response for node %s, public IP %s", nodes.Items[n].Name, publicIP)
- // HealthCheck should pass only on the node where num(endpoints) > 0
- // All other nodes should fail the healthcheck on the service healthCheckNodePort
- expectedSuccess := nodes.Items[n].Name == readyHostName
- jig.TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, "/healthz", expectedSuccess)
- }
- })
- })
- // updateService fetches a service, calls the update function on it,
- // and then attempts to send the updated service. It retries up to 2
- // times in the face of timeouts and conflicts.
- func updateService(c *client.Client, namespace, serviceName string, update func(*api.Service)) (*api.Service, error) {
- var service *api.Service
- var err error
- for i := 0; i < 3; i++ {
- service, err = c.Services(namespace).Get(serviceName)
- if err != nil {
- return service, err
- }
- update(service)
- service, err = c.Services(namespace).Update(service)
- if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
- return service, err
- }
- }
- return service, err
- }
- func getContainerPortsByPodUID(endpoints *api.Endpoints) PortsByPodUID {
- m := PortsByPodUID{}
- for _, ss := range endpoints.Subsets {
- for _, port := range ss.Ports {
- for _, addr := range ss.Addresses {
- containerPort := port.Port
- hostPort := port.Port
- // use endpoint annotations to recover the container port in a Mesos setup
- // compare contrib/mesos/pkg/service/endpoints_controller.syncService
- key := fmt.Sprintf("k8s.mesosphere.io/containerPort_%s_%s_%d", port.Protocol, addr.IP, hostPort)
- mesosContainerPortString := endpoints.Annotations[key]
- if mesosContainerPortString != "" {
- mesosContainerPort, err := strconv.Atoi(mesosContainerPortString)
- if err != nil {
- continue
- }
- containerPort = int32(mesosContainerPort)
- framework.Logf("Mapped mesos host port %d to container port %d via annotation %s=%s", hostPort, containerPort, key, mesosContainerPortString)
- }
- // framework.Logf("Found pod %v, host port %d and container port %d", addr.TargetRef.UID, hostPort, containerPort)
- if _, ok := m[addr.TargetRef.UID]; !ok {
- m[addr.TargetRef.UID] = make([]int, 0)
- }
- m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], int(containerPort))
- }
- }
- }
- return m
- }
- type PortsByPodName map[string][]int
- type PortsByPodUID map[types.UID][]int
- func translatePodNameToUIDOrFail(c *client.Client, ns string, expectedEndpoints PortsByPodName) PortsByPodUID {
- portsByUID := make(PortsByPodUID)
- for name, portList := range expectedEndpoints {
- pod, err := c.Pods(ns).Get(name)
- if err != nil {
- framework.Failf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err)
- }
- portsByUID[pod.ObjectMeta.UID] = portList
- }
- // framework.Logf("successfully translated pod names to UIDs: %v -> %v on namespace %s", expectedEndpoints, portsByUID, ns)
- return portsByUID
- }
- func validatePortsOrFail(endpoints PortsByPodUID, expectedEndpoints PortsByPodUID) {
- if len(endpoints) != len(expectedEndpoints) {
- // should not happen because we check this condition before
- framework.Failf("invalid number of endpoints got %v, expected %v", endpoints, expectedEndpoints)
- }
- for podUID := range expectedEndpoints {
- if _, ok := endpoints[podUID]; !ok {
- framework.Failf("endpoint %v not found", podUID)
- }
- if len(endpoints[podUID]) != len(expectedEndpoints[podUID]) {
- framework.Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID])
- }
- sort.Ints(endpoints[podUID])
- sort.Ints(expectedEndpoints[podUID])
- for index := range endpoints[podUID] {
- if endpoints[podUID][index] != expectedEndpoints[podUID][index] {
- framework.Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID])
- }
- }
- }
- }
- func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, expectedEndpoints PortsByPodName) {
- By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", framework.ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
- i := 1
- for start := time.Now(); time.Since(start) < framework.ServiceStartTimeout; time.Sleep(1 * time.Second) {
- endpoints, err := c.Endpoints(namespace).Get(serviceName)
- if err != nil {
- framework.Logf("Get endpoints failed (%v elapsed, ignoring for 5s): %v", time.Since(start), err)
- continue
- }
- // framework.Logf("Found endpoints %v", endpoints)
- portsByPodUID := getContainerPortsByPodUID(endpoints)
- // framework.Logf("Found port by pod UID %v", portsByPodUID)
- expectedPortsByPodUID := translatePodNameToUIDOrFail(c, namespace, expectedEndpoints)
- if len(portsByPodUID) == len(expectedEndpoints) {
- validatePortsOrFail(portsByPodUID, expectedPortsByPodUID)
- framework.Logf("successfully validated that service %s in namespace %s exposes endpoints %v (%v elapsed)",
- serviceName, namespace, expectedEndpoints, time.Since(start))
- return
- }
- if i%5 == 0 {
- framework.Logf("Unexpected endpoints: found %v, expected %v (%v elapsed, will retry)", portsByPodUID, expectedEndpoints, time.Since(start))
- }
- i++
- }
- if pods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{}); err == nil {
- for _, pod := range pods.Items {
- framework.Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp)
- }
- } else {
- framework.Logf("Can't list pod debug info: %v", err)
- }
- framework.Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, framework.ServiceStartTimeout)
- }
- // createExecPodOrFail creates a simple busybox pod in a sleep loop used as a
- // vessel for kubectl exec commands.
- // Returns the name of the created pod.
- func createExecPodOrFail(c *client.Client, ns, generateName string) string {
- framework.Logf("Creating new exec pod")
- immediate := int64(0)
- pod := &api.Pod{
- ObjectMeta: api.ObjectMeta{
- GenerateName: generateName,
- Namespace: ns,
- },
- Spec: api.PodSpec{
- TerminationGracePeriodSeconds: &immediate,
- Containers: []api.Container{
- {
- Name: "exec",
- Image: "gcr.io/google_containers/busybox:1.24",
- Command: []string{"sh", "-c", "while true; do sleep 5; done"},
- },
- },
- },
- }
- created, err := c.Pods(ns).Create(pod)
- Expect(err).NotTo(HaveOccurred())
- err = wait.PollImmediate(framework.Poll, 5*time.Minute, func() (bool, error) {
- retrievedPod, err := c.Pods(pod.Namespace).Get(created.Name)
- if err != nil {
- return false, nil
- }
- return retrievedPod.Status.Phase == api.PodRunning, nil
- })
- Expect(err).NotTo(HaveOccurred())
- return created.Name
- }
- func createPodOrFail(c *client.Client, ns, name string, labels map[string]string, containerPorts []api.ContainerPort) {
- By(fmt.Sprintf("creating pod %s in namespace %s", name, ns))
- pod := &api.Pod{
- ObjectMeta: api.ObjectMeta{
- Name: name,
- Labels: labels,
- },
- Spec: api.PodSpec{
- Containers: []api.Container{
- {
- Name: "pause",
- Image: framework.GetPauseImageName(c),
- Ports: containerPorts,
- // Add a dummy environment variable to work around a docker issue.
- // https://github.com/docker/docker/issues/14203
- Env: []api.EnvVar{{Name: "FOO", Value: " "}},
- },
- },
- },
- }
- _, err := c.Pods(ns).Create(pod)
- Expect(err).NotTo(HaveOccurred())
- }
- func deletePodOrFail(c *client.Client, ns, name string) {
- By(fmt.Sprintf("deleting pod %s in namespace %s", name, ns))
- err := c.Pods(ns).Delete(name, nil)
- Expect(err).NotTo(HaveOccurred())
- }
- func collectAddresses(nodes *api.NodeList, addressType api.NodeAddressType) []string {
- ips := []string{}
- for i := range nodes.Items {
- item := &nodes.Items[i]
- for j := range item.Status.Addresses {
- nodeAddress := &item.Status.Addresses[j]
- if nodeAddress.Type == addressType {
- ips = append(ips, nodeAddress.Address)
- }
- }
- }
- return ips
- }
- func getNodePublicIps(c *client.Client) ([]string, error) {
- nodes := framework.GetReadySchedulableNodesOrDie(c)
- ips := collectAddresses(nodes, api.NodeExternalIP)
- if len(ips) == 0 {
- ips = collectAddresses(nodes, api.NodeLegacyHostIP)
- }
- return ips, nil
- }
- func pickNodeIP(c *client.Client) string {
- publicIps, err := getNodePublicIps(c)
- Expect(err).NotTo(HaveOccurred())
- if len(publicIps) == 0 {
- framework.Failf("got unexpected number (%d) of public IPs", len(publicIps))
- }
- ip := publicIps[0]
- return ip
- }
- func testReachableHTTP(ip string, port int, request string, expect string) (bool, error) {
- return testReachableHTTPWithContent(ip, port, request, expect, nil)
- }
- func testReachableHTTPWithContent(ip string, port int, request string, expect string, content *bytes.Buffer) (bool, error) {
- url := fmt.Sprintf("http://%s:%d%s", ip, port, request)
- if ip == "" {
- framework.Failf("Got empty IP for reachability check (%s)", url)
- return false, nil
- }
- if port == 0 {
- framework.Failf("Got port==0 for reachability check (%s)", url)
- return false, nil
- }
- framework.Logf("Testing HTTP reachability of %v", url)
- resp, err := httpGetNoConnectionPool(url)
- if err != nil {
- framework.Logf("Got error testing for reachability of %s: %v", url, err)
- return false, nil
- }
- defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- framework.Logf("Got error reading response from %s: %v", url, err)
- return false, nil
- }
- if resp.StatusCode != 200 {
- return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s",
- resp.Status, url, string(body))
- }
- if !strings.Contains(string(body), expect) {
- return false, fmt.Errorf("received response body without expected substring %q: %s", expect, string(body))
- }
- if content != nil {
- content.Write(body)
- }
- return true, nil
- }
- func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) {
- url := fmt.Sprintf("http://%s:%d%s", ip, port, request)
- if ip == "" || port == 0 {
- framework.Failf("Got empty IP for reachability check (%s)", url)
- return false, fmt.Errorf("Invalid input ip or port")
- }
- framework.Logf("Testing HTTP health check on %v", url)
- resp, err := httpGetNoConnectionPool(url)
- if err != nil {
- framework.Logf("Got error testing for reachability of %s: %v", url, err)
- return false, err
- }
- defer resp.Body.Close()
- if err != nil {
- framework.Logf("Got error reading response from %s: %v", url, err)
- return false, err
- }
- // HealthCheck responder returns 503 for no local endpoints
- if resp.StatusCode == 503 {
- return false, nil
- }
- // HealthCheck responder returns 200 for non-zero local endpoints
- if resp.StatusCode == 200 {
- return true, nil
- }
- return false, fmt.Errorf("Unexpected HTTP response code %s from health check responder at %s", resp.Status, url)
- }
- func testNotReachableHTTP(ip string, port int) (bool, error) {
- url := fmt.Sprintf("http://%s:%d", ip, port)
- if ip == "" {
- framework.Failf("Got empty IP for non-reachability check (%s)", url)
- return false, nil
- }
- if port == 0 {
- framework.Failf("Got port==0 for non-reachability check (%s)", url)
- return false, nil
- }
- framework.Logf("Testing HTTP non-reachability of %v", url)
- resp, err := httpGetNoConnectionPool(url)
- if err != nil {
- framework.Logf("Confirmed that %s is not reachable", url)
- return true, nil
- }
- resp.Body.Close()
- return false, nil
- }
- func testReachableUDP(ip string, port int, request string, expect string) (bool, error) {
- uri := fmt.Sprintf("udp://%s:%d", ip, port)
- if ip == "" {
- framework.Failf("Got empty IP for reachability check (%s)", uri)
- return false, nil
- }
- if port == 0 {
- framework.Failf("Got port==0 for reachability check (%s)", uri)
- return false, nil
- }
- framework.Logf("Testing UDP reachability of %v", uri)
- con, err := net.Dial("udp", ip+":"+strconv.Itoa(port))
- if err != nil {
- return false, fmt.Errorf("Failed to dial %s:%d: %v", ip, port, err)
- }
- _, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
- if err != nil {
- return false, fmt.Errorf("Failed to send request: %v", err)
- }
- var buf []byte = make([]byte, len(expect)+1)
- err = con.SetDeadline(time.Now().Add(3 * time.Second))
- if err != nil {
- return false, fmt.Errorf("Failed to set deadline: %v", err)
- }
- _, err = con.Read(buf)
- if err != nil {
- return false, nil
- }
- if !strings.Contains(string(buf), expect) {
- return false, fmt.Errorf("Failed to retrieve %q, got %q", expect, string(buf))
- }
- framework.Logf("Successfully reached %v", uri)
- return true, nil
- }
- func testNotReachableUDP(ip string, port int, request string) (bool, error) {
- uri := fmt.Sprintf("udp://%s:%d", ip, port)
- if ip == "" {
- framework.Failf("Got empty IP for reachability check (%s)", uri)
- return false, nil
- }
- if port == 0 {
- framework.Failf("Got port==0 for reachability check (%s)", uri)
- return false, nil
- }
- framework.Logf("Testing UDP non-reachability of %v", uri)
- con, err := net.Dial("udp", ip+":"+strconv.Itoa(port))
- if err != nil {
- framework.Logf("Confirmed that %s is not reachable", uri)
- return true, nil
- }
- _, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
- if err != nil {
- framework.Logf("Confirmed that %s is not reachable", uri)
- return true, nil
- }
- var buf []byte = make([]byte, 1)
- err = con.SetDeadline(time.Now().Add(3 * time.Second))
- if err != nil {
- return false, fmt.Errorf("Failed to set deadline: %v", err)
- }
- _, err = con.Read(buf)
- if err != nil {
- framework.Logf("Confirmed that %s is not reachable", uri)
- return true, nil
- }
- return false, nil
- }
- // Creates a replication controller that serves its hostname and a service on top of it.
- func startServeHostnameService(c *client.Client, ns, name string, port, replicas int) ([]string, string, error) {
- podNames := make([]string, replicas)
- By("creating service " + name + " in namespace " + ns)
- _, err := c.Services(ns).Create(&api.Service{
- ObjectMeta: api.ObjectMeta{
- Name: name,
- },
- Spec: api.ServiceSpec{
- Ports: []api.ServicePort{{
- Port: int32(port),
- TargetPort: intstr.FromInt(9376),
- Protocol: "TCP",
- }},
- Selector: map[string]string{
- "name": name,
- },
- },
- })
- if err != nil {
- return podNames, "", err
- }
- var createdPods []*api.Pod
- maxContainerFailures := 0
- config := framework.RCConfig{
- Client: c,
- Image: "gcr.io/google_containers/serve_hostname:v1.4",
- Name: name,
- Namespace: ns,
- PollInterval: 3 * time.Second,
- Timeout: framework.PodReadyBeforeTimeout,
- Replicas: replicas,
- CreatedPods: &createdPods,
- MaxContainerFailures: &maxContainerFailures,
- }
- err = framework.RunRC(config)
- if err != nil {
- return podNames, "", err
- }
- if len(createdPods) != replicas {
- return podNames, "", fmt.Errorf("Incorrect number of running pods: %v", len(createdPods))
- }
- for i := range createdPods {
- podNames[i] = createdPods[i].ObjectMeta.Name
- }
- sort.StringSlice(podNames).Sort()
- service, err := c.Services(ns).Get(name)
- if err != nil {
- return podNames, "", err
- }
- if service.Spec.ClusterIP == "" {
- return podNames, "", fmt.Errorf("Service IP is blank for %v", name)
- }
- serviceIP := service.Spec.ClusterIP
- return podNames, serviceIP, nil
- }
- func stopServeHostnameService(c *client.Client, ns, name string) error {
- if err := framework.DeleteRCAndPods(c, ns, name); err != nil {
- return err
- }
- if err := c.Services(ns).Delete(name); err != nil {
- return err
- }
- return nil
- }
- // verifyServeHostnameServiceUp wgets the given serviceIP:servicePort from the
- // given host and from within a pod. The host is expected to be an SSH-able node
- // in the cluster. Each pod in the service is expected to echo its name. These
- // names are compared with the given expectedPods list after a sort | uniq.
- func verifyServeHostnameServiceUp(c *client.Client, ns, host string, expectedPods []string, serviceIP string, servicePort int) error {
- execPodName := createExecPodOrFail(c, ns, "execpod-")
- defer func() {
- deletePodOrFail(c, ns, execPodName)
- }()
- // Loop a bunch of times - the proxy is randomized, so we want a good
- // chance of hitting each backend at least once.
- buildCommand := func(wget string) string {
- return fmt.Sprintf("for i in $(seq 1 %d); do %s http://%s:%d 2>&1 || true; echo; done",
- 50*len(expectedPods), wget, serviceIP, servicePort)
- }
- commands := []func() string{
- // verify service from node
- func() string {
- cmd := "set -e; " + buildCommand("wget -q --timeout=0.2 --tries=1 -O -")
- framework.Logf("Executing cmd %q on host %v", cmd, host)
- result, err := framework.SSH(cmd, host, framework.TestContext.Provider)
- if err != nil || result.Code != 0 {
- framework.LogSSHResult(result)
- framework.Logf("error while SSH-ing to node: %v", err)
- }
- return result.Stdout
- },
- // verify service from pod
- func() string {
- cmd := buildCommand("wget -q -T 1 -O -")
- framework.Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPodName)
- // TODO: Use exec-over-http via the netexec pod instead of kubectl exec.
- output, err := framework.RunHostCmd(ns, execPodName, cmd)
- if err != nil {
- framework.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPodName, err, output)
- }
- return output
- },
- }
- expectedEndpoints := sets.NewString(expectedPods...)
- By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods)))
- for _, cmdFunc := range commands {
- passed := false
- gotEndpoints := sets.NewString()
- // Retry cmdFunc for a while
- for start := time.Now(); time.Since(start) < kubeProxyLagTimeout; time.Sleep(5 * time.Second) {
- for _, endpoint := range strings.Split(cmdFunc(), "\n") {
- trimmedEp := strings.TrimSpace(endpoint)
- if trimmedEp != "" {
- gotEndpoints.Insert(trimmedEp)
- }
- }
- // TODO: simply checking that the retrieved endpoints is a superset
- // of the expected allows us to ignore intermitten network flakes that
- // result in output like "wget timed out", but these should be rare
- // and we need a better way to track how often it occurs.
- if gotEndpoints.IsSuperset(expectedEndpoints) {
- if !gotEndpoints.Equal(expectedEndpoints) {
- framework.Logf("Ignoring unexpected output wgetting endpoints of service %s: %v", serviceIP, gotEndpoints.Difference(expectedEndpoints))
- }
- passed = true
- break
- }
- framework.Logf("Unable to reach the following endpoints of service %s: %v", serviceIP, expectedEndpoints.Difference(gotEndpoints))
- }
- if !passed {
- // Sort the lists so they're easier to visually diff.
- exp := expectedEndpoints.List()
- got := gotEndpoints.List()
- sort.StringSlice(exp).Sort()
- sort.StringSlice(got).Sort()
- return fmt.Errorf("service verification failed for: %s\nexpected %v\nreceived %v", serviceIP, exp, got)
- }
- }
- return nil
- }
- func verifyServeHostnameServiceDown(c *client.Client, host string, serviceIP string, servicePort int) error {
- command := fmt.Sprintf(
- "curl -s --connect-timeout 2 http://%s:%d && exit 99", serviceIP, servicePort)
- for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
- result, err := framework.SSH(command, host, framework.TestContext.Provider)
- if err != nil {
- framework.LogSSHResult(result)
- framework.Logf("error while SSH-ing to node: %v", err)
- }
- if result.Code != 99 {
- return nil
- }
- framework.Logf("service still alive - still waiting")
- }
- return fmt.Errorf("waiting for service to be down timed out")
- }
- // Does an HTTP GET, but does not reuse TCP connections
- // This masks problems where the iptables rule has changed, but we don't see it
- // This is intended for relatively quick requests (status checks), so we set a short (5 seconds) timeout
- func httpGetNoConnectionPool(url string) (*http.Response, error) {
- tr := utilnet.SetTransportDefaults(&http.Transport{
- DisableKeepAlives: true,
- })
- client := &http.Client{
- Transport: tr,
- Timeout: 5 * time.Second,
- }
- return client.Get(url)
- }
- // A test jig to help testing.
- type ServiceTestJig struct {
- ID string
- Name string
- Client *client.Client
- Labels map[string]string
- }
- // NewServiceTestJig allocates and inits a new ServiceTestJig.
- func NewServiceTestJig(client *client.Client, name string) *ServiceTestJig {
- j := &ServiceTestJig{}
- j.Client = client
- j.Name = name
- j.ID = j.Name + "-" + string(uuid.NewUUID())
- j.Labels = map[string]string{"testid": j.ID}
- return j
- }
- // newServiceTemplate returns the default api.Service template for this jig, but
- // does not actually create the Service. The default Service has the same name
- // as the jig and exposes port 80.
- func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol) *api.Service {
- service := &api.Service{
- ObjectMeta: api.ObjectMeta{
- Namespace: namespace,
- Name: j.Name,
- Labels: j.Labels,
- },
- Spec: api.ServiceSpec{
- Selector: j.Labels,
- Ports: []api.ServicePort{
- {
- Protocol: proto,
- Port: 80,
- },
- },
- },
- }
- return service
- }
- // CreateTCPServiceOrFail creates a new TCP Service based on the jig's
- // defaults. Callers can provide a function to tweak the Service object before
- // it is created.
- func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc *api.Service)) *api.Service {
- svc := j.newServiceTemplate(namespace, api.ProtocolTCP)
- if tweak != nil {
- tweak(svc)
- }
- result, err := j.Client.Services(namespace).Create(svc)
- if err != nil {
- framework.Failf("Failed to create TCP Service %q: %v", svc.Name, err)
- }
- return result
- }
- // CreateUDPServiceOrFail creates a new UDP Service based on the jig's
- // defaults. Callers can provide a function to tweak the Service object before
- // it is created.
- func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc *api.Service)) *api.Service {
- svc := j.newServiceTemplate(namespace, api.ProtocolUDP)
- if tweak != nil {
- tweak(svc)
- }
- result, err := j.Client.Services(namespace).Create(svc)
- if err != nil {
- framework.Failf("Failed to create UDP Service %q: %v", svc.Name, err)
- }
- return result
- }
- func (j *ServiceTestJig) SanityCheckService(svc *api.Service, svcType api.ServiceType) {
- if svc.Spec.Type != svcType {
- framework.Failf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
- }
- expectNodePorts := false
- if svcType != api.ServiceTypeClusterIP {
- expectNodePorts = true
- }
- for i, port := range svc.Spec.Ports {
- hasNodePort := (port.NodePort != 0)
- if hasNodePort != expectNodePorts {
- framework.Failf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort)
- }
- if hasNodePort {
- if !ServiceNodePortRange.Contains(int(port.NodePort)) {
- framework.Failf("out-of-range nodePort (%d) for service", port.NodePort)
- }
- }
- }
- expectIngress := false
- if svcType == api.ServiceTypeLoadBalancer {
- expectIngress = true
- }
- hasIngress := len(svc.Status.LoadBalancer.Ingress) != 0
- if hasIngress != expectIngress {
- framework.Failf("unexpected number of Status.LoadBalancer.Ingress (%d) for service", len(svc.Status.LoadBalancer.Ingress))
- }
- if hasIngress {
- for i, ing := range svc.Status.LoadBalancer.Ingress {
- if ing.IP == "" && ing.Hostname == "" {
- framework.Failf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing)
- }
- }
- }
- }
- // UpdateService fetches a service, calls the update function on it, and
- // then attempts to send the updated service. It tries up to 3 times in the
- // face of timeouts and conflicts.
- func (j *ServiceTestJig) UpdateService(namespace, name string, update func(*api.Service)) (*api.Service, error) {
- for i := 0; i < 3; i++ {
- service, err := j.Client.Services(namespace).Get(name)
- if err != nil {
- return nil, fmt.Errorf("Failed to get Service %q: %v", name, err)
- }
- update(service)
- service, err = j.Client.Services(namespace).Update(service)
- if err == nil {
- return service, nil
- }
- if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
- return nil, fmt.Errorf("Failed to update Service %q: %v", name, err)
- }
- }
- return nil, fmt.Errorf("Too many retries updating Service %q", name)
- }
- // UpdateServiceOrFail fetches a service, calls the update function on it, and
- // then attempts to send the updated service. It tries up to 3 times in the
- // face of timeouts and conflicts.
- func (j *ServiceTestJig) UpdateServiceOrFail(namespace, name string, update func(*api.Service)) *api.Service {
- svc, err := j.UpdateService(namespace, name, update)
- if err != nil {
- framework.Failf(err.Error())
- }
- return svc
- }
- func (j *ServiceTestJig) ChangeServiceNodePortOrFail(namespace, name string, initial int) *api.Service {
- var err error
- var service *api.Service
- for i := 1; i < ServiceNodePortRange.Size; i++ {
- offs1 := initial - ServiceNodePortRange.Base
- offs2 := (offs1 + i) % ServiceNodePortRange.Size
- newPort := ServiceNodePortRange.Base + offs2
- service, err = j.UpdateService(namespace, name, func(s *api.Service) {
- s.Spec.Ports[0].NodePort = int32(newPort)
- })
- if err != nil && strings.Contains(err.Error(), "provided port is already allocated") {
- framework.Logf("tried nodePort %d, but it is in use, will try another", newPort)
- continue
- }
- // Otherwise err was nil or err was a real error
- break
- }
- if err != nil {
- framework.Failf("Could not change the nodePort: %v", err)
- }
- return service
- }
- func (j *ServiceTestJig) WaitForLoadBalancerOrFail(namespace, name string, timeout time.Duration) *api.Service {
- var service *api.Service
- framework.Logf("Waiting up to %v for service %q to have a LoadBalancer", timeout, name)
- pollFunc := func() (bool, error) {
- svc, err := j.Client.Services(namespace).Get(name)
- if err != nil {
- return false, err
- }
- if len(svc.Status.LoadBalancer.Ingress) > 0 {
- service = svc
- return true, nil
- }
- return false, nil
- }
- if err := wait.PollImmediate(framework.Poll, timeout, pollFunc); err != nil {
- framework.Failf("Timeout waiting for service %q to have a load balancer", name)
- }
- return service
- }
- func (j *ServiceTestJig) WaitForLoadBalancerDestroyOrFail(namespace, name string, ip string, port int, timeout time.Duration) *api.Service {
- // TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
- defer func() {
- if err := framework.EnsureLoadBalancerResourcesDeleted(ip, strconv.Itoa(port)); err != nil {
- framework.Logf("Failed to delete cloud resources for service: %s %d (%v)", ip, port, err)
- }
- }()
- var service *api.Service
- framework.Logf("Waiting up to %v for service %q to have no LoadBalancer", timeout, name)
- pollFunc := func() (bool, error) {
- svc, err := j.Client.Services(namespace).Get(name)
- if err != nil {
- return false, err
- }
- if len(svc.Status.LoadBalancer.Ingress) == 0 {
- service = svc
- return true, nil
- }
- return false, nil
- }
- if err := wait.PollImmediate(framework.Poll, timeout, pollFunc); err != nil {
- framework.Failf("Timeout waiting for service %q to have no load balancer", name)
- }
- return service
- }
- func (j *ServiceTestJig) TestReachableHTTP(host string, port int, timeout time.Duration) {
- if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testReachableHTTP(host, port, "/echo?msg=hello", "hello") }); err != nil {
- framework.Failf("Could not reach HTTP service through %v:%v after %v: %v", host, port, timeout, err)
- }
- }
- func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout time.Duration) {
- if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testNotReachableHTTP(host, port) }); err != nil {
- framework.Failf("Could still reach HTTP service through %v:%v after %v: %v", host, port, timeout, err)
- }
- }
- func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) {
- if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testReachableUDP(host, port, "echo hello", "hello") }); err != nil {
- framework.Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
- }
- }
- func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time.Duration) {
- if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testNotReachableUDP(host, port, "echo hello") }); err != nil {
- framework.Failf("Could still reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
- }
- }
- func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
- var body bytes.Buffer
- if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testReachableHTTPWithContent(host, port, url, "", &body) }); err != nil {
- framework.Failf("Could not reach HTTP service through %v:%v/%v after %v: %v", host, port, url, timeout, err)
- return body
- }
- return body
- }
- func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, request string, expectedSuccess bool) {
- success, err := testHTTPHealthCheckNodePort(host, port, request)
- if expectedSuccess && success {
- framework.Logf("HealthCheck successful for node %v:%v, as expected", host, port)
- return
- } else if !expectedSuccess && (!success || err != nil) {
- framework.Logf("HealthCheck failed for node %v:%v, as expected", host, port)
- return
- } else if expectedSuccess {
- framework.Failf("HealthCheck NodePort incorrectly reporting unhealthy on %v:%v: %v", host, port, err)
- }
- framework.Failf("Unexpected HealthCheck NodePort still reporting healthy %v:%v: %v", host, port, err)
- }
- func getIngressPoint(ing *api.LoadBalancerIngress) string {
- host := ing.IP
- if host == "" {
- host = ing.Hostname
- }
- return host
- }
- // newRCTemplate returns the default api.ReplicationController object for
- // this jig, but does not actually create the RC. The default RC has the same
- // name as the jig and runs the "netexec" container.
- func (j *ServiceTestJig) newRCTemplate(namespace string) *api.ReplicationController {
- rc := &api.ReplicationController{
- ObjectMeta: api.ObjectMeta{
- Namespace: namespace,
- Name: j.Name,
- Labels: j.Labels,
- },
- Spec: api.ReplicationControllerSpec{
- Replicas: 1,
- Selector: j.Labels,
- Template: &api.PodTemplateSpec{
- ObjectMeta: api.ObjectMeta{
- Labels: j.Labels,
- },
- Spec: api.PodSpec{
- Containers: []api.Container{
- {
- Name: "netexec",
- Image: "gcr.io/google_containers/netexec:1.6",
- Args: []string{"--http-port=80", "--udp-port=80"},
- ReadinessProbe: &api.Probe{
- PeriodSeconds: 3,
- Handler: api.Handler{
- HTTPGet: &api.HTTPGetAction{
- Port: intstr.FromInt(80),
- Path: "/hostName",
- },
- },
- },
- },
- },
- TerminationGracePeriodSeconds: new(int64),
- },
- },
- },
- }
- return rc
- }
- // RunOrFail creates a ReplicationController and Pod(s) and waits for the
- // Pod(s) to be running. Callers can provide a function to tweak the RC object
- // before it is created.
- func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *api.ReplicationController)) *api.ReplicationController {
- rc := j.newRCTemplate(namespace)
- if tweak != nil {
- tweak(rc)
- }
- result, err := j.Client.ReplicationControllers(namespace).Create(rc)
- if err != nil {
- framework.Failf("Failed to created RC %q: %v", rc.Name, err)
- }
- pods, err := j.waitForPodsCreated(namespace, int(rc.Spec.Replicas))
- if err != nil {
- framework.Failf("Failed to create pods: %v", err)
- }
- if err := j.waitForPodsReady(namespace, pods); err != nil {
- framework.Failf("Failed waiting for pods to be running: %v", err)
- }
- return result
- }
- func (j *ServiceTestJig) waitForPodsCreated(namespace string, replicas int) ([]string, error) {
- timeout := 2 * time.Minute
- // List the pods, making sure we observe all the replicas.
- label := labels.SelectorFromSet(labels.Set(j.Labels))
- framework.Logf("Waiting up to %v for %d pods to be created", timeout, replicas)
- for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
- options := api.ListOptions{LabelSelector: label}
- pods, err := j.Client.Pods(namespace).List(options)
- if err != nil {
- return nil, err
- }
- found := []string{}
- for _, pod := range pods.Items {
- if pod.DeletionTimestamp != nil {
- continue
- }
- found = append(found, pod.Name)
- }
- if len(found) == replicas {
- framework.Logf("Found all %d pods", replicas)
- return found, nil
- }
- framework.Logf("Found %d/%d pods - will retry", len(found), replicas)
- }
- return nil, fmt.Errorf("Timeout waiting for %d pods to be created", replicas)
- }
- func (j *ServiceTestJig) waitForPodsReady(namespace string, pods []string) error {
- timeout := 2 * time.Minute
- if !framework.CheckPodsRunningReady(j.Client, namespace, pods, timeout) {
- return fmt.Errorf("Timeout waiting for %d pods to be ready", len(pods))
- }
- return nil
- }
- // Simple helper class to avoid too much boilerplate in tests
- type ServiceTestFixture struct {
- ServiceName string
- Namespace string
- Client *client.Client
- TestId string
- Labels map[string]string
- rcs map[string]bool
- services map[string]bool
- name string
- image string
- }
- func NewServerTest(client *client.Client, namespace string, serviceName string) *ServiceTestFixture {
- t := &ServiceTestFixture{}
- t.Client = client
- t.Namespace = namespace
- t.ServiceName = serviceName
- t.TestId = t.ServiceName + "-" + string(uuid.NewUUID())
- t.Labels = map[string]string{
- "testid": t.TestId,
- }
- t.rcs = make(map[string]bool)
- t.services = make(map[string]bool)
- t.name = "webserver"
- t.image = "gcr.io/google_containers/test-webserver:e2e"
- return t
- }
- // Build default config for a service (which can then be changed)
- func (t *ServiceTestFixture) BuildServiceSpec() *api.Service {
- service := &api.Service{
- ObjectMeta: api.ObjectMeta{
- Name: t.ServiceName,
- Namespace: t.Namespace,
- },
- Spec: api.ServiceSpec{
- Selector: t.Labels,
- Ports: []api.ServicePort{{
- Port: 80,
- TargetPort: intstr.FromInt(80),
- }},
- },
- }
- return service
- }
- // CreateWebserverRC creates rc-backed pods with the well-known webserver
- // configuration and records it for cleanup.
- func (t *ServiceTestFixture) CreateWebserverRC(replicas int32) *api.ReplicationController {
- rcSpec := rcByNamePort(t.name, replicas, t.image, 80, api.ProtocolTCP, t.Labels)
- rcAct, err := t.createRC(rcSpec)
- if err != nil {
- framework.Failf("Failed to create rc %s: %v", rcSpec.Name, err)
- }
- if err := framework.VerifyPods(t.Client, t.Namespace, t.name, false, replicas); err != nil {
- framework.Failf("Failed to create %d pods with name %s: %v", replicas, t.name, err)
- }
- return rcAct
- }
- // createRC creates a replication controller and records it for cleanup.
- func (t *ServiceTestFixture) createRC(rc *api.ReplicationController) (*api.ReplicationController, error) {
- rc, err := t.Client.ReplicationControllers(t.Namespace).Create(rc)
- if err == nil {
- t.rcs[rc.Name] = true
- }
- return rc, err
- }
- // Create a service, and record it for cleanup
- func (t *ServiceTestFixture) CreateService(service *api.Service) (*api.Service, error) {
- result, err := t.Client.Services(t.Namespace).Create(service)
- if err == nil {
- t.services[service.Name] = true
- }
- return result, err
- }
- // Delete a service, and remove it from the cleanup list
- func (t *ServiceTestFixture) DeleteService(serviceName string) error {
- err := t.Client.Services(t.Namespace).Delete(serviceName)
- if err == nil {
- delete(t.services, serviceName)
- }
- return err
- }
- func (t *ServiceTestFixture) Cleanup() []error {
- var errs []error
- for rcName := range t.rcs {
- By("stopping RC " + rcName + " in namespace " + t.Namespace)
- // First, resize the RC to 0.
- old, err := t.Client.ReplicationControllers(t.Namespace).Get(rcName)
- if err != nil {
- errs = append(errs, err)
- }
- old.Spec.Replicas = 0
- if _, err := t.Client.ReplicationControllers(t.Namespace).Update(old); err != nil {
- errs = append(errs, err)
- }
- // TODO(mikedanese): Wait.
- // Then, delete the RC altogether.
- if err := t.Client.ReplicationControllers(t.Namespace).Delete(rcName, nil); err != nil {
- errs = append(errs, err)
- }
- }
- for serviceName := range t.services {
- By("deleting service " + serviceName + " in namespace " + t.Namespace)
- err := t.Client.Services(t.Namespace).Delete(serviceName)
- if err != nil {
- errs = append(errs, err)
- }
- }
- return errs
- }
|