namespace_controller.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package namespace
  14. import (
  15. "time"
  16. "k8s.io/kubernetes/pkg/api"
  17. "k8s.io/kubernetes/pkg/api/unversioned"
  18. "k8s.io/kubernetes/pkg/client/cache"
  19. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  20. "k8s.io/kubernetes/pkg/client/typed/dynamic"
  21. "k8s.io/kubernetes/pkg/controller"
  22. "k8s.io/kubernetes/pkg/controller/framework"
  23. "k8s.io/kubernetes/pkg/runtime"
  24. "k8s.io/kubernetes/pkg/util/metrics"
  25. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  26. "k8s.io/kubernetes/pkg/util/wait"
  27. "k8s.io/kubernetes/pkg/util/workqueue"
  28. "k8s.io/kubernetes/pkg/watch"
  29. "github.com/golang/glog"
  30. )
  31. // NamespaceController is responsible for performing actions dependent upon a namespace phase
  32. type NamespaceController struct {
  33. // client that purges namespace content, must have list/delete privileges on all content
  34. kubeClient clientset.Interface
  35. // clientPool manages a pool of dynamic clients
  36. clientPool dynamic.ClientPool
  37. // store that holds the namespaces
  38. store cache.Store
  39. // controller that observes the namespaces
  40. controller *framework.Controller
  41. // namespaces that have been queued up for processing by workers
  42. queue workqueue.RateLimitingInterface
  43. // list of preferred group versions and their corresponding resource set for namespace deletion
  44. groupVersionResources []unversioned.GroupVersionResource
  45. // opCache is a cache to remember if a particular operation is not supported to aid dynamic client.
  46. opCache operationNotSupportedCache
  47. // finalizerToken is the finalizer token managed by this controller
  48. finalizerToken api.FinalizerName
  49. }
  50. // NewNamespaceController creates a new NamespaceController
  51. func NewNamespaceController(
  52. kubeClient clientset.Interface,
  53. clientPool dynamic.ClientPool,
  54. groupVersionResources []unversioned.GroupVersionResource,
  55. resyncPeriod time.Duration,
  56. finalizerToken api.FinalizerName) *NamespaceController {
  57. // create the controller so we can inject the enqueue function
  58. namespaceController := &NamespaceController{
  59. kubeClient: kubeClient,
  60. clientPool: clientPool,
  61. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
  62. groupVersionResources: groupVersionResources,
  63. opCache: operationNotSupportedCache{},
  64. finalizerToken: finalizerToken,
  65. }
  66. if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
  67. metrics.RegisterMetricAndTrackRateLimiterUsage("namespace_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
  68. }
  69. // configure the backing store/controller
  70. store, controller := framework.NewInformer(
  71. &cache.ListWatch{
  72. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  73. return kubeClient.Core().Namespaces().List(options)
  74. },
  75. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  76. return kubeClient.Core().Namespaces().Watch(options)
  77. },
  78. },
  79. &api.Namespace{},
  80. resyncPeriod,
  81. framework.ResourceEventHandlerFuncs{
  82. AddFunc: func(obj interface{}) {
  83. namespace := obj.(*api.Namespace)
  84. namespaceController.enqueueNamespace(namespace)
  85. },
  86. UpdateFunc: func(oldObj, newObj interface{}) {
  87. namespace := newObj.(*api.Namespace)
  88. namespaceController.enqueueNamespace(namespace)
  89. },
  90. },
  91. )
  92. namespaceController.store = store
  93. namespaceController.controller = controller
  94. return namespaceController
  95. }
  96. // enqueueNamespace adds an object to the controller work queue
  97. // obj could be an *api.Namespace, or a DeletionFinalStateUnknown item.
  98. func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
  99. key, err := controller.KeyFunc(obj)
  100. if err != nil {
  101. glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
  102. return
  103. }
  104. nm.queue.Add(key)
  105. }
  106. // worker processes the queue of namespace objects.
  107. // Each namespace can be in the queue at most once.
  108. // The system ensures that no two workers can process
  109. // the same namespace at the same time.
  110. func (nm *NamespaceController) worker() {
  111. workFunc := func() bool {
  112. key, quit := nm.queue.Get()
  113. if quit {
  114. return true
  115. }
  116. defer nm.queue.Done(key)
  117. err := nm.syncNamespaceFromKey(key.(string))
  118. if err == nil {
  119. // no error, forget this entry and return
  120. nm.queue.Forget(key)
  121. return false
  122. }
  123. if estimate, ok := err.(*contentRemainingError); ok {
  124. t := estimate.Estimate/2 + 1
  125. glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t)
  126. nm.queue.AddAfter(key, time.Duration(t)*time.Second)
  127. } else {
  128. // rather than wait for a full resync, re-add the namespace to the queue to be processed
  129. nm.queue.AddRateLimited(key)
  130. utilruntime.HandleError(err)
  131. }
  132. return false
  133. }
  134. for {
  135. quit := workFunc()
  136. if quit {
  137. return
  138. }
  139. }
  140. }
  141. // syncNamespaceFromKey looks for a namespace with the specified key in its store and synchronizes it
  142. func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
  143. startTime := time.Now()
  144. defer glog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Now().Sub(startTime))
  145. obj, exists, err := nm.store.GetByKey(key)
  146. if !exists {
  147. glog.Infof("Namespace has been deleted %v", key)
  148. return nil
  149. }
  150. if err != nil {
  151. glog.Infof("Unable to retrieve namespace %v from store: %v", key, err)
  152. nm.queue.Add(key)
  153. return err
  154. }
  155. namespace := obj.(*api.Namespace)
  156. return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResources, namespace, nm.finalizerToken)
  157. }
  158. // Run starts observing the system with the specified number of workers.
  159. func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
  160. defer utilruntime.HandleCrash()
  161. go nm.controller.Run(stopCh)
  162. for i := 0; i < workers; i++ {
  163. go wait.Until(nm.worker, time.Second, stopCh)
  164. }
  165. <-stopCh
  166. glog.Infof("Shutting down NamespaceController")
  167. nm.queue.ShutDown()
  168. }