123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package metrics
- import (
- "encoding/json"
- "fmt"
- "strings"
- "time"
- "github.com/golang/glog"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/api/v1"
- clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
- "k8s.io/kubernetes/pkg/labels"
- "k8s.io/kubernetes/pkg/util/sets"
- heapster "k8s.io/heapster/metrics/api/v1/types"
- metrics_api "k8s.io/heapster/metrics/apis/metrics/v1alpha1"
- )
- const (
- DefaultHeapsterNamespace = "kube-system"
- DefaultHeapsterScheme = "http"
- DefaultHeapsterService = "heapster"
- DefaultHeapsterPort = "" // use the first exposed port on the service
- )
- var heapsterQueryStart = -5 * time.Minute
- // MetricsClient is an interface for getting metrics for pods.
- type MetricsClient interface {
- // GetCPUUtilization returns the average utilization over all pods represented as a percent of requested CPU
- // (e.g. 70 means that an average pod uses 70% of the requested CPU)
- // and the time of generation of the oldest of utilization reports for pods.
- GetCPUUtilization(namespace string, selector labels.Selector) (*int, time.Time, error)
- // GetCustomMetric returns the average value of the given custom metrics from the
- // pods picked using the namespace and selector passed as arguments.
- GetCustomMetric(customMetricName string, namespace string, selector labels.Selector) (*float64, time.Time, error)
- }
- type intAndFloat struct {
- intValue int64
- floatValue float64
- }
- // Aggregates results into ResourceConsumption. Also returns number of pods included in the aggregation.
- type metricAggregator func(heapster.MetricResultList) (intAndFloat, int, time.Time)
- type metricDefinition struct {
- name string
- aggregator metricAggregator
- }
- // HeapsterMetricsClient is Heapster-based implementation of MetricsClient
- type HeapsterMetricsClient struct {
- client clientset.Interface
- heapsterNamespace string
- heapsterScheme string
- heapsterService string
- heapsterPort string
- }
- var averageFunction = func(metrics heapster.MetricResultList) (intAndFloat, int, time.Time) {
- sum, count, timestamp := calculateSumFromTimeSample(metrics, time.Minute)
- result := intAndFloat{0, 0}
- if count > 0 {
- result.intValue = sum.intValue / int64(count)
- result.floatValue = sum.floatValue / float64(count)
- }
- return result, count, timestamp
- }
- func getHeapsterCustomMetricDefinition(metricName string) metricDefinition {
- return metricDefinition{"custom/" + metricName, averageFunction}
- }
- // NewHeapsterMetricsClient returns a new instance of Heapster-based implementation of MetricsClient interface.
- func NewHeapsterMetricsClient(client clientset.Interface, namespace, scheme, service, port string) *HeapsterMetricsClient {
- return &HeapsterMetricsClient{
- client: client,
- heapsterNamespace: namespace,
- heapsterScheme: scheme,
- heapsterService: service,
- heapsterPort: port,
- }
- }
- func (h *HeapsterMetricsClient) GetCPUUtilization(namespace string, selector labels.Selector) (*int, time.Time, error) {
- avgConsumption, avgRequest, timestamp, err := h.GetCpuConsumptionAndRequestInMillis(namespace, selector)
- if err != nil {
- return nil, time.Time{}, fmt.Errorf("failed to get CPU consumption and request: %v", err)
- }
- utilization := int((avgConsumption * 100) / avgRequest)
- return &utilization, timestamp, nil
- }
- func (h *HeapsterMetricsClient) GetCpuConsumptionAndRequestInMillis(namespace string, selector labels.Selector) (avgConsumption int64,
- avgRequest int64, timestamp time.Time, err error) {
- podList, err := h.client.Core().Pods(namespace).
- List(api.ListOptions{LabelSelector: selector})
- if err != nil {
- return 0, 0, time.Time{}, fmt.Errorf("failed to get pod list: %v", err)
- }
- podNames := map[string]struct{}{}
- requestSum := int64(0)
- missing := false
- for _, pod := range podList.Items {
- if pod.Status.Phase == api.PodPending {
- // Skip pending pods.
- continue
- }
- podNames[pod.Name] = struct{}{}
- for _, container := range pod.Spec.Containers {
- if containerRequest, ok := container.Resources.Requests[api.ResourceCPU]; ok {
- requestSum += containerRequest.MilliValue()
- } else {
- missing = true
- }
- }
- }
- if len(podNames) == 0 && len(podList.Items) > 0 {
- return 0, 0, time.Time{}, fmt.Errorf("no running pods")
- }
- if missing || requestSum == 0 {
- return 0, 0, time.Time{}, fmt.Errorf("some pods do not have request for cpu")
- }
- glog.V(4).Infof("%s %s - sum of CPU requested: %d", namespace, selector, requestSum)
- requestAvg := requestSum / int64(len(podList.Items))
- // Consumption is already averaged and in millis.
- consumption, timestamp, err := h.getCpuUtilizationForPods(namespace, selector, podNames)
- if err != nil {
- return 0, 0, time.Time{}, err
- }
- return consumption, requestAvg, timestamp, nil
- }
- func (h *HeapsterMetricsClient) getCpuUtilizationForPods(namespace string, selector labels.Selector, podNames map[string]struct{}) (int64, time.Time, error) {
- metricPath := fmt.Sprintf("/apis/metrics/v1alpha1/namespaces/%s/pods", namespace)
- params := map[string]string{"labelSelector": selector.String()}
- resultRaw, err := h.client.Core().Services(h.heapsterNamespace).
- ProxyGet(h.heapsterScheme, h.heapsterService, h.heapsterPort, metricPath, params).
- DoRaw()
- if err != nil {
- return 0, time.Time{}, fmt.Errorf("failed to get pods metrics: %v", err)
- }
- glog.V(4).Infof("Heapster metrics result: %s", string(resultRaw))
- metrics := metrics_api.PodMetricsList{}
- err = json.Unmarshal(resultRaw, &metrics)
- if err != nil {
- return 0, time.Time{}, fmt.Errorf("failed to unmarshall heapster response: %v", err)
- }
- if len(metrics.Items) != len(podNames) {
- present := sets.NewString()
- for _, m := range metrics.Items {
- present.Insert(m.Name)
- }
- missing := make([]string, 0)
- for expected := range podNames {
- if !present.Has(expected) {
- missing = append(missing, expected)
- }
- }
- hint := ""
- if len(missing) > 0 {
- hint = fmt.Sprintf(" (sample missing pod: %s/%s)", namespace, missing[0])
- }
- return 0, time.Time{}, fmt.Errorf("metrics obtained for %d/%d of pods%s", len(metrics.Items), len(podNames), hint)
- }
- sum := int64(0)
- for _, m := range metrics.Items {
- if _, found := podNames[m.Name]; found {
- for _, c := range m.Containers {
- cpu, found := c.Usage[v1.ResourceCPU]
- if !found {
- return 0, time.Time{}, fmt.Errorf("no cpu for container %v in pod %v/%v", c.Name, namespace, m.Name)
- }
- sum += cpu.MilliValue()
- }
- } else {
- return 0, time.Time{}, fmt.Errorf("not expected metrics for pod %v/%v", namespace, m.Name)
- }
- }
- return sum / int64(len(metrics.Items)), metrics.Items[0].Timestamp.Time, nil
- }
- // GetCustomMetric returns the average value of the given custom metric from the
- // pods picked using the namespace and selector passed as arguments.
- func (h *HeapsterMetricsClient) GetCustomMetric(customMetricName string, namespace string, selector labels.Selector) (*float64, time.Time, error) {
- metricSpec := getHeapsterCustomMetricDefinition(customMetricName)
- podList, err := h.client.Core().Pods(namespace).List(api.ListOptions{LabelSelector: selector})
- if err != nil {
- return nil, time.Time{}, fmt.Errorf("failed to get pod list: %v", err)
- }
- podNames := []string{}
- for _, pod := range podList.Items {
- if pod.Status.Phase == api.PodPending {
- // Skip pending pods.
- continue
- }
- podNames = append(podNames, pod.Name)
- }
- if len(podNames) == 0 && len(podList.Items) > 0 {
- return nil, time.Time{}, fmt.Errorf("no running pods")
- }
- value, timestamp, err := h.getCustomMetricForPods(metricSpec, namespace, podNames)
- if err != nil {
- return nil, time.Time{}, err
- }
- return &value.floatValue, timestamp, nil
- }
- func (h *HeapsterMetricsClient) getCustomMetricForPods(metricSpec metricDefinition, namespace string, podNames []string) (*intAndFloat, time.Time, error) {
- now := time.Now()
- startTime := now.Add(heapsterQueryStart)
- metricPath := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s",
- namespace,
- strings.Join(podNames, ","),
- metricSpec.name)
- resultRaw, err := h.client.Core().Services(h.heapsterNamespace).
- ProxyGet(h.heapsterScheme, h.heapsterService, h.heapsterPort, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}).
- DoRaw()
- if err != nil {
- return nil, time.Time{}, fmt.Errorf("failed to get pods metrics: %v", err)
- }
- var metrics heapster.MetricResultList
- err = json.Unmarshal(resultRaw, &metrics)
- if err != nil {
- return nil, time.Time{}, fmt.Errorf("failed to unmarshall heapster response: %v", err)
- }
- glog.V(4).Infof("Heapster metrics result: %s", string(resultRaw))
- sum, count, timestamp := metricSpec.aggregator(metrics)
- if count != len(podNames) {
- missing := make([]string, 0)
- for i, expected := range podNames {
- if len(metrics.Items) > i && len(metrics.Items[i].Metrics) == 0 {
- missing = append(missing, expected)
- }
- }
- hint := ""
- if len(missing) > 0 {
- hint = fmt.Sprintf(" (sample missing pod: %s/%s)", namespace, missing[0])
- }
- return nil, time.Time{}, fmt.Errorf("metrics obtained for %d/%d of pods%s", count, len(podNames), hint)
- }
- return &sum, timestamp, nil
- }
- func calculateSumFromTimeSample(metrics heapster.MetricResultList, duration time.Duration) (sum intAndFloat, count int, timestamp time.Time) {
- sum = intAndFloat{0, 0}
- count = 0
- timestamp = time.Time{}
- var oldest *time.Time // creation time of the oldest of used samples across pods
- oldest = nil
- for _, metrics := range metrics.Items {
- var newest *heapster.MetricPoint // creation time of the newest sample for pod
- newest = nil
- for i, metricPoint := range metrics.Metrics {
- if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) {
- newest = &metrics.Metrics[i]
- }
- }
- if newest != nil {
- if oldest == nil || newest.Timestamp.Before(*oldest) {
- oldest = &newest.Timestamp
- }
- intervalSum := intAndFloat{0, 0}
- intSumCount := 0
- floatSumCount := 0
- for _, metricPoint := range metrics.Metrics {
- if metricPoint.Timestamp.Add(duration).After(newest.Timestamp) {
- intervalSum.intValue += int64(metricPoint.Value)
- intSumCount++
- if metricPoint.FloatValue != nil {
- intervalSum.floatValue += *metricPoint.FloatValue
- floatSumCount++
- }
- }
- }
- if newest.FloatValue == nil {
- if intSumCount > 0 {
- sum.intValue += int64(intervalSum.intValue / int64(intSumCount))
- sum.floatValue += float64(intervalSum.intValue / int64(intSumCount))
- }
- } else {
- if floatSumCount > 0 {
- sum.intValue += int64(intervalSum.floatValue / float64(floatSumCount))
- sum.floatValue += intervalSum.floatValue / float64(floatSumCount)
- }
- }
- count++
- }
- }
- if oldest != nil {
- timestamp = *oldest
- }
- return sum, count, timestamp
- }
|