resource_collector.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. // +build linux
  2. /*
  3. Copyright 2015 The Kubernetes Authors.
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package e2e_node
  15. import (
  16. "bytes"
  17. "fmt"
  18. "io/ioutil"
  19. "log"
  20. "os"
  21. "os/exec"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "sync"
  26. "text/tabwriter"
  27. "time"
  28. cadvisorclient "github.com/google/cadvisor/client/v2"
  29. cadvisorapiv2 "github.com/google/cadvisor/info/v2"
  30. "github.com/opencontainers/runc/libcontainer/cgroups"
  31. "k8s.io/kubernetes/pkg/api"
  32. "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
  33. "k8s.io/kubernetes/pkg/labels"
  34. "k8s.io/kubernetes/pkg/util/runtime"
  35. "k8s.io/kubernetes/pkg/util/uuid"
  36. "k8s.io/kubernetes/pkg/util/wait"
  37. "k8s.io/kubernetes/test/e2e/framework"
  38. . "github.com/onsi/gomega"
  39. )
  40. const (
  41. // resource monitoring
  42. cadvisorImageName = "google/cadvisor:latest"
  43. cadvisorPodName = "cadvisor"
  44. cadvisorPort = 8090
  45. // housekeeping interval of Cadvisor (second)
  46. houseKeepingInterval = 1
  47. )
  48. var (
  49. systemContainers map[string]string
  50. )
  51. type ResourceCollector struct {
  52. client *cadvisorclient.Client
  53. request *cadvisorapiv2.RequestOptions
  54. pollingInterval time.Duration
  55. buffers map[string][]*framework.ContainerResourceUsage
  56. lock sync.RWMutex
  57. stopCh chan struct{}
  58. }
  59. // NewResourceCollector creates a resource collector object which collects
  60. // resource usage periodically from Cadvisor
  61. func NewResourceCollector(interval time.Duration) *ResourceCollector {
  62. buffers := make(map[string][]*framework.ContainerResourceUsage)
  63. return &ResourceCollector{
  64. pollingInterval: interval,
  65. buffers: buffers,
  66. }
  67. }
  68. // Start starts resource collector and connects to the standalone Cadvisor pod
  69. // then repeatedly runs collectStats.
  70. func (r *ResourceCollector) Start() {
  71. // Get the cgroup container names for kubelet and docker
  72. kubeletContainer, err := getContainerNameForProcess(kubeletProcessName, "")
  73. dockerContainer, err := getContainerNameForProcess(dockerProcessName, dockerPidFile)
  74. if err == nil {
  75. systemContainers = map[string]string{
  76. stats.SystemContainerKubelet: kubeletContainer,
  77. stats.SystemContainerRuntime: dockerContainer,
  78. }
  79. } else {
  80. framework.Failf("Failed to get docker container name in test-e2e-node resource collector.")
  81. }
  82. wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) {
  83. var err error
  84. r.client, err = cadvisorclient.NewClient(fmt.Sprintf("http://localhost:%d/", cadvisorPort))
  85. if err == nil {
  86. return true, nil
  87. }
  88. return false, err
  89. })
  90. Expect(r.client).NotTo(BeNil(), "cadvisor client not ready")
  91. r.request = &cadvisorapiv2.RequestOptions{IdType: "name", Count: 1, Recursive: false}
  92. r.stopCh = make(chan struct{})
  93. oldStatsMap := make(map[string]*cadvisorapiv2.ContainerStats)
  94. go wait.Until(func() { r.collectStats(oldStatsMap) }, r.pollingInterval, r.stopCh)
  95. }
  96. // Stop stops resource collector collecting stats. It does not clear the buffer
  97. func (r *ResourceCollector) Stop() {
  98. close(r.stopCh)
  99. }
  100. // Reset clears the stats buffer of resource collector.
  101. func (r *ResourceCollector) Reset() {
  102. r.lock.Lock()
  103. defer r.lock.Unlock()
  104. for _, name := range systemContainers {
  105. r.buffers[name] = []*framework.ContainerResourceUsage{}
  106. }
  107. }
  108. // GetCPUSummary gets CPU usage in percentile.
  109. func (r *ResourceCollector) GetCPUSummary() framework.ContainersCPUSummary {
  110. result := make(framework.ContainersCPUSummary)
  111. for key, name := range systemContainers {
  112. data := r.GetBasicCPUStats(name)
  113. result[key] = data
  114. }
  115. return result
  116. }
  117. // LogLatest logs the latest resource usage.
  118. func (r *ResourceCollector) LogLatest() {
  119. summary, err := r.GetLatest()
  120. if err != nil {
  121. framework.Logf("%v", err)
  122. }
  123. framework.Logf("%s", formatResourceUsageStats(summary))
  124. }
  125. // collectStats collects resource usage from Cadvisor.
  126. func (r *ResourceCollector) collectStats(oldStatsMap map[string]*cadvisorapiv2.ContainerStats) {
  127. for _, name := range systemContainers {
  128. ret, err := r.client.Stats(name, r.request)
  129. if err != nil {
  130. framework.Logf("Error getting container stats, err: %v", err)
  131. return
  132. }
  133. cStats, ok := ret[name]
  134. if !ok {
  135. framework.Logf("Missing info/stats for container %q", name)
  136. return
  137. }
  138. newStats := cStats.Stats[0]
  139. if oldStats, ok := oldStatsMap[name]; ok && oldStats.Timestamp.Before(newStats.Timestamp) {
  140. if oldStats.Timestamp.Equal(newStats.Timestamp) {
  141. continue
  142. }
  143. r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats, newStats))
  144. }
  145. oldStatsMap[name] = newStats
  146. }
  147. }
  148. // computeContainerResourceUsage computes resource usage based on new data sample.
  149. func computeContainerResourceUsage(name string, oldStats, newStats *cadvisorapiv2.ContainerStats) *framework.ContainerResourceUsage {
  150. return &framework.ContainerResourceUsage{
  151. Name: name,
  152. Timestamp: newStats.Timestamp,
  153. CPUUsageInCores: float64(newStats.Cpu.Usage.Total-oldStats.Cpu.Usage.Total) / float64(newStats.Timestamp.Sub(oldStats.Timestamp).Nanoseconds()),
  154. MemoryUsageInBytes: newStats.Memory.Usage,
  155. MemoryWorkingSetInBytes: newStats.Memory.WorkingSet,
  156. MemoryRSSInBytes: newStats.Memory.RSS,
  157. CPUInterval: newStats.Timestamp.Sub(oldStats.Timestamp),
  158. }
  159. }
  160. // GetLatest gets the latest resource usage from stats buffer.
  161. func (r *ResourceCollector) GetLatest() (framework.ResourceUsagePerContainer, error) {
  162. r.lock.RLock()
  163. defer r.lock.RUnlock()
  164. stats := make(framework.ResourceUsagePerContainer)
  165. for key, name := range systemContainers {
  166. contStats, ok := r.buffers[name]
  167. if !ok || len(contStats) == 0 {
  168. return nil, fmt.Errorf("Resource usage of %s:%s is not ready yet", key, name)
  169. }
  170. stats[key] = contStats[len(contStats)-1]
  171. }
  172. return stats, nil
  173. }
  174. type resourceUsageByCPU []*framework.ContainerResourceUsage
  175. func (r resourceUsageByCPU) Len() int { return len(r) }
  176. func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
  177. func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores }
  178. // The percentiles to report.
  179. var percentiles = [...]float64{0.50, 0.90, 0.95, 0.99, 1.00}
  180. // GetBasicCPUStats returns the percentiles the cpu usage in cores for
  181. // containerName. This method examines all data currently in the buffer.
  182. func (r *ResourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 {
  183. r.lock.RLock()
  184. defer r.lock.RUnlock()
  185. result := make(map[float64]float64, len(percentiles))
  186. // We must make a copy of array, otherwise the timeseries order is changed.
  187. usages := make([]*framework.ContainerResourceUsage, 0)
  188. for _, usage := range r.buffers[containerName] {
  189. usages = append(usages, usage)
  190. }
  191. sort.Sort(resourceUsageByCPU(usages))
  192. for _, q := range percentiles {
  193. index := int(float64(len(usages))*q) - 1
  194. if index < 0 {
  195. // We don't have enough data.
  196. result[q] = 0
  197. continue
  198. }
  199. result[q] = usages[index].CPUUsageInCores
  200. }
  201. return result
  202. }
  203. func formatResourceUsageStats(containerStats framework.ResourceUsagePerContainer) string {
  204. // Example output:
  205. //
  206. // Resource usage for node "e2e-test-foo-minion-abcde":
  207. // container cpu(cores) memory(MB)
  208. // "/" 0.363 2942.09
  209. // "/docker-daemon" 0.088 521.80
  210. // "/kubelet" 0.086 424.37
  211. // "/system" 0.007 119.88
  212. buf := &bytes.Buffer{}
  213. w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
  214. fmt.Fprintf(w, "container\tcpu(cores)\tmemory_working_set(MB)\tmemory_rss(MB)\n")
  215. for name, s := range containerStats {
  216. fmt.Fprintf(w, "%q\t%.3f\t%.2f\t%.2f\n", name, s.CPUUsageInCores, float64(s.MemoryWorkingSetInBytes)/(1024*1024), float64(s.MemoryRSSInBytes)/(1024*1024))
  217. }
  218. w.Flush()
  219. return fmt.Sprintf("Resource usage:\n%s", buf.String())
  220. }
  221. func formatCPUSummary(summary framework.ContainersCPUSummary) string {
  222. // Example output for a node (the percentiles may differ):
  223. // CPU usage of containers on node "e2e-test-foo-minion-0vj7":
  224. // container 5th% 50th% 90th% 95th%
  225. // "/" 0.051 0.159 0.387 0.455
  226. // "/runtime 0.000 0.000 0.146 0.166
  227. // "/kubelet" 0.036 0.053 0.091 0.154
  228. // "/misc" 0.001 0.001 0.001 0.002
  229. var summaryStrings []string
  230. var header []string
  231. header = append(header, "container")
  232. for _, p := range percentiles {
  233. header = append(header, fmt.Sprintf("%.0fth%%", p*100))
  234. }
  235. buf := &bytes.Buffer{}
  236. w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
  237. fmt.Fprintf(w, "%s\n", strings.Join(header, "\t"))
  238. for _, containerName := range framework.TargetContainers() {
  239. var s []string
  240. s = append(s, fmt.Sprintf("%q", containerName))
  241. data, ok := summary[containerName]
  242. for _, p := range percentiles {
  243. value := "N/A"
  244. if ok {
  245. value = fmt.Sprintf("%.3f", data[p])
  246. }
  247. s = append(s, value)
  248. }
  249. fmt.Fprintf(w, "%s\n", strings.Join(s, "\t"))
  250. }
  251. w.Flush()
  252. summaryStrings = append(summaryStrings, fmt.Sprintf("CPU usage of containers:\n%s", buf.String()))
  253. return strings.Join(summaryStrings, "\n")
  254. }
  255. // createCadvisorPod creates a standalone cadvisor pod for fine-grain resource monitoring.
  256. func getCadvisorPod() *api.Pod {
  257. return &api.Pod{
  258. ObjectMeta: api.ObjectMeta{
  259. Name: cadvisorPodName,
  260. },
  261. Spec: api.PodSpec{
  262. // It uses a host port for the tests to collect data.
  263. // Currently we can not use port mapping in test-e2e-node.
  264. SecurityContext: &api.PodSecurityContext{
  265. HostNetwork: true,
  266. },
  267. Containers: []api.Container{
  268. {
  269. Image: cadvisorImageName,
  270. Name: cadvisorPodName,
  271. Ports: []api.ContainerPort{
  272. {
  273. Name: "http",
  274. HostPort: cadvisorPort,
  275. ContainerPort: cadvisorPort,
  276. Protocol: api.ProtocolTCP,
  277. },
  278. },
  279. VolumeMounts: []api.VolumeMount{
  280. {
  281. Name: "sys",
  282. ReadOnly: true,
  283. MountPath: "/sys",
  284. },
  285. {
  286. Name: "var-run",
  287. ReadOnly: false,
  288. MountPath: "/var/run",
  289. },
  290. {
  291. Name: "docker",
  292. ReadOnly: true,
  293. MountPath: "/var/lib/docker/",
  294. },
  295. {
  296. Name: "rootfs",
  297. ReadOnly: true,
  298. MountPath: "/rootfs",
  299. },
  300. },
  301. Args: []string{
  302. "--profiling",
  303. fmt.Sprintf("--housekeeping_interval=%ds", houseKeepingInterval),
  304. fmt.Sprintf("--port=%d", cadvisorPort),
  305. },
  306. },
  307. },
  308. Volumes: []api.Volume{
  309. {
  310. Name: "rootfs",
  311. VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/"}},
  312. },
  313. {
  314. Name: "var-run",
  315. VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/var/run"}},
  316. },
  317. {
  318. Name: "sys",
  319. VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/sys"}},
  320. },
  321. {
  322. Name: "docker",
  323. VolumeSource: api.VolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/var/lib/docker"}},
  324. },
  325. },
  326. },
  327. }
  328. }
  329. // deletePodsSync deletes a list of pods and block until pods disappear.
  330. func deletePodsSync(f *framework.Framework, pods []*api.Pod) {
  331. var wg sync.WaitGroup
  332. for _, pod := range pods {
  333. wg.Add(1)
  334. go func(pod *api.Pod) {
  335. defer wg.Done()
  336. err := f.PodClient().Delete(pod.ObjectMeta.Name, api.NewDeleteOptions(30))
  337. Expect(err).NotTo(HaveOccurred())
  338. Expect(framework.WaitForPodToDisappear(f.Client, f.Namespace.Name, pod.ObjectMeta.Name, labels.Everything(),
  339. 30*time.Second, 10*time.Minute)).NotTo(HaveOccurred())
  340. }(pod)
  341. }
  342. wg.Wait()
  343. return
  344. }
  345. // newTestPods creates a list of pods (specification) for test.
  346. func newTestPods(numPods int, imageName, podType string) []*api.Pod {
  347. var pods []*api.Pod
  348. for i := 0; i < numPods; i++ {
  349. podName := "test-" + string(uuid.NewUUID())
  350. labels := map[string]string{
  351. "type": podType,
  352. "name": podName,
  353. }
  354. pods = append(pods,
  355. &api.Pod{
  356. ObjectMeta: api.ObjectMeta{
  357. Name: podName,
  358. Labels: labels,
  359. },
  360. Spec: api.PodSpec{
  361. // Restart policy is always (default).
  362. Containers: []api.Container{
  363. {
  364. Image: imageName,
  365. Name: podName,
  366. },
  367. },
  368. },
  369. })
  370. }
  371. return pods
  372. }
  373. // Time series of resource usage
  374. type ResourceSeries struct {
  375. Timestamp []int64 `json:"ts"`
  376. CPUUsageInMilliCores []int64 `json:"cpu"`
  377. MemoryRSSInMegaBytes []int64 `json:"memory"`
  378. Units map[string]string `json:"unit"`
  379. }
  380. // GetResourceSeriesWithLabels gets the time series of resource usage of each container.
  381. func (r *ResourceCollector) GetResourceTimeSeries() map[string]*ResourceSeries {
  382. resourceSeries := make(map[string]*ResourceSeries)
  383. for key, name := range systemContainers {
  384. newSeries := &ResourceSeries{Units: map[string]string{
  385. "cpu": "mCPU",
  386. "memory": "MB",
  387. }}
  388. resourceSeries[key] = newSeries
  389. for _, usage := range r.buffers[name] {
  390. newSeries.Timestamp = append(newSeries.Timestamp, usage.Timestamp.UnixNano())
  391. newSeries.CPUUsageInMilliCores = append(newSeries.CPUUsageInMilliCores, int64(usage.CPUUsageInCores*1000))
  392. newSeries.MemoryRSSInMegaBytes = append(newSeries.MemoryRSSInMegaBytes, int64(float64(usage.MemoryUsageInBytes)/(1024*1024)))
  393. }
  394. }
  395. return resourceSeries
  396. }
  397. // Code for getting container name of docker, copied from pkg/kubelet/cm/container_manager_linux.go
  398. // since they are not exposed
  399. const (
  400. kubeletProcessName = "kubelet"
  401. dockerProcessName = "docker"
  402. dockerPidFile = "/var/run/docker.pid"
  403. containerdProcessName = "docker-containerd"
  404. containerdPidFile = "/run/docker/libcontainerd/docker-containerd.pid"
  405. )
  406. func getContainerNameForProcess(name, pidFile string) (string, error) {
  407. pids, err := getPidsForProcess(name, pidFile)
  408. if err != nil {
  409. return "", fmt.Errorf("failed to detect process id for %q - %v", name, err)
  410. }
  411. if len(pids) == 0 {
  412. return "", nil
  413. }
  414. cont, err := getContainer(pids[0])
  415. if err != nil {
  416. return "", err
  417. }
  418. return cont, nil
  419. }
  420. func getPidFromPidFile(pidFile string) (int, error) {
  421. file, err := os.Open(pidFile)
  422. if err != nil {
  423. return 0, fmt.Errorf("error opening pid file %s: %v", pidFile, err)
  424. }
  425. defer file.Close()
  426. data, err := ioutil.ReadAll(file)
  427. if err != nil {
  428. return 0, fmt.Errorf("error reading pid file %s: %v", pidFile, err)
  429. }
  430. pid, err := strconv.Atoi(string(data))
  431. if err != nil {
  432. return 0, fmt.Errorf("error parsing %s as a number: %v", string(data), err)
  433. }
  434. return pid, nil
  435. }
  436. func getPidsForProcess(name, pidFile string) ([]int, error) {
  437. if len(pidFile) > 0 {
  438. if pid, err := getPidFromPidFile(pidFile); err == nil {
  439. return []int{pid}, nil
  440. } else {
  441. // log the error and fall back to pidof
  442. runtime.HandleError(err)
  443. }
  444. }
  445. out, err := exec.Command("pidof", name).Output()
  446. if err != nil {
  447. return []int{}, fmt.Errorf("failed to find pid of %q: %v", name, err)
  448. }
  449. // The output of pidof is a list of pids.
  450. pids := []int{}
  451. for _, pidStr := range strings.Split(strings.TrimSpace(string(out)), " ") {
  452. pid, err := strconv.Atoi(pidStr)
  453. if err != nil {
  454. continue
  455. }
  456. pids = append(pids, pid)
  457. }
  458. return pids, nil
  459. }
  460. // getContainer returns the cgroup associated with the specified pid.
  461. // It enforces a unified hierarchy for memory and cpu cgroups.
  462. // On systemd environments, it uses the name=systemd cgroup for the specified pid.
  463. func getContainer(pid int) (string, error) {
  464. cgs, err := cgroups.ParseCgroupFile(fmt.Sprintf("/proc/%d/cgroup", pid))
  465. if err != nil {
  466. return "", err
  467. }
  468. cpu, found := cgs["cpu"]
  469. if !found {
  470. return "", cgroups.NewNotFoundError("cpu")
  471. }
  472. memory, found := cgs["memory"]
  473. if !found {
  474. return "", cgroups.NewNotFoundError("memory")
  475. }
  476. // since we use this container for accounting, we need to ensure its a unified hierarchy.
  477. if cpu != memory {
  478. return "", fmt.Errorf("cpu and memory cgroup hierarchy not unified. cpu: %s, memory: %s", cpu, memory)
  479. }
  480. // on systemd, every pid is in a unified cgroup hierarchy (name=systemd as seen in systemd-cgls)
  481. // cpu and memory accounting is off by default, users may choose to enable it per unit or globally.
  482. // users could enable CPU and memory accounting globally via /etc/systemd/system.conf (DefaultCPUAccounting=true DefaultMemoryAccounting=true).
  483. // users could also enable CPU and memory accounting per unit via CPUAccounting=true and MemoryAccounting=true
  484. // we only warn if accounting is not enabled for CPU or memory so as to not break local development flows where kubelet is launched in a terminal.
  485. // for example, the cgroup for the user session will be something like /user.slice/user-X.slice/session-X.scope, but the cpu and memory
  486. // cgroup will be the closest ancestor where accounting is performed (most likely /) on systems that launch docker containers.
  487. // as a result, on those systems, you will not get cpu or memory accounting statistics for kubelet.
  488. // in addition, you would not get memory or cpu accounting for the runtime unless accounting was enabled on its unit (or globally).
  489. if systemd, found := cgs["name=systemd"]; found {
  490. if systemd != cpu {
  491. log.Printf("CPUAccounting not enabled for pid: %d", pid)
  492. }
  493. if systemd != memory {
  494. log.Printf("MemoryAccounting not enabled for pid: %d", pid)
  495. }
  496. return systemd, nil
  497. }
  498. return cpu, nil
  499. }