kubelet.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package e2e
  14. import (
  15. "fmt"
  16. "strings"
  17. "time"
  18. "k8s.io/kubernetes/pkg/api"
  19. client "k8s.io/kubernetes/pkg/client/unversioned"
  20. "k8s.io/kubernetes/pkg/util/sets"
  21. "k8s.io/kubernetes/pkg/util/uuid"
  22. "k8s.io/kubernetes/pkg/util/wait"
  23. "k8s.io/kubernetes/test/e2e/framework"
  24. . "github.com/onsi/ginkgo"
  25. . "github.com/onsi/gomega"
  26. )
  27. const (
  28. // Interval to framework.Poll /runningpods on a node
  29. pollInterval = 1 * time.Second
  30. // Interval to framework.Poll /stats/container on a node
  31. containerStatsPollingInterval = 5 * time.Second
  32. // Maximum number of nodes that we constraint to
  33. maxNodesToCheck = 10
  34. )
  35. // getPodMatches returns a set of pod names on the given node that matches the
  36. // podNamePrefix and namespace.
  37. func getPodMatches(c *client.Client, nodeName string, podNamePrefix string, namespace string) sets.String {
  38. matches := sets.NewString()
  39. framework.Logf("Checking pods on node %v via /runningpods endpoint", nodeName)
  40. runningPods, err := framework.GetKubeletPods(c, nodeName)
  41. if err != nil {
  42. framework.Logf("Error checking running pods on %v: %v", nodeName, err)
  43. return matches
  44. }
  45. for _, pod := range runningPods.Items {
  46. if pod.Namespace == namespace && strings.HasPrefix(pod.Name, podNamePrefix) {
  47. matches.Insert(pod.Name)
  48. }
  49. }
  50. return matches
  51. }
  52. // waitTillNPodsRunningOnNodes polls the /runningpods endpoint on kubelet until
  53. // it finds targetNumPods pods that match the given criteria (namespace and
  54. // podNamePrefix). Note that we usually use label selector to filter pods that
  55. // belong to the same RC. However, we use podNamePrefix with namespace here
  56. // because pods returned from /runningpods do not contain the original label
  57. // information; they are reconstructed by examining the container runtime. In
  58. // the scope of this test, we do not expect pod naming conflicts so
  59. // podNamePrefix should be sufficient to identify the pods.
  60. func waitTillNPodsRunningOnNodes(c *client.Client, nodeNames sets.String, podNamePrefix string, namespace string, targetNumPods int, timeout time.Duration) error {
  61. return wait.Poll(pollInterval, timeout, func() (bool, error) {
  62. matchCh := make(chan sets.String, len(nodeNames))
  63. for _, item := range nodeNames.List() {
  64. // Launch a goroutine per node to check the pods running on the nodes.
  65. nodeName := item
  66. go func() {
  67. matchCh <- getPodMatches(c, nodeName, podNamePrefix, namespace)
  68. }()
  69. }
  70. seen := sets.NewString()
  71. for i := 0; i < len(nodeNames.List()); i++ {
  72. seen = seen.Union(<-matchCh)
  73. }
  74. if seen.Len() == targetNumPods {
  75. return true, nil
  76. }
  77. framework.Logf("Waiting for %d pods to be running on the node; %d are currently running;", targetNumPods, seen.Len())
  78. return false, nil
  79. })
  80. }
  81. // updates labels of nodes given by nodeNames.
  82. // In case a given label already exists, it overwrites it. If label to remove doesn't exist
  83. // it silently ignores it.
  84. // TODO: migrate to use framework.AddOrUpdateLabelOnNode/framework.RemoveLabelOffNode
  85. func updateNodeLabels(c *client.Client, nodeNames sets.String, toAdd, toRemove map[string]string) {
  86. const maxRetries = 5
  87. for nodeName := range nodeNames {
  88. var node *api.Node
  89. var err error
  90. for i := 0; i < maxRetries; i++ {
  91. node, err = c.Nodes().Get(nodeName)
  92. if err != nil {
  93. framework.Logf("Error getting node %s: %v", nodeName, err)
  94. continue
  95. }
  96. if toAdd != nil {
  97. for k, v := range toAdd {
  98. node.ObjectMeta.Labels[k] = v
  99. }
  100. }
  101. if toRemove != nil {
  102. for k := range toRemove {
  103. delete(node.ObjectMeta.Labels, k)
  104. }
  105. }
  106. _, err = c.Nodes().Update(node)
  107. if err != nil {
  108. framework.Logf("Error updating node %s: %v", nodeName, err)
  109. } else {
  110. break
  111. }
  112. }
  113. Expect(err).NotTo(HaveOccurred())
  114. }
  115. }
  116. var _ = framework.KubeDescribe("kubelet", func() {
  117. var c *client.Client
  118. var numNodes int
  119. var nodeNames sets.String
  120. var nodeLabels map[string]string
  121. f := framework.NewDefaultFramework("kubelet")
  122. var resourceMonitor *framework.ResourceMonitor
  123. BeforeEach(func() {
  124. c = f.Client
  125. nodes := framework.GetReadySchedulableNodesOrDie(f.Client)
  126. numNodes = len(nodes.Items)
  127. nodeNames = sets.NewString()
  128. // If there are a lot of nodes, we don't want to use all of them
  129. // (if there are 1000 nodes in the cluster, starting 10 pods/node
  130. // will take ~10 minutes today). And there is also deletion phase.
  131. //
  132. // Instead, we choose at most 10 nodes and will constraint pods
  133. // that we are creating to be scheduled only on that nodes.
  134. if numNodes > maxNodesToCheck {
  135. numNodes = maxNodesToCheck
  136. nodeLabels = make(map[string]string)
  137. nodeLabels["kubelet_cleanup"] = "true"
  138. }
  139. for i := 0; i < numNodes; i++ {
  140. nodeNames.Insert(nodes.Items[i].Name)
  141. }
  142. updateNodeLabels(c, nodeNames, nodeLabels, nil)
  143. // Start resourceMonitor only in small clusters.
  144. if len(nodes.Items) <= maxNodesToCheck {
  145. resourceMonitor = framework.NewResourceMonitor(f.Client, framework.TargetContainers(), containerStatsPollingInterval)
  146. resourceMonitor.Start()
  147. }
  148. })
  149. AfterEach(func() {
  150. if resourceMonitor != nil {
  151. resourceMonitor.Stop()
  152. }
  153. // If we added labels to nodes in this test, remove them now.
  154. updateNodeLabels(c, nodeNames, nil, nodeLabels)
  155. })
  156. framework.KubeDescribe("Clean up pods on node", func() {
  157. type DeleteTest struct {
  158. podsPerNode int
  159. timeout time.Duration
  160. }
  161. deleteTests := []DeleteTest{
  162. {podsPerNode: 10, timeout: 1 * time.Minute},
  163. }
  164. for _, itArg := range deleteTests {
  165. name := fmt.Sprintf(
  166. "kubelet should be able to delete %d pods per node in %v.", itArg.podsPerNode, itArg.timeout)
  167. It(name, func() {
  168. totalPods := itArg.podsPerNode * numNodes
  169. By(fmt.Sprintf("Creating a RC of %d pods and wait until all pods of this RC are running", totalPods))
  170. rcName := fmt.Sprintf("cleanup%d-%s", totalPods, string(uuid.NewUUID()))
  171. Expect(framework.RunRC(framework.RCConfig{
  172. Client: f.Client,
  173. Name: rcName,
  174. Namespace: f.Namespace.Name,
  175. Image: framework.GetPauseImageName(f.Client),
  176. Replicas: totalPods,
  177. NodeSelector: nodeLabels,
  178. })).NotTo(HaveOccurred())
  179. // Perform a sanity check so that we know all desired pods are
  180. // running on the nodes according to kubelet. The timeout is set to
  181. // only 30 seconds here because framework.RunRC already waited for all pods to
  182. // transition to the running status.
  183. Expect(waitTillNPodsRunningOnNodes(f.Client, nodeNames, rcName, f.Namespace.Name, totalPods,
  184. time.Second*30)).NotTo(HaveOccurred())
  185. if resourceMonitor != nil {
  186. resourceMonitor.LogLatest()
  187. }
  188. By("Deleting the RC")
  189. framework.DeleteRCAndPods(f.Client, f.Namespace.Name, rcName)
  190. // Check that the pods really are gone by querying /runningpods on the
  191. // node. The /runningpods handler checks the container runtime (or its
  192. // cache) and returns a list of running pods. Some possible causes of
  193. // failures are:
  194. // - kubelet deadlock
  195. // - a bug in graceful termination (if it is enabled)
  196. // - docker slow to delete pods (or resource problems causing slowness)
  197. start := time.Now()
  198. Expect(waitTillNPodsRunningOnNodes(f.Client, nodeNames, rcName, f.Namespace.Name, 0,
  199. itArg.timeout)).NotTo(HaveOccurred())
  200. framework.Logf("Deleting %d pods on %d nodes completed in %v after the RC was deleted", totalPods, len(nodeNames),
  201. time.Since(start))
  202. if resourceMonitor != nil {
  203. resourceMonitor.LogCPUSummary()
  204. }
  205. })
  206. }
  207. })
  208. })