horizontal.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package podautoscaler
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "math"
  18. "time"
  19. "github.com/golang/glog"
  20. "k8s.io/kubernetes/pkg/api"
  21. "k8s.io/kubernetes/pkg/api/resource"
  22. "k8s.io/kubernetes/pkg/api/unversioned"
  23. "k8s.io/kubernetes/pkg/apis/autoscaling"
  24. "k8s.io/kubernetes/pkg/apis/extensions"
  25. "k8s.io/kubernetes/pkg/client/cache"
  26. unversionedautoscaling "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/autoscaling/unversioned"
  27. unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
  28. unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
  29. "k8s.io/kubernetes/pkg/client/record"
  30. "k8s.io/kubernetes/pkg/controller/framework"
  31. "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
  32. "k8s.io/kubernetes/pkg/runtime"
  33. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  34. "k8s.io/kubernetes/pkg/watch"
  35. )
  36. const (
  37. // Usage shoud exceed the tolerance before we start downscale or upscale the pods.
  38. // TODO: make it a flag or HPA spec element.
  39. tolerance = 0.1
  40. defaultTargetCPUUtilizationPercentage = 80
  41. HpaCustomMetricsTargetAnnotationName = "alpha/target.custom-metrics.podautoscaler.kubernetes.io"
  42. HpaCustomMetricsStatusAnnotationName = "alpha/status.custom-metrics.podautoscaler.kubernetes.io"
  43. )
  44. type HorizontalController struct {
  45. scaleNamespacer unversionedextensions.ScalesGetter
  46. hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter
  47. metricsClient metrics.MetricsClient
  48. eventRecorder record.EventRecorder
  49. // A store of HPA objects, populated by the controller.
  50. store cache.Store
  51. // Watches changes to all HPA objects.
  52. controller *framework.Controller
  53. }
  54. var downscaleForbiddenWindow = 5 * time.Minute
  55. var upscaleForbiddenWindow = 3 * time.Minute
  56. func newInformer(controller *HorizontalController, resyncPeriod time.Duration) (cache.Store, *framework.Controller) {
  57. return framework.NewInformer(
  58. &cache.ListWatch{
  59. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  60. return controller.hpaNamespacer.HorizontalPodAutoscalers(api.NamespaceAll).List(options)
  61. },
  62. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  63. return controller.hpaNamespacer.HorizontalPodAutoscalers(api.NamespaceAll).Watch(options)
  64. },
  65. },
  66. &autoscaling.HorizontalPodAutoscaler{},
  67. resyncPeriod,
  68. framework.ResourceEventHandlerFuncs{
  69. AddFunc: func(obj interface{}) {
  70. hpa := obj.(*autoscaling.HorizontalPodAutoscaler)
  71. hasCPUPolicy := hpa.Spec.TargetCPUUtilizationPercentage != nil
  72. _, hasCustomMetricsPolicy := hpa.Annotations[HpaCustomMetricsTargetAnnotationName]
  73. if !hasCPUPolicy && !hasCustomMetricsPolicy {
  74. controller.eventRecorder.Event(hpa, api.EventTypeNormal, "DefaultPolicy", "No scaling policy specified - will use default one. See documentation for details")
  75. }
  76. err := controller.reconcileAutoscaler(hpa)
  77. if err != nil {
  78. glog.Warningf("Failed to reconcile %s: %v", hpa.Name, err)
  79. }
  80. },
  81. UpdateFunc: func(old, cur interface{}) {
  82. hpa := cur.(*autoscaling.HorizontalPodAutoscaler)
  83. err := controller.reconcileAutoscaler(hpa)
  84. if err != nil {
  85. glog.Warningf("Failed to reconcile %s: %v", hpa.Name, err)
  86. }
  87. },
  88. // We are not interested in deletions.
  89. },
  90. )
  91. }
  92. func NewHorizontalController(evtNamespacer unversionedcore.EventsGetter, scaleNamespacer unversionedextensions.ScalesGetter, hpaNamespacer unversionedautoscaling.HorizontalPodAutoscalersGetter, metricsClient metrics.MetricsClient, resyncPeriod time.Duration) *HorizontalController {
  93. broadcaster := record.NewBroadcaster()
  94. broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: evtNamespacer.Events("")})
  95. recorder := broadcaster.NewRecorder(api.EventSource{Component: "horizontal-pod-autoscaler"})
  96. controller := &HorizontalController{
  97. metricsClient: metricsClient,
  98. eventRecorder: recorder,
  99. scaleNamespacer: scaleNamespacer,
  100. hpaNamespacer: hpaNamespacer,
  101. }
  102. store, frameworkController := newInformer(controller, resyncPeriod)
  103. controller.store = store
  104. controller.controller = frameworkController
  105. return controller
  106. }
  107. func (a *HorizontalController) Run(stopCh <-chan struct{}) {
  108. defer utilruntime.HandleCrash()
  109. glog.Infof("Starting HPA Controller")
  110. go a.controller.Run(stopCh)
  111. <-stopCh
  112. glog.Infof("Shutting down HPA Controller")
  113. }
  114. func (a *HorizontalController) computeReplicasForCPUUtilization(hpa *autoscaling.HorizontalPodAutoscaler, scale *extensions.Scale) (int32, *int32, time.Time, error) {
  115. targetUtilization := int32(defaultTargetCPUUtilizationPercentage)
  116. if hpa.Spec.TargetCPUUtilizationPercentage != nil {
  117. targetUtilization = *hpa.Spec.TargetCPUUtilizationPercentage
  118. }
  119. currentReplicas := scale.Status.Replicas
  120. if scale.Status.Selector == nil {
  121. errMsg := "selector is required"
  122. a.eventRecorder.Event(hpa, api.EventTypeWarning, "SelectorRequired", errMsg)
  123. return 0, nil, time.Time{}, fmt.Errorf(errMsg)
  124. }
  125. selector, err := unversioned.LabelSelectorAsSelector(scale.Status.Selector)
  126. if err != nil {
  127. errMsg := fmt.Sprintf("couldn't convert selector string to a corresponding selector object: %v", err)
  128. a.eventRecorder.Event(hpa, api.EventTypeWarning, "InvalidSelector", errMsg)
  129. return 0, nil, time.Time{}, fmt.Errorf(errMsg)
  130. }
  131. currentUtilization, timestamp, err := a.metricsClient.GetCPUUtilization(hpa.Namespace, selector)
  132. // TODO: what to do on partial errors (like metrics obtained for 75% of pods).
  133. if err != nil {
  134. a.eventRecorder.Event(hpa, api.EventTypeWarning, "FailedGetMetrics", err.Error())
  135. return 0, nil, time.Time{}, fmt.Errorf("failed to get CPU utilization: %v", err)
  136. }
  137. utilization := int32(*currentUtilization)
  138. usageRatio := float64(utilization) / float64(targetUtilization)
  139. if math.Abs(1.0-usageRatio) > tolerance {
  140. return int32(math.Ceil(usageRatio * float64(currentReplicas))), &utilization, timestamp, nil
  141. } else {
  142. return currentReplicas, &utilization, timestamp, nil
  143. }
  144. }
  145. // Computes the desired number of replicas based on the CustomMetrics passed in cmAnnotation as json-serialized
  146. // extensions.CustomMetricsTargetList.
  147. // Returns number of replicas, metric which required highest number of replicas,
  148. // status string (also json-serialized extensions.CustomMetricsCurrentStatusList),
  149. // last timestamp of the metrics involved in computations or error, if occurred.
  150. func (a *HorizontalController) computeReplicasForCustomMetrics(hpa *autoscaling.HorizontalPodAutoscaler, scale *extensions.Scale,
  151. cmAnnotation string) (replicas int32, metric string, status string, timestamp time.Time, err error) {
  152. currentReplicas := scale.Status.Replicas
  153. replicas = 0
  154. metric = ""
  155. status = ""
  156. timestamp = time.Time{}
  157. err = nil
  158. if cmAnnotation == "" {
  159. return
  160. }
  161. var targetList extensions.CustomMetricTargetList
  162. if err := json.Unmarshal([]byte(cmAnnotation), &targetList); err != nil {
  163. return 0, "", "", time.Time{}, fmt.Errorf("failed to parse custom metrics annotation: %v", err)
  164. }
  165. if len(targetList.Items) == 0 {
  166. return 0, "", "", time.Time{}, fmt.Errorf("no custom metrics in annotation")
  167. }
  168. statusList := extensions.CustomMetricCurrentStatusList{
  169. Items: make([]extensions.CustomMetricCurrentStatus, 0),
  170. }
  171. for _, customMetricTarget := range targetList.Items {
  172. if scale.Status.Selector == nil {
  173. errMsg := "selector is required"
  174. a.eventRecorder.Event(hpa, api.EventTypeWarning, "SelectorRequired", errMsg)
  175. return 0, "", "", time.Time{}, fmt.Errorf("selector is required")
  176. }
  177. selector, err := unversioned.LabelSelectorAsSelector(scale.Status.Selector)
  178. if err != nil {
  179. errMsg := fmt.Sprintf("couldn't convert selector string to a corresponding selector object: %v", err)
  180. a.eventRecorder.Event(hpa, api.EventTypeWarning, "InvalidSelector", errMsg)
  181. return 0, "", "", time.Time{}, fmt.Errorf("couldn't convert selector string to a corresponding selector object: %v", err)
  182. }
  183. value, currentTimestamp, err := a.metricsClient.GetCustomMetric(customMetricTarget.Name, hpa.Namespace, selector)
  184. // TODO: what to do on partial errors (like metrics obtained for 75% of pods).
  185. if err != nil {
  186. a.eventRecorder.Event(hpa, api.EventTypeWarning, "FailedGetCustomMetrics", err.Error())
  187. return 0, "", "", time.Time{}, fmt.Errorf("failed to get custom metric value: %v", err)
  188. }
  189. floatTarget := float64(customMetricTarget.TargetValue.MilliValue()) / 1000.0
  190. usageRatio := *value / floatTarget
  191. replicaCountProposal := int32(0)
  192. if math.Abs(1.0-usageRatio) > tolerance {
  193. replicaCountProposal = int32(math.Ceil(usageRatio * float64(currentReplicas)))
  194. } else {
  195. replicaCountProposal = currentReplicas
  196. }
  197. if replicaCountProposal > replicas {
  198. timestamp = currentTimestamp
  199. replicas = replicaCountProposal
  200. metric = fmt.Sprintf("Custom metric %s", customMetricTarget.Name)
  201. }
  202. quantity, err := resource.ParseQuantity(fmt.Sprintf("%.3f", *value))
  203. if err != nil {
  204. return 0, "", "", time.Time{}, fmt.Errorf("failed to set custom metric value: %v", err)
  205. }
  206. statusList.Items = append(statusList.Items, extensions.CustomMetricCurrentStatus{
  207. Name: customMetricTarget.Name,
  208. CurrentValue: quantity,
  209. })
  210. }
  211. byteStatusList, err := json.Marshal(statusList)
  212. if err != nil {
  213. return 0, "", "", time.Time{}, fmt.Errorf("failed to serialize custom metric status: %v", err)
  214. }
  215. return replicas, metric, string(byteStatusList), timestamp, nil
  216. }
  217. func (a *HorizontalController) reconcileAutoscaler(hpa *autoscaling.HorizontalPodAutoscaler) error {
  218. reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)
  219. scale, err := a.scaleNamespacer.Scales(hpa.Namespace).Get(hpa.Spec.ScaleTargetRef.Kind, hpa.Spec.ScaleTargetRef.Name)
  220. if err != nil {
  221. a.eventRecorder.Event(hpa, api.EventTypeWarning, "FailedGetScale", err.Error())
  222. return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
  223. }
  224. currentReplicas := scale.Status.Replicas
  225. cpuDesiredReplicas := int32(0)
  226. var cpuCurrentUtilization *int32 = nil
  227. cpuTimestamp := time.Time{}
  228. cmDesiredReplicas := int32(0)
  229. cmMetric := ""
  230. cmStatus := ""
  231. cmTimestamp := time.Time{}
  232. desiredReplicas := int32(0)
  233. rescaleReason := ""
  234. timestamp := time.Now()
  235. if scale.Spec.Replicas == 0 {
  236. // Autoscaling is disabled for this resource
  237. desiredReplicas = 0
  238. } else if currentReplicas > hpa.Spec.MaxReplicas {
  239. rescaleReason = "Current number of replicas above Spec.MaxReplicas"
  240. desiredReplicas = hpa.Spec.MaxReplicas
  241. } else if hpa.Spec.MinReplicas != nil && currentReplicas < *hpa.Spec.MinReplicas {
  242. rescaleReason = "Current number of replicas below Spec.MinReplicas"
  243. desiredReplicas = *hpa.Spec.MinReplicas
  244. } else if currentReplicas == 0 {
  245. rescaleReason = "Current number of replicas must be greater than 0"
  246. desiredReplicas = 1
  247. } else {
  248. // All basic scenarios covered, the state should be sane, lets use metrics.
  249. cmAnnotation, cmAnnotationFound := hpa.Annotations[HpaCustomMetricsTargetAnnotationName]
  250. if hpa.Spec.TargetCPUUtilizationPercentage != nil || !cmAnnotationFound {
  251. cpuDesiredReplicas, cpuCurrentUtilization, cpuTimestamp, err = a.computeReplicasForCPUUtilization(hpa, scale)
  252. if err != nil {
  253. a.updateCurrentReplicasInStatus(hpa, currentReplicas)
  254. a.eventRecorder.Event(hpa, api.EventTypeWarning, "FailedComputeReplicas", err.Error())
  255. return fmt.Errorf("failed to compute desired number of replicas based on CPU utilization for %s: %v", reference, err)
  256. }
  257. }
  258. if cmAnnotationFound {
  259. cmDesiredReplicas, cmMetric, cmStatus, cmTimestamp, err = a.computeReplicasForCustomMetrics(hpa, scale, cmAnnotation)
  260. if err != nil {
  261. a.updateCurrentReplicasInStatus(hpa, currentReplicas)
  262. a.eventRecorder.Event(hpa, api.EventTypeWarning, "FailedComputeCMReplicas", err.Error())
  263. return fmt.Errorf("failed to compute desired number of replicas based on Custom Metrics for %s: %v", reference, err)
  264. }
  265. }
  266. rescaleMetric := ""
  267. if cpuDesiredReplicas > desiredReplicas {
  268. desiredReplicas = cpuDesiredReplicas
  269. timestamp = cpuTimestamp
  270. rescaleMetric = "CPU utilization"
  271. }
  272. if cmDesiredReplicas > desiredReplicas {
  273. desiredReplicas = cmDesiredReplicas
  274. timestamp = cmTimestamp
  275. rescaleMetric = cmMetric
  276. }
  277. if desiredReplicas > currentReplicas {
  278. rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
  279. } else if desiredReplicas < currentReplicas {
  280. rescaleReason = "All metrics below target"
  281. }
  282. if hpa.Spec.MinReplicas != nil && desiredReplicas < *hpa.Spec.MinReplicas {
  283. desiredReplicas = *hpa.Spec.MinReplicas
  284. }
  285. // never scale down to 0, reserved for disabling autoscaling
  286. if desiredReplicas == 0 {
  287. desiredReplicas = 1
  288. }
  289. if desiredReplicas > hpa.Spec.MaxReplicas {
  290. desiredReplicas = hpa.Spec.MaxReplicas
  291. }
  292. }
  293. rescale := shouldScale(hpa, currentReplicas, desiredReplicas, timestamp)
  294. if rescale {
  295. scale.Spec.Replicas = desiredReplicas
  296. _, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(hpa.Spec.ScaleTargetRef.Kind, scale)
  297. if err != nil {
  298. a.eventRecorder.Eventf(hpa, api.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
  299. return fmt.Errorf("failed to rescale %s: %v", reference, err)
  300. }
  301. a.eventRecorder.Eventf(hpa, api.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
  302. glog.Infof("Successfull rescale of %s, old size: %d, new size: %d, reason: %s",
  303. hpa.Name, currentReplicas, desiredReplicas, rescaleReason)
  304. } else {
  305. desiredReplicas = currentReplicas
  306. }
  307. return a.updateStatus(hpa, currentReplicas, desiredReplicas, cpuCurrentUtilization, cmStatus, rescale)
  308. }
  309. func shouldScale(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, timestamp time.Time) bool {
  310. if desiredReplicas != currentReplicas {
  311. // Going down only if the usageRatio dropped significantly below the target
  312. // and there was no rescaling in the last downscaleForbiddenWindow.
  313. if desiredReplicas < currentReplicas &&
  314. (hpa.Status.LastScaleTime == nil ||
  315. hpa.Status.LastScaleTime.Add(downscaleForbiddenWindow).Before(timestamp)) {
  316. return true
  317. }
  318. // Going up only if the usage ratio increased significantly above the target
  319. // and there was no rescaling in the last upscaleForbiddenWindow.
  320. if desiredReplicas > currentReplicas &&
  321. (hpa.Status.LastScaleTime == nil ||
  322. hpa.Status.LastScaleTime.Add(upscaleForbiddenWindow).Before(timestamp)) {
  323. return true
  324. }
  325. }
  326. return false
  327. }
  328. func (a *HorizontalController) updateCurrentReplicasInStatus(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas int32) {
  329. err := a.updateStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentCPUUtilizationPercentage, hpa.Annotations[HpaCustomMetricsStatusAnnotationName], false)
  330. if err != nil {
  331. glog.Errorf("%v", err)
  332. }
  333. }
  334. func (a *HorizontalController) updateStatus(hpa *autoscaling.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, cpuCurrentUtilization *int32, cmStatus string, rescale bool) error {
  335. hpa.Status = autoscaling.HorizontalPodAutoscalerStatus{
  336. CurrentReplicas: currentReplicas,
  337. DesiredReplicas: desiredReplicas,
  338. CurrentCPUUtilizationPercentage: cpuCurrentUtilization,
  339. LastScaleTime: hpa.Status.LastScaleTime,
  340. }
  341. if cmStatus != "" {
  342. hpa.Annotations[HpaCustomMetricsStatusAnnotationName] = cmStatus
  343. }
  344. if rescale {
  345. now := unversioned.NewTime(time.Now())
  346. hpa.Status.LastScaleTime = &now
  347. }
  348. _, err := a.hpaNamespacer.HorizontalPodAutoscalers(hpa.Namespace).UpdateStatus(hpa)
  349. if err != nil {
  350. a.eventRecorder.Event(hpa, api.EventTypeWarning, "FailedUpdateStatus", err.Error())
  351. return fmt.Errorf("failed to update status for %s: %v", hpa.Name, err)
  352. }
  353. glog.V(2).Infof("Successfully updated status for %s", hpa.Name)
  354. return nil
  355. }