kubeproxy.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package e2e
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "io/ioutil"
  18. "net/http"
  19. "strconv"
  20. "strings"
  21. "time"
  22. . "github.com/onsi/ginkgo"
  23. . "github.com/onsi/gomega"
  24. api "k8s.io/kubernetes/pkg/api"
  25. "k8s.io/kubernetes/pkg/api/unversioned"
  26. "k8s.io/kubernetes/pkg/apimachinery/registered"
  27. client "k8s.io/kubernetes/pkg/client/unversioned"
  28. "k8s.io/kubernetes/pkg/labels"
  29. "k8s.io/kubernetes/pkg/util/intstr"
  30. utilnet "k8s.io/kubernetes/pkg/util/net"
  31. "k8s.io/kubernetes/pkg/util/uuid"
  32. "k8s.io/kubernetes/pkg/util/wait"
  33. "k8s.io/kubernetes/test/e2e/framework"
  34. )
  35. const (
  36. endpointHttpPort = 8080
  37. endpointUdpPort = 8081
  38. testContainerHttpPort = 8080
  39. clusterHttpPort = 80
  40. clusterUdpPort = 90
  41. nodeHttpPort = 32080
  42. nodeUdpPort = 32081
  43. loadBalancerHttpPort = 100
  44. netexecImageName = "gcr.io/google_containers/netexec:1.5"
  45. testPodName = "test-container-pod"
  46. hostTestPodName = "host-test-container-pod"
  47. nodePortServiceName = "node-port-service"
  48. loadBalancerServiceName = "load-balancer-service"
  49. enableLoadBalancerTest = false
  50. hitEndpointRetryDelay = 1 * time.Second
  51. // Number of retries to hit a given set of endpoints. Needs to be high
  52. // because we verify iptables statistical rr loadbalancing.
  53. testTries = 30
  54. )
  55. type KubeProxyTestConfig struct {
  56. testContainerPod *api.Pod
  57. hostTestContainerPod *api.Pod
  58. endpointPods []*api.Pod
  59. f *framework.Framework
  60. nodePortService *api.Service
  61. loadBalancerService *api.Service
  62. externalAddrs []string
  63. nodes []api.Node
  64. }
  65. var _ = framework.KubeDescribe("KubeProxy", func() {
  66. f := framework.NewDefaultFramework("e2e-kubeproxy")
  67. config := &KubeProxyTestConfig{
  68. f: f,
  69. }
  70. // Slow issue #14204 (10 min)
  71. It("should test kube-proxy [Slow]", func() {
  72. By("cleaning up any pre-existing namespaces used by this test")
  73. config.cleanup()
  74. By("Setting up for the tests")
  75. config.setup()
  76. //TODO Need to add hit externalIPs test
  77. By("TODO: Need to add hit externalIPs test")
  78. By("Hit Test with All Endpoints")
  79. config.hitAll()
  80. config.deleteNetProxyPod()
  81. By("Hit Test with Fewer Endpoints")
  82. config.hitAll()
  83. By("Deleting nodePortservice and ensuring that service cannot be hit")
  84. config.deleteNodePortService()
  85. config.hitNodePort(0) // expect 0 endpoints to be hit
  86. if enableLoadBalancerTest {
  87. By("Deleting loadBalancerService and ensuring that service cannot be hit")
  88. config.deleteLoadBalancerService()
  89. config.hitLoadBalancer(0) // expect 0 endpoints to be hit
  90. }
  91. })
  92. })
  93. func (config *KubeProxyTestConfig) hitAll() {
  94. By("Hitting endpoints from host and container")
  95. config.hitEndpoints()
  96. By("Hitting clusterIP from host and container")
  97. config.hitClusterIP(len(config.endpointPods))
  98. By("Hitting nodePort from host and container")
  99. config.hitNodePort(len(config.endpointPods))
  100. if enableLoadBalancerTest {
  101. By("Waiting for LoadBalancer Ingress Setup")
  102. config.waitForLoadBalancerIngressSetup()
  103. By("Hitting LoadBalancer")
  104. config.hitLoadBalancer(len(config.endpointPods))
  105. }
  106. }
  107. func (config *KubeProxyTestConfig) hitLoadBalancer(epCount int) {
  108. lbIP := config.loadBalancerService.Status.LoadBalancer.Ingress[0].IP
  109. hostNames := make(map[string]bool)
  110. tries := epCount*epCount + 5
  111. for i := 0; i < tries; i++ {
  112. transport := utilnet.SetTransportDefaults(&http.Transport{})
  113. httpClient := createHTTPClient(transport)
  114. resp, err := httpClient.Get(fmt.Sprintf("http://%s:%d/hostName", lbIP, loadBalancerHttpPort))
  115. if err == nil {
  116. defer resp.Body.Close()
  117. hostName, err := ioutil.ReadAll(resp.Body)
  118. if err == nil {
  119. hostNames[string(hostName)] = true
  120. }
  121. }
  122. transport.CloseIdleConnections()
  123. }
  124. Expect(len(hostNames)).To(BeNumerically("==", epCount), "LoadBalancer did not hit all pods")
  125. }
  126. func createHTTPClient(transport *http.Transport) *http.Client {
  127. client := &http.Client{
  128. Transport: transport,
  129. Timeout: 5 * time.Second,
  130. }
  131. return client
  132. }
  133. func (config *KubeProxyTestConfig) hitClusterIP(epCount int) {
  134. clusterIP := config.nodePortService.Spec.ClusterIP
  135. tries := epCount*epCount + testTries // if epCount == 0
  136. By("dialing(udp) node1 --> clusterIP:clusterUdpPort")
  137. config.dialFromNode("udp", clusterIP, clusterUdpPort, tries, epCount)
  138. By("dialing(http) node1 --> clusterIP:clusterHttpPort")
  139. config.dialFromNode("http", clusterIP, clusterHttpPort, tries, epCount)
  140. By("dialing(udp) test container --> clusterIP:clusterUdpPort")
  141. config.dialFromTestContainer("udp", clusterIP, clusterUdpPort, tries, epCount)
  142. By("dialing(http) test container --> clusterIP:clusterHttpPort")
  143. config.dialFromTestContainer("http", clusterIP, clusterHttpPort, tries, epCount)
  144. By("dialing(udp) endpoint container --> clusterIP:clusterUdpPort")
  145. config.dialFromEndpointContainer("udp", clusterIP, clusterUdpPort, tries, epCount)
  146. By("dialing(http) endpoint container --> clusterIP:clusterHttpPort")
  147. config.dialFromEndpointContainer("http", clusterIP, clusterHttpPort, tries, epCount)
  148. }
  149. func (config *KubeProxyTestConfig) hitNodePort(epCount int) {
  150. node1_IP := config.externalAddrs[0]
  151. tries := epCount*epCount + testTries // if epCount == 0
  152. By("dialing(udp) node1 --> node1:nodeUdpPort")
  153. config.dialFromNode("udp", node1_IP, nodeUdpPort, tries, epCount)
  154. By("dialing(http) node1 --> node1:nodeHttpPort")
  155. config.dialFromNode("http", node1_IP, nodeHttpPort, tries, epCount)
  156. By("dialing(udp) test container --> node1:nodeUdpPort")
  157. config.dialFromTestContainer("udp", node1_IP, nodeUdpPort, tries, epCount)
  158. By("dialing(http) test container --> node1:nodeHttpPort")
  159. config.dialFromTestContainer("http", node1_IP, nodeHttpPort, tries, epCount)
  160. By("dialing(udp) endpoint container --> node1:nodeUdpPort")
  161. config.dialFromEndpointContainer("udp", node1_IP, nodeUdpPort, tries, epCount)
  162. By("dialing(http) endpoint container --> node1:nodeHttpPort")
  163. config.dialFromEndpointContainer("http", node1_IP, nodeHttpPort, tries, epCount)
  164. By("dialing(udp) node --> 127.0.0.1:nodeUdpPort")
  165. config.dialFromNode("udp", "127.0.0.1", nodeUdpPort, tries, epCount)
  166. By("dialing(http) node --> 127.0.0.1:nodeHttpPort")
  167. config.dialFromNode("http", "127.0.0.1", nodeHttpPort, tries, epCount)
  168. node2_IP := config.externalAddrs[1]
  169. By("dialing(udp) node1 --> node2:nodeUdpPort")
  170. config.dialFromNode("udp", node2_IP, nodeUdpPort, tries, epCount)
  171. By("dialing(http) node1 --> node2:nodeHttpPort")
  172. config.dialFromNode("http", node2_IP, nodeHttpPort, tries, epCount)
  173. By("checking kube-proxy URLs")
  174. config.getSelfURL("/healthz", "ok")
  175. config.getSelfURL("/proxyMode", "iptables") // the default
  176. }
  177. func (config *KubeProxyTestConfig) hitEndpoints() {
  178. for _, endpointPod := range config.endpointPods {
  179. Expect(len(endpointPod.Status.PodIP)).To(BeNumerically(">", 0), "podIP is empty:%s", endpointPod.Status.PodIP)
  180. By("dialing(udp) endpointPodIP:endpointUdpPort from node1")
  181. config.dialFromNode("udp", endpointPod.Status.PodIP, endpointUdpPort, 5, 1)
  182. By("dialing(http) endpointPodIP:endpointHttpPort from node1")
  183. config.dialFromNode("http", endpointPod.Status.PodIP, endpointHttpPort, 5, 1)
  184. By("dialing(udp) endpointPodIP:endpointUdpPort from test container")
  185. config.dialFromTestContainer("udp", endpointPod.Status.PodIP, endpointUdpPort, 5, 1)
  186. By("dialing(http) endpointPodIP:endpointHttpPort from test container")
  187. config.dialFromTestContainer("http", endpointPod.Status.PodIP, endpointHttpPort, 5, 1)
  188. }
  189. }
  190. func (config *KubeProxyTestConfig) dialFromEndpointContainer(protocol, targetIP string, targetPort, tries, expectedCount int) {
  191. config.dialFromContainer(protocol, config.endpointPods[0].Status.PodIP, targetIP, endpointHttpPort, targetPort, tries, expectedCount)
  192. }
  193. func (config *KubeProxyTestConfig) dialFromTestContainer(protocol, targetIP string, targetPort, tries, expectedCount int) {
  194. config.dialFromContainer(protocol, config.testContainerPod.Status.PodIP, targetIP, testContainerHttpPort, targetPort, tries, expectedCount)
  195. }
  196. func (config *KubeProxyTestConfig) dialFromContainer(protocol, containerIP, targetIP string, containerHttpPort, targetPort, tries, expectedCount int) {
  197. cmd := fmt.Sprintf("curl -q 'http://%s:%d/dial?request=hostName&protocol=%s&host=%s&port=%d&tries=%d'",
  198. containerIP,
  199. containerHttpPort,
  200. protocol,
  201. targetIP,
  202. targetPort,
  203. tries)
  204. By(fmt.Sprintf("Dialing from container. Running command:%s", cmd))
  205. stdout := framework.RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, cmd)
  206. var output map[string][]string
  207. err := json.Unmarshal([]byte(stdout), &output)
  208. Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Could not unmarshal curl response: %s", stdout))
  209. hostNamesMap := array2map(output["responses"])
  210. Expect(len(hostNamesMap)).To(BeNumerically("==", expectedCount), fmt.Sprintf("Response was:%v", output))
  211. }
  212. func (config *KubeProxyTestConfig) dialFromNode(protocol, targetIP string, targetPort, tries, expectedCount int) {
  213. var cmd string
  214. if protocol == "udp" {
  215. cmd = fmt.Sprintf("echo 'hostName' | timeout -t 3 nc -w 1 -u %s %d", targetIP, targetPort)
  216. } else {
  217. cmd = fmt.Sprintf("curl -s --connect-timeout 1 http://%s:%d/hostName", targetIP, targetPort)
  218. }
  219. // TODO: This simply tells us that we can reach the endpoints. Check that
  220. // the probability of hitting a specific endpoint is roughly the same as
  221. // hitting any other.
  222. forLoop := fmt.Sprintf("for i in $(seq 1 %d); do %s; echo; sleep %v; done | grep -v '^\\s*$' |sort | uniq -c | wc -l", tries, cmd, hitEndpointRetryDelay)
  223. By(fmt.Sprintf("Dialing from node. command:%s", forLoop))
  224. stdout := framework.RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, forLoop)
  225. Expect(strconv.Atoi(strings.TrimSpace(stdout))).To(BeNumerically("==", expectedCount))
  226. }
  227. func (config *KubeProxyTestConfig) getSelfURL(path string, expected string) {
  228. cmd := fmt.Sprintf("curl -s --connect-timeout 1 http://localhost:10249%s", path)
  229. By(fmt.Sprintf("Getting kube-proxy self URL %s", path))
  230. stdout := framework.RunHostCmdOrDie(config.f.Namespace.Name, config.hostTestContainerPod.Name, cmd)
  231. Expect(strings.Contains(stdout, expected)).To(BeTrue())
  232. }
  233. func (config *KubeProxyTestConfig) createNetShellPodSpec(podName string, node string) *api.Pod {
  234. probe := &api.Probe{
  235. InitialDelaySeconds: 10,
  236. TimeoutSeconds: 30,
  237. PeriodSeconds: 10,
  238. SuccessThreshold: 1,
  239. FailureThreshold: 3,
  240. Handler: api.Handler{
  241. HTTPGet: &api.HTTPGetAction{
  242. Path: "/healthz",
  243. Port: intstr.IntOrString{IntVal: endpointHttpPort},
  244. },
  245. },
  246. }
  247. pod := &api.Pod{
  248. TypeMeta: unversioned.TypeMeta{
  249. Kind: "Pod",
  250. APIVersion: registered.GroupOrDie(api.GroupName).GroupVersion.String(),
  251. },
  252. ObjectMeta: api.ObjectMeta{
  253. Name: podName,
  254. Namespace: config.f.Namespace.Name,
  255. },
  256. Spec: api.PodSpec{
  257. Containers: []api.Container{
  258. {
  259. Name: "webserver",
  260. Image: netexecImageName,
  261. ImagePullPolicy: api.PullIfNotPresent,
  262. Command: []string{
  263. "/netexec",
  264. fmt.Sprintf("--http-port=%d", endpointHttpPort),
  265. fmt.Sprintf("--udp-port=%d", endpointUdpPort),
  266. },
  267. Ports: []api.ContainerPort{
  268. {
  269. Name: "http",
  270. ContainerPort: endpointHttpPort,
  271. },
  272. {
  273. Name: "udp",
  274. ContainerPort: endpointUdpPort,
  275. Protocol: api.ProtocolUDP,
  276. },
  277. },
  278. LivenessProbe: probe,
  279. ReadinessProbe: probe,
  280. },
  281. },
  282. NodeName: node,
  283. },
  284. }
  285. return pod
  286. }
  287. func (config *KubeProxyTestConfig) createTestPodSpec() *api.Pod {
  288. pod := &api.Pod{
  289. TypeMeta: unversioned.TypeMeta{
  290. Kind: "Pod",
  291. APIVersion: registered.GroupOrDie(api.GroupName).GroupVersion.String(),
  292. },
  293. ObjectMeta: api.ObjectMeta{
  294. Name: testPodName,
  295. Namespace: config.f.Namespace.Name,
  296. },
  297. Spec: api.PodSpec{
  298. Containers: []api.Container{
  299. {
  300. Name: "webserver",
  301. Image: netexecImageName,
  302. ImagePullPolicy: api.PullIfNotPresent,
  303. Command: []string{
  304. "/netexec",
  305. fmt.Sprintf("--http-port=%d", endpointHttpPort),
  306. fmt.Sprintf("--udp-port=%d", endpointUdpPort),
  307. },
  308. Ports: []api.ContainerPort{
  309. {
  310. Name: "http",
  311. ContainerPort: testContainerHttpPort,
  312. },
  313. },
  314. },
  315. },
  316. },
  317. }
  318. return pod
  319. }
  320. func (config *KubeProxyTestConfig) createNodePortService(selector map[string]string) {
  321. serviceSpec := &api.Service{
  322. ObjectMeta: api.ObjectMeta{
  323. Name: nodePortServiceName,
  324. },
  325. Spec: api.ServiceSpec{
  326. Type: api.ServiceTypeNodePort,
  327. Ports: []api.ServicePort{
  328. {Port: clusterHttpPort, Name: "http", Protocol: api.ProtocolTCP, NodePort: nodeHttpPort, TargetPort: intstr.FromInt(endpointHttpPort)},
  329. {Port: clusterUdpPort, Name: "udp", Protocol: api.ProtocolUDP, NodePort: nodeUdpPort, TargetPort: intstr.FromInt(endpointUdpPort)},
  330. },
  331. Selector: selector,
  332. },
  333. }
  334. config.nodePortService = config.createService(serviceSpec)
  335. }
  336. func (config *KubeProxyTestConfig) deleteNodePortService() {
  337. err := config.getServiceClient().Delete(config.nodePortService.Name)
  338. Expect(err).NotTo(HaveOccurred(), "error while deleting NodePortService. err:%v)", err)
  339. time.Sleep(15 * time.Second) // wait for kube-proxy to catch up with the service being deleted.
  340. }
  341. func (config *KubeProxyTestConfig) createLoadBalancerService(selector map[string]string) {
  342. serviceSpec := &api.Service{
  343. ObjectMeta: api.ObjectMeta{
  344. Name: loadBalancerServiceName,
  345. },
  346. Spec: api.ServiceSpec{
  347. Type: api.ServiceTypeLoadBalancer,
  348. Ports: []api.ServicePort{
  349. {Port: loadBalancerHttpPort, Name: "http", Protocol: "TCP", TargetPort: intstr.FromInt(endpointHttpPort)},
  350. },
  351. Selector: selector,
  352. },
  353. }
  354. config.createService(serviceSpec)
  355. }
  356. func (config *KubeProxyTestConfig) deleteLoadBalancerService() {
  357. go func() { config.getServiceClient().Delete(config.loadBalancerService.Name) }()
  358. time.Sleep(15 * time.Second) // wait for kube-proxy to catch up with the service being deleted.
  359. }
  360. func (config *KubeProxyTestConfig) waitForLoadBalancerIngressSetup() {
  361. err := wait.Poll(2*time.Second, 120*time.Second, func() (bool, error) {
  362. service, err := config.getServiceClient().Get(loadBalancerServiceName)
  363. if err != nil {
  364. return false, err
  365. } else {
  366. if len(service.Status.LoadBalancer.Ingress) > 0 {
  367. return true, nil
  368. } else {
  369. return false, fmt.Errorf("Service LoadBalancer Ingress was not setup.")
  370. }
  371. }
  372. })
  373. Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to setup Load Balancer Service. err:%v", err))
  374. config.loadBalancerService, _ = config.getServiceClient().Get(loadBalancerServiceName)
  375. }
  376. func (config *KubeProxyTestConfig) createTestPods() {
  377. testContainerPod := config.createTestPodSpec()
  378. hostTestContainerPod := framework.NewHostExecPodSpec(config.f.Namespace.Name, hostTestPodName)
  379. config.createPod(testContainerPod)
  380. config.createPod(hostTestContainerPod)
  381. framework.ExpectNoError(config.f.WaitForPodRunning(testContainerPod.Name))
  382. framework.ExpectNoError(config.f.WaitForPodRunning(hostTestContainerPod.Name))
  383. var err error
  384. config.testContainerPod, err = config.getPodClient().Get(testContainerPod.Name)
  385. if err != nil {
  386. framework.Failf("Failed to retrieve %s pod: %v", testContainerPod.Name, err)
  387. }
  388. config.hostTestContainerPod, err = config.getPodClient().Get(hostTestContainerPod.Name)
  389. if err != nil {
  390. framework.Failf("Failed to retrieve %s pod: %v", hostTestContainerPod.Name, err)
  391. }
  392. }
  393. func (config *KubeProxyTestConfig) createService(serviceSpec *api.Service) *api.Service {
  394. _, err := config.getServiceClient().Create(serviceSpec)
  395. Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
  396. err = framework.WaitForService(config.f.Client, config.f.Namespace.Name, serviceSpec.Name, true, 5*time.Second, 45*time.Second)
  397. Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("error while waiting for service:%s err: %v", serviceSpec.Name, err))
  398. createdService, err := config.getServiceClient().Get(serviceSpec.Name)
  399. Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Failed to create %s service: %v", serviceSpec.Name, err))
  400. return createdService
  401. }
  402. func (config *KubeProxyTestConfig) setup() {
  403. By("creating a selector")
  404. selectorName := "selector-" + string(uuid.NewUUID())
  405. serviceSelector := map[string]string{
  406. selectorName: "true",
  407. }
  408. By("Getting node addresses")
  409. framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.Client))
  410. nodeList := framework.GetReadySchedulableNodesOrDie(config.f.Client)
  411. config.externalAddrs = framework.NodeAddresses(nodeList, api.NodeExternalIP)
  412. if len(config.externalAddrs) < 2 {
  413. // fall back to legacy IPs
  414. config.externalAddrs = framework.NodeAddresses(nodeList, api.NodeLegacyHostIP)
  415. }
  416. Expect(len(config.externalAddrs)).To(BeNumerically(">=", 2), fmt.Sprintf("At least two nodes necessary with an external or LegacyHostIP"))
  417. config.nodes = nodeList.Items
  418. if enableLoadBalancerTest {
  419. By("Creating the LoadBalancer Service on top of the pods in kubernetes")
  420. config.createLoadBalancerService(serviceSelector)
  421. }
  422. By("Creating the service pods in kubernetes")
  423. podName := "netserver"
  424. config.endpointPods = config.createNetProxyPods(podName, serviceSelector)
  425. By("Creating the service on top of the pods in kubernetes")
  426. config.createNodePortService(serviceSelector)
  427. By("Creating test pods")
  428. config.createTestPods()
  429. }
  430. func (config *KubeProxyTestConfig) cleanup() {
  431. nsClient := config.getNamespacesClient()
  432. nsList, err := nsClient.List(api.ListOptions{})
  433. if err == nil {
  434. for _, ns := range nsList.Items {
  435. if strings.Contains(ns.Name, config.f.BaseName) && ns.Name != config.f.Namespace.Name {
  436. nsClient.Delete(ns.Name)
  437. }
  438. }
  439. }
  440. }
  441. func (config *KubeProxyTestConfig) createNetProxyPods(podName string, selector map[string]string) []*api.Pod {
  442. framework.ExpectNoError(framework.WaitForAllNodesSchedulable(config.f.Client))
  443. nodes := framework.GetReadySchedulableNodesOrDie(config.f.Client)
  444. // create pods, one for each node
  445. createdPods := make([]*api.Pod, 0, len(nodes.Items))
  446. for i, n := range nodes.Items {
  447. podName := fmt.Sprintf("%s-%d", podName, i)
  448. pod := config.createNetShellPodSpec(podName, n.Name)
  449. pod.ObjectMeta.Labels = selector
  450. createdPod := config.createPod(pod)
  451. createdPods = append(createdPods, createdPod)
  452. }
  453. // wait that all of them are up
  454. runningPods := make([]*api.Pod, 0, len(nodes.Items))
  455. for _, p := range createdPods {
  456. framework.ExpectNoError(config.f.WaitForPodReady(p.Name))
  457. rp, err := config.getPodClient().Get(p.Name)
  458. framework.ExpectNoError(err)
  459. runningPods = append(runningPods, rp)
  460. }
  461. return runningPods
  462. }
  463. func (config *KubeProxyTestConfig) deleteNetProxyPod() {
  464. pod := config.endpointPods[0]
  465. config.getPodClient().Delete(pod.Name, api.NewDeleteOptions(0))
  466. config.endpointPods = config.endpointPods[1:]
  467. // wait for pod being deleted.
  468. err := framework.WaitForPodToDisappear(config.f.Client, config.f.Namespace.Name, pod.Name, labels.Everything(), time.Second, wait.ForeverTestTimeout)
  469. if err != nil {
  470. framework.Failf("Failed to delete %s pod: %v", pod.Name, err)
  471. }
  472. // wait for endpoint being removed.
  473. err = framework.WaitForServiceEndpointsNum(config.f.Client, config.f.Namespace.Name, nodePortServiceName, len(config.endpointPods), time.Second, wait.ForeverTestTimeout)
  474. if err != nil {
  475. framework.Failf("Failed to remove endpoint from service: %s", nodePortServiceName)
  476. }
  477. // wait for kube-proxy to catch up with the pod being deleted.
  478. time.Sleep(5 * time.Second)
  479. }
  480. func (config *KubeProxyTestConfig) createPod(pod *api.Pod) *api.Pod {
  481. createdPod, err := config.getPodClient().Create(pod)
  482. if err != nil {
  483. framework.Failf("Failed to create %s pod: %v", pod.Name, err)
  484. }
  485. return createdPod
  486. }
  487. func (config *KubeProxyTestConfig) getPodClient() client.PodInterface {
  488. return config.f.Client.Pods(config.f.Namespace.Name)
  489. }
  490. func (config *KubeProxyTestConfig) getServiceClient() client.ServiceInterface {
  491. return config.f.Client.Services(config.f.Namespace.Name)
  492. }
  493. func (config *KubeProxyTestConfig) getNamespacesClient() client.NamespaceInterface {
  494. return config.f.Client.Namespaces()
  495. }
  496. func array2map(arr []string) map[string]bool {
  497. retval := make(map[string]bool)
  498. if len(arr) == 0 {
  499. return retval
  500. }
  501. for _, str := range arr {
  502. retval[str] = true
  503. }
  504. return retval
  505. }