daemoncontroller.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package daemon
  14. import (
  15. "fmt"
  16. "reflect"
  17. "sort"
  18. "sync"
  19. "time"
  20. "github.com/golang/glog"
  21. "k8s.io/kubernetes/pkg/api"
  22. "k8s.io/kubernetes/pkg/api/unversioned"
  23. "k8s.io/kubernetes/pkg/apis/extensions"
  24. "k8s.io/kubernetes/pkg/client/cache"
  25. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  26. unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
  27. unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned"
  28. "k8s.io/kubernetes/pkg/client/record"
  29. "k8s.io/kubernetes/pkg/controller"
  30. "k8s.io/kubernetes/pkg/controller/framework"
  31. "k8s.io/kubernetes/pkg/controller/framework/informers"
  32. "k8s.io/kubernetes/pkg/labels"
  33. "k8s.io/kubernetes/pkg/runtime"
  34. "k8s.io/kubernetes/pkg/util/metrics"
  35. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  36. "k8s.io/kubernetes/pkg/util/wait"
  37. "k8s.io/kubernetes/pkg/util/workqueue"
  38. "k8s.io/kubernetes/pkg/watch"
  39. "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
  40. "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
  41. )
  42. const (
  43. // Daemon sets will periodically check that their daemon pods are running as expected.
  44. FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable.
  45. // Realistic value of the burstReplica field for the replication manager based off
  46. // performance requirements for kubernetes 1.0.
  47. BurstReplicas = 500
  48. // We must avoid counting pods until the pod store has synced. If it hasn't synced, to
  49. // avoid a hot loop, we'll wait this long between checks.
  50. PodStoreSyncedPollPeriod = 100 * time.Millisecond
  51. // If sending a status upate to API server fails, we retry a finite number of times.
  52. StatusUpdateRetries = 1
  53. )
  54. // DaemonSetsController is responsible for synchronizing DaemonSet objects stored
  55. // in the system with actual running pods.
  56. type DaemonSetsController struct {
  57. kubeClient clientset.Interface
  58. eventRecorder record.EventRecorder
  59. podControl controller.PodControlInterface
  60. // internalPodInformer is used to hold a personal informer. If we're using
  61. // a normal shared informer, then the informer will be started for us. If
  62. // we have a personal informer, we must start it ourselves. If you start
  63. // the controller using NewDaemonSetsController(passing SharedInformer), this
  64. // will be null
  65. internalPodInformer framework.SharedInformer
  66. // An dsc is temporarily suspended after creating/deleting these many replicas.
  67. // It resumes normal action after observing the watch events for them.
  68. burstReplicas int
  69. // To allow injection of syncDaemonSet for testing.
  70. syncHandler func(dsKey string) error
  71. // A TTLCache of pod creates/deletes each ds expects to see
  72. expectations controller.ControllerExpectationsInterface
  73. // A store of daemon sets
  74. dsStore cache.StoreToDaemonSetLister
  75. // A store of pods
  76. podStore cache.StoreToPodLister
  77. // A store of nodes
  78. nodeStore cache.StoreToNodeLister
  79. // Watches changes to all daemon sets.
  80. dsController *framework.Controller
  81. // Watches changes to all pods
  82. podController framework.ControllerInterface
  83. // Watches changes to all nodes.
  84. nodeController *framework.Controller
  85. // podStoreSynced returns true if the pod store has been synced at least once.
  86. // Added as a member to the struct to allow injection for testing.
  87. podStoreSynced func() bool
  88. lookupCache *controller.MatchingCache
  89. // Daemon sets that need to be synced.
  90. queue *workqueue.Type
  91. }
  92. func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
  93. eventBroadcaster := record.NewBroadcaster()
  94. eventBroadcaster.StartLogging(glog.Infof)
  95. // TODO: remove the wrapper when every clients have moved to use the clientset.
  96. eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
  97. if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
  98. metrics.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
  99. }
  100. dsc := &DaemonSetsController{
  101. kubeClient: kubeClient,
  102. eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "daemonset-controller"}),
  103. podControl: controller.RealPodControl{
  104. KubeClient: kubeClient,
  105. Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "daemon-set"}),
  106. },
  107. burstReplicas: BurstReplicas,
  108. expectations: controller.NewControllerExpectations(),
  109. queue: workqueue.NewNamed("daemonset"),
  110. }
  111. // Manage addition/update of daemon sets.
  112. dsc.dsStore.Store, dsc.dsController = framework.NewInformer(
  113. &cache.ListWatch{
  114. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  115. return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).List(options)
  116. },
  117. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  118. return dsc.kubeClient.Extensions().DaemonSets(api.NamespaceAll).Watch(options)
  119. },
  120. },
  121. &extensions.DaemonSet{},
  122. // TODO: Can we have much longer period here?
  123. FullDaemonSetResyncPeriod,
  124. framework.ResourceEventHandlerFuncs{
  125. AddFunc: func(obj interface{}) {
  126. ds := obj.(*extensions.DaemonSet)
  127. glog.V(4).Infof("Adding daemon set %s", ds.Name)
  128. dsc.enqueueDaemonSet(ds)
  129. },
  130. UpdateFunc: func(old, cur interface{}) {
  131. oldDS := old.(*extensions.DaemonSet)
  132. curDS := cur.(*extensions.DaemonSet)
  133. // We should invalidate the whole lookup cache if a DS's selector has been updated.
  134. //
  135. // Imagine that you have two RSs:
  136. // * old DS1
  137. // * new DS2
  138. // You also have a pod that is attached to DS2 (because it doesn't match DS1 selector).
  139. // Now imagine that you are changing DS1 selector so that it is now matching that pod,
  140. // in such case we must invalidate the whole cache so that pod could be adopted by DS1
  141. //
  142. // This makes the lookup cache less helpful, but selector update does not happen often,
  143. // so it's not a big problem
  144. if !reflect.DeepEqual(oldDS.Spec.Selector, curDS.Spec.Selector) {
  145. dsc.lookupCache.InvalidateAll()
  146. }
  147. glog.V(4).Infof("Updating daemon set %s", oldDS.Name)
  148. dsc.enqueueDaemonSet(curDS)
  149. },
  150. DeleteFunc: dsc.deleteDaemonset,
  151. },
  152. )
  153. // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
  154. // more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
  155. podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
  156. AddFunc: dsc.addPod,
  157. UpdateFunc: dsc.updatePod,
  158. DeleteFunc: dsc.deletePod,
  159. })
  160. dsc.podStore.Indexer = podInformer.GetIndexer()
  161. dsc.podController = podInformer.GetController()
  162. dsc.podStoreSynced = podInformer.HasSynced
  163. // Watch for new nodes or updates to nodes - daemon pods are launched on new nodes, and possibly when labels on nodes change,
  164. dsc.nodeStore.Store, dsc.nodeController = framework.NewInformer(
  165. &cache.ListWatch{
  166. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  167. return dsc.kubeClient.Core().Nodes().List(options)
  168. },
  169. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  170. return dsc.kubeClient.Core().Nodes().Watch(options)
  171. },
  172. },
  173. &api.Node{},
  174. resyncPeriod(),
  175. framework.ResourceEventHandlerFuncs{
  176. AddFunc: dsc.addNode,
  177. UpdateFunc: dsc.updateNode,
  178. },
  179. )
  180. dsc.syncHandler = dsc.syncDaemonSet
  181. dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
  182. return dsc
  183. }
  184. func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
  185. podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
  186. dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize)
  187. dsc.internalPodInformer = podInformer
  188. return dsc
  189. }
  190. func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
  191. ds, ok := obj.(*extensions.DaemonSet)
  192. if !ok {
  193. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  194. if !ok {
  195. glog.Errorf("Couldn't get object from tombstone %#v", obj)
  196. return
  197. }
  198. ds, ok = tombstone.Obj.(*extensions.DaemonSet)
  199. if !ok {
  200. glog.Errorf("Tombstone contained object that is not a DaemonSet %#v", obj)
  201. return
  202. }
  203. }
  204. glog.V(4).Infof("Deleting daemon set %s", ds.Name)
  205. dsc.enqueueDaemonSet(ds)
  206. }
  207. // Run begins watching and syncing daemon sets.
  208. func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
  209. defer utilruntime.HandleCrash()
  210. glog.Infof("Starting Daemon Sets controller manager")
  211. go dsc.dsController.Run(stopCh)
  212. go dsc.podController.Run(stopCh)
  213. go dsc.nodeController.Run(stopCh)
  214. for i := 0; i < workers; i++ {
  215. go wait.Until(dsc.runWorker, time.Second, stopCh)
  216. }
  217. if dsc.internalPodInformer != nil {
  218. go dsc.internalPodInformer.Run(stopCh)
  219. }
  220. <-stopCh
  221. glog.Infof("Shutting down Daemon Set Controller")
  222. dsc.queue.ShutDown()
  223. }
  224. func (dsc *DaemonSetsController) runWorker() {
  225. for {
  226. dsKey, quit := dsc.queue.Get()
  227. if quit {
  228. continue
  229. }
  230. err := dsc.syncHandler(dsKey.(string))
  231. if err != nil {
  232. glog.Errorf("Error syncing daemon set with key %s: %v", dsKey.(string), err)
  233. }
  234. dsc.queue.Done(dsKey)
  235. }
  236. }
  237. func (dsc *DaemonSetsController) enqueueDaemonSet(ds *extensions.DaemonSet) {
  238. key, err := controller.KeyFunc(ds)
  239. if err != nil {
  240. glog.Errorf("Couldn't get key for object %#v: %v", ds, err)
  241. return
  242. }
  243. // TODO: Handle overlapping controllers better. See comment in ReplicationManager.
  244. dsc.queue.Add(key)
  245. }
  246. func (dsc *DaemonSetsController) getPodDaemonSet(pod *api.Pod) *extensions.DaemonSet {
  247. // look up in the cache, if cached and the cache is valid, just return cached value
  248. if obj, cached := dsc.lookupCache.GetMatchingObject(pod); cached {
  249. ds, ok := obj.(*extensions.DaemonSet)
  250. if !ok {
  251. // This should not happen
  252. glog.Errorf("lookup cache does not retuen a ReplicationController object")
  253. return nil
  254. }
  255. if dsc.isCacheValid(pod, ds) {
  256. return ds
  257. }
  258. }
  259. sets, err := dsc.dsStore.GetPodDaemonSets(pod)
  260. if err != nil {
  261. glog.V(4).Infof("No daemon sets found for pod %v, daemon set controller will avoid syncing", pod.Name)
  262. return nil
  263. }
  264. if len(sets) > 1 {
  265. // More than two items in this list indicates user error. If two daemon
  266. // sets overlap, sort by creation timestamp, subsort by name, then pick
  267. // the first.
  268. glog.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels)
  269. sort.Sort(byCreationTimestamp(sets))
  270. }
  271. // update lookup cache
  272. dsc.lookupCache.Update(pod, &sets[0])
  273. return &sets[0]
  274. }
  275. // isCacheValid check if the cache is valid
  276. func (dsc *DaemonSetsController) isCacheValid(pod *api.Pod, cachedDS *extensions.DaemonSet) bool {
  277. _, exists, err := dsc.dsStore.Get(cachedDS)
  278. // ds has been deleted or updated, cache is invalid
  279. if err != nil || !exists || !isDaemonSetMatch(pod, cachedDS) {
  280. return false
  281. }
  282. return true
  283. }
  284. // isDaemonSetMatch take a Pod and DaemonSet, return whether the Pod and DaemonSet are matching
  285. // TODO(mqliang): This logic is a copy from GetPodDaemonSets(), remove the duplication
  286. func isDaemonSetMatch(pod *api.Pod, ds *extensions.DaemonSet) bool {
  287. if ds.Namespace != pod.Namespace {
  288. return false
  289. }
  290. selector, err := unversioned.LabelSelectorAsSelector(ds.Spec.Selector)
  291. if err != nil {
  292. err = fmt.Errorf("invalid selector: %v", err)
  293. return false
  294. }
  295. // If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything.
  296. if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
  297. return false
  298. }
  299. return true
  300. }
  301. func (dsc *DaemonSetsController) addPod(obj interface{}) {
  302. pod := obj.(*api.Pod)
  303. glog.V(4).Infof("Pod %s added.", pod.Name)
  304. if ds := dsc.getPodDaemonSet(pod); ds != nil {
  305. dsKey, err := controller.KeyFunc(ds)
  306. if err != nil {
  307. glog.Errorf("Couldn't get key for object %#v: %v", ds, err)
  308. return
  309. }
  310. dsc.expectations.CreationObserved(dsKey)
  311. dsc.enqueueDaemonSet(ds)
  312. }
  313. }
  314. // When a pod is updated, figure out what sets manage it and wake them
  315. // up. If the labels of the pod have changed we need to awaken both the old
  316. // and new set. old and cur must be *api.Pod types.
  317. func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
  318. curPod := cur.(*api.Pod)
  319. oldPod := old.(*api.Pod)
  320. if curPod.ResourceVersion == oldPod.ResourceVersion {
  321. // Periodic resync will send update events for all known pods.
  322. // Two different versions of the same pod will always have different RVs.
  323. return
  324. }
  325. glog.V(4).Infof("Pod %s updated.", curPod.Name)
  326. if curDS := dsc.getPodDaemonSet(curPod); curDS != nil {
  327. dsc.enqueueDaemonSet(curDS)
  328. }
  329. // If the labels have not changed, then the daemon set responsible for
  330. // the pod is the same as it was before. In that case we have enqueued the daemon
  331. // set above, and do not have to enqueue the set again.
  332. if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
  333. // It's ok if both oldDS and curDS are the same, because curDS will set
  334. // the expectations on its run so oldDS will have no effect.
  335. if oldDS := dsc.getPodDaemonSet(oldPod); oldDS != nil {
  336. dsc.enqueueDaemonSet(oldDS)
  337. }
  338. }
  339. }
  340. func (dsc *DaemonSetsController) deletePod(obj interface{}) {
  341. pod, ok := obj.(*api.Pod)
  342. // When a delete is dropped, the relist will notice a pod in the store not
  343. // in the list, leading to the insertion of a tombstone object which contains
  344. // the deleted key/value. Note that this value might be stale. If the pod
  345. // changed labels the new daemonset will not be woken up till the periodic
  346. // resync.
  347. if !ok {
  348. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  349. if !ok {
  350. glog.Errorf("Couldn't get object from tombstone %#v", obj)
  351. return
  352. }
  353. pod, ok = tombstone.Obj.(*api.Pod)
  354. if !ok {
  355. glog.Errorf("Tombstone contained object that is not a pod %#v", obj)
  356. return
  357. }
  358. }
  359. glog.V(4).Infof("Pod %s deleted.", pod.Name)
  360. if ds := dsc.getPodDaemonSet(pod); ds != nil {
  361. dsKey, err := controller.KeyFunc(ds)
  362. if err != nil {
  363. glog.Errorf("Couldn't get key for object %#v: %v", ds, err)
  364. return
  365. }
  366. dsc.expectations.DeletionObserved(dsKey)
  367. dsc.enqueueDaemonSet(ds)
  368. }
  369. }
  370. func (dsc *DaemonSetsController) addNode(obj interface{}) {
  371. // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
  372. dsList, err := dsc.dsStore.List()
  373. if err != nil {
  374. glog.V(4).Infof("Error enqueueing daemon sets: %v", err)
  375. return
  376. }
  377. node := obj.(*api.Node)
  378. for i := range dsList.Items {
  379. ds := &dsList.Items[i]
  380. shouldEnqueue := dsc.nodeShouldRunDaemonPod(node, ds)
  381. if shouldEnqueue {
  382. dsc.enqueueDaemonSet(ds)
  383. }
  384. }
  385. }
  386. func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
  387. oldNode := old.(*api.Node)
  388. curNode := cur.(*api.Node)
  389. if reflect.DeepEqual(oldNode.Labels, curNode.Labels) {
  390. // If node labels didn't change, we can ignore this update.
  391. return
  392. }
  393. dsList, err := dsc.dsStore.List()
  394. if err != nil {
  395. glog.V(4).Infof("Error enqueueing daemon sets: %v", err)
  396. return
  397. }
  398. for i := range dsList.Items {
  399. ds := &dsList.Items[i]
  400. shouldEnqueue := (dsc.nodeShouldRunDaemonPod(oldNode, ds) != dsc.nodeShouldRunDaemonPod(curNode, ds))
  401. if shouldEnqueue {
  402. dsc.enqueueDaemonSet(ds)
  403. }
  404. }
  405. // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
  406. }
  407. // getNodesToDaemonSetPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes.
  408. func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) (map[string][]*api.Pod, error) {
  409. nodeToDaemonPods := make(map[string][]*api.Pod)
  410. selector, err := unversioned.LabelSelectorAsSelector(ds.Spec.Selector)
  411. if err != nil {
  412. return nil, err
  413. }
  414. daemonPods, err := dsc.podStore.Pods(ds.Namespace).List(selector)
  415. if err != nil {
  416. return nodeToDaemonPods, err
  417. }
  418. for i := range daemonPods {
  419. // TODO: Do we need to copy here?
  420. daemonPod := &(*daemonPods[i])
  421. nodeName := daemonPod.Spec.NodeName
  422. nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], daemonPod)
  423. }
  424. return nodeToDaemonPods, nil
  425. }
  426. func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
  427. // Find out which nodes are running the daemon pods selected by ds.
  428. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
  429. if err != nil {
  430. glog.Errorf("Error getting node to daemon pod mapping for daemon set %#v: %v", ds, err)
  431. }
  432. // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
  433. // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
  434. nodeList, err := dsc.nodeStore.List()
  435. if err != nil {
  436. glog.Errorf("Couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
  437. }
  438. var nodesNeedingDaemonPods, podsToDelete []string
  439. for _, node := range nodeList.Items {
  440. shouldRun := dsc.nodeShouldRunDaemonPod(&node, ds)
  441. daemonPods, isRunning := nodeToDaemonPods[node.Name]
  442. switch {
  443. case shouldRun && !isRunning:
  444. // If daemon pod is supposed to be running on node, but isn't, create daemon pod.
  445. nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
  446. case shouldRun && len(daemonPods) > 1:
  447. // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
  448. // Sort the daemon pods by creation time, so the the oldest is preserved.
  449. sort.Sort(podByCreationTimestamp(daemonPods))
  450. for i := 1; i < len(daemonPods); i++ {
  451. podsToDelete = append(podsToDelete, daemonPods[i].Name)
  452. }
  453. case !shouldRun && isRunning:
  454. // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
  455. for i := range daemonPods {
  456. podsToDelete = append(podsToDelete, daemonPods[i].Name)
  457. }
  458. }
  459. }
  460. // We need to set expectations before creating/deleting pods to avoid race conditions.
  461. dsKey, err := controller.KeyFunc(ds)
  462. if err != nil {
  463. glog.Errorf("Couldn't get key for object %#v: %v", ds, err)
  464. return
  465. }
  466. createDiff := len(nodesNeedingDaemonPods)
  467. deleteDiff := len(podsToDelete)
  468. if createDiff > dsc.burstReplicas {
  469. createDiff = dsc.burstReplicas
  470. }
  471. if deleteDiff > dsc.burstReplicas {
  472. deleteDiff = dsc.burstReplicas
  473. }
  474. dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
  475. glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
  476. createWait := sync.WaitGroup{}
  477. createWait.Add(createDiff)
  478. for i := 0; i < createDiff; i++ {
  479. go func(ix int) {
  480. defer createWait.Done()
  481. if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &ds.Spec.Template, ds); err != nil {
  482. glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
  483. dsc.expectations.CreationObserved(dsKey)
  484. utilruntime.HandleError(err)
  485. }
  486. }(i)
  487. }
  488. createWait.Wait()
  489. glog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
  490. deleteWait := sync.WaitGroup{}
  491. deleteWait.Add(deleteDiff)
  492. for i := 0; i < deleteDiff; i++ {
  493. go func(ix int) {
  494. defer deleteWait.Done()
  495. if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
  496. glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
  497. dsc.expectations.DeletionObserved(dsKey)
  498. utilruntime.HandleError(err)
  499. }
  500. }(i)
  501. }
  502. deleteWait.Wait()
  503. }
  504. func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error {
  505. if int(ds.Status.DesiredNumberScheduled) == desiredNumberScheduled &&
  506. int(ds.Status.CurrentNumberScheduled) == currentNumberScheduled &&
  507. int(ds.Status.NumberMisscheduled) == numberMisscheduled {
  508. return nil
  509. }
  510. var updateErr, getErr error
  511. for i := 0; i < StatusUpdateRetries; i++ {
  512. ds.Status.DesiredNumberScheduled = int32(desiredNumberScheduled)
  513. ds.Status.CurrentNumberScheduled = int32(currentNumberScheduled)
  514. ds.Status.NumberMisscheduled = int32(numberMisscheduled)
  515. if _, updateErr = dsClient.UpdateStatus(ds); updateErr == nil {
  516. return nil
  517. }
  518. // Update the set with the latest resource version for the next poll
  519. if ds, getErr = dsClient.Get(ds.Name); getErr != nil {
  520. // If the GET fails we can't trust status.Replicas anymore. This error
  521. // is bound to be more interesting than the update failure.
  522. return getErr
  523. }
  524. }
  525. return updateErr
  526. }
  527. func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) {
  528. glog.V(4).Infof("Updating daemon set status")
  529. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
  530. if err != nil {
  531. glog.Errorf("Error getting node to daemon pod mapping for daemon set %#v: %v", ds, err)
  532. return
  533. }
  534. nodeList, err := dsc.nodeStore.List()
  535. if err != nil {
  536. glog.Errorf("Couldn't get list of nodes when updating daemon set %#v: %v", ds, err)
  537. return
  538. }
  539. var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int
  540. for _, node := range nodeList.Items {
  541. shouldRun := dsc.nodeShouldRunDaemonPod(&node, ds)
  542. scheduled := len(nodeToDaemonPods[node.Name]) > 0
  543. if shouldRun {
  544. desiredNumberScheduled++
  545. if scheduled {
  546. currentNumberScheduled++
  547. }
  548. } else {
  549. if scheduled {
  550. numberMisscheduled++
  551. }
  552. }
  553. }
  554. err = storeDaemonSetStatus(dsc.kubeClient.Extensions().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled)
  555. if err != nil {
  556. glog.Errorf("Error storing status for daemon set %#v: %v", ds, err)
  557. }
  558. }
  559. func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
  560. startTime := time.Now()
  561. defer func() {
  562. glog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Now().Sub(startTime))
  563. }()
  564. if !dsc.podStoreSynced() {
  565. // Sleep so we give the pod reflector goroutine a chance to run.
  566. time.Sleep(PodStoreSyncedPollPeriod)
  567. glog.Infof("Waiting for pods controller to sync, requeuing ds %v", key)
  568. dsc.queue.Add(key)
  569. return nil
  570. }
  571. obj, exists, err := dsc.dsStore.Store.GetByKey(key)
  572. if err != nil {
  573. glog.Infof("Unable to retrieve ds %v from store: %v", key, err)
  574. dsc.queue.Add(key)
  575. return err
  576. }
  577. if !exists {
  578. glog.V(3).Infof("daemon set has been deleted %v", key)
  579. dsc.expectations.DeleteExpectations(key)
  580. return nil
  581. }
  582. ds := obj.(*extensions.DaemonSet)
  583. everything := unversioned.LabelSelector{}
  584. if reflect.DeepEqual(ds.Spec.Selector, &everything) {
  585. dsc.eventRecorder.Eventf(ds, api.EventTypeWarning, "SelectingAll", "This daemon set is selecting all pods. A non-empty selector is required.")
  586. return nil
  587. }
  588. // Don't process a daemon set until all its creations and deletions have been processed.
  589. // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
  590. // then we do not want to call manage on foo until the daemon pods have been created.
  591. dsKey, err := controller.KeyFunc(ds)
  592. if err != nil {
  593. glog.Errorf("Couldn't get key for object %#v: %v", ds, err)
  594. return err
  595. }
  596. dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey)
  597. if dsNeedsSync && ds.DeletionTimestamp == nil {
  598. dsc.manage(ds)
  599. }
  600. dsc.updateDaemonSetStatus(ds)
  601. return nil
  602. }
  603. func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool {
  604. // If the daemon set specifies a node name, check that it matches with node.Name.
  605. if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
  606. return false
  607. }
  608. // TODO: Move it to the predicates
  609. for _, c := range node.Status.Conditions {
  610. if c.Type == api.NodeOutOfDisk && c.Status == api.ConditionTrue {
  611. return false
  612. }
  613. }
  614. newPod := &api.Pod{Spec: ds.Spec.Template.Spec, ObjectMeta: ds.Spec.Template.ObjectMeta}
  615. newPod.Spec.NodeName = node.Name
  616. pods := []*api.Pod{}
  617. for _, m := range dsc.podStore.Indexer.List() {
  618. pod := m.(*api.Pod)
  619. if pod.Spec.NodeName != node.Name {
  620. continue
  621. }
  622. if pod.Status.Phase == api.PodSucceeded || pod.Status.Phase == api.PodFailed {
  623. continue
  624. }
  625. // ignore pods that belong to the daemonset when taking into account whether
  626. // a daemonset should bind to a node.
  627. if pds := dsc.getPodDaemonSet(pod); pds != nil && ds.Name == pds.Name {
  628. continue
  629. }
  630. pods = append(pods, pod)
  631. }
  632. nodeInfo := schedulercache.NewNodeInfo(pods...)
  633. nodeInfo.SetNode(node)
  634. fit, reasons, err := predicates.GeneralPredicates(newPod, nil, nodeInfo)
  635. if err != nil {
  636. glog.Warningf("GeneralPredicates failed on pod %s due to unexpected error: %v", newPod.Name, err)
  637. }
  638. for _, r := range reasons {
  639. glog.V(2).Infof("GeneralPredicates failed on pod %s for reason: %v", newPod.Name, r.GetReason())
  640. }
  641. if !fit {
  642. return false
  643. }
  644. fit, reasons, err = predicates.PodToleratesNodeTaints(newPod, predicates.PredicateMetadata(newPod, nil), nodeInfo)
  645. if err != nil {
  646. glog.Warningf("PodToleratesNodeTaints failed on pod %s due to unexpected error: %v", newPod.Name, err)
  647. }
  648. for _, r := range reasons {
  649. glog.V(2).Infof("PodToleratesNodeTaints failed on pod %s for reason: %v", newPod.Name, r.GetReason())
  650. }
  651. return fit
  652. }
  653. // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
  654. type byCreationTimestamp []extensions.DaemonSet
  655. func (o byCreationTimestamp) Len() int { return len(o) }
  656. func (o byCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
  657. func (o byCreationTimestamp) Less(i, j int) bool {
  658. if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
  659. return o[i].Name < o[j].Name
  660. }
  661. return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
  662. }
  663. type podByCreationTimestamp []*api.Pod
  664. func (o podByCreationTimestamp) Len() int { return len(o) }
  665. func (o podByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
  666. func (o podByCreationTimestamp) Less(i, j int) bool {
  667. if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
  668. return o[i].Name < o[j].Name
  669. }
  670. return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
  671. }