kubelet_stats.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package framework
  14. import (
  15. "bytes"
  16. "encoding/json"
  17. "fmt"
  18. "sort"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "text/tabwriter"
  23. "time"
  24. cadvisorapi "github.com/google/cadvisor/info/v1"
  25. "github.com/prometheus/common/model"
  26. "k8s.io/kubernetes/pkg/api"
  27. client "k8s.io/kubernetes/pkg/client/unversioned"
  28. "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
  29. kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
  30. kubeletstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
  31. "k8s.io/kubernetes/pkg/master/ports"
  32. "k8s.io/kubernetes/pkg/metrics"
  33. utilerrors "k8s.io/kubernetes/pkg/util/errors"
  34. "k8s.io/kubernetes/pkg/util/sets"
  35. "k8s.io/kubernetes/pkg/util/wait"
  36. )
  37. // KubeletMetric stores metrics scraped from the kubelet server's /metric endpoint.
  38. // TODO: Get some more structure around the metrics and this type
  39. type KubeletLatencyMetric struct {
  40. // eg: list, info, create
  41. Operation string
  42. // eg: sync_pods, pod_worker
  43. Method string
  44. // 0 <= quantile <=1, e.g. 0.95 is 95%tile, 0.5 is median.
  45. Quantile float64
  46. Latency time.Duration
  47. }
  48. // KubeletMetricByLatency implements sort.Interface for []KubeletMetric based on
  49. // the latency field.
  50. type KubeletLatencyMetrics []KubeletLatencyMetric
  51. func (a KubeletLatencyMetrics) Len() int { return len(a) }
  52. func (a KubeletLatencyMetrics) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  53. func (a KubeletLatencyMetrics) Less(i, j int) bool { return a[i].Latency > a[j].Latency }
  54. // If a apiserver client is passed in, the function will try to get kubelet metrics from metrics grabber;
  55. // or else, the function will try to get kubelet metrics directly from the node.
  56. func getKubeletMetricsFromNode(c *client.Client, nodeName string) (metrics.KubeletMetrics, error) {
  57. if c == nil {
  58. return metrics.GrabKubeletMetricsWithoutProxy(nodeName)
  59. }
  60. grabber, err := metrics.NewMetricsGrabber(c, true, false, false, false)
  61. if err != nil {
  62. return metrics.KubeletMetrics{}, err
  63. }
  64. return grabber.GrabFromKubelet(nodeName)
  65. }
  66. // getKubeletMetrics gets all metrics in kubelet subsystem from specified node and trims
  67. // the subsystem prefix.
  68. func getKubeletMetrics(c *client.Client, nodeName string) (metrics.KubeletMetrics, error) {
  69. ms, err := getKubeletMetricsFromNode(c, nodeName)
  70. if err != nil {
  71. return metrics.KubeletMetrics{}, err
  72. }
  73. kubeletMetrics := make(metrics.KubeletMetrics)
  74. for name, samples := range ms {
  75. const prefix = kubeletmetrics.KubeletSubsystem + "_"
  76. if !strings.HasPrefix(name, prefix) {
  77. // Not a kubelet metric.
  78. continue
  79. }
  80. method := strings.TrimPrefix(name, prefix)
  81. kubeletMetrics[method] = samples
  82. }
  83. return kubeletMetrics, nil
  84. }
  85. // GetKubeletLatencyMetrics gets all latency related kubelet metrics. Note that the KubeletMetrcis
  86. // passed in should not contain subsystem prefix.
  87. func GetKubeletLatencyMetrics(ms metrics.KubeletMetrics) KubeletLatencyMetrics {
  88. latencyMethods := sets.NewString(
  89. kubeletmetrics.PodWorkerLatencyKey,
  90. kubeletmetrics.PodWorkerStartLatencyKey,
  91. kubeletmetrics.SyncPodsLatencyKey,
  92. kubeletmetrics.PodStartLatencyKey,
  93. kubeletmetrics.PodStatusLatencyKey,
  94. kubeletmetrics.ContainerManagerOperationsKey,
  95. kubeletmetrics.DockerOperationsLatencyKey,
  96. kubeletmetrics.PodWorkerStartLatencyKey,
  97. kubeletmetrics.PLEGRelistLatencyKey,
  98. )
  99. var latencyMetrics KubeletLatencyMetrics
  100. for method, samples := range ms {
  101. if !latencyMethods.Has(method) {
  102. continue
  103. }
  104. for _, sample := range samples {
  105. latency := sample.Value
  106. operation := string(sample.Metric["operation_type"])
  107. var quantile float64
  108. if val, ok := sample.Metric[model.QuantileLabel]; ok {
  109. var err error
  110. if quantile, err = strconv.ParseFloat(string(val), 64); err != nil {
  111. continue
  112. }
  113. }
  114. latencyMetrics = append(latencyMetrics, KubeletLatencyMetric{
  115. Operation: operation,
  116. Method: method,
  117. Quantile: quantile,
  118. Latency: time.Duration(int64(latency)) * time.Microsecond,
  119. })
  120. }
  121. }
  122. return latencyMetrics
  123. }
  124. // RuntimeOperationMonitor is the tool getting and parsing docker operation metrics.
  125. type RuntimeOperationMonitor struct {
  126. client *client.Client
  127. nodesRuntimeOps map[string]NodeRuntimeOperationErrorRate
  128. }
  129. // NodeRuntimeOperationErrorRate is the runtime operation error rate on one node.
  130. type NodeRuntimeOperationErrorRate map[string]*RuntimeOperationErrorRate
  131. // RuntimeOperationErrorRate is the error rate of a specified runtime operation.
  132. type RuntimeOperationErrorRate struct {
  133. TotalNumber float64
  134. ErrorRate float64
  135. TimeoutRate float64
  136. }
  137. func NewRuntimeOperationMonitor(c *client.Client) *RuntimeOperationMonitor {
  138. m := &RuntimeOperationMonitor{
  139. client: c,
  140. nodesRuntimeOps: make(map[string]NodeRuntimeOperationErrorRate),
  141. }
  142. nodes, err := m.client.Nodes().List(api.ListOptions{})
  143. if err != nil {
  144. Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err)
  145. }
  146. for _, node := range nodes.Items {
  147. m.nodesRuntimeOps[node.Name] = make(NodeRuntimeOperationErrorRate)
  148. }
  149. // Initialize the runtime operation error rate
  150. m.GetRuntimeOperationErrorRate()
  151. return m
  152. }
  153. // GetRuntimeOperationErrorRate gets runtime operation records from kubelet metrics and calculate
  154. // error rates of all runtime operations.
  155. func (m *RuntimeOperationMonitor) GetRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate {
  156. for node := range m.nodesRuntimeOps {
  157. nodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node)
  158. if err != nil {
  159. Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
  160. continue
  161. }
  162. m.nodesRuntimeOps[node] = nodeResult
  163. }
  164. return m.nodesRuntimeOps
  165. }
  166. // GetLatestRuntimeOperationErrorRate gets latest error rate and timeout rate from last observed RuntimeOperationErrorRate.
  167. func (m *RuntimeOperationMonitor) GetLatestRuntimeOperationErrorRate() map[string]NodeRuntimeOperationErrorRate {
  168. result := make(map[string]NodeRuntimeOperationErrorRate)
  169. for node := range m.nodesRuntimeOps {
  170. result[node] = make(NodeRuntimeOperationErrorRate)
  171. oldNodeResult := m.nodesRuntimeOps[node]
  172. curNodeResult, err := getNodeRuntimeOperationErrorRate(m.client, node)
  173. if err != nil {
  174. Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
  175. continue
  176. }
  177. for op, cur := range curNodeResult {
  178. t := *cur
  179. if old, found := oldNodeResult[op]; found {
  180. t.ErrorRate = (t.ErrorRate*t.TotalNumber - old.ErrorRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber)
  181. t.TimeoutRate = (t.TimeoutRate*t.TotalNumber - old.TimeoutRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber)
  182. t.TotalNumber -= old.TotalNumber
  183. }
  184. result[node][op] = &t
  185. }
  186. m.nodesRuntimeOps[node] = curNodeResult
  187. }
  188. return result
  189. }
  190. // FormatRuntimeOperationErrorRate formats the runtime operation error rate to string.
  191. func FormatRuntimeOperationErrorRate(nodesResult map[string]NodeRuntimeOperationErrorRate) string {
  192. lines := []string{}
  193. for node, nodeResult := range nodesResult {
  194. lines = append(lines, fmt.Sprintf("node %q runtime operation error rate:", node))
  195. for op, result := range nodeResult {
  196. line := fmt.Sprintf("operation %q: total - %.0f; error rate - %f; timeout rate - %f", op,
  197. result.TotalNumber, result.ErrorRate, result.TimeoutRate)
  198. lines = append(lines, line)
  199. }
  200. lines = append(lines, fmt.Sprintln())
  201. }
  202. return strings.Join(lines, "\n")
  203. }
  204. // getNodeRuntimeOperationErrorRate gets runtime operation error rate from specified node.
  205. func getNodeRuntimeOperationErrorRate(c *client.Client, node string) (NodeRuntimeOperationErrorRate, error) {
  206. result := make(NodeRuntimeOperationErrorRate)
  207. ms, err := getKubeletMetrics(c, node)
  208. if err != nil {
  209. return result, err
  210. }
  211. // If no corresponding metrics are found, the returned samples will be empty. Then the following
  212. // loop will be skipped automatically.
  213. allOps := ms[kubeletmetrics.DockerOperationsKey]
  214. errOps := ms[kubeletmetrics.DockerOperationsErrorsKey]
  215. timeoutOps := ms[kubeletmetrics.DockerOperationsTimeoutKey]
  216. for _, sample := range allOps {
  217. operation := string(sample.Metric["operation_type"])
  218. result[operation] = &RuntimeOperationErrorRate{TotalNumber: float64(sample.Value)}
  219. }
  220. for _, sample := range errOps {
  221. operation := string(sample.Metric["operation_type"])
  222. // Should always find the corresponding item, just in case
  223. if _, found := result[operation]; found {
  224. result[operation].ErrorRate = float64(sample.Value) / result[operation].TotalNumber
  225. }
  226. }
  227. for _, sample := range timeoutOps {
  228. operation := string(sample.Metric["operation_type"])
  229. if _, found := result[operation]; found {
  230. result[operation].TimeoutRate = float64(sample.Value) / result[operation].TotalNumber
  231. }
  232. }
  233. return result, nil
  234. }
  235. // HighLatencyKubeletOperations logs and counts the high latency metrics exported by the kubelet server via /metrics.
  236. func HighLatencyKubeletOperations(c *client.Client, threshold time.Duration, nodeName string) (KubeletLatencyMetrics, error) {
  237. ms, err := getKubeletMetrics(c, nodeName)
  238. if err != nil {
  239. return KubeletLatencyMetrics{}, err
  240. }
  241. latencyMetrics := GetKubeletLatencyMetrics(ms)
  242. sort.Sort(latencyMetrics)
  243. var badMetrics KubeletLatencyMetrics
  244. Logf("\nLatency metrics for node %v", nodeName)
  245. for _, m := range latencyMetrics {
  246. if m.Latency > threshold {
  247. badMetrics = append(badMetrics, m)
  248. Logf("%+v", m)
  249. }
  250. }
  251. return badMetrics, nil
  252. }
  253. // getContainerInfo contacts kubelet for the container information. The "Stats"
  254. // in the returned ContainerInfo is subject to the requirements in statsRequest.
  255. // TODO: This function uses the deprecated kubelet stats API; it should be
  256. // removed.
  257. func getContainerInfo(c *client.Client, nodeName string, req *kubeletstats.StatsRequest) (map[string]cadvisorapi.ContainerInfo, error) {
  258. reqBody, err := json.Marshal(req)
  259. if err != nil {
  260. return nil, err
  261. }
  262. subResourceProxyAvailable, err := ServerVersionGTE(subResourceServiceAndNodeProxyVersion, c)
  263. if err != nil {
  264. return nil, err
  265. }
  266. var data []byte
  267. if subResourceProxyAvailable {
  268. data, err = c.Post().
  269. Resource("nodes").
  270. SubResource("proxy").
  271. Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
  272. Suffix("stats/container").
  273. SetHeader("Content-Type", "application/json").
  274. Body(reqBody).
  275. Do().Raw()
  276. } else {
  277. data, err = c.Post().
  278. Prefix("proxy").
  279. Resource("nodes").
  280. Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
  281. Suffix("stats/container").
  282. SetHeader("Content-Type", "application/json").
  283. Body(reqBody).
  284. Do().Raw()
  285. }
  286. if err != nil {
  287. return nil, err
  288. }
  289. var containers map[string]cadvisorapi.ContainerInfo
  290. err = json.Unmarshal(data, &containers)
  291. if err != nil {
  292. return nil, err
  293. }
  294. return containers, nil
  295. }
  296. // getOneTimeResourceUsageOnNode queries the node's /stats/container endpoint
  297. // and returns the resource usage of all containerNames for the past
  298. // cpuInterval.
  299. // The acceptable range of the interval is 2s~120s. Be warned that as the
  300. // interval (and #containers) increases, the size of kubelet's response
  301. // could be significant. E.g., the 60s interval stats for ~20 containers is
  302. // ~1.5MB. Don't hammer the node with frequent, heavy requests.
  303. //
  304. // cadvisor records cumulative cpu usage in nanoseconds, so we need to have two
  305. // stats points to compute the cpu usage over the interval. Assuming cadvisor
  306. // polls every second, we'd need to get N stats points for N-second interval.
  307. // Note that this is an approximation and may not be accurate, hence we also
  308. // write the actual interval used for calculation (based on the timestamps of
  309. // the stats points in ContainerResourceUsage.CPUInterval.
  310. //
  311. // containerNames is a function returning a collection of container names in which
  312. // user is interested in. ExpectMissingContainers is a flag which says if the test
  313. // should fail if one of containers listed by containerNames is missing on any node
  314. // (useful e.g. when looking for system containers or daemons). If set to true function
  315. // is more forgiving and ignores missing containers.
  316. // TODO: This function relies on the deprecated kubelet stats API and should be
  317. // removed and/or rewritten.
  318. func getOneTimeResourceUsageOnNode(
  319. c *client.Client,
  320. nodeName string,
  321. cpuInterval time.Duration,
  322. containerNames func() []string,
  323. expectMissingContainers bool,
  324. ) (ResourceUsagePerContainer, error) {
  325. const (
  326. // cadvisor records stats about every second.
  327. cadvisorStatsPollingIntervalInSeconds float64 = 1.0
  328. // cadvisor caches up to 2 minutes of stats (configured by kubelet).
  329. maxNumStatsToRequest int = 120
  330. )
  331. numStats := int(float64(cpuInterval.Seconds()) / cadvisorStatsPollingIntervalInSeconds)
  332. if numStats < 2 || numStats > maxNumStatsToRequest {
  333. return nil, fmt.Errorf("numStats needs to be > 1 and < %d", maxNumStatsToRequest)
  334. }
  335. // Get information of all containers on the node.
  336. containerInfos, err := getContainerInfo(c, nodeName, &kubeletstats.StatsRequest{
  337. ContainerName: "/",
  338. NumStats: numStats,
  339. Subcontainers: true,
  340. })
  341. if err != nil {
  342. return nil, err
  343. }
  344. f := func(name string, oldStats, newStats *cadvisorapi.ContainerStats) *ContainerResourceUsage {
  345. return &ContainerResourceUsage{
  346. Name: name,
  347. Timestamp: newStats.Timestamp,
  348. CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()),
  349. MemoryUsageInBytes: newStats.Memory.Usage,
  350. MemoryWorkingSetInBytes: newStats.Memory.WorkingSet,
  351. MemoryRSSInBytes: newStats.Memory.RSS,
  352. CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp),
  353. }
  354. }
  355. // Process container infos that are relevant to us.
  356. containers := containerNames()
  357. usageMap := make(ResourceUsagePerContainer, len(containers))
  358. for _, name := range containers {
  359. info, ok := containerInfos[name]
  360. if !ok {
  361. if !expectMissingContainers {
  362. return nil, fmt.Errorf("missing info for container %q on node %q", name, nodeName)
  363. }
  364. continue
  365. }
  366. first := info.Stats[0]
  367. last := info.Stats[len(info.Stats)-1]
  368. usageMap[name] = f(name, first, last)
  369. }
  370. return usageMap, nil
  371. }
  372. func getNodeStatsSummary(c *client.Client, nodeName string) (*stats.Summary, error) {
  373. subResourceProxyAvailable, err := ServerVersionGTE(subResourceServiceAndNodeProxyVersion, c)
  374. if err != nil {
  375. return nil, err
  376. }
  377. var data []byte
  378. if subResourceProxyAvailable {
  379. data, err = c.Get().
  380. Resource("nodes").
  381. SubResource("proxy").
  382. Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
  383. Suffix("stats/summary").
  384. SetHeader("Content-Type", "application/json").
  385. Do().Raw()
  386. } else {
  387. data, err = c.Get().
  388. Prefix("proxy").
  389. Resource("nodes").
  390. Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
  391. Suffix("stats/summary").
  392. SetHeader("Content-Type", "application/json").
  393. Do().Raw()
  394. }
  395. if err != nil {
  396. return nil, err
  397. }
  398. var summary *stats.Summary
  399. err = json.Unmarshal(data, &summary)
  400. if err != nil {
  401. return nil, err
  402. }
  403. return summary, nil
  404. }
  405. func getSystemContainerStats(summary *stats.Summary) map[string]*stats.ContainerStats {
  406. statsList := summary.Node.SystemContainers
  407. statsMap := make(map[string]*stats.ContainerStats)
  408. for i := range statsList {
  409. statsMap[statsList[i].Name] = &statsList[i]
  410. }
  411. // Create a root container stats using information available in
  412. // stats.NodeStats. This is necessary since it is a different type.
  413. statsMap[rootContainerName] = &stats.ContainerStats{
  414. CPU: summary.Node.CPU,
  415. Memory: summary.Node.Memory,
  416. }
  417. return statsMap
  418. }
  419. const (
  420. rootContainerName = "/"
  421. )
  422. // A list of containers for which we want to collect resource usage.
  423. func TargetContainers() []string {
  424. return []string{
  425. rootContainerName,
  426. stats.SystemContainerRuntime,
  427. stats.SystemContainerKubelet,
  428. stats.SystemContainerMisc,
  429. }
  430. }
  431. type ContainerResourceUsage struct {
  432. Name string
  433. Timestamp time.Time
  434. CPUUsageInCores float64
  435. MemoryUsageInBytes uint64
  436. MemoryWorkingSetInBytes uint64
  437. MemoryRSSInBytes uint64
  438. // The interval used to calculate CPUUsageInCores.
  439. CPUInterval time.Duration
  440. }
  441. func (r *ContainerResourceUsage) isStrictlyGreaterThan(rhs *ContainerResourceUsage) bool {
  442. return r.CPUUsageInCores > rhs.CPUUsageInCores && r.MemoryWorkingSetInBytes > rhs.MemoryWorkingSetInBytes
  443. }
  444. type ResourceUsagePerContainer map[string]*ContainerResourceUsage
  445. type ResourceUsagePerNode map[string]ResourceUsagePerContainer
  446. func formatResourceUsageStats(nodeName string, containerStats ResourceUsagePerContainer) string {
  447. // Example output:
  448. //
  449. // Resource usage for node "e2e-test-foo-minion-abcde":
  450. // container cpu(cores) memory(MB)
  451. // "/" 0.363 2942.09
  452. // "/docker-daemon" 0.088 521.80
  453. // "/kubelet" 0.086 424.37
  454. // "/system" 0.007 119.88
  455. buf := &bytes.Buffer{}
  456. w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
  457. fmt.Fprintf(w, "container\tcpu(cores)\tmemory_working_set(MB)\tmemory_rss(MB)\n")
  458. for name, s := range containerStats {
  459. fmt.Fprintf(w, "%q\t%.3f\t%.2f\t%.2f\n", name, s.CPUUsageInCores, float64(s.MemoryWorkingSetInBytes)/(1024*1024), float64(s.MemoryRSSInBytes)/(1024*1024))
  460. }
  461. w.Flush()
  462. return fmt.Sprintf("Resource usage on node %q:\n%s", nodeName, buf.String())
  463. }
  464. type uint64arr []uint64
  465. func (a uint64arr) Len() int { return len(a) }
  466. func (a uint64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  467. func (a uint64arr) Less(i, j int) bool { return a[i] < a[j] }
  468. type usageDataPerContainer struct {
  469. cpuData []float64
  470. memUseData []uint64
  471. memWorkSetData []uint64
  472. }
  473. func GetKubeletHeapStats(c *client.Client, nodeName string) (string, error) {
  474. client, err := NodeProxyRequest(c, nodeName, "debug/pprof/heap")
  475. if err != nil {
  476. return "", err
  477. }
  478. raw, errRaw := client.Raw()
  479. if errRaw != nil {
  480. return "", err
  481. }
  482. stats := string(raw)
  483. // Only dumping the runtime.MemStats numbers to avoid polluting the log.
  484. numLines := 23
  485. lines := strings.Split(stats, "\n")
  486. return strings.Join(lines[len(lines)-numLines:], "\n"), nil
  487. }
  488. func PrintAllKubeletPods(c *client.Client, nodeName string) {
  489. podList, err := GetKubeletPods(c, nodeName)
  490. if err != nil {
  491. Logf("Unable to retrieve kubelet pods for node %v: %v", nodeName, err)
  492. return
  493. }
  494. for _, p := range podList.Items {
  495. Logf("%v from %v started at %v (%d container statuses recorded)", p.Name, p.Namespace, p.Status.StartTime, len(p.Status.ContainerStatuses))
  496. for _, c := range p.Status.ContainerStatuses {
  497. Logf("\tContainer %v ready: %v, restart count %v",
  498. c.Name, c.Ready, c.RestartCount)
  499. }
  500. }
  501. }
  502. func computeContainerResourceUsage(name string, oldStats, newStats *stats.ContainerStats) *ContainerResourceUsage {
  503. return &ContainerResourceUsage{
  504. Name: name,
  505. Timestamp: newStats.CPU.Time.Time,
  506. CPUUsageInCores: float64(*newStats.CPU.UsageCoreNanoSeconds-*oldStats.CPU.UsageCoreNanoSeconds) / float64(newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time).Nanoseconds()),
  507. MemoryUsageInBytes: *newStats.Memory.UsageBytes,
  508. MemoryWorkingSetInBytes: *newStats.Memory.WorkingSetBytes,
  509. MemoryRSSInBytes: *newStats.Memory.RSSBytes,
  510. CPUInterval: newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time),
  511. }
  512. }
  513. // resourceCollector periodically polls the node, collect stats for a given
  514. // list of containers, computes and cache resource usage up to
  515. // maxEntriesPerContainer for each container.
  516. type resourceCollector struct {
  517. lock sync.RWMutex
  518. node string
  519. containers []string
  520. client *client.Client
  521. buffers map[string][]*ContainerResourceUsage
  522. pollingInterval time.Duration
  523. stopCh chan struct{}
  524. }
  525. func newResourceCollector(c *client.Client, nodeName string, containerNames []string, pollingInterval time.Duration) *resourceCollector {
  526. buffers := make(map[string][]*ContainerResourceUsage)
  527. return &resourceCollector{
  528. node: nodeName,
  529. containers: containerNames,
  530. client: c,
  531. buffers: buffers,
  532. pollingInterval: pollingInterval,
  533. }
  534. }
  535. // Start starts a goroutine to Poll the node every pollingInterval.
  536. func (r *resourceCollector) Start() {
  537. r.stopCh = make(chan struct{}, 1)
  538. // Keep the last observed stats for comparison.
  539. oldStats := make(map[string]*stats.ContainerStats)
  540. go wait.Until(func() { r.collectStats(oldStats) }, r.pollingInterval, r.stopCh)
  541. }
  542. // Stop sends a signal to terminate the stats collecting goroutine.
  543. func (r *resourceCollector) Stop() {
  544. close(r.stopCh)
  545. }
  546. // collectStats gets the latest stats from kubelet stats summary API, computes
  547. // the resource usage, and pushes it to the buffer.
  548. func (r *resourceCollector) collectStats(oldStatsMap map[string]*stats.ContainerStats) {
  549. summary, err := getNodeStatsSummary(r.client, r.node)
  550. if err != nil {
  551. Logf("Error getting node stats summary on %q, err: %v", r.node, err)
  552. return
  553. }
  554. cStatsMap := getSystemContainerStats(summary)
  555. r.lock.Lock()
  556. defer r.lock.Unlock()
  557. for _, name := range r.containers {
  558. cStats, ok := cStatsMap[name]
  559. if !ok {
  560. Logf("Missing info/stats for container %q on node %q", name, r.node)
  561. return
  562. }
  563. if oldStats, ok := oldStatsMap[name]; ok {
  564. if oldStats.CPU.Time.Equal(cStats.CPU.Time) {
  565. // No change -> skip this stat.
  566. continue
  567. }
  568. r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats, cStats))
  569. }
  570. // Update the old stats.
  571. oldStatsMap[name] = cStats
  572. }
  573. }
  574. func (r *resourceCollector) GetLatest() (ResourceUsagePerContainer, error) {
  575. r.lock.RLock()
  576. defer r.lock.RUnlock()
  577. stats := make(ResourceUsagePerContainer)
  578. for _, name := range r.containers {
  579. contStats, ok := r.buffers[name]
  580. if !ok || len(contStats) == 0 {
  581. return nil, fmt.Errorf("Resource usage on node %q is not ready yet", r.node)
  582. }
  583. stats[name] = contStats[len(contStats)-1]
  584. }
  585. return stats, nil
  586. }
  587. // Reset frees the stats and start over.
  588. func (r *resourceCollector) Reset() {
  589. r.lock.Lock()
  590. defer r.lock.Unlock()
  591. for _, name := range r.containers {
  592. r.buffers[name] = []*ContainerResourceUsage{}
  593. }
  594. }
  595. type resourceUsageByCPU []*ContainerResourceUsage
  596. func (r resourceUsageByCPU) Len() int { return len(r) }
  597. func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
  598. func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores }
  599. // The percentiles to report.
  600. var percentiles = [...]float64{0.05, 0.20, 0.50, 0.70, 0.90, 0.95, 0.99}
  601. // GetBasicCPUStats returns the percentiles the cpu usage in cores for
  602. // containerName. This method examines all data currently in the buffer.
  603. func (r *resourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 {
  604. r.lock.RLock()
  605. defer r.lock.RUnlock()
  606. result := make(map[float64]float64, len(percentiles))
  607. usages := r.buffers[containerName]
  608. sort.Sort(resourceUsageByCPU(usages))
  609. for _, q := range percentiles {
  610. index := int(float64(len(usages))*q) - 1
  611. if index < 0 {
  612. // We don't have enough data.
  613. result[q] = 0
  614. continue
  615. }
  616. result[q] = usages[index].CPUUsageInCores
  617. }
  618. return result
  619. }
  620. // ResourceMonitor manages a resourceCollector per node.
  621. type ResourceMonitor struct {
  622. client *client.Client
  623. containers []string
  624. pollingInterval time.Duration
  625. collectors map[string]*resourceCollector
  626. }
  627. func NewResourceMonitor(c *client.Client, containerNames []string, pollingInterval time.Duration) *ResourceMonitor {
  628. return &ResourceMonitor{
  629. containers: containerNames,
  630. client: c,
  631. pollingInterval: pollingInterval,
  632. }
  633. }
  634. func (r *ResourceMonitor) Start() {
  635. // It should be OK to monitor unschedulable Nodes
  636. nodes, err := r.client.Nodes().List(api.ListOptions{})
  637. if err != nil {
  638. Failf("ResourceMonitor: unable to get list of nodes: %v", err)
  639. }
  640. r.collectors = make(map[string]*resourceCollector, 0)
  641. for _, node := range nodes.Items {
  642. collector := newResourceCollector(r.client, node.Name, r.containers, r.pollingInterval)
  643. r.collectors[node.Name] = collector
  644. collector.Start()
  645. }
  646. }
  647. func (r *ResourceMonitor) Stop() {
  648. for _, collector := range r.collectors {
  649. collector.Stop()
  650. }
  651. }
  652. func (r *ResourceMonitor) Reset() {
  653. for _, collector := range r.collectors {
  654. collector.Reset()
  655. }
  656. }
  657. func (r *ResourceMonitor) LogLatest() {
  658. summary, err := r.GetLatest()
  659. if err != nil {
  660. Logf("%v", err)
  661. }
  662. Logf("%s", r.FormatResourceUsage(summary))
  663. }
  664. func (r *ResourceMonitor) FormatResourceUsage(s ResourceUsagePerNode) string {
  665. summary := []string{}
  666. for node, usage := range s {
  667. summary = append(summary, formatResourceUsageStats(node, usage))
  668. }
  669. return strings.Join(summary, "\n")
  670. }
  671. func (r *ResourceMonitor) GetLatest() (ResourceUsagePerNode, error) {
  672. result := make(ResourceUsagePerNode)
  673. errs := []error{}
  674. for key, collector := range r.collectors {
  675. s, err := collector.GetLatest()
  676. if err != nil {
  677. errs = append(errs, err)
  678. continue
  679. }
  680. result[key] = s
  681. }
  682. return result, utilerrors.NewAggregate(errs)
  683. }
  684. func (r *ResourceMonitor) GetMasterNodeLatest(usagePerNode ResourceUsagePerNode) ResourceUsagePerNode {
  685. result := make(ResourceUsagePerNode)
  686. var masterUsage ResourceUsagePerContainer
  687. var nodesUsage []ResourceUsagePerContainer
  688. for node, usage := range usagePerNode {
  689. if strings.HasSuffix(node, "master") {
  690. masterUsage = usage
  691. } else {
  692. nodesUsage = append(nodesUsage, usage)
  693. }
  694. }
  695. nodeAvgUsage := make(ResourceUsagePerContainer)
  696. for _, nodeUsage := range nodesUsage {
  697. for c, usage := range nodeUsage {
  698. if _, found := nodeAvgUsage[c]; !found {
  699. nodeAvgUsage[c] = &ContainerResourceUsage{Name: usage.Name}
  700. }
  701. nodeAvgUsage[c].CPUUsageInCores += usage.CPUUsageInCores
  702. nodeAvgUsage[c].MemoryUsageInBytes += usage.MemoryUsageInBytes
  703. nodeAvgUsage[c].MemoryWorkingSetInBytes += usage.MemoryWorkingSetInBytes
  704. nodeAvgUsage[c].MemoryRSSInBytes += usage.MemoryRSSInBytes
  705. }
  706. }
  707. for c := range nodeAvgUsage {
  708. nodeAvgUsage[c].CPUUsageInCores /= float64(len(nodesUsage))
  709. nodeAvgUsage[c].MemoryUsageInBytes /= uint64(len(nodesUsage))
  710. nodeAvgUsage[c].MemoryWorkingSetInBytes /= uint64(len(nodesUsage))
  711. nodeAvgUsage[c].MemoryRSSInBytes /= uint64(len(nodesUsage))
  712. }
  713. result["master"] = masterUsage
  714. result["node"] = nodeAvgUsage
  715. return result
  716. }
  717. // ContainersCPUSummary is indexed by the container name with each entry a
  718. // (percentile, value) map.
  719. type ContainersCPUSummary map[string]map[float64]float64
  720. // NodesCPUSummary is indexed by the node name with each entry a
  721. // ContainersCPUSummary map.
  722. type NodesCPUSummary map[string]ContainersCPUSummary
  723. func (r *ResourceMonitor) FormatCPUSummary(summary NodesCPUSummary) string {
  724. // Example output for a node (the percentiles may differ):
  725. // CPU usage of containers on node "e2e-test-foo-minion-0vj7":
  726. // container 5th% 50th% 90th% 95th%
  727. // "/" 0.051 0.159 0.387 0.455
  728. // "/runtime 0.000 0.000 0.146 0.166
  729. // "/kubelet" 0.036 0.053 0.091 0.154
  730. // "/misc" 0.001 0.001 0.001 0.002
  731. var summaryStrings []string
  732. var header []string
  733. header = append(header, "container")
  734. for _, p := range percentiles {
  735. header = append(header, fmt.Sprintf("%.0fth%%", p*100))
  736. }
  737. for nodeName, containers := range summary {
  738. buf := &bytes.Buffer{}
  739. w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
  740. fmt.Fprintf(w, "%s\n", strings.Join(header, "\t"))
  741. for _, containerName := range TargetContainers() {
  742. var s []string
  743. s = append(s, fmt.Sprintf("%q", containerName))
  744. data, ok := containers[containerName]
  745. for _, p := range percentiles {
  746. value := "N/A"
  747. if ok {
  748. value = fmt.Sprintf("%.3f", data[p])
  749. }
  750. s = append(s, value)
  751. }
  752. fmt.Fprintf(w, "%s\n", strings.Join(s, "\t"))
  753. }
  754. w.Flush()
  755. summaryStrings = append(summaryStrings, fmt.Sprintf("CPU usage of containers on node %q\n:%s", nodeName, buf.String()))
  756. }
  757. return strings.Join(summaryStrings, "\n")
  758. }
  759. func (r *ResourceMonitor) LogCPUSummary() {
  760. summary := r.GetCPUSummary()
  761. Logf("%s", r.FormatCPUSummary(summary))
  762. }
  763. func (r *ResourceMonitor) GetCPUSummary() NodesCPUSummary {
  764. result := make(NodesCPUSummary)
  765. for nodeName, collector := range r.collectors {
  766. result[nodeName] = make(ContainersCPUSummary)
  767. for _, containerName := range TargetContainers() {
  768. data := collector.GetBasicCPUStats(containerName)
  769. result[nodeName][containerName] = data
  770. }
  771. }
  772. return result
  773. }
  774. func (r *ResourceMonitor) GetMasterNodeCPUSummary(summaryPerNode NodesCPUSummary) NodesCPUSummary {
  775. result := make(NodesCPUSummary)
  776. var masterSummary ContainersCPUSummary
  777. var nodesSummaries []ContainersCPUSummary
  778. for node, summary := range summaryPerNode {
  779. if strings.HasSuffix(node, "master") {
  780. masterSummary = summary
  781. } else {
  782. nodesSummaries = append(nodesSummaries, summary)
  783. }
  784. }
  785. nodeAvgSummary := make(ContainersCPUSummary)
  786. for _, nodeSummary := range nodesSummaries {
  787. for c, summary := range nodeSummary {
  788. if _, found := nodeAvgSummary[c]; !found {
  789. nodeAvgSummary[c] = map[float64]float64{}
  790. }
  791. for perc, value := range summary {
  792. nodeAvgSummary[c][perc] += value
  793. }
  794. }
  795. }
  796. for c := range nodeAvgSummary {
  797. for perc := range nodeAvgSummary[c] {
  798. nodeAvgSummary[c][perc] /= float64(len(nodesSummaries))
  799. }
  800. }
  801. result["master"] = masterSummary
  802. result["node"] = nodeAvgSummary
  803. return result
  804. }