123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package e2e
- import (
- "fmt"
- "strconv"
- "time"
- "k8s.io/kubernetes/pkg/api"
- client "k8s.io/kubernetes/pkg/client/unversioned"
- "k8s.io/kubernetes/pkg/util/intstr"
- "k8s.io/kubernetes/test/e2e/framework"
- . "github.com/onsi/ginkgo"
- )
- const (
- dynamicConsumptionTimeInSeconds = 30
- staticConsumptionTimeInSeconds = 3600
- dynamicRequestSizeInMillicores = 20
- dynamicRequestSizeInMegabytes = 100
- dynamicRequestSizeCustomMetric = 10
- port = 80
- targetPort = 8080
- timeoutRC = 120 * time.Second
- startServiceTimeout = time.Minute
- startServiceInterval = 5 * time.Second
- resourceConsumerImage = "gcr.io/google_containers/resource_consumer:beta4"
- resourceConsumerControllerImage = "gcr.io/google_containers/resource_consumer/controller:beta4"
- rcIsNil = "ERROR: replicationController = nil"
- deploymentIsNil = "ERROR: deployment = nil"
- rsIsNil = "ERROR: replicaset = nil"
- invalidKind = "ERROR: invalid workload kind for resource consumer"
- customMetricName = "QPS"
- )
- /*
- ResourceConsumer is a tool for testing. It helps create specified usage of CPU or memory (Warning: memory not supported)
- typical use case:
- rc.ConsumeCPU(600)
- // ... check your assumption here
- rc.ConsumeCPU(300)
- // ... check your assumption here
- */
- type ResourceConsumer struct {
- name string
- controllerName string
- kind string
- framework *framework.Framework
- cpu chan int
- mem chan int
- customMetric chan int
- stopCPU chan int
- stopMem chan int
- stopCustomMetric chan int
- consumptionTimeInSeconds int
- sleepTime time.Duration
- requestSizeInMillicores int
- requestSizeInMegabytes int
- requestSizeCustomMetric int
- }
- func NewDynamicResourceConsumer(name, kind string, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, f *framework.Framework) *ResourceConsumer {
- return newResourceConsumer(name, kind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, dynamicConsumptionTimeInSeconds,
- dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, cpuLimit, memLimit, f)
- }
- // TODO this still defaults to replication controller
- func NewStaticResourceConsumer(name string, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, f *framework.Framework) *ResourceConsumer {
- return newResourceConsumer(name, kindRC, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, staticConsumptionTimeInSeconds,
- initCPUTotal/replicas, initMemoryTotal/replicas, initCustomMetric/replicas, cpuLimit, memLimit, f)
- }
- /*
- NewResourceConsumer creates new ResourceConsumer
- initCPUTotal argument is in millicores
- initMemoryTotal argument is in megabytes
- memLimit argument is in megabytes, memLimit is a maximum amount of memory that can be consumed by a single pod
- cpuLimit argument is in millicores, cpuLimit is a maximum amount of cpu that can be consumed by a single pod
- */
- func newResourceConsumer(name, kind string, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, consumptionTimeInSeconds, requestSizeInMillicores,
- requestSizeInMegabytes int, requestSizeCustomMetric int, cpuLimit, memLimit int64, f *framework.Framework) *ResourceConsumer {
- runServiceAndWorkloadForResourceConsumer(f.Client, f.Namespace.Name, name, kind, replicas, cpuLimit, memLimit)
- rc := &ResourceConsumer{
- name: name,
- controllerName: name + "-ctrl",
- kind: kind,
- framework: f,
- cpu: make(chan int),
- mem: make(chan int),
- customMetric: make(chan int),
- stopCPU: make(chan int),
- stopMem: make(chan int),
- stopCustomMetric: make(chan int),
- consumptionTimeInSeconds: consumptionTimeInSeconds,
- sleepTime: time.Duration(consumptionTimeInSeconds) * time.Second,
- requestSizeInMillicores: requestSizeInMillicores,
- requestSizeInMegabytes: requestSizeInMegabytes,
- requestSizeCustomMetric: requestSizeCustomMetric,
- }
- go rc.makeConsumeCPURequests()
- rc.ConsumeCPU(initCPUTotal)
- go rc.makeConsumeMemRequests()
- rc.ConsumeMem(initMemoryTotal)
- go rc.makeConsumeCustomMetric()
- rc.ConsumeCustomMetric(initCustomMetric)
- return rc
- }
- // ConsumeCPU consumes given number of CPU
- func (rc *ResourceConsumer) ConsumeCPU(millicores int) {
- framework.Logf("RC %s: consume %v millicores in total", rc.name, millicores)
- rc.cpu <- millicores
- }
- // ConsumeMem consumes given number of Mem
- func (rc *ResourceConsumer) ConsumeMem(megabytes int) {
- framework.Logf("RC %s: consume %v MB in total", rc.name, megabytes)
- rc.mem <- megabytes
- }
- // ConsumeMem consumes given number of custom metric
- func (rc *ResourceConsumer) ConsumeCustomMetric(amount int) {
- framework.Logf("RC %s: consume custom metric %v in total", rc.name, amount)
- rc.customMetric <- amount
- }
- func (rc *ResourceConsumer) makeConsumeCPURequests() {
- defer GinkgoRecover()
- sleepTime := time.Duration(0)
- millicores := 0
- for {
- select {
- case millicores = <-rc.cpu:
- framework.Logf("RC %s: setting consumption to %v millicores in total", rc.name, millicores)
- case <-time.After(sleepTime):
- framework.Logf("RC %s: sending request to consume %d millicores", rc.name, millicores)
- rc.sendConsumeCPURequest(millicores)
- sleepTime = rc.sleepTime
- case <-rc.stopCPU:
- return
- }
- }
- }
- func (rc *ResourceConsumer) makeConsumeMemRequests() {
- defer GinkgoRecover()
- sleepTime := time.Duration(0)
- megabytes := 0
- for {
- select {
- case megabytes = <-rc.mem:
- framework.Logf("RC %s: setting consumption to %v MB in total", rc.name, megabytes)
- case <-time.After(sleepTime):
- framework.Logf("RC %s: sending request to consume %d MB", rc.name, megabytes)
- rc.sendConsumeMemRequest(megabytes)
- sleepTime = rc.sleepTime
- case <-rc.stopMem:
- return
- }
- }
- }
- func (rc *ResourceConsumer) makeConsumeCustomMetric() {
- defer GinkgoRecover()
- sleepTime := time.Duration(0)
- delta := 0
- for {
- select {
- case delta := <-rc.customMetric:
- framework.Logf("RC %s: setting bump of metric %s to %d in total", rc.name, customMetricName, delta)
- case <-time.After(sleepTime):
- framework.Logf("RC %s: sending request to consume %d of custom metric %s", rc.name, delta, customMetricName)
- rc.sendConsumeCustomMetric(delta)
- sleepTime = rc.sleepTime
- case <-rc.stopCustomMetric:
- return
- }
- }
- }
- func (rc *ResourceConsumer) sendConsumeCPURequest(millicores int) {
- proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post())
- framework.ExpectNoError(err)
- req := proxyRequest.Namespace(rc.framework.Namespace.Name).
- Name(rc.controllerName).
- Suffix("ConsumeCPU").
- Param("millicores", strconv.Itoa(millicores)).
- Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
- Param("requestSizeMillicores", strconv.Itoa(rc.requestSizeInMillicores))
- framework.Logf("URL: %v", *req.URL())
- _, err = req.DoRaw()
- framework.ExpectNoError(err)
- }
- // sendConsumeMemRequest sends POST request for memory consumption
- func (rc *ResourceConsumer) sendConsumeMemRequest(megabytes int) {
- proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post())
- framework.ExpectNoError(err)
- req := proxyRequest.Namespace(rc.framework.Namespace.Name).
- Name(rc.controllerName).
- Suffix("ConsumeMem").
- Param("megabytes", strconv.Itoa(megabytes)).
- Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
- Param("requestSizeMegabytes", strconv.Itoa(rc.requestSizeInMegabytes))
- framework.Logf("URL: %v", *req.URL())
- _, err = req.DoRaw()
- framework.ExpectNoError(err)
- }
- // sendConsumeCustomMetric sends POST request for custom metric consumption
- func (rc *ResourceConsumer) sendConsumeCustomMetric(delta int) {
- proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.Client, rc.framework.Client.Post())
- framework.ExpectNoError(err)
- req := proxyRequest.Namespace(rc.framework.Namespace.Name).
- Name(rc.controllerName).
- Suffix("BumpMetric").
- Param("metric", customMetricName).
- Param("delta", strconv.Itoa(delta)).
- Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
- Param("requestSizeMetrics", strconv.Itoa(rc.requestSizeCustomMetric))
- framework.Logf("URL: %v", *req.URL())
- _, err = req.DoRaw()
- framework.ExpectNoError(err)
- }
- func (rc *ResourceConsumer) GetReplicas() int {
- switch rc.kind {
- case kindRC:
- replicationController, err := rc.framework.Client.ReplicationControllers(rc.framework.Namespace.Name).Get(rc.name)
- framework.ExpectNoError(err)
- if replicationController == nil {
- framework.Failf(rcIsNil)
- }
- return int(replicationController.Status.Replicas)
- case kindDeployment:
- deployment, err := rc.framework.Client.Deployments(rc.framework.Namespace.Name).Get(rc.name)
- framework.ExpectNoError(err)
- if deployment == nil {
- framework.Failf(deploymentIsNil)
- }
- return int(deployment.Status.Replicas)
- case kindReplicaSet:
- rs, err := rc.framework.Client.ReplicaSets(rc.framework.Namespace.Name).Get(rc.name)
- framework.ExpectNoError(err)
- if rs == nil {
- framework.Failf(rsIsNil)
- }
- return int(rs.Status.Replicas)
- default:
- framework.Failf(invalidKind)
- }
- return 0
- }
- func (rc *ResourceConsumer) WaitForReplicas(desiredReplicas int) {
- timeout := 15 * time.Minute
- for start := time.Now(); time.Since(start) < timeout; time.Sleep(20 * time.Second) {
- if desiredReplicas == rc.GetReplicas() {
- framework.Logf("%s: current replicas number is equal to desired replicas number: %d", rc.kind, desiredReplicas)
- return
- } else {
- framework.Logf("%s: current replicas number %d waiting to be %d", rc.kind, rc.GetReplicas(), desiredReplicas)
- }
- }
- framework.Failf("timeout waiting %v for pods size to be %d", timeout, desiredReplicas)
- }
- func (rc *ResourceConsumer) EnsureDesiredReplicas(desiredReplicas int, timeout time.Duration) {
- for start := time.Now(); time.Since(start) < timeout; time.Sleep(10 * time.Second) {
- actual := rc.GetReplicas()
- if desiredReplicas != actual {
- framework.Failf("Number of replicas has changed: expected %v, got %v", desiredReplicas, actual)
- }
- framework.Logf("Number of replicas is as expected")
- }
- framework.Logf("Number of replicas was stable over %v", timeout)
- }
- func (rc *ResourceConsumer) CleanUp() {
- By(fmt.Sprintf("Removing consuming RC %s", rc.name))
- rc.stopCPU <- 0
- rc.stopMem <- 0
- rc.stopCustomMetric <- 0
- // Wait some time to ensure all child goroutines are finished.
- time.Sleep(10 * time.Second)
- framework.ExpectNoError(framework.DeleteRCAndPods(rc.framework.Client, rc.framework.Namespace.Name, rc.name))
- framework.ExpectNoError(rc.framework.Client.Services(rc.framework.Namespace.Name).Delete(rc.name))
- framework.ExpectNoError(framework.DeleteRCAndPods(rc.framework.Client, rc.framework.Namespace.Name, rc.controllerName))
- framework.ExpectNoError(rc.framework.Client.Services(rc.framework.Namespace.Name).Delete(rc.controllerName))
- }
- func runServiceAndWorkloadForResourceConsumer(c *client.Client, ns, name, kind string, replicas int, cpuLimitMillis, memLimitMb int64) {
- By(fmt.Sprintf("Running consuming RC %s via %s with %v replicas", name, kind, replicas))
- _, err := c.Services(ns).Create(&api.Service{
- ObjectMeta: api.ObjectMeta{
- Name: name,
- },
- Spec: api.ServiceSpec{
- Ports: []api.ServicePort{{
- Port: port,
- TargetPort: intstr.FromInt(targetPort),
- }},
- Selector: map[string]string{
- "name": name,
- },
- },
- })
- framework.ExpectNoError(err)
- rcConfig := framework.RCConfig{
- Client: c,
- Image: resourceConsumerImage,
- Name: name,
- Namespace: ns,
- Timeout: timeoutRC,
- Replicas: replicas,
- CpuRequest: cpuLimitMillis,
- CpuLimit: cpuLimitMillis,
- MemRequest: memLimitMb * 1024 * 1024, // MemLimit is in bytes
- MemLimit: memLimitMb * 1024 * 1024,
- }
- switch kind {
- case kindRC:
- framework.ExpectNoError(framework.RunRC(rcConfig))
- break
- case kindDeployment:
- dpConfig := framework.DeploymentConfig{
- RCConfig: rcConfig,
- }
- framework.ExpectNoError(framework.RunDeployment(dpConfig))
- break
- case kindReplicaSet:
- rsConfig := framework.ReplicaSetConfig{
- RCConfig: rcConfig,
- }
- framework.ExpectNoError(framework.RunReplicaSet(rsConfig))
- break
- default:
- framework.Failf(invalidKind)
- }
- By(fmt.Sprintf("Running controller"))
- controllerName := name + "-ctrl"
- _, err = c.Services(ns).Create(&api.Service{
- ObjectMeta: api.ObjectMeta{
- Name: controllerName,
- },
- Spec: api.ServiceSpec{
- Ports: []api.ServicePort{{
- Port: port,
- TargetPort: intstr.FromInt(targetPort),
- }},
- Selector: map[string]string{
- "name": controllerName,
- },
- },
- })
- framework.ExpectNoError(err)
- dnsClusterFirst := api.DNSClusterFirst
- controllerRcConfig := framework.RCConfig{
- Client: c,
- Image: resourceConsumerControllerImage,
- Name: controllerName,
- Namespace: ns,
- Timeout: timeoutRC,
- Replicas: 1,
- Command: []string{"/controller", "--consumer-service-name=" + name, "--consumer-service-namespace=" + ns, "--consumer-port=80"},
- DNSPolicy: &dnsClusterFirst,
- }
- framework.ExpectNoError(framework.RunRC(controllerRcConfig))
- // Make sure endpoints are propagated.
- // TODO(piosz): replace sleep with endpoints watch.
- time.Sleep(10 * time.Second)
- }
|