service.go 77 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package e2e
  14. import (
  15. "bytes"
  16. "fmt"
  17. "io/ioutil"
  18. "math/rand"
  19. "net"
  20. "net/http"
  21. "sort"
  22. "strconv"
  23. "strings"
  24. "time"
  25. . "github.com/onsi/ginkgo"
  26. . "github.com/onsi/gomega"
  27. "k8s.io/kubernetes/pkg/api"
  28. "k8s.io/kubernetes/pkg/api/errors"
  29. "k8s.io/kubernetes/pkg/api/service"
  30. client "k8s.io/kubernetes/pkg/client/unversioned"
  31. "k8s.io/kubernetes/pkg/controller/endpoint"
  32. "k8s.io/kubernetes/pkg/labels"
  33. "k8s.io/kubernetes/pkg/types"
  34. "k8s.io/kubernetes/pkg/util/intstr"
  35. utilnet "k8s.io/kubernetes/pkg/util/net"
  36. "k8s.io/kubernetes/pkg/util/sets"
  37. "k8s.io/kubernetes/pkg/util/uuid"
  38. "k8s.io/kubernetes/pkg/util/wait"
  39. "k8s.io/kubernetes/test/e2e/framework"
  40. )
  41. const (
  42. // Maximum time a kube-proxy daemon on a node is allowed to not
  43. // notice a Service update, such as type=NodePort.
  44. // TODO: This timeout should be O(10s), observed values are O(1m), 5m is very
  45. // liberal. Fix tracked in #20567.
  46. kubeProxyLagTimeout = 5 * time.Minute
  47. // Maximum time a load balancer is allowed to not respond after creation.
  48. loadBalancerLagTimeoutDefault = 2 * time.Minute
  49. // On AWS there is a delay between ELB creation and serving traffic;
  50. // a few minutes is typical, so use 10m.
  51. loadBalancerLagTimeoutAWS = 10 * time.Minute
  52. // How long to wait for a load balancer to be created/modified.
  53. //TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
  54. loadBalancerCreateTimeoutDefault = 20 * time.Minute
  55. loadBalancerCreateTimeoutLarge = time.Hour
  56. )
  57. // This should match whatever the default/configured range is
  58. var ServiceNodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
  59. var _ = framework.KubeDescribe("Services", func() {
  60. f := framework.NewDefaultFramework("services")
  61. var c *client.Client
  62. BeforeEach(func() {
  63. c = f.Client
  64. })
  65. // TODO: We get coverage of TCP/UDP and multi-port services through the DNS test. We should have a simpler test for multi-port TCP here.
  66. It("should provide secure master service [Conformance]", func() {
  67. _, err := c.Services(api.NamespaceDefault).Get("kubernetes")
  68. Expect(err).NotTo(HaveOccurred())
  69. })
  70. It("should serve a basic endpoint from pods [Conformance]", func() {
  71. // TODO: use the ServiceTestJig here
  72. serviceName := "endpoint-test2"
  73. ns := f.Namespace.Name
  74. labels := map[string]string{
  75. "foo": "bar",
  76. "baz": "blah",
  77. }
  78. By("creating service " + serviceName + " in namespace " + ns)
  79. defer func() {
  80. err := c.Services(ns).Delete(serviceName)
  81. Expect(err).NotTo(HaveOccurred())
  82. }()
  83. service := &api.Service{
  84. ObjectMeta: api.ObjectMeta{
  85. Name: serviceName,
  86. },
  87. Spec: api.ServiceSpec{
  88. Selector: labels,
  89. Ports: []api.ServicePort{{
  90. Port: 80,
  91. TargetPort: intstr.FromInt(80),
  92. }},
  93. },
  94. }
  95. _, err := c.Services(ns).Create(service)
  96. Expect(err).NotTo(HaveOccurred())
  97. validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
  98. names := map[string]bool{}
  99. defer func() {
  100. for name := range names {
  101. err := c.Pods(ns).Delete(name, nil)
  102. Expect(err).NotTo(HaveOccurred())
  103. }
  104. }()
  105. name1 := "pod1"
  106. name2 := "pod2"
  107. createPodOrFail(c, ns, name1, labels, []api.ContainerPort{{ContainerPort: 80}})
  108. names[name1] = true
  109. validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name1: {80}})
  110. createPodOrFail(c, ns, name2, labels, []api.ContainerPort{{ContainerPort: 80}})
  111. names[name2] = true
  112. validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name1: {80}, name2: {80}})
  113. deletePodOrFail(c, ns, name1)
  114. delete(names, name1)
  115. validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name2: {80}})
  116. deletePodOrFail(c, ns, name2)
  117. delete(names, name2)
  118. validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
  119. })
  120. It("should serve multiport endpoints from pods [Conformance]", func() {
  121. // TODO: use the ServiceTestJig here
  122. // repacking functionality is intentionally not tested here - it's better to test it in an integration test.
  123. serviceName := "multi-endpoint-test"
  124. ns := f.Namespace.Name
  125. defer func() {
  126. err := c.Services(ns).Delete(serviceName)
  127. Expect(err).NotTo(HaveOccurred())
  128. }()
  129. labels := map[string]string{"foo": "bar"}
  130. svc1port := "svc1"
  131. svc2port := "svc2"
  132. By("creating service " + serviceName + " in namespace " + ns)
  133. service := &api.Service{
  134. ObjectMeta: api.ObjectMeta{
  135. Name: serviceName,
  136. },
  137. Spec: api.ServiceSpec{
  138. Selector: labels,
  139. Ports: []api.ServicePort{
  140. {
  141. Name: "portname1",
  142. Port: 80,
  143. TargetPort: intstr.FromString(svc1port),
  144. },
  145. {
  146. Name: "portname2",
  147. Port: 81,
  148. TargetPort: intstr.FromString(svc2port),
  149. },
  150. },
  151. },
  152. }
  153. _, err := c.Services(ns).Create(service)
  154. Expect(err).NotTo(HaveOccurred())
  155. port1 := 100
  156. port2 := 101
  157. validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
  158. names := map[string]bool{}
  159. defer func() {
  160. for name := range names {
  161. err := c.Pods(ns).Delete(name, nil)
  162. Expect(err).NotTo(HaveOccurred())
  163. }
  164. }()
  165. containerPorts1 := []api.ContainerPort{
  166. {
  167. Name: svc1port,
  168. ContainerPort: int32(port1),
  169. },
  170. }
  171. containerPorts2 := []api.ContainerPort{
  172. {
  173. Name: svc2port,
  174. ContainerPort: int32(port2),
  175. },
  176. }
  177. podname1 := "pod1"
  178. podname2 := "pod2"
  179. createPodOrFail(c, ns, podname1, labels, containerPorts1)
  180. names[podname1] = true
  181. validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname1: {port1}})
  182. createPodOrFail(c, ns, podname2, labels, containerPorts2)
  183. names[podname2] = true
  184. validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname1: {port1}, podname2: {port2}})
  185. deletePodOrFail(c, ns, podname1)
  186. delete(names, podname1)
  187. validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname2: {port2}})
  188. deletePodOrFail(c, ns, podname2)
  189. delete(names, podname2)
  190. validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
  191. })
  192. It("should be able to up and down services", func() {
  193. // TODO: use the ServiceTestJig here
  194. // this test uses framework.NodeSSHHosts that does not work if a Node only reports LegacyHostIP
  195. framework.SkipUnlessProviderIs(framework.ProvidersWithSSH...)
  196. ns := f.Namespace.Name
  197. numPods, servicePort := 3, 80
  198. By("creating service1 in namespace " + ns)
  199. podNames1, svc1IP, err := startServeHostnameService(c, ns, "service1", servicePort, numPods)
  200. Expect(err).NotTo(HaveOccurred())
  201. By("creating service2 in namespace " + ns)
  202. podNames2, svc2IP, err := startServeHostnameService(c, ns, "service2", servicePort, numPods)
  203. Expect(err).NotTo(HaveOccurred())
  204. hosts, err := framework.NodeSSHHosts(c)
  205. Expect(err).NotTo(HaveOccurred())
  206. if len(hosts) == 0 {
  207. framework.Failf("No ssh-able nodes")
  208. }
  209. host := hosts[0]
  210. By("verifying service1 is up")
  211. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
  212. By("verifying service2 is up")
  213. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
  214. // Stop service 1 and make sure it is gone.
  215. By("stopping service1")
  216. framework.ExpectNoError(stopServeHostnameService(c, ns, "service1"))
  217. By("verifying service1 is not up")
  218. framework.ExpectNoError(verifyServeHostnameServiceDown(c, host, svc1IP, servicePort))
  219. By("verifying service2 is still up")
  220. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
  221. // Start another service and verify both are up.
  222. By("creating service3 in namespace " + ns)
  223. podNames3, svc3IP, err := startServeHostnameService(c, ns, "service3", servicePort, numPods)
  224. Expect(err).NotTo(HaveOccurred())
  225. if svc2IP == svc3IP {
  226. framework.Failf("service IPs conflict: %v", svc2IP)
  227. }
  228. By("verifying service2 is still up")
  229. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
  230. By("verifying service3 is up")
  231. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames3, svc3IP, servicePort))
  232. })
  233. It("should work after restarting kube-proxy [Disruptive]", func() {
  234. // TODO: use the ServiceTestJig here
  235. framework.SkipUnlessProviderIs("gce", "gke")
  236. ns := f.Namespace.Name
  237. numPods, servicePort := 3, 80
  238. svc1 := "service1"
  239. svc2 := "service2"
  240. defer func() { framework.ExpectNoError(stopServeHostnameService(c, ns, svc1)) }()
  241. podNames1, svc1IP, err := startServeHostnameService(c, ns, svc1, servicePort, numPods)
  242. Expect(err).NotTo(HaveOccurred())
  243. defer func() { framework.ExpectNoError(stopServeHostnameService(c, ns, svc2)) }()
  244. podNames2, svc2IP, err := startServeHostnameService(c, ns, svc2, servicePort, numPods)
  245. Expect(err).NotTo(HaveOccurred())
  246. if svc1IP == svc2IP {
  247. framework.Failf("VIPs conflict: %v", svc1IP)
  248. }
  249. hosts, err := framework.NodeSSHHosts(c)
  250. Expect(err).NotTo(HaveOccurred())
  251. if len(hosts) == 0 {
  252. framework.Failf("No ssh-able nodes")
  253. }
  254. host := hosts[0]
  255. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
  256. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
  257. By("Restarting kube-proxy")
  258. if err := framework.RestartKubeProxy(host); err != nil {
  259. framework.Failf("error restarting kube-proxy: %v", err)
  260. }
  261. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
  262. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
  263. By("Removing iptable rules")
  264. result, err := framework.SSH(`
  265. sudo iptables -t nat -F KUBE-SERVICES || true;
  266. sudo iptables -t nat -F KUBE-PORTALS-HOST || true;
  267. sudo iptables -t nat -F KUBE-PORTALS-CONTAINER || true`, host, framework.TestContext.Provider)
  268. if err != nil || result.Code != 0 {
  269. framework.LogSSHResult(result)
  270. framework.Failf("couldn't remove iptable rules: %v", err)
  271. }
  272. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
  273. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
  274. })
  275. It("should work after restarting apiserver [Disruptive]", func() {
  276. // TODO: use the ServiceTestJig here
  277. framework.SkipUnlessProviderIs("gce", "gke")
  278. ns := f.Namespace.Name
  279. numPods, servicePort := 3, 80
  280. defer func() { framework.ExpectNoError(stopServeHostnameService(c, ns, "service1")) }()
  281. podNames1, svc1IP, err := startServeHostnameService(c, ns, "service1", servicePort, numPods)
  282. Expect(err).NotTo(HaveOccurred())
  283. hosts, err := framework.NodeSSHHosts(c)
  284. Expect(err).NotTo(HaveOccurred())
  285. if len(hosts) == 0 {
  286. framework.Failf("No ssh-able nodes")
  287. }
  288. host := hosts[0]
  289. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
  290. // Restart apiserver
  291. if err := framework.RestartApiserver(c); err != nil {
  292. framework.Failf("error restarting apiserver: %v", err)
  293. }
  294. if err := framework.WaitForApiserverUp(c); err != nil {
  295. framework.Failf("error while waiting for apiserver up: %v", err)
  296. }
  297. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
  298. // Create a new service and check if it's not reusing IP.
  299. defer func() { framework.ExpectNoError(stopServeHostnameService(c, ns, "service2")) }()
  300. podNames2, svc2IP, err := startServeHostnameService(c, ns, "service2", servicePort, numPods)
  301. Expect(err).NotTo(HaveOccurred())
  302. if svc1IP == svc2IP {
  303. framework.Failf("VIPs conflict: %v", svc1IP)
  304. }
  305. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames1, svc1IP, servicePort))
  306. framework.ExpectNoError(verifyServeHostnameServiceUp(c, ns, host, podNames2, svc2IP, servicePort))
  307. })
  308. // TODO: Run this test against the userspace proxy and nodes
  309. // configured with a default deny firewall to validate that the
  310. // proxy whitelists NodePort traffic.
  311. It("should be able to create a functioning NodePort service", func() {
  312. serviceName := "nodeport-test"
  313. ns := f.Namespace.Name
  314. jig := NewServiceTestJig(c, serviceName)
  315. nodeIP := pickNodeIP(jig.Client) // for later
  316. By("creating service " + serviceName + " with type=NodePort in namespace " + ns)
  317. service := jig.CreateTCPServiceOrFail(ns, func(svc *api.Service) {
  318. svc.Spec.Type = api.ServiceTypeNodePort
  319. })
  320. jig.SanityCheckService(service, api.ServiceTypeNodePort)
  321. nodePort := int(service.Spec.Ports[0].NodePort)
  322. By("creating pod to be part of service " + serviceName)
  323. jig.RunOrFail(ns, nil)
  324. By("hitting the pod through the service's NodePort")
  325. jig.TestReachableHTTP(nodeIP, nodePort, kubeProxyLagTimeout)
  326. By("verifying the node port is locked")
  327. hostExec := framework.LaunchHostExecPod(f.Client, f.Namespace.Name, "hostexec")
  328. // Even if the node-ip:node-port check above passed, this hostexec pod
  329. // might fall on a node with a laggy kube-proxy.
  330. cmd := fmt.Sprintf(`for i in $(seq 1 300); do if ss -ant46 'sport = :%d' | grep ^LISTEN; then exit 0; fi; sleep 1; done; exit 1`, nodePort)
  331. stdout, err := framework.RunHostCmd(hostExec.Namespace, hostExec.Name, cmd)
  332. if err != nil {
  333. framework.Failf("expected node port %d to be in use, stdout: %v. err: %v", nodePort, stdout, err)
  334. }
  335. })
  336. It("should be able to change the type and ports of a service [Slow]", func() {
  337. // requires cloud load-balancer support
  338. framework.SkipUnlessProviderIs("gce", "gke", "aws")
  339. loadBalancerSupportsUDP := !framework.ProviderIs("aws")
  340. loadBalancerLagTimeout := loadBalancerLagTimeoutDefault
  341. if framework.ProviderIs("aws") {
  342. loadBalancerLagTimeout = loadBalancerLagTimeoutAWS
  343. }
  344. loadBalancerCreateTimeout := loadBalancerCreateTimeoutDefault
  345. largeClusterMinNodesNumber := 100
  346. if nodes := framework.GetReadySchedulableNodesOrDie(c); len(nodes.Items) > largeClusterMinNodesNumber {
  347. loadBalancerCreateTimeout = loadBalancerCreateTimeoutLarge
  348. }
  349. // This test is more monolithic than we'd like because LB turnup can be
  350. // very slow, so we lumped all the tests into one LB lifecycle.
  351. serviceName := "mutability-test"
  352. ns1 := f.Namespace.Name // LB1 in ns1 on TCP
  353. framework.Logf("namespace for TCP test: %s", ns1)
  354. By("creating a second namespace")
  355. namespacePtr, err := f.CreateNamespace("services", nil)
  356. Expect(err).NotTo(HaveOccurred())
  357. ns2 := namespacePtr.Name // LB2 in ns2 on UDP
  358. framework.Logf("namespace for UDP test: %s", ns2)
  359. jig := NewServiceTestJig(c, serviceName)
  360. nodeIP := pickNodeIP(jig.Client) // for later
  361. // Test TCP and UDP Services. Services with the same name in different
  362. // namespaces should get different node ports and load balancers.
  363. By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns1)
  364. tcpService := jig.CreateTCPServiceOrFail(ns1, nil)
  365. jig.SanityCheckService(tcpService, api.ServiceTypeClusterIP)
  366. By("creating a UDP service " + serviceName + " with type=ClusterIP in namespace " + ns2)
  367. udpService := jig.CreateUDPServiceOrFail(ns2, nil)
  368. jig.SanityCheckService(udpService, api.ServiceTypeClusterIP)
  369. By("verifying that TCP and UDP use the same port")
  370. if tcpService.Spec.Ports[0].Port != udpService.Spec.Ports[0].Port {
  371. framework.Failf("expected to use the same port for TCP and UDP")
  372. }
  373. svcPort := int(tcpService.Spec.Ports[0].Port)
  374. framework.Logf("service port (TCP and UDP): %d", svcPort)
  375. By("creating a pod to be part of the TCP service " + serviceName)
  376. jig.RunOrFail(ns1, nil)
  377. By("creating a pod to be part of the UDP service " + serviceName)
  378. jig.RunOrFail(ns2, nil)
  379. // Change the services to NodePort.
  380. By("changing the TCP service to type=NodePort")
  381. tcpService = jig.UpdateServiceOrFail(ns1, tcpService.Name, func(s *api.Service) {
  382. s.Spec.Type = api.ServiceTypeNodePort
  383. })
  384. jig.SanityCheckService(tcpService, api.ServiceTypeNodePort)
  385. tcpNodePort := int(tcpService.Spec.Ports[0].NodePort)
  386. framework.Logf("TCP node port: %d", tcpNodePort)
  387. By("changing the UDP service to type=NodePort")
  388. udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) {
  389. s.Spec.Type = api.ServiceTypeNodePort
  390. })
  391. jig.SanityCheckService(udpService, api.ServiceTypeNodePort)
  392. udpNodePort := int(udpService.Spec.Ports[0].NodePort)
  393. framework.Logf("UDP node port: %d", udpNodePort)
  394. By("hitting the TCP service's NodePort")
  395. jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
  396. By("hitting the UDP service's NodePort")
  397. jig.TestReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout)
  398. // Change the services to LoadBalancer.
  399. // Here we test that LoadBalancers can receive static IP addresses. This isn't
  400. // necessary, but is an additional feature this monolithic test checks.
  401. requestedIP := ""
  402. staticIPName := ""
  403. if framework.ProviderIs("gce", "gke") {
  404. By("creating a static load balancer IP")
  405. staticIPName = fmt.Sprintf("e2e-external-lb-test-%s", framework.RunId)
  406. requestedIP, err = createGCEStaticIP(staticIPName)
  407. Expect(err).NotTo(HaveOccurred())
  408. defer func() {
  409. if staticIPName != "" {
  410. // Release GCE static IP - this is not kube-managed and will not be automatically released.
  411. if err := deleteGCEStaticIP(staticIPName); err != nil {
  412. framework.Logf("failed to release static IP %s: %v", staticIPName, err)
  413. }
  414. }
  415. }()
  416. framework.Logf("Allocated static load balancer IP: %s", requestedIP)
  417. }
  418. By("changing the TCP service to type=LoadBalancer")
  419. tcpService = jig.UpdateServiceOrFail(ns1, tcpService.Name, func(s *api.Service) {
  420. s.Spec.LoadBalancerIP = requestedIP // will be "" if not applicable
  421. s.Spec.Type = api.ServiceTypeLoadBalancer
  422. })
  423. if loadBalancerSupportsUDP {
  424. By("changing the UDP service to type=LoadBalancer")
  425. udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) {
  426. s.Spec.Type = api.ServiceTypeLoadBalancer
  427. })
  428. }
  429. By("waiting for the TCP service to have a load balancer")
  430. // Wait for the load balancer to be created asynchronously
  431. tcpService = jig.WaitForLoadBalancerOrFail(ns1, tcpService.Name, loadBalancerCreateTimeout)
  432. jig.SanityCheckService(tcpService, api.ServiceTypeLoadBalancer)
  433. if int(tcpService.Spec.Ports[0].NodePort) != tcpNodePort {
  434. framework.Failf("TCP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", tcpNodePort, tcpService.Spec.Ports[0].NodePort)
  435. }
  436. if requestedIP != "" && getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != requestedIP {
  437. framework.Failf("unexpected TCP Status.LoadBalancer.Ingress (expected %s, got %s)", requestedIP, getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
  438. }
  439. tcpIngressIP := getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0])
  440. framework.Logf("TCP load balancer: %s", tcpIngressIP)
  441. if framework.ProviderIs("gce", "gke") {
  442. // Do this as early as possible, which overrides the `defer` above.
  443. // This is mostly out of fear of leaking the IP in a timeout case
  444. // (as of this writing we're not 100% sure where the leaks are
  445. // coming from, so this is first-aid rather than surgery).
  446. By("demoting the static IP to ephemeral")
  447. if staticIPName != "" {
  448. // Deleting it after it is attached "demotes" it to an
  449. // ephemeral IP, which can be auto-released.
  450. if err := deleteGCEStaticIP(staticIPName); err != nil {
  451. framework.Failf("failed to release static IP %s: %v", staticIPName, err)
  452. }
  453. staticIPName = ""
  454. }
  455. }
  456. var udpIngressIP string
  457. if loadBalancerSupportsUDP {
  458. By("waiting for the UDP service to have a load balancer")
  459. // 2nd one should be faster since they ran in parallel.
  460. udpService = jig.WaitForLoadBalancerOrFail(ns2, udpService.Name, loadBalancerCreateTimeout)
  461. jig.SanityCheckService(udpService, api.ServiceTypeLoadBalancer)
  462. if int(udpService.Spec.Ports[0].NodePort) != udpNodePort {
  463. framework.Failf("UDP Spec.Ports[0].NodePort changed (%d -> %d) when not expected", udpNodePort, udpService.Spec.Ports[0].NodePort)
  464. }
  465. udpIngressIP = getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0])
  466. framework.Logf("UDP load balancer: %s", udpIngressIP)
  467. By("verifying that TCP and UDP use different load balancers")
  468. if tcpIngressIP == udpIngressIP {
  469. framework.Failf("Load balancers are not different: %s", getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
  470. }
  471. }
  472. By("hitting the TCP service's NodePort")
  473. jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
  474. By("hitting the UDP service's NodePort")
  475. jig.TestReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout)
  476. By("hitting the TCP service's LoadBalancer")
  477. jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout)
  478. if loadBalancerSupportsUDP {
  479. By("hitting the UDP service's LoadBalancer")
  480. jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout)
  481. }
  482. // Change the services' node ports.
  483. By("changing the TCP service's NodePort")
  484. tcpService = jig.ChangeServiceNodePortOrFail(ns1, tcpService.Name, tcpNodePort)
  485. jig.SanityCheckService(tcpService, api.ServiceTypeLoadBalancer)
  486. tcpNodePortOld := tcpNodePort
  487. tcpNodePort = int(tcpService.Spec.Ports[0].NodePort)
  488. if tcpNodePort == tcpNodePortOld {
  489. framework.Failf("TCP Spec.Ports[0].NodePort (%d) did not change", tcpNodePort)
  490. }
  491. if getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP {
  492. framework.Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
  493. }
  494. framework.Logf("TCP node port: %d", tcpNodePort)
  495. By("changing the UDP service's NodePort")
  496. udpService = jig.ChangeServiceNodePortOrFail(ns2, udpService.Name, udpNodePort)
  497. if loadBalancerSupportsUDP {
  498. jig.SanityCheckService(udpService, api.ServiceTypeLoadBalancer)
  499. } else {
  500. jig.SanityCheckService(udpService, api.ServiceTypeNodePort)
  501. }
  502. udpNodePortOld := udpNodePort
  503. udpNodePort = int(udpService.Spec.Ports[0].NodePort)
  504. if udpNodePort == udpNodePortOld {
  505. framework.Failf("UDP Spec.Ports[0].NodePort (%d) did not change", udpNodePort)
  506. }
  507. if loadBalancerSupportsUDP && getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) != udpIngressIP {
  508. framework.Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]))
  509. }
  510. framework.Logf("UDP node port: %d", udpNodePort)
  511. By("hitting the TCP service's new NodePort")
  512. jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
  513. By("hitting the UDP service's new NodePort")
  514. jig.TestReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout)
  515. By("checking the old TCP NodePort is closed")
  516. jig.TestNotReachableHTTP(nodeIP, tcpNodePortOld, kubeProxyLagTimeout)
  517. By("checking the old UDP NodePort is closed")
  518. jig.TestNotReachableUDP(nodeIP, udpNodePortOld, kubeProxyLagTimeout)
  519. By("hitting the TCP service's LoadBalancer")
  520. jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout)
  521. if loadBalancerSupportsUDP {
  522. By("hitting the UDP service's LoadBalancer")
  523. jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout)
  524. }
  525. // Change the services' main ports.
  526. By("changing the TCP service's port")
  527. tcpService = jig.UpdateServiceOrFail(ns1, tcpService.Name, func(s *api.Service) {
  528. s.Spec.Ports[0].Port++
  529. })
  530. jig.SanityCheckService(tcpService, api.ServiceTypeLoadBalancer)
  531. svcPortOld := svcPort
  532. svcPort = int(tcpService.Spec.Ports[0].Port)
  533. if svcPort == svcPortOld {
  534. framework.Failf("TCP Spec.Ports[0].Port (%d) did not change", svcPort)
  535. }
  536. if int(tcpService.Spec.Ports[0].NodePort) != tcpNodePort {
  537. framework.Failf("TCP Spec.Ports[0].NodePort (%d) changed", tcpService.Spec.Ports[0].NodePort)
  538. }
  539. if getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]) != tcpIngressIP {
  540. framework.Failf("TCP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", tcpIngressIP, getIngressPoint(&tcpService.Status.LoadBalancer.Ingress[0]))
  541. }
  542. By("changing the UDP service's port")
  543. udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) {
  544. s.Spec.Ports[0].Port++
  545. })
  546. if loadBalancerSupportsUDP {
  547. jig.SanityCheckService(udpService, api.ServiceTypeLoadBalancer)
  548. } else {
  549. jig.SanityCheckService(udpService, api.ServiceTypeNodePort)
  550. }
  551. if int(udpService.Spec.Ports[0].Port) != svcPort {
  552. framework.Failf("UDP Spec.Ports[0].Port (%d) did not change", udpService.Spec.Ports[0].Port)
  553. }
  554. if int(udpService.Spec.Ports[0].NodePort) != udpNodePort {
  555. framework.Failf("UDP Spec.Ports[0].NodePort (%d) changed", udpService.Spec.Ports[0].NodePort)
  556. }
  557. if loadBalancerSupportsUDP && getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]) != udpIngressIP {
  558. framework.Failf("UDP Status.LoadBalancer.Ingress changed (%s -> %s) when not expected", udpIngressIP, getIngressPoint(&udpService.Status.LoadBalancer.Ingress[0]))
  559. }
  560. framework.Logf("service port (TCP and UDP): %d", svcPort)
  561. By("hitting the TCP service's NodePort")
  562. jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
  563. By("hitting the UDP service's NodePort")
  564. jig.TestReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout)
  565. By("hitting the TCP service's LoadBalancer")
  566. jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB
  567. if loadBalancerSupportsUDP {
  568. By("hitting the UDP service's LoadBalancer")
  569. jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB)
  570. }
  571. // Change the services back to ClusterIP.
  572. By("changing TCP service back to type=ClusterIP")
  573. tcpService = jig.UpdateServiceOrFail(ns1, tcpService.Name, func(s *api.Service) {
  574. s.Spec.Type = api.ServiceTypeClusterIP
  575. s.Spec.Ports[0].NodePort = 0
  576. })
  577. // Wait for the load balancer to be destroyed asynchronously
  578. tcpService = jig.WaitForLoadBalancerDestroyOrFail(ns1, tcpService.Name, tcpIngressIP, svcPort, loadBalancerCreateTimeout)
  579. jig.SanityCheckService(tcpService, api.ServiceTypeClusterIP)
  580. By("changing UDP service back to type=ClusterIP")
  581. udpService = jig.UpdateServiceOrFail(ns2, udpService.Name, func(s *api.Service) {
  582. s.Spec.Type = api.ServiceTypeClusterIP
  583. s.Spec.Ports[0].NodePort = 0
  584. })
  585. if loadBalancerSupportsUDP {
  586. // Wait for the load balancer to be destroyed asynchronously
  587. udpService = jig.WaitForLoadBalancerDestroyOrFail(ns2, udpService.Name, udpIngressIP, svcPort, loadBalancerCreateTimeout)
  588. jig.SanityCheckService(udpService, api.ServiceTypeClusterIP)
  589. }
  590. By("checking the TCP NodePort is closed")
  591. jig.TestNotReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
  592. By("checking the UDP NodePort is closed")
  593. jig.TestNotReachableUDP(nodeIP, udpNodePort, kubeProxyLagTimeout)
  594. By("checking the TCP LoadBalancer is closed")
  595. jig.TestNotReachableHTTP(tcpIngressIP, svcPort, loadBalancerLagTimeout)
  596. if loadBalancerSupportsUDP {
  597. By("checking the UDP LoadBalancer is closed")
  598. jig.TestNotReachableUDP(udpIngressIP, svcPort, loadBalancerLagTimeout)
  599. }
  600. })
  601. It("should use same NodePort with same port but different protocols", func() {
  602. serviceName := "nodeports"
  603. ns := f.Namespace.Name
  604. t := NewServerTest(c, ns, serviceName)
  605. defer func() {
  606. defer GinkgoRecover()
  607. errs := t.Cleanup()
  608. if len(errs) != 0 {
  609. framework.Failf("errors in cleanup: %v", errs)
  610. }
  611. }()
  612. By("creating service " + serviceName + " with same NodePort but different protocols in namespace " + ns)
  613. service := &api.Service{
  614. ObjectMeta: api.ObjectMeta{
  615. Name: t.ServiceName,
  616. Namespace: t.Namespace,
  617. },
  618. Spec: api.ServiceSpec{
  619. Selector: t.Labels,
  620. Type: api.ServiceTypeNodePort,
  621. Ports: []api.ServicePort{
  622. {
  623. Name: "tcp-port",
  624. Port: 53,
  625. Protocol: api.ProtocolTCP,
  626. },
  627. {
  628. Name: "udp-port",
  629. Port: 53,
  630. Protocol: api.ProtocolUDP,
  631. },
  632. },
  633. },
  634. }
  635. result, err := t.CreateService(service)
  636. Expect(err).NotTo(HaveOccurred())
  637. if len(result.Spec.Ports) != 2 {
  638. framework.Failf("got unexpected len(Spec.Ports) for new service: %v", result)
  639. }
  640. if result.Spec.Ports[0].NodePort != result.Spec.Ports[1].NodePort {
  641. framework.Failf("should use same NodePort for new service: %v", result)
  642. }
  643. })
  644. It("should prevent NodePort collisions", func() {
  645. // TODO: use the ServiceTestJig here
  646. baseName := "nodeport-collision-"
  647. serviceName1 := baseName + "1"
  648. serviceName2 := baseName + "2"
  649. ns := f.Namespace.Name
  650. t := NewServerTest(c, ns, serviceName1)
  651. defer func() {
  652. defer GinkgoRecover()
  653. errs := t.Cleanup()
  654. if len(errs) != 0 {
  655. framework.Failf("errors in cleanup: %v", errs)
  656. }
  657. }()
  658. By("creating service " + serviceName1 + " with type NodePort in namespace " + ns)
  659. service := t.BuildServiceSpec()
  660. service.Spec.Type = api.ServiceTypeNodePort
  661. result, err := t.CreateService(service)
  662. Expect(err).NotTo(HaveOccurred())
  663. if result.Spec.Type != api.ServiceTypeNodePort {
  664. framework.Failf("got unexpected Spec.Type for new service: %v", result)
  665. }
  666. if len(result.Spec.Ports) != 1 {
  667. framework.Failf("got unexpected len(Spec.Ports) for new service: %v", result)
  668. }
  669. port := result.Spec.Ports[0]
  670. if port.NodePort == 0 {
  671. framework.Failf("got unexpected Spec.Ports[0].nodePort for new service: %v", result)
  672. }
  673. By("creating service " + serviceName2 + " with conflicting NodePort")
  674. service2 := t.BuildServiceSpec()
  675. service2.Name = serviceName2
  676. service2.Spec.Type = api.ServiceTypeNodePort
  677. service2.Spec.Ports[0].NodePort = port.NodePort
  678. result2, err := t.CreateService(service2)
  679. if err == nil {
  680. framework.Failf("Created service with conflicting NodePort: %v", result2)
  681. }
  682. expectedErr := fmt.Sprintf("%d.*port is already allocated", port.NodePort)
  683. Expect(fmt.Sprintf("%v", err)).To(MatchRegexp(expectedErr))
  684. By("deleting service " + serviceName1 + " to release NodePort")
  685. err = t.DeleteService(serviceName1)
  686. Expect(err).NotTo(HaveOccurred())
  687. By("creating service " + serviceName2 + " with no-longer-conflicting NodePort")
  688. _, err = t.CreateService(service2)
  689. Expect(err).NotTo(HaveOccurred())
  690. })
  691. It("should check NodePort out-of-range", func() {
  692. // TODO: use the ServiceTestJig here
  693. serviceName := "nodeport-range-test"
  694. ns := f.Namespace.Name
  695. t := NewServerTest(c, ns, serviceName)
  696. defer func() {
  697. defer GinkgoRecover()
  698. errs := t.Cleanup()
  699. if len(errs) != 0 {
  700. framework.Failf("errors in cleanup: %v", errs)
  701. }
  702. }()
  703. service := t.BuildServiceSpec()
  704. service.Spec.Type = api.ServiceTypeNodePort
  705. By("creating service " + serviceName + " with type NodePort in namespace " + ns)
  706. service, err := t.CreateService(service)
  707. Expect(err).NotTo(HaveOccurred())
  708. if service.Spec.Type != api.ServiceTypeNodePort {
  709. framework.Failf("got unexpected Spec.Type for new service: %v", service)
  710. }
  711. if len(service.Spec.Ports) != 1 {
  712. framework.Failf("got unexpected len(Spec.Ports) for new service: %v", service)
  713. }
  714. port := service.Spec.Ports[0]
  715. if port.NodePort == 0 {
  716. framework.Failf("got unexpected Spec.Ports[0].nodePort for new service: %v", service)
  717. }
  718. if !ServiceNodePortRange.Contains(int(port.NodePort)) {
  719. framework.Failf("got unexpected (out-of-range) port for new service: %v", service)
  720. }
  721. outOfRangeNodePort := 0
  722. rand.Seed(time.Now().UTC().UnixNano())
  723. for {
  724. outOfRangeNodePort = 1 + rand.Intn(65535)
  725. if !ServiceNodePortRange.Contains(outOfRangeNodePort) {
  726. break
  727. }
  728. }
  729. By(fmt.Sprintf("changing service "+serviceName+" to out-of-range NodePort %d", outOfRangeNodePort))
  730. result, err := updateService(c, ns, serviceName, func(s *api.Service) {
  731. s.Spec.Ports[0].NodePort = int32(outOfRangeNodePort)
  732. })
  733. if err == nil {
  734. framework.Failf("failed to prevent update of service with out-of-range NodePort: %v", result)
  735. }
  736. expectedErr := fmt.Sprintf("%d.*port is not in the valid range", outOfRangeNodePort)
  737. Expect(fmt.Sprintf("%v", err)).To(MatchRegexp(expectedErr))
  738. By("deleting original service " + serviceName)
  739. err = t.DeleteService(serviceName)
  740. Expect(err).NotTo(HaveOccurred())
  741. By(fmt.Sprintf("creating service "+serviceName+" with out-of-range NodePort %d", outOfRangeNodePort))
  742. service = t.BuildServiceSpec()
  743. service.Spec.Type = api.ServiceTypeNodePort
  744. service.Spec.Ports[0].NodePort = int32(outOfRangeNodePort)
  745. service, err = t.CreateService(service)
  746. if err == nil {
  747. framework.Failf("failed to prevent create of service with out-of-range NodePort (%d): %v", outOfRangeNodePort, service)
  748. }
  749. Expect(fmt.Sprintf("%v", err)).To(MatchRegexp(expectedErr))
  750. })
  751. It("should release NodePorts on delete", func() {
  752. // TODO: use the ServiceTestJig here
  753. serviceName := "nodeport-reuse"
  754. ns := f.Namespace.Name
  755. t := NewServerTest(c, ns, serviceName)
  756. defer func() {
  757. defer GinkgoRecover()
  758. errs := t.Cleanup()
  759. if len(errs) != 0 {
  760. framework.Failf("errors in cleanup: %v", errs)
  761. }
  762. }()
  763. service := t.BuildServiceSpec()
  764. service.Spec.Type = api.ServiceTypeNodePort
  765. By("creating service " + serviceName + " with type NodePort in namespace " + ns)
  766. service, err := t.CreateService(service)
  767. Expect(err).NotTo(HaveOccurred())
  768. if service.Spec.Type != api.ServiceTypeNodePort {
  769. framework.Failf("got unexpected Spec.Type for new service: %v", service)
  770. }
  771. if len(service.Spec.Ports) != 1 {
  772. framework.Failf("got unexpected len(Spec.Ports) for new service: %v", service)
  773. }
  774. port := service.Spec.Ports[0]
  775. if port.NodePort == 0 {
  776. framework.Failf("got unexpected Spec.Ports[0].nodePort for new service: %v", service)
  777. }
  778. if !ServiceNodePortRange.Contains(int(port.NodePort)) {
  779. framework.Failf("got unexpected (out-of-range) port for new service: %v", service)
  780. }
  781. nodePort := port.NodePort
  782. By("deleting original service " + serviceName)
  783. err = t.DeleteService(serviceName)
  784. Expect(err).NotTo(HaveOccurred())
  785. hostExec := framework.LaunchHostExecPod(f.Client, f.Namespace.Name, "hostexec")
  786. cmd := fmt.Sprintf(`! ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN`, nodePort)
  787. var stdout string
  788. if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
  789. var err error
  790. stdout, err = framework.RunHostCmd(hostExec.Namespace, hostExec.Name, cmd)
  791. if err != nil {
  792. framework.Logf("expected node port (%d) to not be in use, stdout: %v", nodePort, stdout)
  793. return false, nil
  794. }
  795. return true, nil
  796. }); pollErr != nil {
  797. framework.Failf("expected node port (%d) to not be in use in %v, stdout: %v", nodePort, kubeProxyLagTimeout, stdout)
  798. }
  799. By(fmt.Sprintf("creating service "+serviceName+" with same NodePort %d", nodePort))
  800. service = t.BuildServiceSpec()
  801. service.Spec.Type = api.ServiceTypeNodePort
  802. service.Spec.Ports[0].NodePort = nodePort
  803. service, err = t.CreateService(service)
  804. Expect(err).NotTo(HaveOccurred())
  805. })
  806. It("should create endpoints for unready pods", func() {
  807. serviceName := "never-ready"
  808. ns := f.Namespace.Name
  809. t := NewServerTest(c, ns, serviceName)
  810. defer func() {
  811. defer GinkgoRecover()
  812. errs := t.Cleanup()
  813. if len(errs) != 0 {
  814. framework.Failf("errors in cleanup: %v", errs)
  815. }
  816. }()
  817. service := t.BuildServiceSpec()
  818. service.Annotations = map[string]string{endpoint.TolerateUnreadyEndpointsAnnotation: "true"}
  819. rcSpec := rcByNameContainer(t.name, 1, t.image, t.Labels, api.Container{
  820. Name: t.name,
  821. Image: t.image,
  822. Ports: []api.ContainerPort{{ContainerPort: int32(80), Protocol: api.ProtocolTCP}},
  823. ReadinessProbe: &api.Probe{
  824. Handler: api.Handler{
  825. Exec: &api.ExecAction{
  826. Command: []string{"/bin/false"},
  827. },
  828. },
  829. },
  830. })
  831. By(fmt.Sprintf("createing RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector))
  832. _, err := t.createRC(rcSpec)
  833. ExpectNoError(err)
  834. By(fmt.Sprintf("creating Service %v with selectors %v", service.Name, service.Spec.Selector))
  835. _, err = t.CreateService(service)
  836. ExpectNoError(err)
  837. By("Verifying pods for RC " + t.name)
  838. ExpectNoError(framework.VerifyPods(t.Client, t.Namespace, t.name, false, 1))
  839. svcName := fmt.Sprintf("%v.%v", serviceName, f.Namespace.Name)
  840. By("waiting for endpoints of Service with DNS name " + svcName)
  841. execPodName := createExecPodOrFail(f.Client, f.Namespace.Name, "execpod-")
  842. cmd := fmt.Sprintf("wget -qO- %v", svcName)
  843. var stdout string
  844. if pollErr := wait.PollImmediate(framework.Poll, kubeProxyLagTimeout, func() (bool, error) {
  845. var err error
  846. stdout, err = framework.RunHostCmd(f.Namespace.Name, execPodName, cmd)
  847. if err != nil {
  848. framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.name, stdout, err)
  849. return false, nil
  850. }
  851. return true, nil
  852. }); pollErr != nil {
  853. framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.name, kubeProxyLagTimeout, stdout)
  854. }
  855. })
  856. It("should be able to create services of type LoadBalancer and externalTraffic=localOnly [Slow][Feature:ExternalTrafficLocalOnly]", func() {
  857. // requires cloud load-balancer support - this feature currently supported only on GCE/GKE
  858. framework.SkipUnlessProviderIs("gce", "gke")
  859. loadBalancerCreateTimeout := loadBalancerCreateTimeoutDefault
  860. largeClusterMinNodesNumber := 100
  861. if nodes := framework.GetReadySchedulableNodesOrDie(c); len(nodes.Items) > largeClusterMinNodesNumber {
  862. loadBalancerCreateTimeout = loadBalancerCreateTimeoutLarge
  863. }
  864. namespace := f.Namespace.Name
  865. serviceName := "external-local"
  866. jig := NewServiceTestJig(c, serviceName)
  867. By("creating a service " + namespace + "/" + namespace + " with type=LoadBalancer and annotation for local-traffic-only")
  868. svc := jig.CreateTCPServiceOrFail(namespace, func(svc *api.Service) {
  869. svc.Spec.Type = api.ServiceTypeLoadBalancer
  870. // We need to turn affinity off for our LB distribution tests
  871. svc.Spec.SessionAffinity = api.ServiceAffinityNone
  872. svc.ObjectMeta.Annotations = map[string]string{
  873. service.AnnotationExternalTraffic: service.AnnotationValueExternalTrafficLocal}
  874. svc.Spec.Ports = []api.ServicePort{{Protocol: "TCP", Port: 80}}
  875. })
  876. By("creating a pod to be part of the service " + serviceName)
  877. // This container is an nginx container listening on port 80
  878. // See kubernetes/contrib/ingress/echoheaders/nginx.conf for content of response
  879. jig.RunOrFail(namespace, nil)
  880. By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
  881. svc = jig.WaitForLoadBalancerOrFail(namespace, serviceName, loadBalancerCreateTimeout)
  882. jig.SanityCheckService(svc, api.ServiceTypeLoadBalancer)
  883. svcTcpPort := int(svc.Spec.Ports[0].Port)
  884. framework.Logf("service port : %d", svcTcpPort)
  885. tcpNodePort := int(svc.Spec.Ports[0].NodePort)
  886. framework.Logf("TCP node port: %d", tcpNodePort)
  887. ingressIP := getIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
  888. framework.Logf("TCP load balancer: %s", ingressIP)
  889. healthCheckNodePort := int(service.GetServiceHealthCheckNodePort(svc))
  890. By("checking health check node port allocated")
  891. if healthCheckNodePort == 0 {
  892. framework.Failf("Service HealthCheck NodePort was not allocated")
  893. }
  894. nodeIP := pickNodeIP(jig.Client)
  895. By("hitting the TCP service's NodePort on " + nodeIP + ":" + fmt.Sprintf("%d", tcpNodePort))
  896. jig.TestReachableHTTP(nodeIP, tcpNodePort, kubeProxyLagTimeout)
  897. By("hitting the TCP service's service port, via its external VIP " + ingressIP + ":" + fmt.Sprintf("%d", svcTcpPort))
  898. jig.TestReachableHTTP(ingressIP, svcTcpPort, kubeProxyLagTimeout)
  899. By("reading clientIP using the TCP service's NodePort")
  900. content := jig.GetHTTPContent(nodeIP, tcpNodePort, kubeProxyLagTimeout, "/clientip")
  901. clientIP := content.String()
  902. framework.Logf("ClientIP detected by target pod using NodePort is %s", clientIP)
  903. By("reading clientIP using the TCP service's service port via its external VIP")
  904. content = jig.GetHTTPContent(ingressIP, svcTcpPort, kubeProxyLagTimeout, "/clientip")
  905. clientIP = content.String()
  906. framework.Logf("ClientIP detected by target pod using VIP:SvcPort is %s", clientIP)
  907. By("checking if Source IP is preserved")
  908. if strings.HasPrefix(clientIP, "10.") {
  909. framework.Failf("Source IP was NOT preserved")
  910. }
  911. By("finding nodes for all service endpoints")
  912. endpoints, err := c.Endpoints(namespace).Get(serviceName)
  913. if err != nil {
  914. framework.Failf("Get endpoints for service %s/%s failed (%s)", namespace, serviceName, err)
  915. }
  916. if len(endpoints.Subsets[0].Addresses) == 0 {
  917. framework.Failf("Expected Ready endpoints - found none")
  918. }
  919. readyHostName := *endpoints.Subsets[0].Addresses[0].NodeName
  920. framework.Logf("Pod for service %s/%s is on node %s", namespace, serviceName, readyHostName)
  921. // HealthCheck responder validation - iterate over all node IPs and check their HC responses
  922. // Collect all node names and their public IPs - the nodes and ips slices parallel each other
  923. nodes := framework.GetReadySchedulableNodesOrDie(jig.Client)
  924. ips := collectAddresses(nodes, api.NodeExternalIP)
  925. if len(ips) == 0 {
  926. ips = collectAddresses(nodes, api.NodeLegacyHostIP)
  927. }
  928. By("checking kube-proxy health check responses are correct")
  929. for n, publicIP := range ips {
  930. framework.Logf("Checking health check response for node %s, public IP %s", nodes.Items[n].Name, publicIP)
  931. // HealthCheck should pass only on the node where num(endpoints) > 0
  932. // All other nodes should fail the healthcheck on the service healthCheckNodePort
  933. expectedSuccess := nodes.Items[n].Name == readyHostName
  934. jig.TestHTTPHealthCheckNodePort(publicIP, healthCheckNodePort, "/healthz", expectedSuccess)
  935. }
  936. })
  937. })
  938. // updateService fetches a service, calls the update function on it,
  939. // and then attempts to send the updated service. It retries up to 2
  940. // times in the face of timeouts and conflicts.
  941. func updateService(c *client.Client, namespace, serviceName string, update func(*api.Service)) (*api.Service, error) {
  942. var service *api.Service
  943. var err error
  944. for i := 0; i < 3; i++ {
  945. service, err = c.Services(namespace).Get(serviceName)
  946. if err != nil {
  947. return service, err
  948. }
  949. update(service)
  950. service, err = c.Services(namespace).Update(service)
  951. if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
  952. return service, err
  953. }
  954. }
  955. return service, err
  956. }
  957. func getContainerPortsByPodUID(endpoints *api.Endpoints) PortsByPodUID {
  958. m := PortsByPodUID{}
  959. for _, ss := range endpoints.Subsets {
  960. for _, port := range ss.Ports {
  961. for _, addr := range ss.Addresses {
  962. containerPort := port.Port
  963. hostPort := port.Port
  964. // use endpoint annotations to recover the container port in a Mesos setup
  965. // compare contrib/mesos/pkg/service/endpoints_controller.syncService
  966. key := fmt.Sprintf("k8s.mesosphere.io/containerPort_%s_%s_%d", port.Protocol, addr.IP, hostPort)
  967. mesosContainerPortString := endpoints.Annotations[key]
  968. if mesosContainerPortString != "" {
  969. mesosContainerPort, err := strconv.Atoi(mesosContainerPortString)
  970. if err != nil {
  971. continue
  972. }
  973. containerPort = int32(mesosContainerPort)
  974. framework.Logf("Mapped mesos host port %d to container port %d via annotation %s=%s", hostPort, containerPort, key, mesosContainerPortString)
  975. }
  976. // framework.Logf("Found pod %v, host port %d and container port %d", addr.TargetRef.UID, hostPort, containerPort)
  977. if _, ok := m[addr.TargetRef.UID]; !ok {
  978. m[addr.TargetRef.UID] = make([]int, 0)
  979. }
  980. m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], int(containerPort))
  981. }
  982. }
  983. }
  984. return m
  985. }
  986. type PortsByPodName map[string][]int
  987. type PortsByPodUID map[types.UID][]int
  988. func translatePodNameToUIDOrFail(c *client.Client, ns string, expectedEndpoints PortsByPodName) PortsByPodUID {
  989. portsByUID := make(PortsByPodUID)
  990. for name, portList := range expectedEndpoints {
  991. pod, err := c.Pods(ns).Get(name)
  992. if err != nil {
  993. framework.Failf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err)
  994. }
  995. portsByUID[pod.ObjectMeta.UID] = portList
  996. }
  997. // framework.Logf("successfully translated pod names to UIDs: %v -> %v on namespace %s", expectedEndpoints, portsByUID, ns)
  998. return portsByUID
  999. }
  1000. func validatePortsOrFail(endpoints PortsByPodUID, expectedEndpoints PortsByPodUID) {
  1001. if len(endpoints) != len(expectedEndpoints) {
  1002. // should not happen because we check this condition before
  1003. framework.Failf("invalid number of endpoints got %v, expected %v", endpoints, expectedEndpoints)
  1004. }
  1005. for podUID := range expectedEndpoints {
  1006. if _, ok := endpoints[podUID]; !ok {
  1007. framework.Failf("endpoint %v not found", podUID)
  1008. }
  1009. if len(endpoints[podUID]) != len(expectedEndpoints[podUID]) {
  1010. framework.Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID])
  1011. }
  1012. sort.Ints(endpoints[podUID])
  1013. sort.Ints(expectedEndpoints[podUID])
  1014. for index := range endpoints[podUID] {
  1015. if endpoints[podUID][index] != expectedEndpoints[podUID][index] {
  1016. framework.Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID])
  1017. }
  1018. }
  1019. }
  1020. }
  1021. func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, expectedEndpoints PortsByPodName) {
  1022. By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", framework.ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
  1023. i := 1
  1024. for start := time.Now(); time.Since(start) < framework.ServiceStartTimeout; time.Sleep(1 * time.Second) {
  1025. endpoints, err := c.Endpoints(namespace).Get(serviceName)
  1026. if err != nil {
  1027. framework.Logf("Get endpoints failed (%v elapsed, ignoring for 5s): %v", time.Since(start), err)
  1028. continue
  1029. }
  1030. // framework.Logf("Found endpoints %v", endpoints)
  1031. portsByPodUID := getContainerPortsByPodUID(endpoints)
  1032. // framework.Logf("Found port by pod UID %v", portsByPodUID)
  1033. expectedPortsByPodUID := translatePodNameToUIDOrFail(c, namespace, expectedEndpoints)
  1034. if len(portsByPodUID) == len(expectedEndpoints) {
  1035. validatePortsOrFail(portsByPodUID, expectedPortsByPodUID)
  1036. framework.Logf("successfully validated that service %s in namespace %s exposes endpoints %v (%v elapsed)",
  1037. serviceName, namespace, expectedEndpoints, time.Since(start))
  1038. return
  1039. }
  1040. if i%5 == 0 {
  1041. framework.Logf("Unexpected endpoints: found %v, expected %v (%v elapsed, will retry)", portsByPodUID, expectedEndpoints, time.Since(start))
  1042. }
  1043. i++
  1044. }
  1045. if pods, err := c.Pods(api.NamespaceAll).List(api.ListOptions{}); err == nil {
  1046. for _, pod := range pods.Items {
  1047. framework.Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp)
  1048. }
  1049. } else {
  1050. framework.Logf("Can't list pod debug info: %v", err)
  1051. }
  1052. framework.Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, framework.ServiceStartTimeout)
  1053. }
  1054. // createExecPodOrFail creates a simple busybox pod in a sleep loop used as a
  1055. // vessel for kubectl exec commands.
  1056. // Returns the name of the created pod.
  1057. func createExecPodOrFail(c *client.Client, ns, generateName string) string {
  1058. framework.Logf("Creating new exec pod")
  1059. immediate := int64(0)
  1060. pod := &api.Pod{
  1061. ObjectMeta: api.ObjectMeta{
  1062. GenerateName: generateName,
  1063. Namespace: ns,
  1064. },
  1065. Spec: api.PodSpec{
  1066. TerminationGracePeriodSeconds: &immediate,
  1067. Containers: []api.Container{
  1068. {
  1069. Name: "exec",
  1070. Image: "gcr.io/google_containers/busybox:1.24",
  1071. Command: []string{"sh", "-c", "while true; do sleep 5; done"},
  1072. },
  1073. },
  1074. },
  1075. }
  1076. created, err := c.Pods(ns).Create(pod)
  1077. Expect(err).NotTo(HaveOccurred())
  1078. err = wait.PollImmediate(framework.Poll, 5*time.Minute, func() (bool, error) {
  1079. retrievedPod, err := c.Pods(pod.Namespace).Get(created.Name)
  1080. if err != nil {
  1081. return false, nil
  1082. }
  1083. return retrievedPod.Status.Phase == api.PodRunning, nil
  1084. })
  1085. Expect(err).NotTo(HaveOccurred())
  1086. return created.Name
  1087. }
  1088. func createPodOrFail(c *client.Client, ns, name string, labels map[string]string, containerPorts []api.ContainerPort) {
  1089. By(fmt.Sprintf("creating pod %s in namespace %s", name, ns))
  1090. pod := &api.Pod{
  1091. ObjectMeta: api.ObjectMeta{
  1092. Name: name,
  1093. Labels: labels,
  1094. },
  1095. Spec: api.PodSpec{
  1096. Containers: []api.Container{
  1097. {
  1098. Name: "pause",
  1099. Image: framework.GetPauseImageName(c),
  1100. Ports: containerPorts,
  1101. // Add a dummy environment variable to work around a docker issue.
  1102. // https://github.com/docker/docker/issues/14203
  1103. Env: []api.EnvVar{{Name: "FOO", Value: " "}},
  1104. },
  1105. },
  1106. },
  1107. }
  1108. _, err := c.Pods(ns).Create(pod)
  1109. Expect(err).NotTo(HaveOccurred())
  1110. }
  1111. func deletePodOrFail(c *client.Client, ns, name string) {
  1112. By(fmt.Sprintf("deleting pod %s in namespace %s", name, ns))
  1113. err := c.Pods(ns).Delete(name, nil)
  1114. Expect(err).NotTo(HaveOccurred())
  1115. }
  1116. func collectAddresses(nodes *api.NodeList, addressType api.NodeAddressType) []string {
  1117. ips := []string{}
  1118. for i := range nodes.Items {
  1119. item := &nodes.Items[i]
  1120. for j := range item.Status.Addresses {
  1121. nodeAddress := &item.Status.Addresses[j]
  1122. if nodeAddress.Type == addressType {
  1123. ips = append(ips, nodeAddress.Address)
  1124. }
  1125. }
  1126. }
  1127. return ips
  1128. }
  1129. func getNodePublicIps(c *client.Client) ([]string, error) {
  1130. nodes := framework.GetReadySchedulableNodesOrDie(c)
  1131. ips := collectAddresses(nodes, api.NodeExternalIP)
  1132. if len(ips) == 0 {
  1133. ips = collectAddresses(nodes, api.NodeLegacyHostIP)
  1134. }
  1135. return ips, nil
  1136. }
  1137. func pickNodeIP(c *client.Client) string {
  1138. publicIps, err := getNodePublicIps(c)
  1139. Expect(err).NotTo(HaveOccurred())
  1140. if len(publicIps) == 0 {
  1141. framework.Failf("got unexpected number (%d) of public IPs", len(publicIps))
  1142. }
  1143. ip := publicIps[0]
  1144. return ip
  1145. }
  1146. func testReachableHTTP(ip string, port int, request string, expect string) (bool, error) {
  1147. return testReachableHTTPWithContent(ip, port, request, expect, nil)
  1148. }
  1149. func testReachableHTTPWithContent(ip string, port int, request string, expect string, content *bytes.Buffer) (bool, error) {
  1150. url := fmt.Sprintf("http://%s:%d%s", ip, port, request)
  1151. if ip == "" {
  1152. framework.Failf("Got empty IP for reachability check (%s)", url)
  1153. return false, nil
  1154. }
  1155. if port == 0 {
  1156. framework.Failf("Got port==0 for reachability check (%s)", url)
  1157. return false, nil
  1158. }
  1159. framework.Logf("Testing HTTP reachability of %v", url)
  1160. resp, err := httpGetNoConnectionPool(url)
  1161. if err != nil {
  1162. framework.Logf("Got error testing for reachability of %s: %v", url, err)
  1163. return false, nil
  1164. }
  1165. defer resp.Body.Close()
  1166. body, err := ioutil.ReadAll(resp.Body)
  1167. if err != nil {
  1168. framework.Logf("Got error reading response from %s: %v", url, err)
  1169. return false, nil
  1170. }
  1171. if resp.StatusCode != 200 {
  1172. return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s",
  1173. resp.Status, url, string(body))
  1174. }
  1175. if !strings.Contains(string(body), expect) {
  1176. return false, fmt.Errorf("received response body without expected substring %q: %s", expect, string(body))
  1177. }
  1178. if content != nil {
  1179. content.Write(body)
  1180. }
  1181. return true, nil
  1182. }
  1183. func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) {
  1184. url := fmt.Sprintf("http://%s:%d%s", ip, port, request)
  1185. if ip == "" || port == 0 {
  1186. framework.Failf("Got empty IP for reachability check (%s)", url)
  1187. return false, fmt.Errorf("Invalid input ip or port")
  1188. }
  1189. framework.Logf("Testing HTTP health check on %v", url)
  1190. resp, err := httpGetNoConnectionPool(url)
  1191. if err != nil {
  1192. framework.Logf("Got error testing for reachability of %s: %v", url, err)
  1193. return false, err
  1194. }
  1195. defer resp.Body.Close()
  1196. if err != nil {
  1197. framework.Logf("Got error reading response from %s: %v", url, err)
  1198. return false, err
  1199. }
  1200. // HealthCheck responder returns 503 for no local endpoints
  1201. if resp.StatusCode == 503 {
  1202. return false, nil
  1203. }
  1204. // HealthCheck responder returns 200 for non-zero local endpoints
  1205. if resp.StatusCode == 200 {
  1206. return true, nil
  1207. }
  1208. return false, fmt.Errorf("Unexpected HTTP response code %s from health check responder at %s", resp.Status, url)
  1209. }
  1210. func testNotReachableHTTP(ip string, port int) (bool, error) {
  1211. url := fmt.Sprintf("http://%s:%d", ip, port)
  1212. if ip == "" {
  1213. framework.Failf("Got empty IP for non-reachability check (%s)", url)
  1214. return false, nil
  1215. }
  1216. if port == 0 {
  1217. framework.Failf("Got port==0 for non-reachability check (%s)", url)
  1218. return false, nil
  1219. }
  1220. framework.Logf("Testing HTTP non-reachability of %v", url)
  1221. resp, err := httpGetNoConnectionPool(url)
  1222. if err != nil {
  1223. framework.Logf("Confirmed that %s is not reachable", url)
  1224. return true, nil
  1225. }
  1226. resp.Body.Close()
  1227. return false, nil
  1228. }
  1229. func testReachableUDP(ip string, port int, request string, expect string) (bool, error) {
  1230. uri := fmt.Sprintf("udp://%s:%d", ip, port)
  1231. if ip == "" {
  1232. framework.Failf("Got empty IP for reachability check (%s)", uri)
  1233. return false, nil
  1234. }
  1235. if port == 0 {
  1236. framework.Failf("Got port==0 for reachability check (%s)", uri)
  1237. return false, nil
  1238. }
  1239. framework.Logf("Testing UDP reachability of %v", uri)
  1240. con, err := net.Dial("udp", ip+":"+strconv.Itoa(port))
  1241. if err != nil {
  1242. return false, fmt.Errorf("Failed to dial %s:%d: %v", ip, port, err)
  1243. }
  1244. _, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
  1245. if err != nil {
  1246. return false, fmt.Errorf("Failed to send request: %v", err)
  1247. }
  1248. var buf []byte = make([]byte, len(expect)+1)
  1249. err = con.SetDeadline(time.Now().Add(3 * time.Second))
  1250. if err != nil {
  1251. return false, fmt.Errorf("Failed to set deadline: %v", err)
  1252. }
  1253. _, err = con.Read(buf)
  1254. if err != nil {
  1255. return false, nil
  1256. }
  1257. if !strings.Contains(string(buf), expect) {
  1258. return false, fmt.Errorf("Failed to retrieve %q, got %q", expect, string(buf))
  1259. }
  1260. framework.Logf("Successfully reached %v", uri)
  1261. return true, nil
  1262. }
  1263. func testNotReachableUDP(ip string, port int, request string) (bool, error) {
  1264. uri := fmt.Sprintf("udp://%s:%d", ip, port)
  1265. if ip == "" {
  1266. framework.Failf("Got empty IP for reachability check (%s)", uri)
  1267. return false, nil
  1268. }
  1269. if port == 0 {
  1270. framework.Failf("Got port==0 for reachability check (%s)", uri)
  1271. return false, nil
  1272. }
  1273. framework.Logf("Testing UDP non-reachability of %v", uri)
  1274. con, err := net.Dial("udp", ip+":"+strconv.Itoa(port))
  1275. if err != nil {
  1276. framework.Logf("Confirmed that %s is not reachable", uri)
  1277. return true, nil
  1278. }
  1279. _, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
  1280. if err != nil {
  1281. framework.Logf("Confirmed that %s is not reachable", uri)
  1282. return true, nil
  1283. }
  1284. var buf []byte = make([]byte, 1)
  1285. err = con.SetDeadline(time.Now().Add(3 * time.Second))
  1286. if err != nil {
  1287. return false, fmt.Errorf("Failed to set deadline: %v", err)
  1288. }
  1289. _, err = con.Read(buf)
  1290. if err != nil {
  1291. framework.Logf("Confirmed that %s is not reachable", uri)
  1292. return true, nil
  1293. }
  1294. return false, nil
  1295. }
  1296. // Creates a replication controller that serves its hostname and a service on top of it.
  1297. func startServeHostnameService(c *client.Client, ns, name string, port, replicas int) ([]string, string, error) {
  1298. podNames := make([]string, replicas)
  1299. By("creating service " + name + " in namespace " + ns)
  1300. _, err := c.Services(ns).Create(&api.Service{
  1301. ObjectMeta: api.ObjectMeta{
  1302. Name: name,
  1303. },
  1304. Spec: api.ServiceSpec{
  1305. Ports: []api.ServicePort{{
  1306. Port: int32(port),
  1307. TargetPort: intstr.FromInt(9376),
  1308. Protocol: "TCP",
  1309. }},
  1310. Selector: map[string]string{
  1311. "name": name,
  1312. },
  1313. },
  1314. })
  1315. if err != nil {
  1316. return podNames, "", err
  1317. }
  1318. var createdPods []*api.Pod
  1319. maxContainerFailures := 0
  1320. config := framework.RCConfig{
  1321. Client: c,
  1322. Image: "gcr.io/google_containers/serve_hostname:v1.4",
  1323. Name: name,
  1324. Namespace: ns,
  1325. PollInterval: 3 * time.Second,
  1326. Timeout: framework.PodReadyBeforeTimeout,
  1327. Replicas: replicas,
  1328. CreatedPods: &createdPods,
  1329. MaxContainerFailures: &maxContainerFailures,
  1330. }
  1331. err = framework.RunRC(config)
  1332. if err != nil {
  1333. return podNames, "", err
  1334. }
  1335. if len(createdPods) != replicas {
  1336. return podNames, "", fmt.Errorf("Incorrect number of running pods: %v", len(createdPods))
  1337. }
  1338. for i := range createdPods {
  1339. podNames[i] = createdPods[i].ObjectMeta.Name
  1340. }
  1341. sort.StringSlice(podNames).Sort()
  1342. service, err := c.Services(ns).Get(name)
  1343. if err != nil {
  1344. return podNames, "", err
  1345. }
  1346. if service.Spec.ClusterIP == "" {
  1347. return podNames, "", fmt.Errorf("Service IP is blank for %v", name)
  1348. }
  1349. serviceIP := service.Spec.ClusterIP
  1350. return podNames, serviceIP, nil
  1351. }
  1352. func stopServeHostnameService(c *client.Client, ns, name string) error {
  1353. if err := framework.DeleteRCAndPods(c, ns, name); err != nil {
  1354. return err
  1355. }
  1356. if err := c.Services(ns).Delete(name); err != nil {
  1357. return err
  1358. }
  1359. return nil
  1360. }
  1361. // verifyServeHostnameServiceUp wgets the given serviceIP:servicePort from the
  1362. // given host and from within a pod. The host is expected to be an SSH-able node
  1363. // in the cluster. Each pod in the service is expected to echo its name. These
  1364. // names are compared with the given expectedPods list after a sort | uniq.
  1365. func verifyServeHostnameServiceUp(c *client.Client, ns, host string, expectedPods []string, serviceIP string, servicePort int) error {
  1366. execPodName := createExecPodOrFail(c, ns, "execpod-")
  1367. defer func() {
  1368. deletePodOrFail(c, ns, execPodName)
  1369. }()
  1370. // Loop a bunch of times - the proxy is randomized, so we want a good
  1371. // chance of hitting each backend at least once.
  1372. buildCommand := func(wget string) string {
  1373. return fmt.Sprintf("for i in $(seq 1 %d); do %s http://%s:%d 2>&1 || true; echo; done",
  1374. 50*len(expectedPods), wget, serviceIP, servicePort)
  1375. }
  1376. commands := []func() string{
  1377. // verify service from node
  1378. func() string {
  1379. cmd := "set -e; " + buildCommand("wget -q --timeout=0.2 --tries=1 -O -")
  1380. framework.Logf("Executing cmd %q on host %v", cmd, host)
  1381. result, err := framework.SSH(cmd, host, framework.TestContext.Provider)
  1382. if err != nil || result.Code != 0 {
  1383. framework.LogSSHResult(result)
  1384. framework.Logf("error while SSH-ing to node: %v", err)
  1385. }
  1386. return result.Stdout
  1387. },
  1388. // verify service from pod
  1389. func() string {
  1390. cmd := buildCommand("wget -q -T 1 -O -")
  1391. framework.Logf("Executing cmd %q in pod %v/%v", cmd, ns, execPodName)
  1392. // TODO: Use exec-over-http via the netexec pod instead of kubectl exec.
  1393. output, err := framework.RunHostCmd(ns, execPodName, cmd)
  1394. if err != nil {
  1395. framework.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, execPodName, err, output)
  1396. }
  1397. return output
  1398. },
  1399. }
  1400. expectedEndpoints := sets.NewString(expectedPods...)
  1401. By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods)))
  1402. for _, cmdFunc := range commands {
  1403. passed := false
  1404. gotEndpoints := sets.NewString()
  1405. // Retry cmdFunc for a while
  1406. for start := time.Now(); time.Since(start) < kubeProxyLagTimeout; time.Sleep(5 * time.Second) {
  1407. for _, endpoint := range strings.Split(cmdFunc(), "\n") {
  1408. trimmedEp := strings.TrimSpace(endpoint)
  1409. if trimmedEp != "" {
  1410. gotEndpoints.Insert(trimmedEp)
  1411. }
  1412. }
  1413. // TODO: simply checking that the retrieved endpoints is a superset
  1414. // of the expected allows us to ignore intermitten network flakes that
  1415. // result in output like "wget timed out", but these should be rare
  1416. // and we need a better way to track how often it occurs.
  1417. if gotEndpoints.IsSuperset(expectedEndpoints) {
  1418. if !gotEndpoints.Equal(expectedEndpoints) {
  1419. framework.Logf("Ignoring unexpected output wgetting endpoints of service %s: %v", serviceIP, gotEndpoints.Difference(expectedEndpoints))
  1420. }
  1421. passed = true
  1422. break
  1423. }
  1424. framework.Logf("Unable to reach the following endpoints of service %s: %v", serviceIP, expectedEndpoints.Difference(gotEndpoints))
  1425. }
  1426. if !passed {
  1427. // Sort the lists so they're easier to visually diff.
  1428. exp := expectedEndpoints.List()
  1429. got := gotEndpoints.List()
  1430. sort.StringSlice(exp).Sort()
  1431. sort.StringSlice(got).Sort()
  1432. return fmt.Errorf("service verification failed for: %s\nexpected %v\nreceived %v", serviceIP, exp, got)
  1433. }
  1434. }
  1435. return nil
  1436. }
  1437. func verifyServeHostnameServiceDown(c *client.Client, host string, serviceIP string, servicePort int) error {
  1438. command := fmt.Sprintf(
  1439. "curl -s --connect-timeout 2 http://%s:%d && exit 99", serviceIP, servicePort)
  1440. for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
  1441. result, err := framework.SSH(command, host, framework.TestContext.Provider)
  1442. if err != nil {
  1443. framework.LogSSHResult(result)
  1444. framework.Logf("error while SSH-ing to node: %v", err)
  1445. }
  1446. if result.Code != 99 {
  1447. return nil
  1448. }
  1449. framework.Logf("service still alive - still waiting")
  1450. }
  1451. return fmt.Errorf("waiting for service to be down timed out")
  1452. }
  1453. // Does an HTTP GET, but does not reuse TCP connections
  1454. // This masks problems where the iptables rule has changed, but we don't see it
  1455. // This is intended for relatively quick requests (status checks), so we set a short (5 seconds) timeout
  1456. func httpGetNoConnectionPool(url string) (*http.Response, error) {
  1457. tr := utilnet.SetTransportDefaults(&http.Transport{
  1458. DisableKeepAlives: true,
  1459. })
  1460. client := &http.Client{
  1461. Transport: tr,
  1462. Timeout: 5 * time.Second,
  1463. }
  1464. return client.Get(url)
  1465. }
  1466. // A test jig to help testing.
  1467. type ServiceTestJig struct {
  1468. ID string
  1469. Name string
  1470. Client *client.Client
  1471. Labels map[string]string
  1472. }
  1473. // NewServiceTestJig allocates and inits a new ServiceTestJig.
  1474. func NewServiceTestJig(client *client.Client, name string) *ServiceTestJig {
  1475. j := &ServiceTestJig{}
  1476. j.Client = client
  1477. j.Name = name
  1478. j.ID = j.Name + "-" + string(uuid.NewUUID())
  1479. j.Labels = map[string]string{"testid": j.ID}
  1480. return j
  1481. }
  1482. // newServiceTemplate returns the default api.Service template for this jig, but
  1483. // does not actually create the Service. The default Service has the same name
  1484. // as the jig and exposes port 80.
  1485. func (j *ServiceTestJig) newServiceTemplate(namespace string, proto api.Protocol) *api.Service {
  1486. service := &api.Service{
  1487. ObjectMeta: api.ObjectMeta{
  1488. Namespace: namespace,
  1489. Name: j.Name,
  1490. Labels: j.Labels,
  1491. },
  1492. Spec: api.ServiceSpec{
  1493. Selector: j.Labels,
  1494. Ports: []api.ServicePort{
  1495. {
  1496. Protocol: proto,
  1497. Port: 80,
  1498. },
  1499. },
  1500. },
  1501. }
  1502. return service
  1503. }
  1504. // CreateTCPServiceOrFail creates a new TCP Service based on the jig's
  1505. // defaults. Callers can provide a function to tweak the Service object before
  1506. // it is created.
  1507. func (j *ServiceTestJig) CreateTCPServiceOrFail(namespace string, tweak func(svc *api.Service)) *api.Service {
  1508. svc := j.newServiceTemplate(namespace, api.ProtocolTCP)
  1509. if tweak != nil {
  1510. tweak(svc)
  1511. }
  1512. result, err := j.Client.Services(namespace).Create(svc)
  1513. if err != nil {
  1514. framework.Failf("Failed to create TCP Service %q: %v", svc.Name, err)
  1515. }
  1516. return result
  1517. }
  1518. // CreateUDPServiceOrFail creates a new UDP Service based on the jig's
  1519. // defaults. Callers can provide a function to tweak the Service object before
  1520. // it is created.
  1521. func (j *ServiceTestJig) CreateUDPServiceOrFail(namespace string, tweak func(svc *api.Service)) *api.Service {
  1522. svc := j.newServiceTemplate(namespace, api.ProtocolUDP)
  1523. if tweak != nil {
  1524. tweak(svc)
  1525. }
  1526. result, err := j.Client.Services(namespace).Create(svc)
  1527. if err != nil {
  1528. framework.Failf("Failed to create UDP Service %q: %v", svc.Name, err)
  1529. }
  1530. return result
  1531. }
  1532. func (j *ServiceTestJig) SanityCheckService(svc *api.Service, svcType api.ServiceType) {
  1533. if svc.Spec.Type != svcType {
  1534. framework.Failf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
  1535. }
  1536. expectNodePorts := false
  1537. if svcType != api.ServiceTypeClusterIP {
  1538. expectNodePorts = true
  1539. }
  1540. for i, port := range svc.Spec.Ports {
  1541. hasNodePort := (port.NodePort != 0)
  1542. if hasNodePort != expectNodePorts {
  1543. framework.Failf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort)
  1544. }
  1545. if hasNodePort {
  1546. if !ServiceNodePortRange.Contains(int(port.NodePort)) {
  1547. framework.Failf("out-of-range nodePort (%d) for service", port.NodePort)
  1548. }
  1549. }
  1550. }
  1551. expectIngress := false
  1552. if svcType == api.ServiceTypeLoadBalancer {
  1553. expectIngress = true
  1554. }
  1555. hasIngress := len(svc.Status.LoadBalancer.Ingress) != 0
  1556. if hasIngress != expectIngress {
  1557. framework.Failf("unexpected number of Status.LoadBalancer.Ingress (%d) for service", len(svc.Status.LoadBalancer.Ingress))
  1558. }
  1559. if hasIngress {
  1560. for i, ing := range svc.Status.LoadBalancer.Ingress {
  1561. if ing.IP == "" && ing.Hostname == "" {
  1562. framework.Failf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing)
  1563. }
  1564. }
  1565. }
  1566. }
  1567. // UpdateService fetches a service, calls the update function on it, and
  1568. // then attempts to send the updated service. It tries up to 3 times in the
  1569. // face of timeouts and conflicts.
  1570. func (j *ServiceTestJig) UpdateService(namespace, name string, update func(*api.Service)) (*api.Service, error) {
  1571. for i := 0; i < 3; i++ {
  1572. service, err := j.Client.Services(namespace).Get(name)
  1573. if err != nil {
  1574. return nil, fmt.Errorf("Failed to get Service %q: %v", name, err)
  1575. }
  1576. update(service)
  1577. service, err = j.Client.Services(namespace).Update(service)
  1578. if err == nil {
  1579. return service, nil
  1580. }
  1581. if !errors.IsConflict(err) && !errors.IsServerTimeout(err) {
  1582. return nil, fmt.Errorf("Failed to update Service %q: %v", name, err)
  1583. }
  1584. }
  1585. return nil, fmt.Errorf("Too many retries updating Service %q", name)
  1586. }
  1587. // UpdateServiceOrFail fetches a service, calls the update function on it, and
  1588. // then attempts to send the updated service. It tries up to 3 times in the
  1589. // face of timeouts and conflicts.
  1590. func (j *ServiceTestJig) UpdateServiceOrFail(namespace, name string, update func(*api.Service)) *api.Service {
  1591. svc, err := j.UpdateService(namespace, name, update)
  1592. if err != nil {
  1593. framework.Failf(err.Error())
  1594. }
  1595. return svc
  1596. }
  1597. func (j *ServiceTestJig) ChangeServiceNodePortOrFail(namespace, name string, initial int) *api.Service {
  1598. var err error
  1599. var service *api.Service
  1600. for i := 1; i < ServiceNodePortRange.Size; i++ {
  1601. offs1 := initial - ServiceNodePortRange.Base
  1602. offs2 := (offs1 + i) % ServiceNodePortRange.Size
  1603. newPort := ServiceNodePortRange.Base + offs2
  1604. service, err = j.UpdateService(namespace, name, func(s *api.Service) {
  1605. s.Spec.Ports[0].NodePort = int32(newPort)
  1606. })
  1607. if err != nil && strings.Contains(err.Error(), "provided port is already allocated") {
  1608. framework.Logf("tried nodePort %d, but it is in use, will try another", newPort)
  1609. continue
  1610. }
  1611. // Otherwise err was nil or err was a real error
  1612. break
  1613. }
  1614. if err != nil {
  1615. framework.Failf("Could not change the nodePort: %v", err)
  1616. }
  1617. return service
  1618. }
  1619. func (j *ServiceTestJig) WaitForLoadBalancerOrFail(namespace, name string, timeout time.Duration) *api.Service {
  1620. var service *api.Service
  1621. framework.Logf("Waiting up to %v for service %q to have a LoadBalancer", timeout, name)
  1622. pollFunc := func() (bool, error) {
  1623. svc, err := j.Client.Services(namespace).Get(name)
  1624. if err != nil {
  1625. return false, err
  1626. }
  1627. if len(svc.Status.LoadBalancer.Ingress) > 0 {
  1628. service = svc
  1629. return true, nil
  1630. }
  1631. return false, nil
  1632. }
  1633. if err := wait.PollImmediate(framework.Poll, timeout, pollFunc); err != nil {
  1634. framework.Failf("Timeout waiting for service %q to have a load balancer", name)
  1635. }
  1636. return service
  1637. }
  1638. func (j *ServiceTestJig) WaitForLoadBalancerDestroyOrFail(namespace, name string, ip string, port int, timeout time.Duration) *api.Service {
  1639. // TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
  1640. defer func() {
  1641. if err := framework.EnsureLoadBalancerResourcesDeleted(ip, strconv.Itoa(port)); err != nil {
  1642. framework.Logf("Failed to delete cloud resources for service: %s %d (%v)", ip, port, err)
  1643. }
  1644. }()
  1645. var service *api.Service
  1646. framework.Logf("Waiting up to %v for service %q to have no LoadBalancer", timeout, name)
  1647. pollFunc := func() (bool, error) {
  1648. svc, err := j.Client.Services(namespace).Get(name)
  1649. if err != nil {
  1650. return false, err
  1651. }
  1652. if len(svc.Status.LoadBalancer.Ingress) == 0 {
  1653. service = svc
  1654. return true, nil
  1655. }
  1656. return false, nil
  1657. }
  1658. if err := wait.PollImmediate(framework.Poll, timeout, pollFunc); err != nil {
  1659. framework.Failf("Timeout waiting for service %q to have no load balancer", name)
  1660. }
  1661. return service
  1662. }
  1663. func (j *ServiceTestJig) TestReachableHTTP(host string, port int, timeout time.Duration) {
  1664. if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testReachableHTTP(host, port, "/echo?msg=hello", "hello") }); err != nil {
  1665. framework.Failf("Could not reach HTTP service through %v:%v after %v: %v", host, port, timeout, err)
  1666. }
  1667. }
  1668. func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout time.Duration) {
  1669. if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testNotReachableHTTP(host, port) }); err != nil {
  1670. framework.Failf("Could still reach HTTP service through %v:%v after %v: %v", host, port, timeout, err)
  1671. }
  1672. }
  1673. func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) {
  1674. if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testReachableUDP(host, port, "echo hello", "hello") }); err != nil {
  1675. framework.Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
  1676. }
  1677. }
  1678. func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time.Duration) {
  1679. if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testNotReachableUDP(host, port, "echo hello") }); err != nil {
  1680. framework.Failf("Could still reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
  1681. }
  1682. }
  1683. func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
  1684. var body bytes.Buffer
  1685. if err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) { return testReachableHTTPWithContent(host, port, url, "", &body) }); err != nil {
  1686. framework.Failf("Could not reach HTTP service through %v:%v/%v after %v: %v", host, port, url, timeout, err)
  1687. return body
  1688. }
  1689. return body
  1690. }
  1691. func (j *ServiceTestJig) TestHTTPHealthCheckNodePort(host string, port int, request string, expectedSuccess bool) {
  1692. success, err := testHTTPHealthCheckNodePort(host, port, request)
  1693. if expectedSuccess && success {
  1694. framework.Logf("HealthCheck successful for node %v:%v, as expected", host, port)
  1695. return
  1696. } else if !expectedSuccess && (!success || err != nil) {
  1697. framework.Logf("HealthCheck failed for node %v:%v, as expected", host, port)
  1698. return
  1699. } else if expectedSuccess {
  1700. framework.Failf("HealthCheck NodePort incorrectly reporting unhealthy on %v:%v: %v", host, port, err)
  1701. }
  1702. framework.Failf("Unexpected HealthCheck NodePort still reporting healthy %v:%v: %v", host, port, err)
  1703. }
  1704. func getIngressPoint(ing *api.LoadBalancerIngress) string {
  1705. host := ing.IP
  1706. if host == "" {
  1707. host = ing.Hostname
  1708. }
  1709. return host
  1710. }
  1711. // newRCTemplate returns the default api.ReplicationController object for
  1712. // this jig, but does not actually create the RC. The default RC has the same
  1713. // name as the jig and runs the "netexec" container.
  1714. func (j *ServiceTestJig) newRCTemplate(namespace string) *api.ReplicationController {
  1715. rc := &api.ReplicationController{
  1716. ObjectMeta: api.ObjectMeta{
  1717. Namespace: namespace,
  1718. Name: j.Name,
  1719. Labels: j.Labels,
  1720. },
  1721. Spec: api.ReplicationControllerSpec{
  1722. Replicas: 1,
  1723. Selector: j.Labels,
  1724. Template: &api.PodTemplateSpec{
  1725. ObjectMeta: api.ObjectMeta{
  1726. Labels: j.Labels,
  1727. },
  1728. Spec: api.PodSpec{
  1729. Containers: []api.Container{
  1730. {
  1731. Name: "netexec",
  1732. Image: "gcr.io/google_containers/netexec:1.6",
  1733. Args: []string{"--http-port=80", "--udp-port=80"},
  1734. ReadinessProbe: &api.Probe{
  1735. PeriodSeconds: 3,
  1736. Handler: api.Handler{
  1737. HTTPGet: &api.HTTPGetAction{
  1738. Port: intstr.FromInt(80),
  1739. Path: "/hostName",
  1740. },
  1741. },
  1742. },
  1743. },
  1744. },
  1745. TerminationGracePeriodSeconds: new(int64),
  1746. },
  1747. },
  1748. },
  1749. }
  1750. return rc
  1751. }
  1752. // RunOrFail creates a ReplicationController and Pod(s) and waits for the
  1753. // Pod(s) to be running. Callers can provide a function to tweak the RC object
  1754. // before it is created.
  1755. func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *api.ReplicationController)) *api.ReplicationController {
  1756. rc := j.newRCTemplate(namespace)
  1757. if tweak != nil {
  1758. tweak(rc)
  1759. }
  1760. result, err := j.Client.ReplicationControllers(namespace).Create(rc)
  1761. if err != nil {
  1762. framework.Failf("Failed to created RC %q: %v", rc.Name, err)
  1763. }
  1764. pods, err := j.waitForPodsCreated(namespace, int(rc.Spec.Replicas))
  1765. if err != nil {
  1766. framework.Failf("Failed to create pods: %v", err)
  1767. }
  1768. if err := j.waitForPodsReady(namespace, pods); err != nil {
  1769. framework.Failf("Failed waiting for pods to be running: %v", err)
  1770. }
  1771. return result
  1772. }
  1773. func (j *ServiceTestJig) waitForPodsCreated(namespace string, replicas int) ([]string, error) {
  1774. timeout := 2 * time.Minute
  1775. // List the pods, making sure we observe all the replicas.
  1776. label := labels.SelectorFromSet(labels.Set(j.Labels))
  1777. framework.Logf("Waiting up to %v for %d pods to be created", timeout, replicas)
  1778. for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
  1779. options := api.ListOptions{LabelSelector: label}
  1780. pods, err := j.Client.Pods(namespace).List(options)
  1781. if err != nil {
  1782. return nil, err
  1783. }
  1784. found := []string{}
  1785. for _, pod := range pods.Items {
  1786. if pod.DeletionTimestamp != nil {
  1787. continue
  1788. }
  1789. found = append(found, pod.Name)
  1790. }
  1791. if len(found) == replicas {
  1792. framework.Logf("Found all %d pods", replicas)
  1793. return found, nil
  1794. }
  1795. framework.Logf("Found %d/%d pods - will retry", len(found), replicas)
  1796. }
  1797. return nil, fmt.Errorf("Timeout waiting for %d pods to be created", replicas)
  1798. }
  1799. func (j *ServiceTestJig) waitForPodsReady(namespace string, pods []string) error {
  1800. timeout := 2 * time.Minute
  1801. if !framework.CheckPodsRunningReady(j.Client, namespace, pods, timeout) {
  1802. return fmt.Errorf("Timeout waiting for %d pods to be ready", len(pods))
  1803. }
  1804. return nil
  1805. }
  1806. // Simple helper class to avoid too much boilerplate in tests
  1807. type ServiceTestFixture struct {
  1808. ServiceName string
  1809. Namespace string
  1810. Client *client.Client
  1811. TestId string
  1812. Labels map[string]string
  1813. rcs map[string]bool
  1814. services map[string]bool
  1815. name string
  1816. image string
  1817. }
  1818. func NewServerTest(client *client.Client, namespace string, serviceName string) *ServiceTestFixture {
  1819. t := &ServiceTestFixture{}
  1820. t.Client = client
  1821. t.Namespace = namespace
  1822. t.ServiceName = serviceName
  1823. t.TestId = t.ServiceName + "-" + string(uuid.NewUUID())
  1824. t.Labels = map[string]string{
  1825. "testid": t.TestId,
  1826. }
  1827. t.rcs = make(map[string]bool)
  1828. t.services = make(map[string]bool)
  1829. t.name = "webserver"
  1830. t.image = "gcr.io/google_containers/test-webserver:e2e"
  1831. return t
  1832. }
  1833. // Build default config for a service (which can then be changed)
  1834. func (t *ServiceTestFixture) BuildServiceSpec() *api.Service {
  1835. service := &api.Service{
  1836. ObjectMeta: api.ObjectMeta{
  1837. Name: t.ServiceName,
  1838. Namespace: t.Namespace,
  1839. },
  1840. Spec: api.ServiceSpec{
  1841. Selector: t.Labels,
  1842. Ports: []api.ServicePort{{
  1843. Port: 80,
  1844. TargetPort: intstr.FromInt(80),
  1845. }},
  1846. },
  1847. }
  1848. return service
  1849. }
  1850. // CreateWebserverRC creates rc-backed pods with the well-known webserver
  1851. // configuration and records it for cleanup.
  1852. func (t *ServiceTestFixture) CreateWebserverRC(replicas int32) *api.ReplicationController {
  1853. rcSpec := rcByNamePort(t.name, replicas, t.image, 80, api.ProtocolTCP, t.Labels)
  1854. rcAct, err := t.createRC(rcSpec)
  1855. if err != nil {
  1856. framework.Failf("Failed to create rc %s: %v", rcSpec.Name, err)
  1857. }
  1858. if err := framework.VerifyPods(t.Client, t.Namespace, t.name, false, replicas); err != nil {
  1859. framework.Failf("Failed to create %d pods with name %s: %v", replicas, t.name, err)
  1860. }
  1861. return rcAct
  1862. }
  1863. // createRC creates a replication controller and records it for cleanup.
  1864. func (t *ServiceTestFixture) createRC(rc *api.ReplicationController) (*api.ReplicationController, error) {
  1865. rc, err := t.Client.ReplicationControllers(t.Namespace).Create(rc)
  1866. if err == nil {
  1867. t.rcs[rc.Name] = true
  1868. }
  1869. return rc, err
  1870. }
  1871. // Create a service, and record it for cleanup
  1872. func (t *ServiceTestFixture) CreateService(service *api.Service) (*api.Service, error) {
  1873. result, err := t.Client.Services(t.Namespace).Create(service)
  1874. if err == nil {
  1875. t.services[service.Name] = true
  1876. }
  1877. return result, err
  1878. }
  1879. // Delete a service, and remove it from the cleanup list
  1880. func (t *ServiceTestFixture) DeleteService(serviceName string) error {
  1881. err := t.Client.Services(t.Namespace).Delete(serviceName)
  1882. if err == nil {
  1883. delete(t.services, serviceName)
  1884. }
  1885. return err
  1886. }
  1887. func (t *ServiceTestFixture) Cleanup() []error {
  1888. var errs []error
  1889. for rcName := range t.rcs {
  1890. By("stopping RC " + rcName + " in namespace " + t.Namespace)
  1891. // First, resize the RC to 0.
  1892. old, err := t.Client.ReplicationControllers(t.Namespace).Get(rcName)
  1893. if err != nil {
  1894. errs = append(errs, err)
  1895. }
  1896. old.Spec.Replicas = 0
  1897. if _, err := t.Client.ReplicationControllers(t.Namespace).Update(old); err != nil {
  1898. errs = append(errs, err)
  1899. }
  1900. // TODO(mikedanese): Wait.
  1901. // Then, delete the RC altogether.
  1902. if err := t.Client.ReplicationControllers(t.Namespace).Delete(rcName, nil); err != nil {
  1903. errs = append(errs, err)
  1904. }
  1905. }
  1906. for serviceName := range t.services {
  1907. By("deleting service " + serviceName + " in namespace " + t.Namespace)
  1908. err := t.Client.Services(t.Namespace).Delete(serviceName)
  1909. if err != nil {
  1910. errs = append(errs, err)
  1911. }
  1912. }
  1913. return errs
  1914. }