drain.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cmd
  14. import (
  15. "errors"
  16. "fmt"
  17. "io"
  18. "reflect"
  19. "strings"
  20. "github.com/renstrom/dedent"
  21. "github.com/spf13/cobra"
  22. "k8s.io/kubernetes/pkg/api"
  23. "k8s.io/kubernetes/pkg/api/meta"
  24. client "k8s.io/kubernetes/pkg/client/unversioned"
  25. "k8s.io/kubernetes/pkg/fields"
  26. cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
  27. "k8s.io/kubernetes/pkg/kubectl/resource"
  28. "k8s.io/kubernetes/pkg/kubelet/types"
  29. "k8s.io/kubernetes/pkg/runtime"
  30. )
  31. type DrainOptions struct {
  32. client *client.Client
  33. factory *cmdutil.Factory
  34. Force bool
  35. GracePeriodSeconds int
  36. IgnoreDaemonsets bool
  37. DeleteLocalData bool
  38. mapper meta.RESTMapper
  39. nodeInfo *resource.Info
  40. out io.Writer
  41. typer runtime.ObjectTyper
  42. }
  43. // Takes a pod and returns a bool indicating whether or not to operate on the
  44. // pod, an optional warning message, and an optional fatal error.
  45. type podFilter func(api.Pod) (include bool, w *warning, f *fatal)
  46. type warning struct {
  47. string
  48. }
  49. type fatal struct {
  50. string
  51. }
  52. const (
  53. kDaemonsetFatal = "DaemonSet-managed pods (use --ignore-daemonsets to ignore)"
  54. kDaemonsetWarning = "Ignoring DaemonSet-managed pods"
  55. kLocalStorageFatal = "pods with local storage (use --delete-local-data to override)"
  56. kLocalStorageWarning = "Deleting pods with local storage"
  57. kUnmanagedFatal = "pods not managed by ReplicationController, ReplicaSet, Job, or DaemonSet (use --force to override)"
  58. kUnmanagedWarning = "Deleting pods not managed by ReplicationController, ReplicaSet, Job, or DaemonSet"
  59. )
  60. var (
  61. cordon_long = dedent.Dedent(`
  62. Mark node as unschedulable.
  63. `)
  64. cordon_example = dedent.Dedent(`
  65. # Mark node "foo" as unschedulable.
  66. kubectl cordon foo
  67. `)
  68. )
  69. func NewCmdCordon(f *cmdutil.Factory, out io.Writer) *cobra.Command {
  70. options := &DrainOptions{factory: f, out: out}
  71. cmd := &cobra.Command{
  72. Use: "cordon NODE",
  73. Short: "Mark node as unschedulable",
  74. Long: cordon_long,
  75. Example: cordon_example,
  76. Run: func(cmd *cobra.Command, args []string) {
  77. cmdutil.CheckErr(options.SetupDrain(cmd, args))
  78. cmdutil.CheckErr(options.RunCordonOrUncordon(true))
  79. },
  80. }
  81. return cmd
  82. }
  83. var (
  84. uncordon_long = dedent.Dedent(`
  85. Mark node as schedulable.
  86. `)
  87. uncordon_example = dedent.Dedent(`
  88. # Mark node "foo" as schedulable.
  89. $ kubectl uncordon foo
  90. `)
  91. )
  92. func NewCmdUncordon(f *cmdutil.Factory, out io.Writer) *cobra.Command {
  93. options := &DrainOptions{factory: f, out: out}
  94. cmd := &cobra.Command{
  95. Use: "uncordon NODE",
  96. Short: "Mark node as schedulable",
  97. Long: uncordon_long,
  98. Example: uncordon_example,
  99. Run: func(cmd *cobra.Command, args []string) {
  100. cmdutil.CheckErr(options.SetupDrain(cmd, args))
  101. cmdutil.CheckErr(options.RunCordonOrUncordon(false))
  102. },
  103. }
  104. return cmd
  105. }
  106. var (
  107. drain_long = dedent.Dedent(`
  108. Drain node in preparation for maintenance.
  109. The given node will be marked unschedulable to prevent new pods from arriving.
  110. Then drain deletes all pods except mirror pods (which cannot be deleted through
  111. the API server). If there are DaemonSet-managed pods, drain will not proceed
  112. without --ignore-daemonsets, and regardless it will not delete any
  113. DaemonSet-managed pods, because those pods would be immediately replaced by the
  114. DaemonSet controller, which ignores unschedulable markings. If there are any
  115. pods that are neither mirror pods nor managed--by ReplicationController,
  116. ReplicaSet, DaemonSet or Job--, then drain will not delete any pods unless you
  117. use --force.
  118. When you are ready to put the node back into service, use kubectl uncordon, which
  119. will make the node schedulable again.
  120. ![Workflow](http://kubernetes.io/images/docs/kubectl_drain.svg)
  121. `)
  122. drain_example = dedent.Dedent(`
  123. # Drain node "foo", even if there are pods not managed by a ReplicationController, ReplicaSet, Job, or DaemonSet on it.
  124. $ kubectl drain foo --force
  125. # As above, but abort if there are pods not managed by a ReplicationController, ReplicaSet, Job, or DaemonSet, and use a grace period of 15 minutes.
  126. $ kubectl drain foo --grace-period=900
  127. `)
  128. )
  129. func NewCmdDrain(f *cmdutil.Factory, out io.Writer) *cobra.Command {
  130. options := &DrainOptions{factory: f, out: out}
  131. cmd := &cobra.Command{
  132. Use: "drain NODE",
  133. Short: "Drain node in preparation for maintenance",
  134. Long: drain_long,
  135. Example: drain_example,
  136. Run: func(cmd *cobra.Command, args []string) {
  137. cmdutil.CheckErr(options.SetupDrain(cmd, args))
  138. cmdutil.CheckErr(options.RunDrain())
  139. },
  140. }
  141. cmd.Flags().BoolVar(&options.Force, "force", false, "Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, or DaemonSet.")
  142. cmd.Flags().BoolVar(&options.IgnoreDaemonsets, "ignore-daemonsets", false, "Ignore DaemonSet-managed pods.")
  143. cmd.Flags().BoolVar(&options.DeleteLocalData, "delete-local-data", false, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).")
  144. cmd.Flags().IntVar(&options.GracePeriodSeconds, "grace-period", -1, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.")
  145. return cmd
  146. }
  147. // SetupDrain populates some fields from the factory, grabs command line
  148. // arguments and looks up the node using Builder
  149. func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error {
  150. var err error
  151. if len(args) != 1 {
  152. return cmdutil.UsageError(cmd, fmt.Sprintf("USAGE: %s [flags]", cmd.Use))
  153. }
  154. if o.client, err = o.factory.Client(); err != nil {
  155. return err
  156. }
  157. o.mapper, o.typer = o.factory.Object(false)
  158. cmdNamespace, _, err := o.factory.DefaultNamespace()
  159. if err != nil {
  160. return err
  161. }
  162. r := o.factory.NewBuilder(cmdutil.GetIncludeThirdPartyAPIs(cmd)).
  163. NamespaceParam(cmdNamespace).DefaultNamespace().
  164. ResourceNames("node", args[0]).
  165. Do()
  166. if err = r.Err(); err != nil {
  167. return err
  168. }
  169. return r.Visit(func(info *resource.Info, err error) error {
  170. if err != nil {
  171. return err
  172. }
  173. o.nodeInfo = info
  174. return nil
  175. })
  176. }
  177. // RunDrain runs the 'drain' command
  178. func (o *DrainOptions) RunDrain() error {
  179. if err := o.RunCordonOrUncordon(true); err != nil {
  180. return err
  181. }
  182. pods, err := o.getPodsForDeletion()
  183. if err != nil {
  184. return err
  185. }
  186. if err = o.deletePods(pods); err != nil {
  187. return err
  188. }
  189. cmdutil.PrintSuccess(o.mapper, false, o.out, "node", o.nodeInfo.Name, "drained")
  190. return nil
  191. }
  192. func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{}, error) {
  193. switch sr.Reference.Kind {
  194. case "ReplicationController":
  195. return o.client.ReplicationControllers(sr.Reference.Namespace).Get(sr.Reference.Name)
  196. case "DaemonSet":
  197. return o.client.DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name)
  198. case "Job":
  199. return o.client.ExtensionsClient.Jobs(sr.Reference.Namespace).Get(sr.Reference.Name)
  200. case "ReplicaSet":
  201. return o.client.ExtensionsClient.ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name)
  202. }
  203. return nil, fmt.Errorf("Unknown controller kind %q", sr.Reference.Kind)
  204. }
  205. func (o *DrainOptions) getPodCreator(pod api.Pod) (*api.SerializedReference, error) {
  206. creatorRef, found := pod.ObjectMeta.Annotations[api.CreatedByAnnotation]
  207. if !found {
  208. return nil, nil
  209. }
  210. // Now verify that the specified creator actually exists.
  211. sr := &api.SerializedReference{}
  212. if err := runtime.DecodeInto(o.factory.Decoder(true), []byte(creatorRef), sr); err != nil {
  213. return nil, err
  214. }
  215. // We assume the only reason for an error is because the controller is
  216. // gone/missing, not for any other cause. TODO(mml): something more
  217. // sophisticated than this
  218. _, err := o.getController(sr)
  219. if err != nil {
  220. return nil, err
  221. }
  222. return sr, nil
  223. }
  224. func (o *DrainOptions) unreplicatedFilter(pod api.Pod) (bool, *warning, *fatal) {
  225. sr, err := o.getPodCreator(pod)
  226. if err != nil {
  227. return false, nil, &fatal{err.Error()}
  228. }
  229. if sr != nil {
  230. return true, nil, nil
  231. }
  232. if !o.Force {
  233. return false, nil, &fatal{kUnmanagedFatal}
  234. }
  235. return true, &warning{kUnmanagedWarning}, nil
  236. }
  237. func (o *DrainOptions) daemonsetFilter(pod api.Pod) (bool, *warning, *fatal) {
  238. // Note that we return false in all cases where the pod is DaemonSet managed,
  239. // regardless of flags. We never delete them, the only question is whether
  240. // their presence constitutes an error.
  241. sr, err := o.getPodCreator(pod)
  242. if err != nil {
  243. return false, nil, &fatal{err.Error()}
  244. }
  245. if sr == nil || sr.Reference.Kind != "DaemonSet" {
  246. return true, nil, nil
  247. }
  248. if _, err := o.client.DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name); err != nil {
  249. return false, nil, &fatal{err.Error()}
  250. }
  251. if !o.IgnoreDaemonsets {
  252. return false, nil, &fatal{kDaemonsetFatal}
  253. }
  254. return false, &warning{kDaemonsetWarning}, nil
  255. }
  256. func mirrorPodFilter(pod api.Pod) (bool, *warning, *fatal) {
  257. if _, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey]; found {
  258. return false, nil, nil
  259. }
  260. return true, nil, nil
  261. }
  262. func hasLocalStorage(pod api.Pod) bool {
  263. for _, volume := range pod.Spec.Volumes {
  264. if volume.EmptyDir != nil {
  265. return true
  266. }
  267. }
  268. return false
  269. }
  270. func (o *DrainOptions) localStorageFilter(pod api.Pod) (bool, *warning, *fatal) {
  271. if !hasLocalStorage(pod) {
  272. return true, nil, nil
  273. }
  274. if !o.DeleteLocalData {
  275. return false, nil, &fatal{kLocalStorageFatal}
  276. }
  277. return true, &warning{kLocalStorageWarning}, nil
  278. }
  279. // Map of status message to a list of pod names having that status.
  280. type podStatuses map[string][]string
  281. func (ps podStatuses) Message() string {
  282. msgs := []string{}
  283. for key, pods := range ps {
  284. msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", ")))
  285. }
  286. return strings.Join(msgs, "; ")
  287. }
  288. // getPodsForDeletion returns all the pods we're going to delete. If there are
  289. // any pods preventing us from deleting, we return that list in an error.
  290. func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) {
  291. podList, err := o.client.Pods(api.NamespaceAll).List(api.ListOptions{
  292. FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeInfo.Name})})
  293. if err != nil {
  294. return pods, err
  295. }
  296. ws := podStatuses{}
  297. fs := podStatuses{}
  298. for _, pod := range podList.Items {
  299. podOk := true
  300. for _, filt := range []podFilter{mirrorPodFilter, o.localStorageFilter, o.unreplicatedFilter, o.daemonsetFilter} {
  301. filterOk, w, f := filt(pod)
  302. podOk = podOk && filterOk
  303. if w != nil {
  304. ws[w.string] = append(ws[w.string], pod.Name)
  305. }
  306. if f != nil {
  307. fs[f.string] = append(fs[f.string], pod.Name)
  308. }
  309. }
  310. if podOk {
  311. pods = append(pods, pod)
  312. }
  313. }
  314. if len(fs) > 0 {
  315. return []api.Pod{}, errors.New(fs.Message())
  316. }
  317. if len(ws) > 0 {
  318. fmt.Fprintf(o.out, "WARNING: %s\n", ws.Message())
  319. }
  320. return pods, nil
  321. }
  322. // deletePods deletes the pods on the api server
  323. func (o *DrainOptions) deletePods(pods []api.Pod) error {
  324. deleteOptions := api.DeleteOptions{}
  325. if o.GracePeriodSeconds >= 0 {
  326. gracePeriodSeconds := int64(o.GracePeriodSeconds)
  327. deleteOptions.GracePeriodSeconds = &gracePeriodSeconds
  328. }
  329. for _, pod := range pods {
  330. err := o.client.Pods(pod.Namespace).Delete(pod.Name, &deleteOptions)
  331. if err != nil {
  332. return err
  333. }
  334. cmdutil.PrintSuccess(o.mapper, false, o.out, "pod", pod.Name, "deleted")
  335. }
  336. return nil
  337. }
  338. // RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
  339. // "Unschedulable" is passed as the first arg.
  340. func (o *DrainOptions) RunCordonOrUncordon(desired bool) error {
  341. cmdNamespace, _, err := o.factory.DefaultNamespace()
  342. if err != nil {
  343. return err
  344. }
  345. if o.nodeInfo.Mapping.GroupVersionKind.Kind == "Node" {
  346. unsched := reflect.ValueOf(o.nodeInfo.Object).Elem().FieldByName("Spec").FieldByName("Unschedulable")
  347. if unsched.Bool() == desired {
  348. cmdutil.PrintSuccess(o.mapper, false, o.out, o.nodeInfo.Mapping.Resource, o.nodeInfo.Name, already(desired))
  349. } else {
  350. helper := resource.NewHelper(o.client, o.nodeInfo.Mapping)
  351. unsched.SetBool(desired)
  352. _, err := helper.Replace(cmdNamespace, o.nodeInfo.Name, true, o.nodeInfo.Object)
  353. if err != nil {
  354. return err
  355. }
  356. cmdutil.PrintSuccess(o.mapper, false, o.out, o.nodeInfo.Mapping.Resource, o.nodeInfo.Name, changed(desired))
  357. }
  358. } else {
  359. cmdutil.PrintSuccess(o.mapper, false, o.out, o.nodeInfo.Mapping.Resource, o.nodeInfo.Name, "skipped")
  360. }
  361. return nil
  362. }
  363. // already() and changed() return suitable strings for {un,}cordoning
  364. func already(desired bool) string {
  365. if desired {
  366. return "already cordoned"
  367. }
  368. return "already uncordoned"
  369. }
  370. func changed(desired bool) string {
  371. if desired {
  372. return "cordoned"
  373. }
  374. return "uncordoned"
  375. }