controller_utils.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package node
  14. import (
  15. "fmt"
  16. "strings"
  17. "time"
  18. "k8s.io/kubernetes/pkg/api"
  19. "k8s.io/kubernetes/pkg/client/cache"
  20. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  21. "k8s.io/kubernetes/pkg/client/record"
  22. "k8s.io/kubernetes/pkg/cloudprovider"
  23. "k8s.io/kubernetes/pkg/fields"
  24. "k8s.io/kubernetes/pkg/kubelet/util/format"
  25. "k8s.io/kubernetes/pkg/types"
  26. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  27. "k8s.io/kubernetes/pkg/version"
  28. "github.com/golang/glog"
  29. )
  30. const (
  31. // Number of Nodes that needs to be in the cluster for it to be treated as "large"
  32. LargeClusterThreshold = 20
  33. )
  34. // cleanupOrphanedPods deletes pods that are bound to nodes that don't
  35. // exist.
  36. func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
  37. for _, pod := range pods {
  38. if pod.Spec.NodeName == "" {
  39. continue
  40. }
  41. if _, exists, _ := nodeStore.GetByKey(pod.Spec.NodeName); exists {
  42. continue
  43. }
  44. if err := forcefulDeletePodFunc(pod); err != nil {
  45. utilruntime.HandleError(err)
  46. }
  47. }
  48. }
  49. // deletePods will delete all pods from master running on given node, and return true
  50. // if any pods were deleted, or were found pending deletion.
  51. func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore cache.StoreToDaemonSetLister) (bool, error) {
  52. remaining := false
  53. selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
  54. options := api.ListOptions{FieldSelector: selector}
  55. pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
  56. if err != nil {
  57. return remaining, err
  58. }
  59. if len(pods.Items) > 0 {
  60. recordNodeEvent(recorder, nodeName, nodeUID, api.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
  61. }
  62. for _, pod := range pods.Items {
  63. // Defensive check, also needed for tests.
  64. if pod.Spec.NodeName != nodeName {
  65. continue
  66. }
  67. // if the pod has already been marked for deletion, we still return true that there are remaining pods.
  68. if pod.DeletionGracePeriodSeconds != nil {
  69. remaining = true
  70. continue
  71. }
  72. // if the pod is managed by a daemonset, ignore it
  73. _, err := daemonStore.GetPodDaemonSets(&pod)
  74. if err == nil { // No error means at least one daemonset was found
  75. continue
  76. }
  77. glog.V(2).Infof("Starting deletion of pod %v", pod.Name)
  78. recorder.Eventf(&pod, api.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
  79. if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
  80. return false, err
  81. }
  82. remaining = true
  83. }
  84. return remaining, nil
  85. }
  86. func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error {
  87. var zero int64
  88. err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero})
  89. if err == nil {
  90. glog.V(4).Infof("forceful deletion of %s succeeded", pod.Name)
  91. }
  92. return err
  93. }
  94. // forcefullyDeleteNode immediately deletes all pods on the node, and then
  95. // deletes the node itself.
  96. func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string, forcefulDeletePodFunc func(*api.Pod) error) error {
  97. selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
  98. options := api.ListOptions{FieldSelector: selector}
  99. pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
  100. if err != nil {
  101. return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err)
  102. }
  103. for _, pod := range pods.Items {
  104. if pod.Spec.NodeName != nodeName {
  105. continue
  106. }
  107. if err := forcefulDeletePodFunc(&pod); err != nil {
  108. return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err)
  109. }
  110. }
  111. if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil {
  112. return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
  113. }
  114. return nil
  115. }
  116. // maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
  117. // that should not be gracefully terminated.
  118. func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
  119. pod, ok := obj.(*api.Pod)
  120. if !ok {
  121. return
  122. }
  123. // consider only terminating pods
  124. if pod.DeletionTimestamp == nil {
  125. return
  126. }
  127. // delete terminating pods that have not yet been scheduled
  128. if len(pod.Spec.NodeName) == 0 {
  129. utilruntime.HandleError(nc.forcefullyDeletePod(pod))
  130. return
  131. }
  132. nodeObj, found, err := nc.nodeStore.Store.GetByKey(pod.Spec.NodeName)
  133. if err != nil {
  134. // this can only happen if the Store.KeyFunc has a problem creating
  135. // a key for the pod. If it happens once, it will happen again so
  136. // don't bother requeuing the pod.
  137. utilruntime.HandleError(err)
  138. return
  139. }
  140. // delete terminating pods that have been scheduled on
  141. // nonexistent nodes
  142. if !found {
  143. glog.Warningf("Unable to find Node: %v, deleting all assigned Pods.", pod.Spec.NodeName)
  144. utilruntime.HandleError(nc.forcefullyDeletePod(pod))
  145. return
  146. }
  147. // delete terminating pods that have been scheduled on
  148. // nodes that do not support graceful termination
  149. // TODO(mikedanese): this can be removed when we no longer
  150. // guarantee backwards compatibility of master API to kubelets with
  151. // versions less than 1.1.0
  152. node := nodeObj.(*api.Node)
  153. v, err := version.Parse(node.Status.NodeInfo.KubeletVersion)
  154. if err != nil {
  155. glog.V(0).Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err)
  156. utilruntime.HandleError(nc.forcefullyDeletePod(pod))
  157. return
  158. }
  159. if gracefulDeletionVersion.GT(v) {
  160. utilruntime.HandleError(nc.forcefullyDeletePod(pod))
  161. return
  162. }
  163. }
  164. // update ready status of all pods running on given node from master
  165. // return true if success
  166. func markAllPodsNotReady(kubeClient clientset.Interface, node *api.Node) error {
  167. // Don't set pods to NotReady if the kubelet is running a version that
  168. // doesn't understand how to correct readiness.
  169. // TODO: Remove this check when we no longer guarantee backward compatibility
  170. // with node versions < 1.2.0.
  171. if nodeRunningOutdatedKubelet(node) {
  172. return nil
  173. }
  174. nodeName := node.Name
  175. glog.V(2).Infof("Update ready status of pods on node [%v]", nodeName)
  176. opts := api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName)}
  177. pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(opts)
  178. if err != nil {
  179. return err
  180. }
  181. errMsg := []string{}
  182. for _, pod := range pods.Items {
  183. // Defensive check, also needed for tests.
  184. if pod.Spec.NodeName != nodeName {
  185. continue
  186. }
  187. for i, cond := range pod.Status.Conditions {
  188. if cond.Type == api.PodReady {
  189. pod.Status.Conditions[i].Status = api.ConditionFalse
  190. glog.V(2).Infof("Updating ready status of pod %v to false", pod.Name)
  191. _, err := kubeClient.Core().Pods(pod.Namespace).UpdateStatus(&pod)
  192. if err != nil {
  193. glog.Warningf("Failed to update status for pod %q: %v", format.Pod(&pod), err)
  194. errMsg = append(errMsg, fmt.Sprintf("%v", err))
  195. }
  196. break
  197. }
  198. }
  199. }
  200. if len(errMsg) == 0 {
  201. return nil
  202. }
  203. return fmt.Errorf("%v", strings.Join(errMsg, "; "))
  204. }
  205. // nodeRunningOutdatedKubelet returns true if the kubeletVersion reported
  206. // in the nodeInfo of the given node is "outdated", meaning < 1.2.0.
  207. // Older versions were inflexible and modifying pod.Status directly through
  208. // the apiserver would result in unexpected outcomes.
  209. func nodeRunningOutdatedKubelet(node *api.Node) bool {
  210. v, err := version.Parse(node.Status.NodeInfo.KubeletVersion)
  211. if err != nil {
  212. glog.Errorf("couldn't parse version %q of node %v", node.Status.NodeInfo.KubeletVersion, err)
  213. return true
  214. }
  215. if podStatusReconciliationVersion.GT(v) {
  216. glog.Infof("Node %v running kubelet at (%v) which is less than the minimum version that allows nodecontroller to mark pods NotReady (%v).", node.Name, v, podStatusReconciliationVersion)
  217. return true
  218. }
  219. return false
  220. }
  221. func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) (bool, error) {
  222. instances, ok := cloud.Instances()
  223. if !ok {
  224. return false, fmt.Errorf("%v", ErrCloudInstance)
  225. }
  226. if _, err := instances.ExternalID(nodeName); err != nil {
  227. if err == cloudprovider.InstanceNotFound {
  228. return false, nil
  229. }
  230. return false, err
  231. }
  232. return true, nil
  233. }
  234. func recordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
  235. ref := &api.ObjectReference{
  236. Kind: "Node",
  237. Name: nodeName,
  238. UID: types.UID(nodeUID),
  239. Namespace: "",
  240. }
  241. glog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
  242. recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
  243. }
  244. func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_status string) {
  245. ref := &api.ObjectReference{
  246. Kind: "Node",
  247. Name: node.Name,
  248. UID: node.UID,
  249. Namespace: "",
  250. }
  251. glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name)
  252. // TODO: This requires a transaction, either both node status is updated
  253. // and event is recorded or neither should happen, see issue #6055.
  254. recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status)
  255. }
  256. // terminatePods will ensure all pods on the given node that are in terminating state are eventually
  257. // cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how
  258. // long before we should check again (the next deadline for a pod to complete), or an error.
  259. func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, nodeUID string, since time.Time, maxGracePeriod time.Duration) (bool, time.Duration, error) {
  260. // the time before we should try again
  261. nextAttempt := time.Duration(0)
  262. // have we deleted all pods
  263. complete := true
  264. selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
  265. options := api.ListOptions{FieldSelector: selector}
  266. pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
  267. if err != nil {
  268. return false, nextAttempt, err
  269. }
  270. now := time.Now()
  271. elapsed := now.Sub(since)
  272. for _, pod := range pods.Items {
  273. // Defensive check, also needed for tests.
  274. if pod.Spec.NodeName != nodeName {
  275. continue
  276. }
  277. // only clean terminated pods
  278. if pod.DeletionGracePeriodSeconds == nil {
  279. continue
  280. }
  281. // the user's requested grace period
  282. grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
  283. if grace > maxGracePeriod {
  284. grace = maxGracePeriod
  285. }
  286. // the time remaining before the pod should have been deleted
  287. remaining := grace - elapsed
  288. if remaining < 0 {
  289. remaining = 0
  290. glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
  291. recordNodeEvent(recorder, nodeName, nodeUID, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName))
  292. if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
  293. glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
  294. complete = false
  295. }
  296. } else {
  297. glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining)
  298. complete = false
  299. }
  300. if nextAttempt < remaining {
  301. nextAttempt = remaining
  302. }
  303. }
  304. return complete, nextAttempt, nil
  305. }