namespace_controller_utils.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package namespace
  14. import (
  15. "fmt"
  16. "time"
  17. "k8s.io/kubernetes/pkg/api"
  18. "k8s.io/kubernetes/pkg/api/errors"
  19. "k8s.io/kubernetes/pkg/api/unversioned"
  20. "k8s.io/kubernetes/pkg/api/v1"
  21. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  22. "k8s.io/kubernetes/pkg/client/typed/dynamic"
  23. "k8s.io/kubernetes/pkg/runtime"
  24. "k8s.io/kubernetes/pkg/util/sets"
  25. "github.com/golang/glog"
  26. )
  27. // contentRemainingError is used to inform the caller that content is not fully removed from the namespace
  28. type contentRemainingError struct {
  29. Estimate int64
  30. }
  31. func (e *contentRemainingError) Error() string {
  32. return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate)
  33. }
  34. // operation is used for caching if an operation is supported on a dynamic client.
  35. type operation string
  36. const (
  37. operationDeleteCollection operation = "deleteCollection"
  38. operationList operation = "list"
  39. )
  40. // operationKey is an entry in a cache.
  41. type operationKey struct {
  42. op operation
  43. gvr unversioned.GroupVersionResource
  44. }
  45. // operationNotSupportedCache is a simple cache to remember if an operation is not supported for a resource.
  46. // if the operationKey maps to true, it means the operation is not supported.
  47. type operationNotSupportedCache map[operationKey]bool
  48. // isSupported returns true if the operation is supported
  49. func (o operationNotSupportedCache) isSupported(key operationKey) bool {
  50. return !o[key]
  51. }
  52. // updateNamespaceFunc is a function that makes an update to a namespace
  53. type updateNamespaceFunc func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error)
  54. // retryOnConflictError retries the specified fn if there was a conflict error
  55. // TODO RetryOnConflict should be a generic concept in client code
  56. func retryOnConflictError(kubeClient clientset.Interface, namespace *api.Namespace, fn updateNamespaceFunc) (result *api.Namespace, err error) {
  57. latestNamespace := namespace
  58. for {
  59. result, err = fn(kubeClient, latestNamespace)
  60. if err == nil {
  61. return result, nil
  62. }
  63. if !errors.IsConflict(err) {
  64. return nil, err
  65. }
  66. latestNamespace, err = kubeClient.Core().Namespaces().Get(latestNamespace.Name)
  67. if err != nil {
  68. return nil, err
  69. }
  70. }
  71. }
  72. // updateNamespaceStatusFunc will verify that the status of the namespace is correct
  73. func updateNamespaceStatusFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) {
  74. if namespace.DeletionTimestamp.IsZero() || namespace.Status.Phase == api.NamespaceTerminating {
  75. return namespace, nil
  76. }
  77. newNamespace := api.Namespace{}
  78. newNamespace.ObjectMeta = namespace.ObjectMeta
  79. newNamespace.Status = namespace.Status
  80. newNamespace.Status.Phase = api.NamespaceTerminating
  81. return kubeClient.Core().Namespaces().UpdateStatus(&newNamespace)
  82. }
  83. // finalized returns true if the namespace.Spec.Finalizers is an empty list
  84. func finalized(namespace *api.Namespace) bool {
  85. return len(namespace.Spec.Finalizers) == 0
  86. }
  87. // finalizeNamespaceFunc returns a function that knows how to finalize a namespace for specified token.
  88. func finalizeNamespaceFunc(finalizerToken api.FinalizerName) updateNamespaceFunc {
  89. return func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) {
  90. return finalizeNamespace(kubeClient, namespace, finalizerToken)
  91. }
  92. }
  93. // finalizeNamespace removes the specified finalizerToken and finalizes the namespace
  94. func finalizeNamespace(kubeClient clientset.Interface, namespace *api.Namespace, finalizerToken api.FinalizerName) (*api.Namespace, error) {
  95. namespaceFinalize := api.Namespace{}
  96. namespaceFinalize.ObjectMeta = namespace.ObjectMeta
  97. namespaceFinalize.Spec = namespace.Spec
  98. finalizerSet := sets.NewString()
  99. for i := range namespace.Spec.Finalizers {
  100. if namespace.Spec.Finalizers[i] != finalizerToken {
  101. finalizerSet.Insert(string(namespace.Spec.Finalizers[i]))
  102. }
  103. }
  104. namespaceFinalize.Spec.Finalizers = make([]api.FinalizerName, 0, len(finalizerSet))
  105. for _, value := range finalizerSet.List() {
  106. namespaceFinalize.Spec.Finalizers = append(namespaceFinalize.Spec.Finalizers, api.FinalizerName(value))
  107. }
  108. namespace, err := kubeClient.Core().Namespaces().Finalize(&namespaceFinalize)
  109. if err != nil {
  110. // it was removed already, so life is good
  111. if errors.IsNotFound(err) {
  112. return namespace, nil
  113. }
  114. }
  115. return namespace, err
  116. }
  117. // deleteCollection is a helper function that will delete the collection of resources
  118. // it returns true if the operation was supported on the server.
  119. // it returns an error if the operation was supported on the server but was unable to complete.
  120. func deleteCollection(
  121. dynamicClient *dynamic.Client,
  122. opCache operationNotSupportedCache,
  123. gvr unversioned.GroupVersionResource,
  124. namespace string,
  125. ) (bool, error) {
  126. glog.V(5).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr)
  127. key := operationKey{op: operationDeleteCollection, gvr: gvr}
  128. if !opCache.isSupported(key) {
  129. glog.V(5).Infof("namespace controller - deleteCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr)
  130. return false, nil
  131. }
  132. apiResource := unversioned.APIResource{Name: gvr.Resource, Namespaced: true}
  133. err := dynamicClient.Resource(&apiResource, namespace).DeleteCollection(nil, &v1.ListOptions{})
  134. if err == nil {
  135. return true, nil
  136. }
  137. // this is strange, but we need to special case for both MethodNotSupported and NotFound errors
  138. // TODO: https://github.com/kubernetes/kubernetes/issues/22413
  139. // we have a resource returned in the discovery API that supports no top-level verbs:
  140. // /apis/extensions/v1beta1/namespaces/default/replicationcontrollers
  141. // when working with this resource type, we will get a literal not found error rather than expected method not supported
  142. // remember next time that this resource does not support delete collection...
  143. if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) {
  144. glog.V(5).Infof("namespace controller - deleteCollection not supported - namespace: %s, gvr: %v", namespace, gvr)
  145. opCache[key] = true
  146. return false, nil
  147. }
  148. glog.V(5).Infof("namespace controller - deleteCollection unexpected error - namespace: %s, gvr: %v, error: %v", namespace, gvr, err)
  149. return true, err
  150. }
  151. // listCollection will list the items in the specified namespace
  152. // it returns the following:
  153. // the list of items in the collection (if found)
  154. // a boolean if the operation is supported
  155. // an error if the operation is supported but could not be completed.
  156. func listCollection(
  157. dynamicClient *dynamic.Client,
  158. opCache operationNotSupportedCache,
  159. gvr unversioned.GroupVersionResource,
  160. namespace string,
  161. ) (*runtime.UnstructuredList, bool, error) {
  162. glog.V(5).Infof("namespace controller - listCollection - namespace: %s, gvr: %v", namespace, gvr)
  163. key := operationKey{op: operationList, gvr: gvr}
  164. if !opCache.isSupported(key) {
  165. glog.V(5).Infof("namespace controller - listCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr)
  166. return nil, false, nil
  167. }
  168. apiResource := unversioned.APIResource{Name: gvr.Resource, Namespaced: true}
  169. obj, err := dynamicClient.Resource(&apiResource, namespace).List(&v1.ListOptions{})
  170. if err == nil {
  171. unstructuredList, ok := obj.(*runtime.UnstructuredList)
  172. if !ok {
  173. return nil, false, fmt.Errorf("resource: %s, expected *runtime.UnstructuredList, got %#v", apiResource.Name, obj)
  174. }
  175. return unstructuredList, true, nil
  176. }
  177. // this is strange, but we need to special case for both MethodNotSupported and NotFound errors
  178. // TODO: https://github.com/kubernetes/kubernetes/issues/22413
  179. // we have a resource returned in the discovery API that supports no top-level verbs:
  180. // /apis/extensions/v1beta1/namespaces/default/replicationcontrollers
  181. // when working with this resource type, we will get a literal not found error rather than expected method not supported
  182. // remember next time that this resource does not support delete collection...
  183. if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) {
  184. glog.V(5).Infof("namespace controller - listCollection not supported - namespace: %s, gvr: %v", namespace, gvr)
  185. opCache[key] = true
  186. return nil, false, nil
  187. }
  188. return nil, true, err
  189. }
  190. // deleteEachItem is a helper function that will list the collection of resources and delete each item 1 by 1.
  191. func deleteEachItem(
  192. dynamicClient *dynamic.Client,
  193. opCache operationNotSupportedCache,
  194. gvr unversioned.GroupVersionResource,
  195. namespace string,
  196. ) error {
  197. glog.V(5).Infof("namespace controller - deleteEachItem - namespace: %s, gvr: %v", namespace, gvr)
  198. unstructuredList, listSupported, err := listCollection(dynamicClient, opCache, gvr, namespace)
  199. if err != nil {
  200. return err
  201. }
  202. if !listSupported {
  203. return nil
  204. }
  205. apiResource := unversioned.APIResource{Name: gvr.Resource, Namespaced: true}
  206. for _, item := range unstructuredList.Items {
  207. if err = dynamicClient.Resource(&apiResource, namespace).Delete(item.GetName(), nil); err != nil && !errors.IsNotFound(err) && !errors.IsMethodNotSupported(err) {
  208. return err
  209. }
  210. }
  211. return nil
  212. }
  213. // deleteAllContentForGroupVersionResource will use the dynamic client to delete each resource identified in gvr.
  214. // It returns an estimate of the time remaining before the remaining resources are deleted.
  215. // If estimate > 0, not all resources are guaranteed to be gone.
  216. func deleteAllContentForGroupVersionResource(
  217. kubeClient clientset.Interface,
  218. clientPool dynamic.ClientPool,
  219. opCache operationNotSupportedCache,
  220. gvr unversioned.GroupVersionResource,
  221. namespace string,
  222. namespaceDeletedAt unversioned.Time,
  223. ) (int64, error) {
  224. glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - namespace: %s, gvr: %v", namespace, gvr)
  225. // estimate how long it will take for the resource to be deleted (needed for objects that support graceful delete)
  226. estimate, err := estimateGracefulTermination(kubeClient, gvr, namespace, namespaceDeletedAt)
  227. if err != nil {
  228. glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to estimate - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
  229. return estimate, err
  230. }
  231. glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v", namespace, gvr, estimate)
  232. // get a client for this group version...
  233. dynamicClient, err := clientPool.ClientForGroupVersion(gvr.GroupVersion())
  234. if err != nil {
  235. glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to get client - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
  236. return estimate, err
  237. }
  238. // first try to delete the entire collection
  239. deleteCollectionSupported, err := deleteCollection(dynamicClient, opCache, gvr, namespace)
  240. if err != nil {
  241. return estimate, err
  242. }
  243. // delete collection was not supported, so we list and delete each item...
  244. if !deleteCollectionSupported {
  245. err = deleteEachItem(dynamicClient, opCache, gvr, namespace)
  246. if err != nil {
  247. return estimate, err
  248. }
  249. }
  250. // verify there are no more remaining items
  251. // it is not an error condition for there to be remaining items if local estimate is non-zero
  252. glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace: %s, gvr: %v", namespace, gvr)
  253. unstructuredList, listSupported, err := listCollection(dynamicClient, opCache, gvr, namespace)
  254. if err != nil {
  255. glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
  256. return estimate, err
  257. }
  258. if !listSupported {
  259. return estimate, nil
  260. }
  261. glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - items remaining - namespace: %s, gvr: %v, items: %v", namespace, gvr, len(unstructuredList.Items))
  262. if len(unstructuredList.Items) != 0 && estimate == int64(0) {
  263. return estimate, fmt.Errorf("unexpected items still remain in namespace: %s for gvr: %v", namespace, gvr)
  264. }
  265. return estimate, nil
  266. }
  267. // deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources.
  268. // It returns an estimate of the time remaining before the remaining resources are deleted.
  269. // If estimate > 0, not all resources are guaranteed to be gone.
  270. func deleteAllContent(
  271. kubeClient clientset.Interface,
  272. clientPool dynamic.ClientPool,
  273. opCache operationNotSupportedCache,
  274. groupVersionResources []unversioned.GroupVersionResource,
  275. namespace string,
  276. namespaceDeletedAt unversioned.Time,
  277. ) (int64, error) {
  278. estimate := int64(0)
  279. glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, gvrs: %v", namespace, groupVersionResources)
  280. // iterate over each group version, and attempt to delete all of its resources
  281. for _, gvr := range groupVersionResources {
  282. gvrEstimate, err := deleteAllContentForGroupVersionResource(kubeClient, clientPool, opCache, gvr, namespace, namespaceDeletedAt)
  283. if err != nil {
  284. return estimate, err
  285. }
  286. if gvrEstimate > estimate {
  287. estimate = gvrEstimate
  288. }
  289. }
  290. glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, estimate: %v", namespace, estimate)
  291. return estimate, nil
  292. }
  293. // syncNamespace orchestrates deletion of a Namespace and its associated content.
  294. func syncNamespace(
  295. kubeClient clientset.Interface,
  296. clientPool dynamic.ClientPool,
  297. opCache operationNotSupportedCache,
  298. groupVersionResources []unversioned.GroupVersionResource,
  299. namespace *api.Namespace,
  300. finalizerToken api.FinalizerName,
  301. ) error {
  302. if namespace.DeletionTimestamp == nil {
  303. return nil
  304. }
  305. // multiple controllers may edit a namespace during termination
  306. // first get the latest state of the namespace before proceeding
  307. // if the namespace was deleted already, don't do anything
  308. namespace, err := kubeClient.Core().Namespaces().Get(namespace.Name)
  309. if err != nil {
  310. if errors.IsNotFound(err) {
  311. return nil
  312. }
  313. return err
  314. }
  315. glog.V(5).Infof("namespace controller - syncNamespace - namespace: %s, finalizerToken: %s", namespace.Name, finalizerToken)
  316. // ensure that the status is up to date on the namespace
  317. // if we get a not found error, we assume the namespace is truly gone
  318. namespace, err = retryOnConflictError(kubeClient, namespace, updateNamespaceStatusFunc)
  319. if err != nil {
  320. if errors.IsNotFound(err) {
  321. return nil
  322. }
  323. return err
  324. }
  325. // if the namespace is already finalized, delete it
  326. if finalized(namespace) {
  327. err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil)
  328. if err != nil && !errors.IsNotFound(err) {
  329. return err
  330. }
  331. return nil
  332. }
  333. // there may still be content for us to remove
  334. estimate, err := deleteAllContent(kubeClient, clientPool, opCache, groupVersionResources, namespace.Name, *namespace.DeletionTimestamp)
  335. if err != nil {
  336. return err
  337. }
  338. if estimate > 0 {
  339. return &contentRemainingError{estimate}
  340. }
  341. // we have removed content, so mark it finalized by us
  342. result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc(finalizerToken))
  343. if err != nil {
  344. // in normal practice, this should not be possible, but if a deployment is running
  345. // two controllers to do namespace deletion that share a common finalizer token it's
  346. // possible that a not found could occur since the other controller would have finished the delete.
  347. if errors.IsNotFound(err) {
  348. return nil
  349. }
  350. return err
  351. }
  352. // now check if all finalizers have reported that we delete now
  353. if finalized(result) {
  354. err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil)
  355. if err != nil && !errors.IsNotFound(err) {
  356. return err
  357. }
  358. }
  359. return nil
  360. }
  361. // estimateGrracefulTermination will estimate the graceful termination required for the specific entity in the namespace
  362. func estimateGracefulTermination(kubeClient clientset.Interface, groupVersionResource unversioned.GroupVersionResource, ns string, namespaceDeletedAt unversioned.Time) (int64, error) {
  363. groupResource := groupVersionResource.GroupResource()
  364. glog.V(5).Infof("namespace controller - estimateGracefulTermination - group %s, resource: %s", groupResource.Group, groupResource.Resource)
  365. estimate := int64(0)
  366. var err error
  367. switch groupResource {
  368. case unversioned.GroupResource{Group: "", Resource: "pods"}:
  369. estimate, err = estimateGracefulTerminationForPods(kubeClient, ns)
  370. }
  371. if err != nil {
  372. return estimate, err
  373. }
  374. // determine if the estimate is greater than the deletion timestamp
  375. duration := time.Since(namespaceDeletedAt.Time)
  376. allowedEstimate := time.Duration(estimate) * time.Second
  377. if duration >= allowedEstimate {
  378. estimate = int64(0)
  379. }
  380. return estimate, nil
  381. }
  382. // estimateGracefulTerminationForPods determines the graceful termination period for pods in the namespace
  383. func estimateGracefulTerminationForPods(kubeClient clientset.Interface, ns string) (int64, error) {
  384. glog.V(5).Infof("namespace controller - estimateGracefulTerminationForPods - namespace %s", ns)
  385. estimate := int64(0)
  386. items, err := kubeClient.Core().Pods(ns).List(api.ListOptions{})
  387. if err != nil {
  388. return estimate, err
  389. }
  390. for i := range items.Items {
  391. // filter out terminal pods
  392. phase := items.Items[i].Status.Phase
  393. if api.PodSucceeded == phase || api.PodFailed == phase {
  394. continue
  395. }
  396. if items.Items[i].Spec.TerminationGracePeriodSeconds != nil {
  397. grace := *items.Items[i].Spec.TerminationGracePeriodSeconds
  398. if grace > estimate {
  399. estimate = grace
  400. }
  401. }
  402. }
  403. return estimate, nil
  404. }