stop.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package kubectl
  14. import (
  15. "fmt"
  16. "strings"
  17. "time"
  18. "k8s.io/kubernetes/pkg/api"
  19. "k8s.io/kubernetes/pkg/api/errors"
  20. "k8s.io/kubernetes/pkg/api/meta"
  21. "k8s.io/kubernetes/pkg/api/unversioned"
  22. "k8s.io/kubernetes/pkg/apis/apps"
  23. "k8s.io/kubernetes/pkg/apis/batch"
  24. "k8s.io/kubernetes/pkg/apis/extensions"
  25. client "k8s.io/kubernetes/pkg/client/unversioned"
  26. deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
  27. "k8s.io/kubernetes/pkg/labels"
  28. "k8s.io/kubernetes/pkg/util"
  29. utilerrors "k8s.io/kubernetes/pkg/util/errors"
  30. "k8s.io/kubernetes/pkg/util/uuid"
  31. "k8s.io/kubernetes/pkg/util/wait"
  32. )
  33. const (
  34. Interval = time.Second * 1
  35. Timeout = time.Minute * 5
  36. )
  37. // A Reaper handles terminating an object as gracefully as possible.
  38. // timeout is how long we'll wait for the termination to be successful
  39. // gracePeriod is time given to an API object for it to delete itself cleanly,
  40. // e.g., pod shutdown. It may or may not be supported by the API object.
  41. type Reaper interface {
  42. Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error
  43. }
  44. type NoSuchReaperError struct {
  45. kind unversioned.GroupKind
  46. }
  47. func (n *NoSuchReaperError) Error() string {
  48. return fmt.Sprintf("no reaper has been implemented for %v", n.kind)
  49. }
  50. func IsNoSuchReaperError(err error) bool {
  51. _, ok := err.(*NoSuchReaperError)
  52. return ok
  53. }
  54. func ReaperFor(kind unversioned.GroupKind, c client.Interface) (Reaper, error) {
  55. switch kind {
  56. case api.Kind("ReplicationController"):
  57. return &ReplicationControllerReaper{c, Interval, Timeout}, nil
  58. case extensions.Kind("ReplicaSet"):
  59. return &ReplicaSetReaper{c, Interval, Timeout}, nil
  60. case extensions.Kind("DaemonSet"):
  61. return &DaemonSetReaper{c, Interval, Timeout}, nil
  62. case api.Kind("Pod"):
  63. return &PodReaper{c}, nil
  64. case api.Kind("Service"):
  65. return &ServiceReaper{c}, nil
  66. case extensions.Kind("Job"), batch.Kind("Job"):
  67. return &JobReaper{c, Interval, Timeout}, nil
  68. case apps.Kind("PetSet"):
  69. return &PetSetReaper{c, Interval, Timeout}, nil
  70. case extensions.Kind("Deployment"):
  71. return &DeploymentReaper{c, Interval, Timeout}, nil
  72. }
  73. return nil, &NoSuchReaperError{kind}
  74. }
  75. func ReaperForReplicationController(c client.Interface, timeout time.Duration) (Reaper, error) {
  76. return &ReplicationControllerReaper{c, Interval, timeout}, nil
  77. }
  78. type ReplicationControllerReaper struct {
  79. client.Interface
  80. pollInterval, timeout time.Duration
  81. }
  82. type ReplicaSetReaper struct {
  83. client.Interface
  84. pollInterval, timeout time.Duration
  85. }
  86. type DaemonSetReaper struct {
  87. client.Interface
  88. pollInterval, timeout time.Duration
  89. }
  90. type JobReaper struct {
  91. client.Interface
  92. pollInterval, timeout time.Duration
  93. }
  94. type DeploymentReaper struct {
  95. client.Interface
  96. pollInterval, timeout time.Duration
  97. }
  98. type PodReaper struct {
  99. client.Interface
  100. }
  101. type ServiceReaper struct {
  102. client.Interface
  103. }
  104. type PetSetReaper struct {
  105. client.Interface
  106. pollInterval, timeout time.Duration
  107. }
  108. type objInterface interface {
  109. Delete(name string) error
  110. Get(name string) (meta.Object, error)
  111. }
  112. // getOverlappingControllers finds rcs that this controller overlaps, as well as rcs overlapping this controller.
  113. func getOverlappingControllers(c client.ReplicationControllerInterface, rc *api.ReplicationController) ([]api.ReplicationController, error) {
  114. rcs, err := c.List(api.ListOptions{})
  115. if err != nil {
  116. return nil, fmt.Errorf("error getting replication controllers: %v", err)
  117. }
  118. var matchingRCs []api.ReplicationController
  119. rcLabels := labels.Set(rc.Spec.Selector)
  120. for _, controller := range rcs.Items {
  121. newRCLabels := labels.Set(controller.Spec.Selector)
  122. if labels.SelectorFromSet(newRCLabels).Matches(rcLabels) || labels.SelectorFromSet(rcLabels).Matches(newRCLabels) {
  123. matchingRCs = append(matchingRCs, controller)
  124. }
  125. }
  126. return matchingRCs, nil
  127. }
  128. func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
  129. rc := reaper.ReplicationControllers(namespace)
  130. scaler, err := ScalerFor(api.Kind("ReplicationController"), *reaper)
  131. if err != nil {
  132. return err
  133. }
  134. ctrl, err := rc.Get(name)
  135. if err != nil {
  136. return err
  137. }
  138. if timeout == 0 {
  139. timeout = Timeout + time.Duration(10*ctrl.Spec.Replicas)*time.Second
  140. }
  141. // The rc manager will try and detect all matching rcs for a pod's labels,
  142. // and only sync the oldest one. This means if we have a pod with labels
  143. // [(k1: v1), (k2: v2)] and two rcs: rc1 with selector [(k1=v1)], and rc2 with selector [(k1=v1),(k2=v2)],
  144. // the rc manager will sync the older of the two rcs.
  145. //
  146. // If there are rcs with a superset of labels, eg:
  147. // deleting: (k1=v1), superset: (k2=v2, k1=v1)
  148. // - It isn't safe to delete the rc because there could be a pod with labels
  149. // (k1=v1) that isn't managed by the superset rc. We can't scale it down
  150. // either, because there could be a pod (k2=v2, k1=v1) that it deletes
  151. // causing a fight with the superset rc.
  152. // If there are rcs with a subset of labels, eg:
  153. // deleting: (k2=v2, k1=v1), subset: (k1=v1), superset: (k2=v2, k1=v1, k3=v3)
  154. // - Even if it's safe to delete this rc without a scale down because all it's pods
  155. // are being controlled by the subset rc the code returns an error.
  156. // In theory, creating overlapping controllers is user error, so the loop below
  157. // tries to account for this logic only in the common case, where we end up
  158. // with multiple rcs that have an exact match on selectors.
  159. overlappingCtrls, err := getOverlappingControllers(rc, ctrl)
  160. if err != nil {
  161. return fmt.Errorf("error getting replication controllers: %v", err)
  162. }
  163. exactMatchRCs := []api.ReplicationController{}
  164. overlapRCs := []string{}
  165. for _, overlappingRC := range overlappingCtrls {
  166. if len(overlappingRC.Spec.Selector) == len(ctrl.Spec.Selector) {
  167. exactMatchRCs = append(exactMatchRCs, overlappingRC)
  168. } else {
  169. overlapRCs = append(overlapRCs, overlappingRC.Name)
  170. }
  171. }
  172. if len(overlapRCs) > 0 {
  173. return fmt.Errorf(
  174. "Detected overlapping controllers for rc %v: %v, please manage deletion individually with --cascade=false.",
  175. ctrl.Name, strings.Join(overlapRCs, ","))
  176. }
  177. if len(exactMatchRCs) == 1 {
  178. // No overlapping controllers.
  179. retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
  180. waitForReplicas := NewRetryParams(reaper.pollInterval, timeout)
  181. if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil {
  182. return err
  183. }
  184. }
  185. falseVar := false
  186. deleteOptions := &api.DeleteOptions{OrphanDependents: &falseVar}
  187. return rc.Delete(name, deleteOptions)
  188. }
  189. // TODO(madhusudancs): Implement it when controllerRef is implemented - https://github.com/kubernetes/kubernetes/issues/2210
  190. // getOverlappingReplicaSets finds ReplicaSets that this ReplicaSet overlaps, as well as ReplicaSets overlapping this ReplicaSet.
  191. func getOverlappingReplicaSets(c client.ReplicaSetInterface, rs *extensions.ReplicaSet) ([]extensions.ReplicaSet, []extensions.ReplicaSet, error) {
  192. var overlappingRSs, exactMatchRSs []extensions.ReplicaSet
  193. return overlappingRSs, exactMatchRSs, nil
  194. }
  195. func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
  196. rsc := reaper.Extensions().ReplicaSets(namespace)
  197. scaler, err := ScalerFor(extensions.Kind("ReplicaSet"), *reaper)
  198. if err != nil {
  199. return err
  200. }
  201. rs, err := rsc.Get(name)
  202. if err != nil {
  203. return err
  204. }
  205. if timeout == 0 {
  206. timeout = Timeout + time.Duration(10*rs.Spec.Replicas)*time.Second
  207. }
  208. // The ReplicaSet controller will try and detect all matching ReplicaSets
  209. // for a pod's labels, and only sync the oldest one. This means if we have
  210. // a pod with labels [(k1: v1), (k2: v2)] and two ReplicaSets: rs1 with
  211. // selector [(k1=v1)], and rs2 with selector [(k1=v1),(k2=v2)], the
  212. // ReplicaSet controller will sync the older of the two ReplicaSets.
  213. //
  214. // If there are ReplicaSets with a superset of labels, eg:
  215. // deleting: (k1=v1), superset: (k2=v2, k1=v1)
  216. // - It isn't safe to delete the ReplicaSet because there could be a pod
  217. // with labels (k1=v1) that isn't managed by the superset ReplicaSet.
  218. // We can't scale it down either, because there could be a pod
  219. // (k2=v2, k1=v1) that it deletes causing a fight with the superset
  220. // ReplicaSet.
  221. // If there are ReplicaSets with a subset of labels, eg:
  222. // deleting: (k2=v2, k1=v1), subset: (k1=v1), superset: (k2=v2, k1=v1, k3=v3)
  223. // - Even if it's safe to delete this ReplicaSet without a scale down because
  224. // all it's pods are being controlled by the subset ReplicaSet the code
  225. // returns an error.
  226. // In theory, creating overlapping ReplicaSets is user error, so the loop below
  227. // tries to account for this logic only in the common case, where we end up
  228. // with multiple ReplicaSets that have an exact match on selectors.
  229. // TODO(madhusudancs): Re-evaluate again when controllerRef is implemented -
  230. // https://github.com/kubernetes/kubernetes/issues/2210
  231. overlappingRSs, exactMatchRSs, err := getOverlappingReplicaSets(rsc, rs)
  232. if err != nil {
  233. return fmt.Errorf("error getting ReplicaSets: %v", err)
  234. }
  235. if len(overlappingRSs) > 0 {
  236. var names []string
  237. for _, overlappingRS := range overlappingRSs {
  238. names = append(names, overlappingRS.Name)
  239. }
  240. return fmt.Errorf(
  241. "Detected overlapping ReplicaSets for ReplicaSet %v: %v, please manage deletion individually with --cascade=false.",
  242. rs.Name, strings.Join(names, ","))
  243. }
  244. if len(exactMatchRSs) == 0 {
  245. // No overlapping ReplicaSets.
  246. retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
  247. waitForReplicas := NewRetryParams(reaper.pollInterval, timeout)
  248. if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil {
  249. return err
  250. }
  251. }
  252. falseVar := false
  253. deleteOptions := &api.DeleteOptions{OrphanDependents: &falseVar}
  254. return rsc.Delete(name, deleteOptions)
  255. }
  256. func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
  257. ds, err := reaper.Extensions().DaemonSets(namespace).Get(name)
  258. if err != nil {
  259. return err
  260. }
  261. // We set the nodeSelector to a random label. This label is nearly guaranteed
  262. // to not be set on any node so the DameonSetController will start deleting
  263. // daemon pods. Once it's done deleting the daemon pods, it's safe to delete
  264. // the DaemonSet.
  265. ds.Spec.Template.Spec.NodeSelector = map[string]string{
  266. string(uuid.NewUUID()): string(uuid.NewUUID()),
  267. }
  268. // force update to avoid version conflict
  269. ds.ResourceVersion = ""
  270. if ds, err = reaper.Extensions().DaemonSets(namespace).Update(ds); err != nil {
  271. return err
  272. }
  273. // Wait for the daemon set controller to kill all the daemon pods.
  274. if err := wait.Poll(reaper.pollInterval, reaper.timeout, func() (bool, error) {
  275. updatedDS, err := reaper.Extensions().DaemonSets(namespace).Get(name)
  276. if err != nil {
  277. return false, nil
  278. }
  279. return updatedDS.Status.CurrentNumberScheduled+updatedDS.Status.NumberMisscheduled == 0, nil
  280. }); err != nil {
  281. return err
  282. }
  283. return reaper.Extensions().DaemonSets(namespace).Delete(name)
  284. }
  285. func (reaper *PetSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
  286. petsets := reaper.Apps().PetSets(namespace)
  287. scaler, err := ScalerFor(apps.Kind("PetSet"), *reaper)
  288. if err != nil {
  289. return err
  290. }
  291. ps, err := petsets.Get(name)
  292. if err != nil {
  293. return err
  294. }
  295. if timeout == 0 {
  296. numPets := ps.Spec.Replicas
  297. timeout = Timeout + time.Duration(10*numPets)*time.Second
  298. }
  299. retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
  300. waitForPetSet := NewRetryParams(reaper.pollInterval, reaper.timeout)
  301. if err = scaler.Scale(namespace, name, 0, nil, retry, waitForPetSet); err != nil {
  302. return err
  303. }
  304. // TODO: This shouldn't be needed, see corresponding TODO in PetSetHasDesiredPets.
  305. // PetSet should track generation number.
  306. pods := reaper.Pods(namespace)
  307. selector, _ := unversioned.LabelSelectorAsSelector(ps.Spec.Selector)
  308. options := api.ListOptions{LabelSelector: selector}
  309. podList, err := pods.List(options)
  310. if err != nil {
  311. return err
  312. }
  313. errList := []error{}
  314. for _, pod := range podList.Items {
  315. if err := pods.Delete(pod.Name, gracePeriod); err != nil {
  316. if !errors.IsNotFound(err) {
  317. errList = append(errList, err)
  318. }
  319. }
  320. }
  321. if len(errList) > 0 {
  322. return utilerrors.NewAggregate(errList)
  323. }
  324. // TODO: Cleanup volumes? We don't want to accidentally delete volumes from
  325. // stop, so just leave this up to the the petset.
  326. return petsets.Delete(name, nil)
  327. }
  328. func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
  329. jobs := reaper.Batch().Jobs(namespace)
  330. pods := reaper.Pods(namespace)
  331. scaler, err := ScalerFor(batch.Kind("Job"), *reaper)
  332. if err != nil {
  333. return err
  334. }
  335. job, err := jobs.Get(name)
  336. if err != nil {
  337. return err
  338. }
  339. if timeout == 0 {
  340. // we will never have more active pods than job.Spec.Parallelism
  341. parallelism := *job.Spec.Parallelism
  342. timeout = Timeout + time.Duration(10*parallelism)*time.Second
  343. }
  344. // TODO: handle overlapping jobs
  345. retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
  346. waitForJobs := NewRetryParams(reaper.pollInterval, timeout)
  347. if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil {
  348. return err
  349. }
  350. // at this point only dead pods are left, that should be removed
  351. selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector)
  352. options := api.ListOptions{LabelSelector: selector}
  353. podList, err := pods.List(options)
  354. if err != nil {
  355. return err
  356. }
  357. errList := []error{}
  358. for _, pod := range podList.Items {
  359. if err := pods.Delete(pod.Name, gracePeriod); err != nil {
  360. // ignores the error when the pod isn't found
  361. if !errors.IsNotFound(err) {
  362. errList = append(errList, err)
  363. }
  364. }
  365. }
  366. if len(errList) > 0 {
  367. return utilerrors.NewAggregate(errList)
  368. }
  369. // once we have all the pods removed we can safely remove the job itself
  370. return jobs.Delete(name, nil)
  371. }
  372. func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
  373. deployments := reaper.Extensions().Deployments(namespace)
  374. replicaSets := reaper.Extensions().ReplicaSets(namespace)
  375. rsReaper, _ := ReaperFor(extensions.Kind("ReplicaSet"), reaper)
  376. deployment, err := reaper.updateDeploymentWithRetries(namespace, name, func(d *extensions.Deployment) {
  377. // set deployment's history and scale to 0
  378. // TODO replace with patch when available: https://github.com/kubernetes/kubernetes/issues/20527
  379. d.Spec.RevisionHistoryLimit = util.Int32Ptr(0)
  380. d.Spec.Replicas = 0
  381. d.Spec.Paused = true
  382. })
  383. if err != nil {
  384. return err
  385. }
  386. // Use observedGeneration to determine if the deployment controller noticed the pause.
  387. if err := deploymentutil.WaitForObservedDeployment(func() (*extensions.Deployment, error) {
  388. return deployments.Get(name)
  389. }, deployment.Generation, 1*time.Second, 1*time.Minute); err != nil {
  390. return err
  391. }
  392. // Stop all replica sets.
  393. selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
  394. if err != nil {
  395. return err
  396. }
  397. options := api.ListOptions{LabelSelector: selector}
  398. rsList, err := replicaSets.List(options)
  399. if err != nil {
  400. return err
  401. }
  402. errList := []error{}
  403. for _, rc := range rsList.Items {
  404. if err := rsReaper.Stop(rc.Namespace, rc.Name, timeout, gracePeriod); err != nil {
  405. scaleGetErr, ok := err.(ScaleError)
  406. if errors.IsNotFound(err) || (ok && errors.IsNotFound(scaleGetErr.ActualError)) {
  407. continue
  408. }
  409. errList = append(errList, err)
  410. }
  411. }
  412. if len(errList) > 0 {
  413. return utilerrors.NewAggregate(errList)
  414. }
  415. // Delete deployment at the end.
  416. // Note: We delete deployment at the end so that if removing RSs fails, we at least have the deployment to retry.
  417. return deployments.Delete(name, nil)
  418. }
  419. type updateDeploymentFunc func(d *extensions.Deployment)
  420. func (reaper *DeploymentReaper) updateDeploymentWithRetries(namespace, name string, applyUpdate updateDeploymentFunc) (deployment *extensions.Deployment, err error) {
  421. deployments := reaper.Extensions().Deployments(namespace)
  422. err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
  423. if deployment, err = deployments.Get(name); err != nil {
  424. return false, err
  425. }
  426. // Apply the update, then attempt to push it to the apiserver.
  427. applyUpdate(deployment)
  428. if deployment, err = deployments.Update(deployment); err == nil {
  429. return true, nil
  430. }
  431. // Retry only on update conflict.
  432. if errors.IsConflict(err) {
  433. return false, nil
  434. }
  435. return false, err
  436. })
  437. return deployment, err
  438. }
  439. func (reaper *PodReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
  440. pods := reaper.Pods(namespace)
  441. _, err := pods.Get(name)
  442. if err != nil {
  443. return err
  444. }
  445. return pods.Delete(name, gracePeriod)
  446. }
  447. func (reaper *ServiceReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
  448. services := reaper.Services(namespace)
  449. _, err := services.Get(name)
  450. if err != nil {
  451. return err
  452. }
  453. return services.Delete(name)
  454. }