density.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package e2e
  14. import (
  15. "fmt"
  16. "math"
  17. "os"
  18. "sort"
  19. "strconv"
  20. "sync"
  21. "time"
  22. "k8s.io/kubernetes/pkg/api"
  23. "k8s.io/kubernetes/pkg/api/resource"
  24. "k8s.io/kubernetes/pkg/api/unversioned"
  25. "k8s.io/kubernetes/pkg/client/cache"
  26. client "k8s.io/kubernetes/pkg/client/unversioned"
  27. controllerframework "k8s.io/kubernetes/pkg/controller/framework"
  28. "k8s.io/kubernetes/pkg/fields"
  29. "k8s.io/kubernetes/pkg/labels"
  30. "k8s.io/kubernetes/pkg/runtime"
  31. "k8s.io/kubernetes/pkg/util/sets"
  32. utiluuid "k8s.io/kubernetes/pkg/util/uuid"
  33. "k8s.io/kubernetes/pkg/watch"
  34. "k8s.io/kubernetes/test/e2e/framework"
  35. . "github.com/onsi/ginkgo"
  36. . "github.com/onsi/gomega"
  37. )
  38. const (
  39. MinSaturationThreshold = 2 * time.Minute
  40. MinPodsPerSecondThroughput = 8
  41. )
  42. // Maximum container failures this test tolerates before failing.
  43. var MaxContainerFailures = 0
  44. type DensityTestConfig struct {
  45. Configs []framework.RCConfig
  46. Client *client.Client
  47. Namespace string
  48. PollInterval time.Duration
  49. PodCount int
  50. Timeout time.Duration
  51. }
  52. func density30AddonResourceVerifier(numNodes int) map[string]framework.ResourceConstraint {
  53. var apiserverMem uint64
  54. var controllerMem uint64
  55. var schedulerMem uint64
  56. apiserverCPU := math.MaxFloat32
  57. apiserverMem = math.MaxUint64
  58. controllerCPU := math.MaxFloat32
  59. controllerMem = math.MaxUint64
  60. schedulerCPU := math.MaxFloat32
  61. schedulerMem = math.MaxUint64
  62. framework.Logf("Setting resource constraings for provider: %s", framework.TestContext.Provider)
  63. if framework.ProviderIs("kubemark") {
  64. if numNodes <= 5 {
  65. apiserverCPU = 0.25
  66. apiserverMem = 150 * (1024 * 1024)
  67. controllerCPU = 0.1
  68. controllerMem = 100 * (1024 * 1024)
  69. schedulerCPU = 0.05
  70. schedulerMem = 50 * (1024 * 1024)
  71. } else if numNodes <= 100 {
  72. apiserverCPU = 1.5
  73. apiserverMem = 1500 * (1024 * 1024)
  74. controllerCPU = 0.75
  75. controllerMem = 750 * (1024 * 1024)
  76. schedulerCPU = 0.75
  77. schedulerMem = 500 * (1024 * 1024)
  78. } else if numNodes <= 500 {
  79. apiserverCPU = 2.25
  80. apiserverMem = 3400 * (1024 * 1024)
  81. controllerCPU = 1.3
  82. controllerMem = 1100 * (1024 * 1024)
  83. schedulerCPU = 1.5
  84. schedulerMem = 500 * (1024 * 1024)
  85. } else if numNodes <= 1000 {
  86. apiserverCPU = 4
  87. apiserverMem = 4000 * (1024 * 1024)
  88. controllerCPU = 3
  89. controllerMem = 2000 * (1024 * 1024)
  90. schedulerCPU = 1.5
  91. schedulerMem = 750 * (1024 * 1024)
  92. }
  93. } else {
  94. if numNodes <= 100 {
  95. // TODO: Investigate higher apiserver consumption and
  96. // potentially revert to 1.5cpu and 1.3GB - see #30871
  97. apiserverCPU = 1.8
  98. apiserverMem = 2200 * (1024 * 1024)
  99. controllerCPU = 0.5
  100. controllerMem = 300 * (1024 * 1024)
  101. schedulerCPU = 0.4
  102. schedulerMem = 150 * (1024 * 1024)
  103. }
  104. }
  105. constraints := make(map[string]framework.ResourceConstraint)
  106. constraints["fluentd-elasticsearch"] = framework.ResourceConstraint{
  107. CPUConstraint: 0.2,
  108. MemoryConstraint: 250 * (1024 * 1024),
  109. }
  110. constraints["elasticsearch-logging"] = framework.ResourceConstraint{
  111. CPUConstraint: 2,
  112. // TODO: bring it down to 750MB again, when we lower Kubelet verbosity level. I.e. revert #19164
  113. MemoryConstraint: 5000 * (1024 * 1024),
  114. }
  115. constraints["heapster"] = framework.ResourceConstraint{
  116. CPUConstraint: 2,
  117. MemoryConstraint: 1800 * (1024 * 1024),
  118. }
  119. constraints["kibana-logging"] = framework.ResourceConstraint{
  120. CPUConstraint: 0.2,
  121. MemoryConstraint: 100 * (1024 * 1024),
  122. }
  123. constraints["kube-proxy"] = framework.ResourceConstraint{
  124. CPUConstraint: 0.1,
  125. MemoryConstraint: 20 * (1024 * 1024),
  126. }
  127. constraints["l7-lb-controller"] = framework.ResourceConstraint{
  128. CPUConstraint: 0.1,
  129. MemoryConstraint: 60 * (1024 * 1024),
  130. }
  131. constraints["influxdb"] = framework.ResourceConstraint{
  132. CPUConstraint: 2,
  133. MemoryConstraint: 500 * (1024 * 1024),
  134. }
  135. constraints["kube-apiserver"] = framework.ResourceConstraint{
  136. CPUConstraint: apiserverCPU,
  137. MemoryConstraint: apiserverMem,
  138. }
  139. constraints["kube-controller-manager"] = framework.ResourceConstraint{
  140. CPUConstraint: controllerCPU,
  141. MemoryConstraint: controllerMem,
  142. }
  143. constraints["kube-scheduler"] = framework.ResourceConstraint{
  144. CPUConstraint: schedulerCPU,
  145. MemoryConstraint: schedulerMem,
  146. }
  147. return constraints
  148. }
  149. func logPodStartupStatus(c *client.Client, expectedPods int, ns string, observedLabels map[string]string, period time.Duration, stopCh chan struct{}) {
  150. label := labels.SelectorFromSet(labels.Set(observedLabels))
  151. podStore := framework.NewPodStore(c, ns, label, fields.Everything())
  152. defer podStore.Stop()
  153. ticker := time.NewTicker(period)
  154. defer ticker.Stop()
  155. for {
  156. select {
  157. case <-ticker.C:
  158. pods := podStore.List()
  159. startupStatus := framework.ComputeRCStartupStatus(pods, expectedPods)
  160. startupStatus.Print("Density")
  161. case <-stopCh:
  162. pods := podStore.List()
  163. startupStatus := framework.ComputeRCStartupStatus(pods, expectedPods)
  164. startupStatus.Print("Density")
  165. return
  166. }
  167. }
  168. }
  169. // runDensityTest will perform a density test and return the time it took for
  170. // all pods to start
  171. func runDensityTest(dtc DensityTestConfig) time.Duration {
  172. defer GinkgoRecover()
  173. // Create a listener for events.
  174. // eLock is a lock protects the events
  175. var eLock sync.Mutex
  176. events := make([](*api.Event), 0)
  177. _, controller := controllerframework.NewInformer(
  178. &cache.ListWatch{
  179. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  180. return dtc.Client.Events(dtc.Namespace).List(options)
  181. },
  182. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  183. return dtc.Client.Events(dtc.Namespace).Watch(options)
  184. },
  185. },
  186. &api.Event{},
  187. 0,
  188. controllerframework.ResourceEventHandlerFuncs{
  189. AddFunc: func(obj interface{}) {
  190. eLock.Lock()
  191. defer eLock.Unlock()
  192. events = append(events, obj.(*api.Event))
  193. },
  194. },
  195. )
  196. stop := make(chan struct{})
  197. go controller.Run(stop)
  198. // Create a listener for api updates
  199. // uLock is a lock protects the updateCount
  200. var uLock sync.Mutex
  201. updateCount := 0
  202. label := labels.SelectorFromSet(labels.Set(map[string]string{"type": "densityPod"}))
  203. _, updateController := controllerframework.NewInformer(
  204. &cache.ListWatch{
  205. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  206. options.LabelSelector = label
  207. return dtc.Client.Pods(dtc.Namespace).List(options)
  208. },
  209. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  210. options.LabelSelector = label
  211. return dtc.Client.Pods(dtc.Namespace).Watch(options)
  212. },
  213. },
  214. &api.Pod{},
  215. 0,
  216. controllerframework.ResourceEventHandlerFuncs{
  217. UpdateFunc: func(_, _ interface{}) {
  218. uLock.Lock()
  219. defer uLock.Unlock()
  220. updateCount++
  221. },
  222. },
  223. )
  224. go updateController.Run(stop)
  225. // Start all replication controllers.
  226. startTime := time.Now()
  227. wg := sync.WaitGroup{}
  228. wg.Add(len(dtc.Configs))
  229. for i := range dtc.Configs {
  230. rcConfig := dtc.Configs[i]
  231. go func() {
  232. framework.ExpectNoError(framework.RunRC(rcConfig))
  233. wg.Done()
  234. }()
  235. }
  236. logStopCh := make(chan struct{})
  237. go logPodStartupStatus(dtc.Client, dtc.PodCount, dtc.Namespace, map[string]string{"type": "densityPod"}, dtc.PollInterval, logStopCh)
  238. wg.Wait()
  239. startupTime := time.Now().Sub(startTime)
  240. close(logStopCh)
  241. framework.Logf("E2E startup time for %d pods: %v", dtc.PodCount, startupTime)
  242. framework.Logf("Throughput (pods/s) during cluster saturation phase: %v", float32(dtc.PodCount)/float32(startupTime/time.Second))
  243. By("Waiting for all events to be recorded")
  244. last := -1
  245. current := len(events)
  246. lastCount := -1
  247. currentCount := updateCount
  248. for start := time.Now(); (last < current || lastCount < currentCount) && time.Since(start) < dtc.Timeout; time.Sleep(10 * time.Second) {
  249. func() {
  250. eLock.Lock()
  251. defer eLock.Unlock()
  252. last = current
  253. current = len(events)
  254. }()
  255. func() {
  256. uLock.Lock()
  257. defer uLock.Unlock()
  258. lastCount = currentCount
  259. currentCount = updateCount
  260. }()
  261. }
  262. close(stop)
  263. if current != last {
  264. framework.Logf("Warning: Not all events were recorded after waiting %.2f minutes", dtc.Timeout.Minutes())
  265. }
  266. framework.Logf("Found %d events", current)
  267. if currentCount != lastCount {
  268. framework.Logf("Warning: Not all updates were recorded after waiting %.2f minutes", dtc.Timeout.Minutes())
  269. }
  270. framework.Logf("Found %d updates", currentCount)
  271. // Tune the threshold for allowed failures.
  272. badEvents := framework.BadEvents(events)
  273. Expect(badEvents).NotTo(BeNumerically(">", int(math.Floor(0.01*float64(dtc.PodCount)))))
  274. // Print some data about Pod to Node allocation
  275. By("Printing Pod to Node allocation data")
  276. podList, err := dtc.Client.Pods(api.NamespaceAll).List(api.ListOptions{})
  277. framework.ExpectNoError(err)
  278. pausePodAllocation := make(map[string]int)
  279. systemPodAllocation := make(map[string][]string)
  280. for _, pod := range podList.Items {
  281. if pod.Namespace == api.NamespaceSystem {
  282. systemPodAllocation[pod.Spec.NodeName] = append(systemPodAllocation[pod.Spec.NodeName], pod.Name)
  283. } else {
  284. pausePodAllocation[pod.Spec.NodeName]++
  285. }
  286. }
  287. nodeNames := make([]string, 0)
  288. for k := range pausePodAllocation {
  289. nodeNames = append(nodeNames, k)
  290. }
  291. sort.Strings(nodeNames)
  292. for _, node := range nodeNames {
  293. framework.Logf("%v: %v pause pods, system pods: %v", node, pausePodAllocation[node], systemPodAllocation[node])
  294. }
  295. return startupTime
  296. }
  297. func cleanupDensityTest(dtc DensityTestConfig) {
  298. defer GinkgoRecover()
  299. By("Deleting ReplicationController")
  300. // We explicitly delete all pods to have API calls necessary for deletion accounted in metrics.
  301. for i := range dtc.Configs {
  302. rcName := dtc.Configs[i].Name
  303. rc, err := dtc.Client.ReplicationControllers(dtc.Namespace).Get(rcName)
  304. if err == nil && rc.Spec.Replicas != 0 {
  305. if framework.TestContext.GarbageCollectorEnabled {
  306. By("Cleaning up only the replication controller, garbage collector will clean up the pods")
  307. err := framework.DeleteRCAndWaitForGC(dtc.Client, dtc.Namespace, rcName)
  308. framework.ExpectNoError(err)
  309. } else {
  310. By("Cleaning up the replication controller and pods")
  311. err := framework.DeleteRCAndPods(dtc.Client, dtc.Namespace, rcName)
  312. framework.ExpectNoError(err)
  313. }
  314. }
  315. }
  316. }
  317. // This test suite can take a long time to run, and can affect or be affected by other tests.
  318. // So by default it is added to the ginkgo.skip list (see driver.go).
  319. // To run this suite you must explicitly ask for it by setting the
  320. // -t/--test flag or ginkgo.focus flag.
  321. // IMPORTANT: This test is designed to work on large (>= 100 Nodes) clusters. For smaller ones
  322. // results will not be representative for control-plane performance as we'll start hitting
  323. // limits on Docker's concurrent container startup.
  324. var _ = framework.KubeDescribe("Density", func() {
  325. var c *client.Client
  326. var nodeCount int
  327. var RCName string
  328. var additionalPodsPrefix string
  329. var ns string
  330. var uuid string
  331. var e2eStartupTime time.Duration
  332. var totalPods int
  333. var nodeCpuCapacity int64
  334. var nodeMemCapacity int64
  335. var nodes *api.NodeList
  336. var masters sets.String
  337. // Gathers data prior to framework namespace teardown
  338. AfterEach(func() {
  339. saturationThreshold := time.Duration((totalPods / MinPodsPerSecondThroughput)) * time.Second
  340. if saturationThreshold < MinSaturationThreshold {
  341. saturationThreshold = MinSaturationThreshold
  342. }
  343. Expect(e2eStartupTime).NotTo(BeNumerically(">", saturationThreshold))
  344. saturationData := framework.SaturationTime{
  345. TimeToSaturate: e2eStartupTime,
  346. NumberOfNodes: nodeCount,
  347. NumberOfPods: totalPods,
  348. Throughput: float32(totalPods) / float32(e2eStartupTime/time.Second),
  349. }
  350. framework.Logf("Cluster saturation time: %s", framework.PrettyPrintJSON(saturationData))
  351. // Verify latency metrics.
  352. highLatencyRequests, err := framework.HighLatencyRequests(c)
  353. framework.ExpectNoError(err)
  354. Expect(highLatencyRequests).NotTo(BeNumerically(">", 0), "There should be no high-latency requests")
  355. // Verify scheduler metrics.
  356. // TODO: Reset metrics at the beginning of the test.
  357. // We should do something similar to how we do it for APIserver.
  358. framework.ExpectNoError(framework.VerifySchedulerLatency(c))
  359. })
  360. // Explicitly put here, to delete namespace at the end of the test
  361. // (after measuring latency metrics, etc.).
  362. f := framework.NewDefaultFramework("density")
  363. f.NamespaceDeletionTimeout = time.Hour
  364. BeforeEach(func() {
  365. c = f.Client
  366. ns = f.Namespace.Name
  367. // In large clusters we may get to this point but still have a bunch
  368. // of nodes without Routes created. Since this would make a node
  369. // unschedulable, we need to wait until all of them are schedulable.
  370. framework.ExpectNoError(framework.WaitForAllNodesSchedulable(c))
  371. masters, nodes = framework.GetMasterAndWorkerNodesOrDie(c)
  372. nodeCount = len(nodes.Items)
  373. Expect(nodeCount).NotTo(BeZero())
  374. nodeCpuCapacity = nodes.Items[0].Status.Allocatable.Cpu().MilliValue()
  375. nodeMemCapacity = nodes.Items[0].Status.Allocatable.Memory().Value()
  376. // Terminating a namespace (deleting the remaining objects from it - which
  377. // generally means events) can affect the current run. Thus we wait for all
  378. // terminating namespace to be finally deleted before starting this test.
  379. err := framework.CheckTestingNSDeletedExcept(c, ns)
  380. framework.ExpectNoError(err)
  381. uuid = string(utiluuid.NewUUID())
  382. framework.ExpectNoError(framework.ResetMetrics(c))
  383. framework.ExpectNoError(os.Mkdir(fmt.Sprintf(framework.TestContext.OutputDir+"/%s", uuid), 0777))
  384. framework.Logf("Listing nodes for easy debugging:\n")
  385. for _, node := range nodes.Items {
  386. var internalIP, externalIP string
  387. for _, address := range node.Status.Addresses {
  388. if address.Type == api.NodeInternalIP {
  389. internalIP = address.Address
  390. }
  391. if address.Type == api.NodeExternalIP {
  392. externalIP = address.Address
  393. }
  394. }
  395. framework.Logf("Name: %v, clusterIP: %v, externalIP: %v", node.ObjectMeta.Name, internalIP, externalIP)
  396. }
  397. })
  398. type Density struct {
  399. // Controls if e2e latency tests should be run (they are slow)
  400. runLatencyTest bool
  401. podsPerNode int
  402. // Controls how often the apiserver is polled for pods
  403. interval time.Duration
  404. }
  405. densityTests := []Density{
  406. // TODO: Expose runLatencyTest as ginkgo flag.
  407. {podsPerNode: 3, runLatencyTest: false, interval: 10 * time.Second},
  408. {podsPerNode: 30, runLatencyTest: true, interval: 10 * time.Second},
  409. {podsPerNode: 50, runLatencyTest: false, interval: 10 * time.Second},
  410. {podsPerNode: 95, runLatencyTest: true, interval: 10 * time.Second},
  411. {podsPerNode: 100, runLatencyTest: false, interval: 10 * time.Second},
  412. }
  413. for _, testArg := range densityTests {
  414. name := fmt.Sprintf("should allow starting %d pods per node", testArg.podsPerNode)
  415. switch testArg.podsPerNode {
  416. case 30:
  417. name = "[Feature:Performance] " + name
  418. case 95:
  419. name = "[Feature:HighDensityPerformance]" + name
  420. default:
  421. name = "[Feature:ManualPerformance] " + name
  422. }
  423. itArg := testArg
  424. It(name, func() {
  425. podsPerNode := itArg.podsPerNode
  426. if podsPerNode == 30 {
  427. f.AddonResourceConstraints = func() map[string]framework.ResourceConstraint { return density30AddonResourceVerifier(nodeCount) }()
  428. }
  429. totalPods = podsPerNode * nodeCount
  430. fileHndl, err := os.Create(fmt.Sprintf(framework.TestContext.OutputDir+"/%s/pod_states.csv", uuid))
  431. framework.ExpectNoError(err)
  432. defer fileHndl.Close()
  433. timeout := 10 * time.Minute
  434. // TODO: loop to podsPerNode instead of 1 when we're ready.
  435. numberOrRCs := 1
  436. RCConfigs := make([]framework.RCConfig, numberOrRCs)
  437. for i := 0; i < numberOrRCs; i++ {
  438. RCName := "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid
  439. RCConfigs[i] = framework.RCConfig{Client: c,
  440. Image: framework.GetPauseImageName(f.Client),
  441. Name: RCName,
  442. Namespace: ns,
  443. Labels: map[string]string{"type": "densityPod"},
  444. PollInterval: itArg.interval,
  445. PodStatusFile: fileHndl,
  446. Replicas: (totalPods + numberOrRCs - 1) / numberOrRCs,
  447. CpuRequest: nodeCpuCapacity / 100,
  448. MemRequest: nodeMemCapacity / 100,
  449. MaxContainerFailures: &MaxContainerFailures,
  450. Silent: true,
  451. }
  452. }
  453. dConfig := DensityTestConfig{Client: c,
  454. Configs: RCConfigs,
  455. PodCount: totalPods,
  456. Namespace: ns,
  457. PollInterval: itArg.interval,
  458. Timeout: timeout,
  459. }
  460. e2eStartupTime = runDensityTest(dConfig)
  461. if itArg.runLatencyTest {
  462. By("Scheduling additional Pods to measure startup latencies")
  463. createTimes := make(map[string]unversioned.Time, 0)
  464. nodes := make(map[string]string, 0)
  465. scheduleTimes := make(map[string]unversioned.Time, 0)
  466. runTimes := make(map[string]unversioned.Time, 0)
  467. watchTimes := make(map[string]unversioned.Time, 0)
  468. var mutex sync.Mutex
  469. checkPod := func(p *api.Pod) {
  470. mutex.Lock()
  471. defer mutex.Unlock()
  472. defer GinkgoRecover()
  473. if p.Status.Phase == api.PodRunning {
  474. if _, found := watchTimes[p.Name]; !found {
  475. watchTimes[p.Name] = unversioned.Now()
  476. createTimes[p.Name] = p.CreationTimestamp
  477. nodes[p.Name] = p.Spec.NodeName
  478. var startTime unversioned.Time
  479. for _, cs := range p.Status.ContainerStatuses {
  480. if cs.State.Running != nil {
  481. if startTime.Before(cs.State.Running.StartedAt) {
  482. startTime = cs.State.Running.StartedAt
  483. }
  484. }
  485. }
  486. if startTime != unversioned.NewTime(time.Time{}) {
  487. runTimes[p.Name] = startTime
  488. } else {
  489. framework.Failf("Pod %v is reported to be running, but none of its containers is", p.Name)
  490. }
  491. }
  492. }
  493. }
  494. additionalPodsPrefix = "density-latency-pod"
  495. latencyPodsStore, controller := controllerframework.NewInformer(
  496. &cache.ListWatch{
  497. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  498. options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": additionalPodsPrefix})
  499. return c.Pods(ns).List(options)
  500. },
  501. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  502. options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": additionalPodsPrefix})
  503. return c.Pods(ns).Watch(options)
  504. },
  505. },
  506. &api.Pod{},
  507. 0,
  508. controllerframework.ResourceEventHandlerFuncs{
  509. AddFunc: func(obj interface{}) {
  510. p, ok := obj.(*api.Pod)
  511. Expect(ok).To(Equal(true))
  512. go checkPod(p)
  513. },
  514. UpdateFunc: func(oldObj, newObj interface{}) {
  515. p, ok := newObj.(*api.Pod)
  516. Expect(ok).To(Equal(true))
  517. go checkPod(p)
  518. },
  519. },
  520. )
  521. stopCh := make(chan struct{})
  522. go controller.Run(stopCh)
  523. // Create some additional pods with throughput ~5 pods/sec.
  524. var wg sync.WaitGroup
  525. wg.Add(nodeCount)
  526. // Explicitly set requests here.
  527. // Thanks to it we trigger increasing priority function by scheduling
  528. // a pod to a node, which in turn will result in spreading latency pods
  529. // more evenly between nodes.
  530. cpuRequest := *resource.NewMilliQuantity(nodeCpuCapacity/5, resource.DecimalSI)
  531. memRequest := *resource.NewQuantity(nodeMemCapacity/5, resource.DecimalSI)
  532. if podsPerNode > 30 {
  533. // This is to make them schedulable on high-density tests
  534. // (e.g. 100 pods/node kubemark).
  535. cpuRequest = *resource.NewMilliQuantity(0, resource.DecimalSI)
  536. memRequest = *resource.NewQuantity(0, resource.DecimalSI)
  537. }
  538. for i := 1; i <= nodeCount; i++ {
  539. name := additionalPodsPrefix + "-" + strconv.Itoa(i)
  540. go createRunningPodFromRC(&wg, c, name, ns, framework.GetPauseImageName(f.Client), additionalPodsPrefix, cpuRequest, memRequest)
  541. time.Sleep(200 * time.Millisecond)
  542. }
  543. wg.Wait()
  544. By("Waiting for all Pods begin observed by the watch...")
  545. for start := time.Now(); len(watchTimes) < nodeCount; time.Sleep(10 * time.Second) {
  546. if time.Since(start) < timeout {
  547. framework.Failf("Timeout reached waiting for all Pods being observed by the watch.")
  548. }
  549. }
  550. close(stopCh)
  551. nodeToLatencyPods := make(map[string]int)
  552. for _, item := range latencyPodsStore.List() {
  553. pod := item.(*api.Pod)
  554. nodeToLatencyPods[pod.Spec.NodeName]++
  555. }
  556. for node, count := range nodeToLatencyPods {
  557. if count > 1 {
  558. framework.Logf("%d latency pods scheduled on %s", count, node)
  559. }
  560. }
  561. selector := fields.Set{
  562. "involvedObject.kind": "Pod",
  563. "involvedObject.namespace": ns,
  564. "source": api.DefaultSchedulerName,
  565. }.AsSelector()
  566. options := api.ListOptions{FieldSelector: selector}
  567. schedEvents, err := c.Events(ns).List(options)
  568. framework.ExpectNoError(err)
  569. for k := range createTimes {
  570. for _, event := range schedEvents.Items {
  571. if event.InvolvedObject.Name == k {
  572. scheduleTimes[k] = event.FirstTimestamp
  573. break
  574. }
  575. }
  576. }
  577. scheduleLag := make([]framework.PodLatencyData, 0)
  578. startupLag := make([]framework.PodLatencyData, 0)
  579. watchLag := make([]framework.PodLatencyData, 0)
  580. schedToWatchLag := make([]framework.PodLatencyData, 0)
  581. e2eLag := make([]framework.PodLatencyData, 0)
  582. for name, create := range createTimes {
  583. sched, ok := scheduleTimes[name]
  584. Expect(ok).To(Equal(true))
  585. run, ok := runTimes[name]
  586. Expect(ok).To(Equal(true))
  587. watch, ok := watchTimes[name]
  588. Expect(ok).To(Equal(true))
  589. node, ok := nodes[name]
  590. Expect(ok).To(Equal(true))
  591. scheduleLag = append(scheduleLag, framework.PodLatencyData{Name: name, Node: node, Latency: sched.Time.Sub(create.Time)})
  592. startupLag = append(startupLag, framework.PodLatencyData{Name: name, Node: node, Latency: run.Time.Sub(sched.Time)})
  593. watchLag = append(watchLag, framework.PodLatencyData{Name: name, Node: node, Latency: watch.Time.Sub(run.Time)})
  594. schedToWatchLag = append(schedToWatchLag, framework.PodLatencyData{Name: name, Node: node, Latency: watch.Time.Sub(sched.Time)})
  595. e2eLag = append(e2eLag, framework.PodLatencyData{Name: name, Node: node, Latency: watch.Time.Sub(create.Time)})
  596. }
  597. sort.Sort(framework.LatencySlice(scheduleLag))
  598. sort.Sort(framework.LatencySlice(startupLag))
  599. sort.Sort(framework.LatencySlice(watchLag))
  600. sort.Sort(framework.LatencySlice(schedToWatchLag))
  601. sort.Sort(framework.LatencySlice(e2eLag))
  602. framework.PrintLatencies(scheduleLag, "worst schedule latencies")
  603. framework.PrintLatencies(startupLag, "worst run-after-schedule latencies")
  604. framework.PrintLatencies(watchLag, "worst watch latencies")
  605. framework.PrintLatencies(schedToWatchLag, "worst scheduled-to-end total latencies")
  606. framework.PrintLatencies(e2eLag, "worst e2e total latencies")
  607. // Test whether e2e pod startup time is acceptable.
  608. podStartupLatency := framework.PodStartupLatency{Latency: framework.ExtractLatencyMetrics(e2eLag)}
  609. framework.ExpectNoError(framework.VerifyPodStartupLatency(podStartupLatency))
  610. framework.LogSuspiciousLatency(startupLag, e2eLag, nodeCount, c)
  611. By("Removing additional replication controllers")
  612. for i := 1; i <= nodeCount; i++ {
  613. name := additionalPodsPrefix + "-" + strconv.Itoa(i)
  614. c.ReplicationControllers(ns).Delete(name, nil)
  615. }
  616. }
  617. cleanupDensityTest(dConfig)
  618. })
  619. }
  620. // Calculate total number of pods from each node's max-pod
  621. It("[Feature:ManualPerformance] should allow running maximum capacity pods on nodes", func() {
  622. totalPods = 0
  623. for _, n := range nodes.Items {
  624. totalPods += int(n.Status.Capacity.Pods().Value())
  625. }
  626. totalPods -= framework.WaitForStableCluster(c, masters)
  627. fileHndl, err := os.Create(fmt.Sprintf(framework.TestContext.OutputDir+"/%s/pod_states.csv", uuid))
  628. framework.ExpectNoError(err)
  629. defer fileHndl.Close()
  630. rcCnt := 1
  631. RCConfigs := make([]framework.RCConfig, rcCnt)
  632. podsPerRC := int(totalPods / rcCnt)
  633. for i := 0; i < rcCnt; i++ {
  634. if i == rcCnt-1 {
  635. podsPerRC += int(math.Mod(float64(totalPods), float64(rcCnt)))
  636. }
  637. RCName = "density" + strconv.Itoa(totalPods) + "-" + strconv.Itoa(i) + "-" + uuid
  638. RCConfigs[i] = framework.RCConfig{Client: c,
  639. Image: framework.GetPauseImageName(f.Client),
  640. Name: RCName,
  641. Namespace: ns,
  642. Labels: map[string]string{"type": "densityPod"},
  643. PollInterval: 10 * time.Second,
  644. PodStatusFile: fileHndl,
  645. Replicas: podsPerRC,
  646. MaxContainerFailures: &MaxContainerFailures,
  647. Silent: true,
  648. }
  649. }
  650. dConfig := DensityTestConfig{Client: c,
  651. Configs: RCConfigs,
  652. PodCount: totalPods,
  653. Namespace: ns,
  654. PollInterval: 10 * time.Second,
  655. Timeout: 10 * time.Minute,
  656. }
  657. e2eStartupTime = runDensityTest(dConfig)
  658. cleanupDensityTest(dConfig)
  659. })
  660. })
  661. func createRunningPodFromRC(wg *sync.WaitGroup, c *client.Client, name, ns, image, podType string, cpuRequest, memRequest resource.Quantity) {
  662. defer GinkgoRecover()
  663. defer wg.Done()
  664. labels := map[string]string{
  665. "type": podType,
  666. "name": name,
  667. }
  668. rc := &api.ReplicationController{
  669. ObjectMeta: api.ObjectMeta{
  670. Name: name,
  671. Labels: labels,
  672. },
  673. Spec: api.ReplicationControllerSpec{
  674. Replicas: 1,
  675. Selector: labels,
  676. Template: &api.PodTemplateSpec{
  677. ObjectMeta: api.ObjectMeta{
  678. Labels: labels,
  679. },
  680. Spec: api.PodSpec{
  681. Containers: []api.Container{
  682. {
  683. Name: name,
  684. Image: image,
  685. Resources: api.ResourceRequirements{
  686. Requests: api.ResourceList{
  687. api.ResourceCPU: cpuRequest,
  688. api.ResourceMemory: memRequest,
  689. },
  690. },
  691. },
  692. },
  693. DNSPolicy: api.DNSDefault,
  694. },
  695. },
  696. },
  697. }
  698. _, err := c.ReplicationControllers(ns).Create(rc)
  699. framework.ExpectNoError(err)
  700. framework.ExpectNoError(framework.WaitForRCPodsRunning(c, ns, name))
  701. framework.Logf("Found pod '%s' running", name)
  702. }