resource_quota_controller.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package resourcequota
  14. import (
  15. "time"
  16. "github.com/golang/glog"
  17. "k8s.io/kubernetes/pkg/api"
  18. "k8s.io/kubernetes/pkg/api/unversioned"
  19. "k8s.io/kubernetes/pkg/client/cache"
  20. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  21. "k8s.io/kubernetes/pkg/controller"
  22. "k8s.io/kubernetes/pkg/controller/framework"
  23. "k8s.io/kubernetes/pkg/quota"
  24. "k8s.io/kubernetes/pkg/runtime"
  25. "k8s.io/kubernetes/pkg/util/metrics"
  26. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  27. "k8s.io/kubernetes/pkg/util/wait"
  28. "k8s.io/kubernetes/pkg/util/workqueue"
  29. "k8s.io/kubernetes/pkg/watch"
  30. )
  31. // ResourceQuotaControllerOptions holds options for creating a quota controller
  32. type ResourceQuotaControllerOptions struct {
  33. // Must have authority to list all quotas, and update quota status
  34. KubeClient clientset.Interface
  35. // Controls full recalculation of quota usage
  36. ResyncPeriod controller.ResyncPeriodFunc
  37. // Knows how to calculate usage
  38. Registry quota.Registry
  39. // Knows how to build controllers that notify replenishment events
  40. ControllerFactory ReplenishmentControllerFactory
  41. // Controls full resync of objects monitored for replenihsment.
  42. ReplenishmentResyncPeriod controller.ResyncPeriodFunc
  43. // List of GroupKind objects that should be monitored for replenishment at
  44. // a faster frequency than the quota controller recalculation interval
  45. GroupKindsToReplenish []unversioned.GroupKind
  46. }
  47. // ResourceQuotaController is responsible for tracking quota usage status in the system
  48. type ResourceQuotaController struct {
  49. // Must have authority to list all resources in the system, and update quota status
  50. kubeClient clientset.Interface
  51. // An index of resource quota objects by namespace
  52. rqIndexer cache.Indexer
  53. // Watches changes to all resource quota
  54. rqController *framework.Controller
  55. // ResourceQuota objects that need to be synchronized
  56. queue workqueue.RateLimitingInterface
  57. // missingUsageQueue holds objects that are missing the initial usage informatino
  58. missingUsageQueue workqueue.RateLimitingInterface
  59. // To allow injection of syncUsage for testing.
  60. syncHandler func(key string) error
  61. // function that controls full recalculation of quota usage
  62. resyncPeriod controller.ResyncPeriodFunc
  63. // knows how to calculate usage
  64. registry quota.Registry
  65. // controllers monitoring to notify for replenishment
  66. replenishmentControllers []framework.ControllerInterface
  67. }
  68. func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController {
  69. // build the resource quota controller
  70. rq := &ResourceQuotaController{
  71. kubeClient: options.KubeClient,
  72. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),
  73. missingUsageQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),
  74. resyncPeriod: options.ResyncPeriod,
  75. registry: options.Registry,
  76. replenishmentControllers: []framework.ControllerInterface{},
  77. }
  78. if options.KubeClient != nil && options.KubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
  79. metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", options.KubeClient.Core().GetRESTClient().GetRateLimiter())
  80. }
  81. // set the synchronization handler
  82. rq.syncHandler = rq.syncResourceQuotaFromKey
  83. // build the controller that observes quota
  84. rq.rqIndexer, rq.rqController = framework.NewIndexerInformer(
  85. &cache.ListWatch{
  86. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  87. return rq.kubeClient.Core().ResourceQuotas(api.NamespaceAll).List(options)
  88. },
  89. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  90. return rq.kubeClient.Core().ResourceQuotas(api.NamespaceAll).Watch(options)
  91. },
  92. },
  93. &api.ResourceQuota{},
  94. rq.resyncPeriod(),
  95. framework.ResourceEventHandlerFuncs{
  96. AddFunc: rq.addQuota,
  97. UpdateFunc: func(old, cur interface{}) {
  98. // We are only interested in observing updates to quota.spec to drive updates to quota.status.
  99. // We ignore all updates to quota.Status because they are all driven by this controller.
  100. // IMPORTANT:
  101. // We do not use this function to queue up a full quota recalculation. To do so, would require
  102. // us to enqueue all quota.Status updates, and since quota.Status updates involve additional queries
  103. // that cannot be backed by a cache and result in a full query of a namespace's content, we do not
  104. // want to pay the price on spurious status updates. As a result, we have a separate routine that is
  105. // responsible for enqueue of all resource quotas when doing a full resync (enqueueAll)
  106. oldResourceQuota := old.(*api.ResourceQuota)
  107. curResourceQuota := cur.(*api.ResourceQuota)
  108. if quota.Equals(curResourceQuota.Spec.Hard, oldResourceQuota.Spec.Hard) {
  109. return
  110. }
  111. rq.addQuota(curResourceQuota)
  112. },
  113. // This will enter the sync loop and no-op, because the controller has been deleted from the store.
  114. // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
  115. // way of achieving this is by performing a `stop` operation on the controller.
  116. DeleteFunc: rq.enqueueResourceQuota,
  117. },
  118. cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
  119. )
  120. for _, groupKindToReplenish := range options.GroupKindsToReplenish {
  121. controllerOptions := &ReplenishmentControllerOptions{
  122. GroupKind: groupKindToReplenish,
  123. ResyncPeriod: options.ReplenishmentResyncPeriod,
  124. ReplenishmentFunc: rq.replenishQuota,
  125. }
  126. replenishmentController, err := options.ControllerFactory.NewController(controllerOptions)
  127. if err != nil {
  128. glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err)
  129. } else {
  130. rq.replenishmentControllers = append(rq.replenishmentControllers, replenishmentController)
  131. }
  132. }
  133. return rq
  134. }
  135. // enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
  136. func (rq *ResourceQuotaController) enqueueAll() {
  137. defer glog.V(4).Infof("Resource quota controller queued all resource quota for full calculation of usage")
  138. for _, k := range rq.rqIndexer.ListKeys() {
  139. rq.queue.Add(k)
  140. }
  141. }
  142. // obj could be an *api.ResourceQuota, or a DeletionFinalStateUnknown marker item.
  143. func (rq *ResourceQuotaController) enqueueResourceQuota(obj interface{}) {
  144. key, err := controller.KeyFunc(obj)
  145. if err != nil {
  146. glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
  147. return
  148. }
  149. rq.queue.Add(key)
  150. }
  151. func (rq *ResourceQuotaController) addQuota(obj interface{}) {
  152. key, err := controller.KeyFunc(obj)
  153. if err != nil {
  154. glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
  155. return
  156. }
  157. resourceQuota := obj.(*api.ResourceQuota)
  158. // if we declared an intent that is not yet captured in status (prioritize it)
  159. if !api.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard) {
  160. rq.missingUsageQueue.Add(key)
  161. return
  162. }
  163. // if we declared a constraint that has no usage (which this controller can calculate, prioritize it)
  164. for constraint := range resourceQuota.Status.Hard {
  165. if _, usageFound := resourceQuota.Status.Used[constraint]; !usageFound {
  166. matchedResources := []api.ResourceName{constraint}
  167. for _, evaluator := range rq.registry.Evaluators() {
  168. if intersection := quota.Intersection(evaluator.MatchesResources(), matchedResources); len(intersection) != 0 {
  169. rq.missingUsageQueue.Add(key)
  170. return
  171. }
  172. }
  173. }
  174. }
  175. // no special priority, go in normal recalc queue
  176. rq.queue.Add(key)
  177. }
  178. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  179. func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() {
  180. workFunc := func() bool {
  181. key, quit := queue.Get()
  182. if quit {
  183. return true
  184. }
  185. defer queue.Done(key)
  186. err := rq.syncHandler(key.(string))
  187. if err == nil {
  188. queue.Forget(key)
  189. return false
  190. }
  191. utilruntime.HandleError(err)
  192. queue.AddRateLimited(key)
  193. return false
  194. }
  195. return func() {
  196. for {
  197. if quit := workFunc(); quit {
  198. glog.Infof("resource quota controller worker shutting down")
  199. return
  200. }
  201. }
  202. }
  203. }
  204. // Run begins quota controller using the specified number of workers
  205. func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
  206. defer utilruntime.HandleCrash()
  207. go rq.rqController.Run(stopCh)
  208. // the controllers that replenish other resources to respond rapidly to state changes
  209. for _, replenishmentController := range rq.replenishmentControllers {
  210. go replenishmentController.Run(stopCh)
  211. }
  212. // the workers that chug through the quota calculation backlog
  213. for i := 0; i < workers; i++ {
  214. go wait.Until(rq.worker(rq.queue), time.Second, stopCh)
  215. go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)
  216. }
  217. // the timer for how often we do a full recalculation across all quotas
  218. go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)
  219. <-stopCh
  220. glog.Infof("Shutting down ResourceQuotaController")
  221. rq.queue.ShutDown()
  222. }
  223. // syncResourceQuotaFromKey syncs a quota key
  224. func (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err error) {
  225. startTime := time.Now()
  226. defer func() {
  227. glog.V(4).Infof("Finished syncing resource quota %q (%v)", key, time.Now().Sub(startTime))
  228. }()
  229. obj, exists, err := rq.rqIndexer.GetByKey(key)
  230. if !exists {
  231. glog.Infof("Resource quota has been deleted %v", key)
  232. return nil
  233. }
  234. if err != nil {
  235. glog.Infof("Unable to retrieve resource quota %v from store: %v", key, err)
  236. rq.queue.Add(key)
  237. return err
  238. }
  239. quota := *obj.(*api.ResourceQuota)
  240. return rq.syncResourceQuota(quota)
  241. }
  242. // syncResourceQuota runs a complete sync of resource quota status across all known kinds
  243. func (rq *ResourceQuotaController) syncResourceQuota(resourceQuota api.ResourceQuota) (err error) {
  244. // quota is dirty if any part of spec hard limits differs from the status hard limits
  245. dirty := !api.Semantic.DeepEqual(resourceQuota.Spec.Hard, resourceQuota.Status.Hard)
  246. // dirty tracks if the usage status differs from the previous sync,
  247. // if so, we send a new usage with latest status
  248. // if this is our first sync, it will be dirty by default, since we need track usage
  249. dirty = dirty || (resourceQuota.Status.Hard == nil || resourceQuota.Status.Used == nil)
  250. used := api.ResourceList{}
  251. if resourceQuota.Status.Used != nil {
  252. used = quota.Add(api.ResourceList{}, resourceQuota.Status.Used)
  253. }
  254. hardLimits := quota.Add(api.ResourceList{}, resourceQuota.Spec.Hard)
  255. newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry)
  256. if err != nil {
  257. return err
  258. }
  259. for key, value := range newUsage {
  260. used[key] = value
  261. }
  262. // ensure set of used values match those that have hard constraints
  263. hardResources := quota.ResourceNames(hardLimits)
  264. used = quota.Mask(used, hardResources)
  265. // Create a usage object that is based on the quota resource version that will handle updates
  266. // by default, we preserve the past usage observation, and set hard to the current spec
  267. usage := api.ResourceQuota{
  268. ObjectMeta: api.ObjectMeta{
  269. Name: resourceQuota.Name,
  270. Namespace: resourceQuota.Namespace,
  271. ResourceVersion: resourceQuota.ResourceVersion,
  272. Labels: resourceQuota.Labels,
  273. Annotations: resourceQuota.Annotations},
  274. Status: api.ResourceQuotaStatus{
  275. Hard: hardLimits,
  276. Used: used,
  277. },
  278. }
  279. dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used)
  280. // there was a change observed by this controller that requires we update quota
  281. if dirty {
  282. _, err = rq.kubeClient.Core().ResourceQuotas(usage.Namespace).UpdateStatus(&usage)
  283. return err
  284. }
  285. return nil
  286. }
  287. // replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
  288. func (rq *ResourceQuotaController) replenishQuota(groupKind unversioned.GroupKind, namespace string, object runtime.Object) {
  289. // check if the quota controller can evaluate this kind, if not, ignore it altogether...
  290. evaluators := rq.registry.Evaluators()
  291. evaluator, found := evaluators[groupKind]
  292. if !found {
  293. return
  294. }
  295. // check if this namespace even has a quota...
  296. indexKey := &api.ResourceQuota{}
  297. indexKey.Namespace = namespace
  298. resourceQuotas, err := rq.rqIndexer.Index("namespace", indexKey)
  299. if err != nil {
  300. glog.Errorf("quota controller could not find ResourceQuota associated with namespace: %s, could take up to %v before a quota replenishes", namespace, rq.resyncPeriod())
  301. }
  302. if len(resourceQuotas) == 0 {
  303. return
  304. }
  305. // only queue those quotas that are tracking a resource associated with this kind.
  306. matchedResources := evaluator.MatchesResources()
  307. for i := range resourceQuotas {
  308. resourceQuota := resourceQuotas[i].(*api.ResourceQuota)
  309. resourceQuotaResources := quota.ResourceNames(resourceQuota.Status.Hard)
  310. if len(quota.Intersection(matchedResources, resourceQuotaResources)) > 0 {
  311. // TODO: make this support targeted replenishment to a specific kind, right now it does a full recalc on that quota.
  312. rq.enqueueResourceQuota(resourceQuota)
  313. }
  314. }
  315. }