resize_nodes.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package e2e
  14. import (
  15. "fmt"
  16. "os/exec"
  17. "regexp"
  18. "strings"
  19. "time"
  20. "k8s.io/kubernetes/pkg/api"
  21. "k8s.io/kubernetes/pkg/api/unversioned"
  22. "k8s.io/kubernetes/pkg/apimachinery/registered"
  23. client "k8s.io/kubernetes/pkg/client/unversioned"
  24. "k8s.io/kubernetes/pkg/labels"
  25. "k8s.io/kubernetes/pkg/util/intstr"
  26. "k8s.io/kubernetes/test/e2e/framework"
  27. "github.com/aws/aws-sdk-go/aws/session"
  28. "github.com/aws/aws-sdk-go/service/autoscaling"
  29. . "github.com/onsi/ginkgo"
  30. . "github.com/onsi/gomega"
  31. "k8s.io/kubernetes/pkg/client/cache"
  32. awscloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
  33. controllerframework "k8s.io/kubernetes/pkg/controller/framework"
  34. "k8s.io/kubernetes/pkg/fields"
  35. "k8s.io/kubernetes/pkg/runtime"
  36. "k8s.io/kubernetes/pkg/watch"
  37. )
  38. const (
  39. serveHostnameImage = "gcr.io/google_containers/serve_hostname:v1.4"
  40. resizeNodeReadyTimeout = 2 * time.Minute
  41. resizeNodeNotReadyTimeout = 2 * time.Minute
  42. nodeReadinessTimeout = 3 * time.Minute
  43. podNotReadyTimeout = 1 * time.Minute
  44. podReadyTimeout = 2 * time.Minute
  45. testPort = 9376
  46. )
  47. func ResizeGroup(group string, size int32) error {
  48. if framework.TestContext.ReportDir != "" {
  49. framework.CoreDump(framework.TestContext.ReportDir)
  50. defer framework.CoreDump(framework.TestContext.ReportDir)
  51. }
  52. if framework.TestContext.Provider == "gce" || framework.TestContext.Provider == "gke" {
  53. // TODO: make this hit the compute API directly instead of shelling out to gcloud.
  54. // TODO: make gce/gke implement InstanceGroups, so we can eliminate the per-provider logic
  55. output, err := exec.Command("gcloud", "compute", "instance-groups", "managed", "resize",
  56. group, fmt.Sprintf("--size=%v", size),
  57. "--project="+framework.TestContext.CloudConfig.ProjectID, "--zone="+framework.TestContext.CloudConfig.Zone).CombinedOutput()
  58. if err != nil {
  59. framework.Logf("Failed to resize node instance group: %v", string(output))
  60. }
  61. return err
  62. } else if framework.TestContext.Provider == "aws" {
  63. client := autoscaling.New(session.New())
  64. return awscloud.ResizeInstanceGroup(client, group, int(size))
  65. } else {
  66. return fmt.Errorf("Provider does not support InstanceGroups")
  67. }
  68. }
  69. func GetGroupNodes(group string) ([]string, error) {
  70. if framework.TestContext.Provider == "gce" || framework.TestContext.Provider == "gke" {
  71. // TODO: make this hit the compute API directly instead of shelling out to gcloud.
  72. // TODO: make gce/gke implement InstanceGroups, so we can eliminate the per-provider logic
  73. output, err := exec.Command("gcloud", "compute", "instance-groups", "managed",
  74. "list-instances", group, "--project="+framework.TestContext.CloudConfig.ProjectID,
  75. "--zone="+framework.TestContext.CloudConfig.Zone).CombinedOutput()
  76. if err != nil {
  77. return nil, err
  78. }
  79. re := regexp.MustCompile(".*RUNNING")
  80. lines := re.FindAllString(string(output), -1)
  81. for i, line := range lines {
  82. lines[i] = line[:strings.Index(line, " ")]
  83. }
  84. return lines, nil
  85. } else {
  86. return nil, fmt.Errorf("provider does not support InstanceGroups")
  87. }
  88. }
  89. func GroupSize(group string) (int, error) {
  90. if framework.TestContext.Provider == "gce" || framework.TestContext.Provider == "gke" {
  91. // TODO: make this hit the compute API directly instead of shelling out to gcloud.
  92. // TODO: make gce/gke implement InstanceGroups, so we can eliminate the per-provider logic
  93. output, err := exec.Command("gcloud", "compute", "instance-groups", "managed",
  94. "list-instances", group, "--project="+framework.TestContext.CloudConfig.ProjectID,
  95. "--zone="+framework.TestContext.CloudConfig.Zone).CombinedOutput()
  96. if err != nil {
  97. return -1, err
  98. }
  99. re := regexp.MustCompile("RUNNING")
  100. return len(re.FindAllString(string(output), -1)), nil
  101. } else if framework.TestContext.Provider == "aws" {
  102. client := autoscaling.New(session.New())
  103. instanceGroup, err := awscloud.DescribeInstanceGroup(client, group)
  104. if err != nil {
  105. return -1, fmt.Errorf("error describing instance group: %v", err)
  106. }
  107. if instanceGroup == nil {
  108. return -1, fmt.Errorf("instance group not found: %s", group)
  109. }
  110. return instanceGroup.CurrentSize()
  111. } else {
  112. return -1, fmt.Errorf("provider does not support InstanceGroups")
  113. }
  114. }
  115. func WaitForGroupSize(group string, size int32) error {
  116. timeout := 10 * time.Minute
  117. for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) {
  118. currentSize, err := GroupSize(group)
  119. if err != nil {
  120. framework.Logf("Failed to get node instance group size: %v", err)
  121. continue
  122. }
  123. if currentSize != int(size) {
  124. framework.Logf("Waiting for node instance group size %d, current size %d", size, currentSize)
  125. continue
  126. }
  127. framework.Logf("Node instance group has reached the desired size %d", size)
  128. return nil
  129. }
  130. return fmt.Errorf("timeout waiting %v for node instance group size to be %d", timeout, size)
  131. }
  132. func svcByName(name string, port int) *api.Service {
  133. return &api.Service{
  134. ObjectMeta: api.ObjectMeta{
  135. Name: name,
  136. },
  137. Spec: api.ServiceSpec{
  138. Type: api.ServiceTypeNodePort,
  139. Selector: map[string]string{
  140. "name": name,
  141. },
  142. Ports: []api.ServicePort{{
  143. Port: int32(port),
  144. TargetPort: intstr.FromInt(port),
  145. }},
  146. },
  147. }
  148. }
  149. func newSVCByName(c *client.Client, ns, name string) error {
  150. _, err := c.Services(ns).Create(svcByName(name, testPort))
  151. return err
  152. }
  153. func podOnNode(podName, nodeName string, image string) *api.Pod {
  154. return &api.Pod{
  155. ObjectMeta: api.ObjectMeta{
  156. Name: podName,
  157. Labels: map[string]string{
  158. "name": podName,
  159. },
  160. },
  161. Spec: api.PodSpec{
  162. Containers: []api.Container{
  163. {
  164. Name: podName,
  165. Image: image,
  166. Ports: []api.ContainerPort{{ContainerPort: 9376}},
  167. },
  168. },
  169. NodeName: nodeName,
  170. RestartPolicy: api.RestartPolicyNever,
  171. },
  172. }
  173. }
  174. func newPodOnNode(c *client.Client, namespace, podName, nodeName string) error {
  175. pod, err := c.Pods(namespace).Create(podOnNode(podName, nodeName, serveHostnameImage))
  176. if err == nil {
  177. framework.Logf("Created pod %s on node %s", pod.ObjectMeta.Name, nodeName)
  178. } else {
  179. framework.Logf("Failed to create pod %s on node %s: %v", podName, nodeName, err)
  180. }
  181. return err
  182. }
  183. func rcByName(name string, replicas int32, image string, labels map[string]string) *api.ReplicationController {
  184. return rcByNameContainer(name, replicas, image, labels, api.Container{
  185. Name: name,
  186. Image: image,
  187. })
  188. }
  189. func rcByNamePort(name string, replicas int32, image string, port int, protocol api.Protocol, labels map[string]string) *api.ReplicationController {
  190. return rcByNameContainer(name, replicas, image, labels, api.Container{
  191. Name: name,
  192. Image: image,
  193. Ports: []api.ContainerPort{{ContainerPort: int32(port), Protocol: protocol}},
  194. })
  195. }
  196. func rcByNameContainer(name string, replicas int32, image string, labels map[string]string, c api.Container) *api.ReplicationController {
  197. // Add "name": name to the labels, overwriting if it exists.
  198. labels["name"] = name
  199. gracePeriod := int64(0)
  200. return &api.ReplicationController{
  201. TypeMeta: unversioned.TypeMeta{
  202. Kind: "ReplicationController",
  203. APIVersion: registered.GroupOrDie(api.GroupName).GroupVersion.String(),
  204. },
  205. ObjectMeta: api.ObjectMeta{
  206. Name: name,
  207. },
  208. Spec: api.ReplicationControllerSpec{
  209. Replicas: replicas,
  210. Selector: map[string]string{
  211. "name": name,
  212. },
  213. Template: &api.PodTemplateSpec{
  214. ObjectMeta: api.ObjectMeta{
  215. Labels: labels,
  216. },
  217. Spec: api.PodSpec{
  218. Containers: []api.Container{c},
  219. TerminationGracePeriodSeconds: &gracePeriod,
  220. },
  221. },
  222. },
  223. }
  224. }
  225. // newRCByName creates a replication controller with a selector by name of name.
  226. func newRCByName(c *client.Client, ns, name string, replicas int32) (*api.ReplicationController, error) {
  227. By(fmt.Sprintf("creating replication controller %s", name))
  228. return c.ReplicationControllers(ns).Create(rcByNamePort(
  229. name, replicas, serveHostnameImage, 9376, api.ProtocolTCP, map[string]string{}))
  230. }
  231. func resizeRC(c *client.Client, ns, name string, replicas int32) error {
  232. rc, err := c.ReplicationControllers(ns).Get(name)
  233. if err != nil {
  234. return err
  235. }
  236. rc.Spec.Replicas = replicas
  237. _, err = c.ReplicationControllers(rc.Namespace).Update(rc)
  238. return err
  239. }
  240. func getMaster(c *client.Client) string {
  241. master := ""
  242. switch framework.TestContext.Provider {
  243. case "gce":
  244. eps, err := c.Endpoints(api.NamespaceDefault).Get("kubernetes")
  245. if err != nil {
  246. framework.Failf("Fail to get kubernetes endpoinds: %v", err)
  247. }
  248. if len(eps.Subsets) != 1 || len(eps.Subsets[0].Addresses) != 1 {
  249. framework.Failf("There are more than 1 endpoints for kubernetes service: %+v", eps)
  250. }
  251. master = eps.Subsets[0].Addresses[0].IP
  252. case "gke":
  253. master = strings.TrimPrefix(framework.TestContext.Host, "https://")
  254. case "aws":
  255. // TODO(justinsb): Avoid hardcoding this.
  256. master = "172.20.0.9"
  257. default:
  258. framework.Failf("This test is not supported for provider %s and should be disabled", framework.TestContext.Provider)
  259. }
  260. return master
  261. }
  262. // Return node external IP concatenated with port 22 for ssh
  263. // e.g. 1.2.3.4:22
  264. func getNodeExternalIP(node *api.Node) string {
  265. framework.Logf("Getting external IP address for %s", node.Name)
  266. host := ""
  267. for _, a := range node.Status.Addresses {
  268. if a.Type == api.NodeExternalIP {
  269. host = a.Address + ":22"
  270. break
  271. }
  272. }
  273. if host == "" {
  274. framework.Failf("Couldn't get the external IP of host %s with addresses %v", node.Name, node.Status.Addresses)
  275. }
  276. return host
  277. }
  278. // Blocks outgoing network traffic on 'node'. Then verifies that 'podNameToDisappear',
  279. // that belongs to replication controller 'rcName', really disappeared.
  280. // Finally, it checks that the replication controller recreates the
  281. // pods on another node and that now the number of replicas is equal 'replicas'.
  282. // At the end (even in case of errors), the network traffic is brought back to normal.
  283. // This function executes commands on a node so it will work only for some
  284. // environments.
  285. func performTemporaryNetworkFailure(c *client.Client, ns, rcName string, replicas int32, podNameToDisappear string, node *api.Node) {
  286. host := getNodeExternalIP(node)
  287. master := getMaster(c)
  288. By(fmt.Sprintf("block network traffic from node %s to the master", node.Name))
  289. defer func() {
  290. // This code will execute even if setting the iptables rule failed.
  291. // It is on purpose because we may have an error even if the new rule
  292. // had been inserted. (yes, we could look at the error code and ssh error
  293. // separately, but I prefer to stay on the safe side).
  294. By(fmt.Sprintf("Unblock network traffic from node %s to the master", node.Name))
  295. framework.UnblockNetwork(host, master)
  296. }()
  297. framework.Logf("Waiting %v to ensure node %s is ready before beginning test...", resizeNodeReadyTimeout, node.Name)
  298. if !framework.WaitForNodeToBe(c, node.Name, api.NodeReady, true, resizeNodeReadyTimeout) {
  299. framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
  300. }
  301. framework.BlockNetwork(host, master)
  302. framework.Logf("Waiting %v for node %s to be not ready after simulated network failure", resizeNodeNotReadyTimeout, node.Name)
  303. if !framework.WaitForNodeToBe(c, node.Name, api.NodeReady, false, resizeNodeNotReadyTimeout) {
  304. framework.Failf("Node %s did not become not-ready within %v", node.Name, resizeNodeNotReadyTimeout)
  305. }
  306. framework.Logf("Waiting for pod %s to be removed", podNameToDisappear)
  307. err := framework.WaitForRCPodToDisappear(c, ns, rcName, podNameToDisappear)
  308. Expect(err).NotTo(HaveOccurred())
  309. By("verifying whether the pod from the unreachable node is recreated")
  310. err = framework.VerifyPods(c, ns, rcName, true, replicas)
  311. Expect(err).NotTo(HaveOccurred())
  312. // network traffic is unblocked in a deferred function
  313. }
  314. func expectNodeReadiness(isReady bool, newNode chan *api.Node) {
  315. timeout := false
  316. expected := false
  317. timer := time.After(nodeReadinessTimeout)
  318. for !expected && !timeout {
  319. select {
  320. case n := <-newNode:
  321. if framework.IsNodeConditionSetAsExpected(n, api.NodeReady, isReady) {
  322. expected = true
  323. } else {
  324. framework.Logf("Observed node ready status is NOT %v as expected", isReady)
  325. }
  326. case <-timer:
  327. timeout = true
  328. }
  329. }
  330. if !expected {
  331. framework.Failf("Failed to observe node ready status change to %v", isReady)
  332. }
  333. }
  334. var _ = framework.KubeDescribe("Nodes [Disruptive]", func() {
  335. f := framework.NewDefaultFramework("resize-nodes")
  336. var systemPodsNo int32
  337. var c *client.Client
  338. var ns string
  339. ignoreLabels := framework.ImagePullerLabels
  340. var group string
  341. BeforeEach(func() {
  342. c = f.Client
  343. ns = f.Namespace.Name
  344. systemPods, err := framework.GetPodsInNamespace(c, ns, ignoreLabels)
  345. Expect(err).NotTo(HaveOccurred())
  346. systemPodsNo = int32(len(systemPods))
  347. if strings.Index(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") >= 0 {
  348. framework.Failf("Test dose not support cluster setup with more than one MIG: %s", framework.TestContext.CloudConfig.NodeInstanceGroup)
  349. } else {
  350. group = framework.TestContext.CloudConfig.NodeInstanceGroup
  351. }
  352. })
  353. // Slow issue #13323 (8 min)
  354. framework.KubeDescribe("Resize [Slow]", func() {
  355. var skipped bool
  356. BeforeEach(func() {
  357. skipped = true
  358. framework.SkipUnlessProviderIs("gce", "gke", "aws")
  359. framework.SkipUnlessNodeCountIsAtLeast(2)
  360. skipped = false
  361. })
  362. AfterEach(func() {
  363. if skipped {
  364. return
  365. }
  366. By("restoring the original node instance group size")
  367. if err := ResizeGroup(group, int32(framework.TestContext.CloudConfig.NumNodes)); err != nil {
  368. framework.Failf("Couldn't restore the original node instance group size: %v", err)
  369. }
  370. // In GKE, our current tunneling setup has the potential to hold on to a broken tunnel (from a
  371. // rebooted/deleted node) for up to 5 minutes before all tunnels are dropped and recreated.
  372. // Most tests make use of some proxy feature to verify functionality. So, if a reboot test runs
  373. // right before a test that tries to get logs, for example, we may get unlucky and try to use a
  374. // closed tunnel to a node that was recently rebooted. There's no good way to framework.Poll for proxies
  375. // being closed, so we sleep.
  376. //
  377. // TODO(cjcullen) reduce this sleep (#19314)
  378. if framework.ProviderIs("gke") {
  379. By("waiting 5 minutes for all dead tunnels to be dropped")
  380. time.Sleep(5 * time.Minute)
  381. }
  382. if err := WaitForGroupSize(group, int32(framework.TestContext.CloudConfig.NumNodes)); err != nil {
  383. framework.Failf("Couldn't restore the original node instance group size: %v", err)
  384. }
  385. if err := framework.WaitForClusterSize(c, framework.TestContext.CloudConfig.NumNodes, 10*time.Minute); err != nil {
  386. framework.Failf("Couldn't restore the original cluster size: %v", err)
  387. }
  388. // Many e2e tests assume that the cluster is fully healthy before they start. Wait until
  389. // the cluster is restored to health.
  390. By("waiting for system pods to successfully restart")
  391. err := framework.WaitForPodsRunningReady(c, api.NamespaceSystem, systemPodsNo, framework.PodReadyBeforeTimeout, ignoreLabels)
  392. Expect(err).NotTo(HaveOccurred())
  393. By("waiting for image prepulling pods to complete")
  394. framework.WaitForPodsSuccess(c, api.NamespaceSystem, framework.ImagePullerLabels, imagePrePullingTimeout)
  395. })
  396. It("should be able to delete nodes", func() {
  397. // Create a replication controller for a service that serves its hostname.
  398. // The source for the Docker container kubernetes/serve_hostname is in contrib/for-demos/serve_hostname
  399. name := "my-hostname-delete-node"
  400. replicas := int32(framework.TestContext.CloudConfig.NumNodes)
  401. newRCByName(c, ns, name, replicas)
  402. err := framework.VerifyPods(c, ns, name, true, replicas)
  403. Expect(err).NotTo(HaveOccurred())
  404. By(fmt.Sprintf("decreasing cluster size to %d", replicas-1))
  405. err = ResizeGroup(group, replicas-1)
  406. Expect(err).NotTo(HaveOccurred())
  407. err = WaitForGroupSize(group, replicas-1)
  408. Expect(err).NotTo(HaveOccurred())
  409. err = framework.WaitForClusterSize(c, int(replicas-1), 10*time.Minute)
  410. Expect(err).NotTo(HaveOccurred())
  411. By("verifying whether the pods from the removed node are recreated")
  412. err = framework.VerifyPods(c, ns, name, true, replicas)
  413. Expect(err).NotTo(HaveOccurred())
  414. })
  415. // TODO: Bug here - testName is not correct
  416. It("should be able to add nodes", func() {
  417. // Create a replication controller for a service that serves its hostname.
  418. // The source for the Docker container kubernetes/serve_hostname is in contrib/for-demos/serve_hostname
  419. name := "my-hostname-add-node"
  420. newSVCByName(c, ns, name)
  421. replicas := int32(framework.TestContext.CloudConfig.NumNodes)
  422. newRCByName(c, ns, name, replicas)
  423. err := framework.VerifyPods(c, ns, name, true, replicas)
  424. Expect(err).NotTo(HaveOccurred())
  425. By(fmt.Sprintf("increasing cluster size to %d", replicas+1))
  426. err = ResizeGroup(group, replicas+1)
  427. Expect(err).NotTo(HaveOccurred())
  428. err = WaitForGroupSize(group, replicas+1)
  429. Expect(err).NotTo(HaveOccurred())
  430. err = framework.WaitForClusterSize(c, int(replicas+1), 10*time.Minute)
  431. Expect(err).NotTo(HaveOccurred())
  432. By(fmt.Sprintf("increasing size of the replication controller to %d and verifying all pods are running", replicas+1))
  433. err = resizeRC(c, ns, name, replicas+1)
  434. Expect(err).NotTo(HaveOccurred())
  435. err = framework.VerifyPods(c, ns, name, true, replicas+1)
  436. Expect(err).NotTo(HaveOccurred())
  437. })
  438. })
  439. framework.KubeDescribe("Network", func() {
  440. Context("when a node becomes unreachable", func() {
  441. BeforeEach(func() {
  442. framework.SkipUnlessProviderIs("gce", "gke", "aws")
  443. framework.SkipUnlessNodeCountIsAtLeast(2)
  444. })
  445. // TODO marekbiskup 2015-06-19 #10085
  446. // This test has nothing to do with resizing nodes so it should be moved elsewhere.
  447. // Two things are tested here:
  448. // 1. pods from a uncontactable nodes are rescheduled
  449. // 2. when a node joins the cluster, it can host new pods.
  450. // Factor out the cases into two separate tests.
  451. It("[replication controller] recreates pods scheduled on the unreachable node "+
  452. "AND allows scheduling of pods on a node after it rejoins the cluster", func() {
  453. // Create a replication controller for a service that serves its hostname.
  454. // The source for the Docker container kubernetes/serve_hostname is in contrib/for-demos/serve_hostname
  455. name := "my-hostname-net"
  456. newSVCByName(c, ns, name)
  457. replicas := int32(framework.TestContext.CloudConfig.NumNodes)
  458. newRCByName(c, ns, name, replicas)
  459. err := framework.VerifyPods(c, ns, name, true, replicas)
  460. Expect(err).NotTo(HaveOccurred(), "Each pod should start running and responding")
  461. By("choose a node with at least one pod - we will block some network traffic on this node")
  462. label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
  463. options := api.ListOptions{LabelSelector: label}
  464. pods, err := c.Pods(ns).List(options) // list pods after all have been scheduled
  465. Expect(err).NotTo(HaveOccurred())
  466. nodeName := pods.Items[0].Spec.NodeName
  467. node, err := c.Nodes().Get(nodeName)
  468. Expect(err).NotTo(HaveOccurred())
  469. By(fmt.Sprintf("block network traffic from node %s", node.Name))
  470. performTemporaryNetworkFailure(c, ns, name, replicas, pods.Items[0].Name, node)
  471. framework.Logf("Waiting %v for node %s to be ready once temporary network failure ends", resizeNodeReadyTimeout, node.Name)
  472. if !framework.WaitForNodeToBeReady(c, node.Name, resizeNodeReadyTimeout) {
  473. framework.Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
  474. }
  475. // sleep a bit, to allow Watch in NodeController to catch up.
  476. time.Sleep(5 * time.Second)
  477. By("verify whether new pods can be created on the re-attached node")
  478. // increasing the RC size is not a valid way to test this
  479. // since we have no guarantees the pod will be scheduled on our node.
  480. additionalPod := "additionalpod"
  481. err = newPodOnNode(c, ns, additionalPod, node.Name)
  482. Expect(err).NotTo(HaveOccurred())
  483. err = framework.VerifyPods(c, ns, additionalPod, true, 1)
  484. Expect(err).NotTo(HaveOccurred())
  485. // verify that it is really on the requested node
  486. {
  487. pod, err := c.Pods(ns).Get(additionalPod)
  488. Expect(err).NotTo(HaveOccurred())
  489. if pod.Spec.NodeName != node.Name {
  490. framework.Logf("Pod %s found on invalid node: %s instead of %s", pod.Name, pod.Spec.NodeName, node.Name)
  491. }
  492. }
  493. })
  494. // What happens in this test:
  495. // Network traffic from a node to master is cut off to simulate network partition
  496. // Expect to observe:
  497. // 1. Node is marked NotReady after timeout by nodecontroller (40seconds)
  498. // 2. All pods on node are marked NotReady shortly after #1
  499. // 3. Node and pods return to Ready after connectivivty recovers
  500. It("All pods on the unreachable node should be marked as NotReady upon the node turn NotReady "+
  501. "AND all pods should be mark back to Ready when the node get back to Ready before pod eviction timeout", func() {
  502. By("choose a node - we will block all network traffic on this node")
  503. var podOpts api.ListOptions
  504. nodeOpts := api.ListOptions{}
  505. nodes, err := c.Nodes().List(nodeOpts)
  506. Expect(err).NotTo(HaveOccurred())
  507. framework.FilterNodes(nodes, func(node api.Node) bool {
  508. if !framework.IsNodeConditionSetAsExpected(&node, api.NodeReady, true) {
  509. return false
  510. }
  511. podOpts = api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, node.Name)}
  512. pods, err := c.Pods(api.NamespaceAll).List(podOpts)
  513. if err != nil || len(pods.Items) <= 0 {
  514. return false
  515. }
  516. return true
  517. })
  518. if len(nodes.Items) <= 0 {
  519. framework.Failf("No eligible node were found: %d", len(nodes.Items))
  520. }
  521. node := nodes.Items[0]
  522. podOpts = api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, node.Name)}
  523. if err = framework.WaitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, framework.PodRunningReady); err != nil {
  524. framework.Failf("Pods on node %s are not ready and running within %v: %v", node.Name, podReadyTimeout, err)
  525. }
  526. By("Set up watch on node status")
  527. nodeSelector := fields.OneTermEqualSelector("metadata.name", node.Name)
  528. stopCh := make(chan struct{})
  529. newNode := make(chan *api.Node)
  530. var controller *controllerframework.Controller
  531. _, controller = controllerframework.NewInformer(
  532. &cache.ListWatch{
  533. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  534. options.FieldSelector = nodeSelector
  535. return f.Client.Nodes().List(options)
  536. },
  537. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  538. options.FieldSelector = nodeSelector
  539. return f.Client.Nodes().Watch(options)
  540. },
  541. },
  542. &api.Node{},
  543. 0,
  544. controllerframework.ResourceEventHandlerFuncs{
  545. UpdateFunc: func(oldObj, newObj interface{}) {
  546. n, ok := newObj.(*api.Node)
  547. Expect(ok).To(Equal(true))
  548. newNode <- n
  549. },
  550. },
  551. )
  552. defer func() {
  553. // Will not explicitly close newNode channel here due to
  554. // race condition where stopCh and newNode are closed but informer onUpdate still executes.
  555. close(stopCh)
  556. }()
  557. go controller.Run(stopCh)
  558. By(fmt.Sprintf("Block traffic from node %s to the master", node.Name))
  559. host := getNodeExternalIP(&node)
  560. master := getMaster(c)
  561. defer func() {
  562. By(fmt.Sprintf("Unblock traffic from node %s to the master", node.Name))
  563. framework.UnblockNetwork(host, master)
  564. if CurrentGinkgoTestDescription().Failed {
  565. return
  566. }
  567. By("Expect to observe node and pod status change from NotReady to Ready after network connectivity recovers")
  568. expectNodeReadiness(true, newNode)
  569. if err = framework.WaitForMatchPodsCondition(c, podOpts, "Running and Ready", podReadyTimeout, framework.PodRunningReady); err != nil {
  570. framework.Failf("Pods on node %s did not become ready and running within %v: %v", node.Name, podReadyTimeout, err)
  571. }
  572. }()
  573. framework.BlockNetwork(host, master)
  574. By("Expect to observe node and pod status change from Ready to NotReady after network partition")
  575. expectNodeReadiness(false, newNode)
  576. if err = framework.WaitForMatchPodsCondition(c, podOpts, "NotReady", podNotReadyTimeout, framework.PodNotReady); err != nil {
  577. framework.Failf("Pods on node %s did not become NotReady within %v: %v", node.Name, podNotReadyTimeout, err)
  578. }
  579. })
  580. })
  581. })
  582. })