replenishment_controller.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package resourcequota
  14. import (
  15. "fmt"
  16. "github.com/golang/glog"
  17. "k8s.io/kubernetes/pkg/api"
  18. "k8s.io/kubernetes/pkg/api/meta"
  19. "k8s.io/kubernetes/pkg/api/unversioned"
  20. "k8s.io/kubernetes/pkg/client/cache"
  21. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  22. "k8s.io/kubernetes/pkg/controller"
  23. "k8s.io/kubernetes/pkg/controller/framework"
  24. "k8s.io/kubernetes/pkg/controller/framework/informers"
  25. "k8s.io/kubernetes/pkg/quota/evaluator/core"
  26. "k8s.io/kubernetes/pkg/runtime"
  27. "k8s.io/kubernetes/pkg/util/metrics"
  28. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  29. "k8s.io/kubernetes/pkg/watch"
  30. )
  31. // ReplenishmentFunc is a function that is invoked when controller sees a change
  32. // that may require a quota to be replenished (i.e. object deletion, or object moved to terminal state)
  33. type ReplenishmentFunc func(groupKind unversioned.GroupKind, namespace string, object runtime.Object)
  34. // ReplenishmentControllerOptions is an options struct that tells a factory
  35. // how to configure a controller that can inform the quota system it should
  36. // replenish quota
  37. type ReplenishmentControllerOptions struct {
  38. // The kind monitored for replenishment
  39. GroupKind unversioned.GroupKind
  40. // The period that should be used to re-sync the monitored resource
  41. ResyncPeriod controller.ResyncPeriodFunc
  42. // The function to invoke when a change is observed that should trigger
  43. // replenishment
  44. ReplenishmentFunc ReplenishmentFunc
  45. }
  46. // PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is not
  47. func PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) {
  48. return func(oldObj, newObj interface{}) {
  49. oldPod := oldObj.(*api.Pod)
  50. newPod := newObj.(*api.Pod)
  51. if core.QuotaPod(oldPod) && !core.QuotaPod(newPod) {
  52. options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod)
  53. }
  54. }
  55. }
  56. // ObjectReplenenishmentDeleteFunc will replenish on every delete
  57. func ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func(obj interface{}) {
  58. return func(obj interface{}) {
  59. metaObject, err := meta.Accessor(obj)
  60. if err != nil {
  61. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  62. if !ok {
  63. glog.Errorf("replenishment controller could not get object from tombstone %+v, could take up to %v before quota is replenished", obj, options.ResyncPeriod())
  64. utilruntime.HandleError(err)
  65. return
  66. }
  67. metaObject, err = meta.Accessor(tombstone.Obj)
  68. if err != nil {
  69. glog.Errorf("replenishment controller tombstone contained object that is not a meta %+v, could take up to %v before quota is replenished", tombstone.Obj, options.ResyncPeriod())
  70. utilruntime.HandleError(err)
  71. return
  72. }
  73. }
  74. options.ReplenishmentFunc(options.GroupKind, metaObject.GetNamespace(), nil)
  75. }
  76. }
  77. // ReplenishmentControllerFactory knows how to build replenishment controllers
  78. type ReplenishmentControllerFactory interface {
  79. // NewController returns a controller configured with the specified options.
  80. // This method is NOT thread-safe.
  81. NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error)
  82. }
  83. // replenishmentControllerFactory implements ReplenishmentControllerFactory
  84. type replenishmentControllerFactory struct {
  85. kubeClient clientset.Interface
  86. podInformer framework.SharedInformer
  87. }
  88. // NewReplenishmentControllerFactory returns a factory that knows how to build controllers
  89. // to replenish resources when updated or deleted
  90. func NewReplenishmentControllerFactory(podInformer framework.SharedInformer, kubeClient clientset.Interface) ReplenishmentControllerFactory {
  91. return &replenishmentControllerFactory{
  92. kubeClient: kubeClient,
  93. podInformer: podInformer,
  94. }
  95. }
  96. func NewReplenishmentControllerFactoryFromClient(kubeClient clientset.Interface) ReplenishmentControllerFactory {
  97. return NewReplenishmentControllerFactory(nil, kubeClient)
  98. }
  99. func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error) {
  100. var result framework.ControllerInterface
  101. if r.kubeClient != nil && r.kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
  102. metrics.RegisterMetricAndTrackRateLimiterUsage("replenishment_controller", r.kubeClient.Core().GetRESTClient().GetRateLimiter())
  103. }
  104. switch options.GroupKind {
  105. case api.Kind("Pod"):
  106. if r.podInformer != nil {
  107. r.podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
  108. UpdateFunc: PodReplenishmentUpdateFunc(options),
  109. DeleteFunc: ObjectReplenishmentDeleteFunc(options),
  110. })
  111. result = r.podInformer.GetController()
  112. break
  113. }
  114. r.podInformer = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod())
  115. result = r.podInformer
  116. case api.Kind("Service"):
  117. _, result = framework.NewInformer(
  118. &cache.ListWatch{
  119. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  120. return r.kubeClient.Core().Services(api.NamespaceAll).List(options)
  121. },
  122. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  123. return r.kubeClient.Core().Services(api.NamespaceAll).Watch(options)
  124. },
  125. },
  126. &api.Service{},
  127. options.ResyncPeriod(),
  128. framework.ResourceEventHandlerFuncs{
  129. UpdateFunc: ServiceReplenishmentUpdateFunc(options),
  130. DeleteFunc: ObjectReplenishmentDeleteFunc(options),
  131. },
  132. )
  133. case api.Kind("ReplicationController"):
  134. _, result = framework.NewInformer(
  135. &cache.ListWatch{
  136. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  137. return r.kubeClient.Core().ReplicationControllers(api.NamespaceAll).List(options)
  138. },
  139. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  140. return r.kubeClient.Core().ReplicationControllers(api.NamespaceAll).Watch(options)
  141. },
  142. },
  143. &api.ReplicationController{},
  144. options.ResyncPeriod(),
  145. framework.ResourceEventHandlerFuncs{
  146. DeleteFunc: ObjectReplenishmentDeleteFunc(options),
  147. },
  148. )
  149. case api.Kind("PersistentVolumeClaim"):
  150. _, result = framework.NewInformer(
  151. &cache.ListWatch{
  152. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  153. return r.kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).List(options)
  154. },
  155. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  156. return r.kubeClient.Core().PersistentVolumeClaims(api.NamespaceAll).Watch(options)
  157. },
  158. },
  159. &api.PersistentVolumeClaim{},
  160. options.ResyncPeriod(),
  161. framework.ResourceEventHandlerFuncs{
  162. DeleteFunc: ObjectReplenishmentDeleteFunc(options),
  163. },
  164. )
  165. case api.Kind("Secret"):
  166. _, result = framework.NewInformer(
  167. &cache.ListWatch{
  168. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  169. return r.kubeClient.Core().Secrets(api.NamespaceAll).List(options)
  170. },
  171. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  172. return r.kubeClient.Core().Secrets(api.NamespaceAll).Watch(options)
  173. },
  174. },
  175. &api.Secret{},
  176. options.ResyncPeriod(),
  177. framework.ResourceEventHandlerFuncs{
  178. DeleteFunc: ObjectReplenishmentDeleteFunc(options),
  179. },
  180. )
  181. case api.Kind("ConfigMap"):
  182. _, result = framework.NewInformer(
  183. &cache.ListWatch{
  184. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  185. return r.kubeClient.Core().ConfigMaps(api.NamespaceAll).List(options)
  186. },
  187. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  188. return r.kubeClient.Core().ConfigMaps(api.NamespaceAll).Watch(options)
  189. },
  190. },
  191. &api.ConfigMap{},
  192. options.ResyncPeriod(),
  193. framework.ResourceEventHandlerFuncs{
  194. DeleteFunc: ObjectReplenishmentDeleteFunc(options),
  195. },
  196. )
  197. default:
  198. return nil, NewUnhandledGroupKindError(options.GroupKind)
  199. }
  200. return result, nil
  201. }
  202. // ServiceReplenishmentUpdateFunc will replenish if the service was quota tracked has changed service type
  203. func ServiceReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) {
  204. return func(oldObj, newObj interface{}) {
  205. oldService := oldObj.(*api.Service)
  206. newService := newObj.(*api.Service)
  207. if core.GetQuotaServiceType(oldService) != core.GetQuotaServiceType(newService) {
  208. options.ReplenishmentFunc(options.GroupKind, newService.Namespace, nil)
  209. }
  210. }
  211. }
  212. type unhandledKindErr struct {
  213. kind unversioned.GroupKind
  214. }
  215. func (e unhandledKindErr) Error() string {
  216. return fmt.Sprintf("no replenishment controller available for %s", e.kind)
  217. }
  218. func NewUnhandledGroupKindError(kind unversioned.GroupKind) error {
  219. return unhandledKindErr{kind: kind}
  220. }
  221. func IsUnhandledGroupKindError(err error) bool {
  222. if err == nil {
  223. return false
  224. }
  225. _, ok := err.(unhandledKindErr)
  226. return ok
  227. }
  228. // UnionReplenishmentControllerFactory iterates through its constituent factories ignoring, UnhandledGroupKindErrors
  229. // returning the first success or failure it hits. If there are no hits either way, it return an UnhandledGroupKind error
  230. type UnionReplenishmentControllerFactory []ReplenishmentControllerFactory
  231. func (f UnionReplenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (framework.ControllerInterface, error) {
  232. for _, factory := range f {
  233. controller, err := factory.NewController(options)
  234. if !IsUnhandledGroupKindError(err) {
  235. return controller, err
  236. }
  237. }
  238. return nil, NewUnhandledGroupKindError(options.GroupKind)
  239. }