density_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. // +build linux
  2. /*
  3. Copyright 2015 The Kubernetes Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package e2e_node
  15. import (
  16. "fmt"
  17. "sort"
  18. "strconv"
  19. "sync"
  20. "time"
  21. "k8s.io/kubernetes/pkg/api"
  22. "k8s.io/kubernetes/pkg/api/unversioned"
  23. "k8s.io/kubernetes/pkg/client/cache"
  24. controllerframework "k8s.io/kubernetes/pkg/controller/framework"
  25. "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
  26. kubemetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
  27. "k8s.io/kubernetes/pkg/labels"
  28. "k8s.io/kubernetes/pkg/metrics"
  29. "k8s.io/kubernetes/pkg/runtime"
  30. "k8s.io/kubernetes/pkg/watch"
  31. "k8s.io/kubernetes/test/e2e/framework"
  32. . "github.com/onsi/ginkgo"
  33. . "github.com/onsi/gomega"
  34. )
  35. const (
  36. kubeletAddr = "localhost:10255"
  37. )
  38. var _ = framework.KubeDescribe("Density [Serial] [Slow]", func() {
  39. const (
  40. // The data collection time of resource collector and the standalone cadvisor
  41. // is not synchronizated, so resource collector may miss data or
  42. // collect duplicated data
  43. containerStatsPollingPeriod = 500 * time.Millisecond
  44. )
  45. var (
  46. ns string
  47. nodeName string
  48. rc *ResourceCollector
  49. )
  50. f := framework.NewDefaultFramework("density-test")
  51. BeforeEach(func() {
  52. ns = f.Namespace.Name
  53. nodeName = framework.TestContext.NodeName
  54. // Start a standalone cadvisor pod using 'createSync', the pod is running when it returns
  55. f.PodClient().CreateSync(getCadvisorPod())
  56. // Resource collector monitors fine-grain CPU/memory usage by a standalone Cadvisor with
  57. // 1s housingkeeping interval
  58. rc = NewResourceCollector(containerStatsPollingPeriod)
  59. })
  60. Context("create a batch of pods", func() {
  61. // TODO(coufon): the values are generous, set more precise limits with benchmark data
  62. // and add more tests
  63. dTests := []densityTest{
  64. {
  65. podsNr: 10,
  66. interval: 0 * time.Millisecond,
  67. cpuLimits: framework.ContainersCPUSummary{
  68. stats.SystemContainerKubelet: {0.50: 0.25, 0.95: 0.40},
  69. stats.SystemContainerRuntime: {0.50: 0.40, 0.95: 0.60},
  70. },
  71. memLimits: framework.ResourceUsagePerContainer{
  72. stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 100 * 1024 * 1024},
  73. stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 500 * 1024 * 1024},
  74. },
  75. // percentile limit of single pod startup latency
  76. podStartupLimits: framework.LatencyMetric{
  77. Perc50: 16 * time.Second,
  78. Perc90: 18 * time.Second,
  79. Perc99: 20 * time.Second,
  80. },
  81. // upbound of startup latency of a batch of pods
  82. podBatchStartupLimit: 25 * time.Second,
  83. },
  84. }
  85. for _, testArg := range dTests {
  86. itArg := testArg
  87. It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval",
  88. itArg.podsNr, itArg.interval), func() {
  89. itArg.createMethod = "batch"
  90. testName := itArg.getTestName()
  91. batchLag, e2eLags := runDensityBatchTest(f, rc, itArg, false)
  92. By("Verifying latency")
  93. logAndVerifyLatency(batchLag, e2eLags, itArg.podStartupLimits, itArg.podBatchStartupLimit, testName, true)
  94. By("Verifying resource")
  95. logAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, testName, true)
  96. })
  97. }
  98. })
  99. Context("create a batch of pods", func() {
  100. dTests := []densityTest{
  101. {
  102. podsNr: 10,
  103. interval: 0 * time.Millisecond,
  104. },
  105. {
  106. podsNr: 35,
  107. interval: 0 * time.Millisecond,
  108. },
  109. {
  110. podsNr: 105,
  111. interval: 0 * time.Millisecond,
  112. },
  113. {
  114. podsNr: 10,
  115. interval: 100 * time.Millisecond,
  116. },
  117. {
  118. podsNr: 35,
  119. interval: 100 * time.Millisecond,
  120. },
  121. {
  122. podsNr: 105,
  123. interval: 100 * time.Millisecond,
  124. },
  125. {
  126. podsNr: 10,
  127. interval: 300 * time.Millisecond,
  128. },
  129. {
  130. podsNr: 35,
  131. interval: 300 * time.Millisecond,
  132. },
  133. {
  134. podsNr: 105,
  135. interval: 300 * time.Millisecond,
  136. },
  137. }
  138. for _, testArg := range dTests {
  139. itArg := testArg
  140. It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval [Benchmark]",
  141. itArg.podsNr, itArg.interval), func() {
  142. itArg.createMethod = "batch"
  143. testName := itArg.getTestName()
  144. batchLag, e2eLags := runDensityBatchTest(f, rc, itArg, true)
  145. By("Verifying latency")
  146. logAndVerifyLatency(batchLag, e2eLags, itArg.podStartupLimits, itArg.podBatchStartupLimit, testName, false)
  147. By("Verifying resource")
  148. logAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, testName, false)
  149. })
  150. }
  151. })
  152. Context("create a sequence of pods", func() {
  153. dTests := []densityTest{
  154. {
  155. podsNr: 10,
  156. bgPodsNr: 50,
  157. cpuLimits: framework.ContainersCPUSummary{
  158. stats.SystemContainerKubelet: {0.50: 0.25, 0.95: 0.40},
  159. stats.SystemContainerRuntime: {0.50: 0.40, 0.95: 0.60},
  160. },
  161. memLimits: framework.ResourceUsagePerContainer{
  162. stats.SystemContainerKubelet: &framework.ContainerResourceUsage{MemoryRSSInBytes: 100 * 1024 * 1024},
  163. stats.SystemContainerRuntime: &framework.ContainerResourceUsage{MemoryRSSInBytes: 500 * 1024 * 1024},
  164. },
  165. podStartupLimits: framework.LatencyMetric{
  166. Perc50: 5000 * time.Millisecond,
  167. Perc90: 9000 * time.Millisecond,
  168. Perc99: 10000 * time.Millisecond,
  169. },
  170. },
  171. }
  172. for _, testArg := range dTests {
  173. itArg := testArg
  174. It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %d background pods",
  175. itArg.podsNr, itArg.bgPodsNr), func() {
  176. itArg.createMethod = "sequence"
  177. testName := itArg.getTestName()
  178. batchlag, e2eLags := runDensitySeqTest(f, rc, itArg)
  179. By("Verifying latency")
  180. logAndVerifyLatency(batchlag, e2eLags, itArg.podStartupLimits, itArg.podBatchStartupLimit, testName, true)
  181. By("Verifying resource")
  182. logAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, testName, true)
  183. })
  184. }
  185. })
  186. Context("create a sequence of pods", func() {
  187. dTests := []densityTest{
  188. {
  189. podsNr: 10,
  190. bgPodsNr: 50,
  191. },
  192. {
  193. podsNr: 30,
  194. bgPodsNr: 50,
  195. },
  196. {
  197. podsNr: 50,
  198. bgPodsNr: 50,
  199. },
  200. }
  201. for _, testArg := range dTests {
  202. itArg := testArg
  203. It(fmt.Sprintf("latency/resource should be within limit when create %d pods with %d background pods [Benchmark]",
  204. itArg.podsNr, itArg.bgPodsNr), func() {
  205. itArg.createMethod = "sequence"
  206. testName := itArg.getTestName()
  207. batchlag, e2eLags := runDensitySeqTest(f, rc, itArg)
  208. By("Verifying latency")
  209. logAndVerifyLatency(batchlag, e2eLags, itArg.podStartupLimits, itArg.podBatchStartupLimit, testName, false)
  210. By("Verifying resource")
  211. logAndVerifyResource(f, rc, itArg.cpuLimits, itArg.memLimits, testName, false)
  212. })
  213. }
  214. })
  215. })
  216. type densityTest struct {
  217. // number of pods
  218. podsNr int
  219. // number of background pods
  220. bgPodsNr int
  221. // interval between creating pod (rate control)
  222. interval time.Duration
  223. // create pods in 'batch' or 'sequence'
  224. createMethod string
  225. // performance limits
  226. cpuLimits framework.ContainersCPUSummary
  227. memLimits framework.ResourceUsagePerContainer
  228. podStartupLimits framework.LatencyMetric
  229. podBatchStartupLimit time.Duration
  230. }
  231. func (dt *densityTest) getTestName() string {
  232. return fmt.Sprintf("density_create_%s_%d_%d_%d", dt.createMethod, dt.podsNr, dt.bgPodsNr, dt.interval.Nanoseconds()/1000000)
  233. }
  234. // runDensityBatchTest runs the density batch pod creation test
  235. func runDensityBatchTest(f *framework.Framework, rc *ResourceCollector, testArg densityTest,
  236. isLogTimeSeries bool) (time.Duration, []framework.PodLatencyData) {
  237. const (
  238. podType = "density_test_pod"
  239. sleepBeforeCreatePods = 30 * time.Second
  240. )
  241. var (
  242. mutex = &sync.Mutex{}
  243. watchTimes = make(map[string]unversioned.Time, 0)
  244. stopCh = make(chan struct{})
  245. )
  246. // create test pod data structure
  247. pods := newTestPods(testArg.podsNr, ImageRegistry[pauseImage], podType)
  248. // the controller watches the change of pod status
  249. controller := newInformerWatchPod(f, mutex, watchTimes, podType)
  250. go controller.Run(stopCh)
  251. defer close(stopCh)
  252. // TODO(coufon): in the test we found kubelet starts while it is busy on something, as a result 'syncLoop'
  253. // does not response to pod creation immediately. Creating the first pod has a delay around 5s.
  254. // The node status has already been 'ready' so `wait and check node being ready does not help here.
  255. // Now wait here for a grace period to let 'syncLoop' be ready
  256. time.Sleep(sleepBeforeCreatePods)
  257. rc.Start()
  258. // Explicitly delete pods to prevent namespace controller cleanning up timeout
  259. defer deletePodsSync(f, append(pods, getCadvisorPod()))
  260. defer rc.Stop()
  261. By("Creating a batch of pods")
  262. // It returns a map['pod name']'creation time' containing the creation timestamps
  263. createTimes := createBatchPodWithRateControl(f, pods, testArg.interval)
  264. By("Waiting for all Pods to be observed by the watch...")
  265. Eventually(func() bool {
  266. return len(watchTimes) == testArg.podsNr
  267. }, 10*time.Minute, 10*time.Second).Should(BeTrue())
  268. if len(watchTimes) < testArg.podsNr {
  269. framework.Failf("Timeout reached waiting for all Pods to be observed by the watch.")
  270. }
  271. // Analyze results
  272. var (
  273. firstCreate unversioned.Time
  274. lastRunning unversioned.Time
  275. init = true
  276. e2eLags = make([]framework.PodLatencyData, 0)
  277. )
  278. for name, create := range createTimes {
  279. watch, ok := watchTimes[name]
  280. Expect(ok).To(Equal(true))
  281. e2eLags = append(e2eLags,
  282. framework.PodLatencyData{Name: name, Latency: watch.Time.Sub(create.Time)})
  283. if !init {
  284. if firstCreate.Time.After(create.Time) {
  285. firstCreate = create
  286. }
  287. if lastRunning.Time.Before(watch.Time) {
  288. lastRunning = watch
  289. }
  290. } else {
  291. init = false
  292. firstCreate, lastRunning = create, watch
  293. }
  294. }
  295. sort.Sort(framework.LatencySlice(e2eLags))
  296. batchLag := lastRunning.Time.Sub(firstCreate.Time)
  297. testName := testArg.getTestName()
  298. // Log time series data.
  299. if isLogTimeSeries {
  300. logDensityTimeSeries(rc, createTimes, watchTimes, testName)
  301. }
  302. // Log throughput data.
  303. logPodCreateThroughput(batchLag, e2eLags, testArg.podsNr, testName)
  304. return batchLag, e2eLags
  305. }
  306. // runDensitySeqTest runs the density sequential pod creation test
  307. func runDensitySeqTest(f *framework.Framework, rc *ResourceCollector, testArg densityTest) (time.Duration, []framework.PodLatencyData) {
  308. const (
  309. podType = "density_test_pod"
  310. sleepBeforeCreatePods = 30 * time.Second
  311. )
  312. bgPods := newTestPods(testArg.bgPodsNr, ImageRegistry[pauseImage], "background_pod")
  313. testPods := newTestPods(testArg.podsNr, ImageRegistry[pauseImage], podType)
  314. By("Creating a batch of background pods")
  315. // CreatBatch is synchronized, all pods are running when it returns
  316. f.PodClient().CreateBatch(bgPods)
  317. time.Sleep(sleepBeforeCreatePods)
  318. rc.Start()
  319. // Explicitly delete pods to prevent namespace controller cleanning up timeout
  320. defer deletePodsSync(f, append(bgPods, append(testPods, getCadvisorPod())...))
  321. defer rc.Stop()
  322. // Create pods sequentially (back-to-back). e2eLags have been sorted.
  323. batchlag, e2eLags := createBatchPodSequential(f, testPods)
  324. // Log throughput data.
  325. logPodCreateThroughput(batchlag, e2eLags, testArg.podsNr, testArg.getTestName())
  326. return batchlag, e2eLags
  327. }
  328. // createBatchPodWithRateControl creates a batch of pods concurrently, uses one goroutine for each creation.
  329. // between creations there is an interval for throughput control
  330. func createBatchPodWithRateControl(f *framework.Framework, pods []*api.Pod, interval time.Duration) map[string]unversioned.Time {
  331. createTimes := make(map[string]unversioned.Time)
  332. for _, pod := range pods {
  333. createTimes[pod.ObjectMeta.Name] = unversioned.Now()
  334. go f.PodClient().Create(pod)
  335. time.Sleep(interval)
  336. }
  337. return createTimes
  338. }
  339. // getPodStartLatency gets prometheus metric 'pod start latency' from kubelet
  340. func getPodStartLatency(node string) (framework.KubeletLatencyMetrics, error) {
  341. latencyMetrics := framework.KubeletLatencyMetrics{}
  342. ms, err := metrics.GrabKubeletMetricsWithoutProxy(node)
  343. Expect(err).NotTo(HaveOccurred())
  344. for _, samples := range ms {
  345. for _, sample := range samples {
  346. if sample.Metric["__name__"] == kubemetrics.KubeletSubsystem+"_"+kubemetrics.PodStartLatencyKey {
  347. quantile, _ := strconv.ParseFloat(string(sample.Metric["quantile"]), 64)
  348. latencyMetrics = append(latencyMetrics,
  349. framework.KubeletLatencyMetric{
  350. Quantile: quantile,
  351. Method: kubemetrics.PodStartLatencyKey,
  352. Latency: time.Duration(int(sample.Value)) * time.Microsecond})
  353. }
  354. }
  355. }
  356. return latencyMetrics, nil
  357. }
  358. // verifyPodStartupLatency verifies whether 50, 90 and 99th percentiles of PodStartupLatency are
  359. // within the threshold.
  360. func verifyPodStartupLatency(expect, actual framework.LatencyMetric) error {
  361. if actual.Perc50 > expect.Perc50 {
  362. return fmt.Errorf("too high pod startup latency 50th percentile: %v", actual.Perc50)
  363. }
  364. if actual.Perc90 > expect.Perc90 {
  365. return fmt.Errorf("too high pod startup latency 90th percentile: %v", actual.Perc90)
  366. }
  367. if actual.Perc99 > actual.Perc99 {
  368. return fmt.Errorf("too high pod startup latency 99th percentil: %v", actual.Perc99)
  369. }
  370. return nil
  371. }
  372. // newInformerWatchPod creates an informer to check whether all pods are running.
  373. func newInformerWatchPod(f *framework.Framework, mutex *sync.Mutex, watchTimes map[string]unversioned.Time,
  374. podType string) *controllerframework.Controller {
  375. ns := f.Namespace.Name
  376. checkPodRunning := func(p *api.Pod) {
  377. mutex.Lock()
  378. defer mutex.Unlock()
  379. defer GinkgoRecover()
  380. if p.Status.Phase == api.PodRunning {
  381. if _, found := watchTimes[p.Name]; !found {
  382. watchTimes[p.Name] = unversioned.Now()
  383. }
  384. }
  385. }
  386. _, controller := controllerframework.NewInformer(
  387. &cache.ListWatch{
  388. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  389. options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType})
  390. return f.Client.Pods(ns).List(options)
  391. },
  392. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  393. options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType})
  394. return f.Client.Pods(ns).Watch(options)
  395. },
  396. },
  397. &api.Pod{},
  398. 0,
  399. controllerframework.ResourceEventHandlerFuncs{
  400. AddFunc: func(obj interface{}) {
  401. p, ok := obj.(*api.Pod)
  402. Expect(ok).To(Equal(true))
  403. go checkPodRunning(p)
  404. },
  405. UpdateFunc: func(oldObj, newObj interface{}) {
  406. p, ok := newObj.(*api.Pod)
  407. Expect(ok).To(Equal(true))
  408. go checkPodRunning(p)
  409. },
  410. },
  411. )
  412. return controller
  413. }
  414. // createBatchPodSequential creats pods back-to-back in sequence.
  415. func createBatchPodSequential(f *framework.Framework, pods []*api.Pod) (time.Duration, []framework.PodLatencyData) {
  416. batchStartTime := unversioned.Now()
  417. e2eLags := make([]framework.PodLatencyData, 0)
  418. for _, pod := range pods {
  419. create := unversioned.Now()
  420. f.PodClient().CreateSync(pod)
  421. e2eLags = append(e2eLags,
  422. framework.PodLatencyData{Name: pod.Name, Latency: unversioned.Now().Time.Sub(create.Time)})
  423. }
  424. batchLag := unversioned.Now().Time.Sub(batchStartTime.Time)
  425. sort.Sort(framework.LatencySlice(e2eLags))
  426. return batchLag, e2eLags
  427. }
  428. // logAndVerifyLatency verifies that whether pod creation latency satisfies the limit.
  429. func logAndVerifyLatency(batchLag time.Duration, e2eLags []framework.PodLatencyData, podStartupLimits framework.LatencyMetric,
  430. podBatchStartupLimit time.Duration, testName string, isVerify bool) {
  431. framework.PrintLatencies(e2eLags, "worst client e2e total latencies")
  432. // TODO(coufon): do not trust 'kubelet' metrics since they are not reset!
  433. latencyMetrics, _ := getPodStartLatency(kubeletAddr)
  434. framework.Logf("Kubelet Prometheus metrics (not reset):\n%s", framework.PrettyPrintJSON(latencyMetrics))
  435. podCreateLatency := framework.PodStartupLatency{Latency: framework.ExtractLatencyMetrics(e2eLags)}
  436. // log latency perf data
  437. framework.PrintPerfData(getLatencyPerfData(podCreateLatency.Latency, testName))
  438. if isVerify {
  439. // check whether e2e pod startup time is acceptable.
  440. framework.ExpectNoError(verifyPodStartupLatency(podStartupLimits, podCreateLatency.Latency))
  441. // check bactch pod creation latency
  442. if podBatchStartupLimit > 0 {
  443. Expect(batchLag <= podBatchStartupLimit).To(Equal(true), "Batch creation startup time %v exceed limit %v",
  444. batchLag, podBatchStartupLimit)
  445. }
  446. }
  447. }
  448. // logThroughput calculates and logs pod creation throughput.
  449. func logPodCreateThroughput(batchLag time.Duration, e2eLags []framework.PodLatencyData, podsNr int, testName string) {
  450. framework.PrintPerfData(getThroughputPerfData(batchLag, e2eLags, podsNr, testName))
  451. }