replication_controller.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // If you make changes to this file, you should also make the corresponding change in ReplicaSet.
  14. package replication
  15. import (
  16. "reflect"
  17. "sort"
  18. "sync"
  19. "time"
  20. "github.com/golang/glog"
  21. "k8s.io/kubernetes/pkg/api"
  22. "k8s.io/kubernetes/pkg/api/errors"
  23. "k8s.io/kubernetes/pkg/api/unversioned"
  24. "k8s.io/kubernetes/pkg/api/v1"
  25. "k8s.io/kubernetes/pkg/client/cache"
  26. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  27. unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
  28. "k8s.io/kubernetes/pkg/client/record"
  29. "k8s.io/kubernetes/pkg/controller"
  30. "k8s.io/kubernetes/pkg/controller/framework"
  31. "k8s.io/kubernetes/pkg/controller/framework/informers"
  32. "k8s.io/kubernetes/pkg/labels"
  33. "k8s.io/kubernetes/pkg/runtime"
  34. "k8s.io/kubernetes/pkg/util"
  35. utilerrors "k8s.io/kubernetes/pkg/util/errors"
  36. "k8s.io/kubernetes/pkg/util/metrics"
  37. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  38. "k8s.io/kubernetes/pkg/util/wait"
  39. "k8s.io/kubernetes/pkg/util/workqueue"
  40. "k8s.io/kubernetes/pkg/watch"
  41. )
  42. const (
  43. // We'll attempt to recompute the required replicas of all replication controllers
  44. // that have fulfilled their expectations at least this often. This recomputation
  45. // happens based on contents in local pod storage.
  46. // Full Resync shouldn't be needed at all in a healthy system. This is a protection
  47. // against disappearing objects and watch notification, that we believe should not
  48. // happen at all.
  49. // TODO: We should get rid of it completely in the fullness of time.
  50. FullControllerResyncPeriod = 10 * time.Minute
  51. // Realistic value of the burstReplica field for the replication manager based off
  52. // performance requirements for kubernetes 1.0.
  53. BurstReplicas = 500
  54. // We must avoid counting pods until the pod store has synced. If it hasn't synced, to
  55. // avoid a hot loop, we'll wait this long between checks.
  56. PodStoreSyncedPollPeriod = 100 * time.Millisecond
  57. // The number of times we retry updating a replication controller's status.
  58. statusUpdateRetries = 1
  59. )
  60. func getRCKind() unversioned.GroupVersionKind {
  61. return v1.SchemeGroupVersion.WithKind("ReplicationController")
  62. }
  63. // ReplicationManager is responsible for synchronizing ReplicationController objects stored
  64. // in the system with actual running pods.
  65. // TODO: this really should be called ReplicationController. The only reason why it's a Manager
  66. // is to distinguish this type from API object "ReplicationController". We should fix this.
  67. type ReplicationManager struct {
  68. kubeClient clientset.Interface
  69. podControl controller.PodControlInterface
  70. // internalPodInformer is used to hold a personal informer. If we're using
  71. // a normal shared informer, then the informer will be started for us. If
  72. // we have a personal informer, we must start it ourselves. If you start
  73. // the controller using NewReplicationManager(passing SharedInformer), this
  74. // will be null
  75. internalPodInformer framework.SharedIndexInformer
  76. // An rc is temporarily suspended after creating/deleting these many replicas.
  77. // It resumes normal action after observing the watch events for them.
  78. burstReplicas int
  79. // To allow injection of syncReplicationController for testing.
  80. syncHandler func(rcKey string) error
  81. // A TTLCache of pod creates/deletes each rc expects to see.
  82. expectations *controller.UIDTrackingControllerExpectations
  83. // A store of replication controllers, populated by the rcController
  84. rcStore cache.StoreToReplicationControllerLister
  85. // Watches changes to all replication controllers
  86. rcController *framework.Controller
  87. // A store of pods, populated by the podController
  88. podStore cache.StoreToPodLister
  89. // Watches changes to all pods
  90. podController framework.ControllerInterface
  91. // podStoreSynced returns true if the pod store has been synced at least once.
  92. // Added as a member to the struct to allow injection for testing.
  93. podStoreSynced func() bool
  94. lookupCache *controller.MatchingCache
  95. // Controllers that need to be synced
  96. queue workqueue.RateLimitingInterface
  97. // garbageCollectorEnabled denotes if the garbage collector is enabled. RC
  98. // manager behaves differently if GC is enabled.
  99. garbageCollectorEnabled bool
  100. }
  101. // NewReplicationManager creates a replication manager
  102. func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
  103. eventBroadcaster := record.NewBroadcaster()
  104. eventBroadcaster.StartLogging(glog.Infof)
  105. eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
  106. return newReplicationManager(
  107. eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}),
  108. podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
  109. }
  110. // newReplicationManager configures a replication manager with the specified event recorder
  111. func newReplicationManager(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
  112. if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
  113. metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
  114. }
  115. rm := &ReplicationManager{
  116. kubeClient: kubeClient,
  117. podControl: controller.RealPodControl{
  118. KubeClient: kubeClient,
  119. Recorder: eventRecorder,
  120. },
  121. burstReplicas: burstReplicas,
  122. expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
  123. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "replicationmanager"),
  124. garbageCollectorEnabled: garbageCollectorEnabled,
  125. }
  126. rm.rcStore.Indexer, rm.rcController = framework.NewIndexerInformer(
  127. &cache.ListWatch{
  128. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  129. return rm.kubeClient.Core().ReplicationControllers(api.NamespaceAll).List(options)
  130. },
  131. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  132. return rm.kubeClient.Core().ReplicationControllers(api.NamespaceAll).Watch(options)
  133. },
  134. },
  135. &api.ReplicationController{},
  136. // TODO: Can we have much longer period here?
  137. FullControllerResyncPeriod,
  138. framework.ResourceEventHandlerFuncs{
  139. AddFunc: rm.enqueueController,
  140. UpdateFunc: rm.updateRC,
  141. // This will enter the sync loop and no-op, because the controller has been deleted from the store.
  142. // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
  143. // way of achieving this is by performing a `stop` operation on the controller.
  144. DeleteFunc: rm.enqueueController,
  145. },
  146. cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
  147. )
  148. podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
  149. AddFunc: rm.addPod,
  150. // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill
  151. // the most frequent pod update is status, and the associated rc will only list from local storage, so
  152. // it should be ok.
  153. UpdateFunc: rm.updatePod,
  154. DeleteFunc: rm.deletePod,
  155. })
  156. rm.podStore.Indexer = podInformer.GetIndexer()
  157. rm.podController = podInformer.GetController()
  158. rm.syncHandler = rm.syncReplicationController
  159. rm.podStoreSynced = rm.podController.HasSynced
  160. rm.lookupCache = controller.NewMatchingCache(lookupCacheSize)
  161. return rm
  162. }
  163. // NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests.
  164. func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
  165. podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
  166. garbageCollectorEnabled := false
  167. rm := newReplicationManager(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
  168. rm.internalPodInformer = podInformer
  169. return rm
  170. }
  171. // NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
  172. func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
  173. podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
  174. garbageCollectorEnabled := false
  175. rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
  176. rm.internalPodInformer = podInformer
  177. return rm
  178. }
  179. // SetEventRecorder replaces the event recorder used by the replication manager
  180. // with the given recorder. Only used for testing.
  181. func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) {
  182. // TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
  183. // need to pass in a fake.
  184. rm.podControl = controller.RealPodControl{KubeClient: rm.kubeClient, Recorder: recorder}
  185. }
  186. // Run begins watching and syncing.
  187. func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
  188. defer utilruntime.HandleCrash()
  189. glog.Infof("Starting RC Manager")
  190. go rm.rcController.Run(stopCh)
  191. go rm.podController.Run(stopCh)
  192. for i := 0; i < workers; i++ {
  193. go wait.Until(rm.worker, time.Second, stopCh)
  194. }
  195. if rm.internalPodInformer != nil {
  196. go rm.internalPodInformer.Run(stopCh)
  197. }
  198. <-stopCh
  199. glog.Infof("Shutting down RC Manager")
  200. rm.queue.ShutDown()
  201. }
  202. // getPodController returns the controller managing the given pod.
  203. // TODO: Surface that we are ignoring multiple controllers for a single pod.
  204. // TODO: use ownerReference.Controller to determine if the rc controls the pod.
  205. func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationController {
  206. // look up in the cache, if cached and the cache is valid, just return cached value
  207. if obj, cached := rm.lookupCache.GetMatchingObject(pod); cached {
  208. controller, ok := obj.(*api.ReplicationController)
  209. if !ok {
  210. // This should not happen
  211. glog.Errorf("lookup cache does not return a ReplicationController object")
  212. return nil
  213. }
  214. if cached && rm.isCacheValid(pod, controller) {
  215. return controller
  216. }
  217. }
  218. // if not cached or cached value is invalid, search all the rc to find the matching one, and update cache
  219. controllers, err := rm.rcStore.GetPodControllers(pod)
  220. if err != nil {
  221. glog.V(4).Infof("No controllers found for pod %v, replication manager will avoid syncing", pod.Name)
  222. return nil
  223. }
  224. // In theory, overlapping controllers is user error. This sorting will not prevent
  225. // oscillation of replicas in all cases, eg:
  226. // rc1 (older rc): [(k1=v1)], replicas=1 rc2: [(k2=v2)], replicas=2
  227. // pod: [(k1:v1), (k2:v2)] will wake both rc1 and rc2, and we will sync rc1.
  228. // pod: [(k2:v2)] will wake rc2 which creates a new replica.
  229. if len(controllers) > 1 {
  230. // More than two items in this list indicates user error. If two replication-controller
  231. // overlap, sort by creation timestamp, subsort by name, then pick
  232. // the first.
  233. glog.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels)
  234. sort.Sort(OverlappingControllers(controllers))
  235. }
  236. // update lookup cache
  237. rm.lookupCache.Update(pod, &controllers[0])
  238. return &controllers[0]
  239. }
  240. // isCacheValid check if the cache is valid
  241. func (rm *ReplicationManager) isCacheValid(pod *api.Pod, cachedRC *api.ReplicationController) bool {
  242. exists, err := rm.rcStore.Exists(cachedRC)
  243. // rc has been deleted or updated, cache is invalid
  244. if err != nil || !exists || !isControllerMatch(pod, cachedRC) {
  245. return false
  246. }
  247. return true
  248. }
  249. // isControllerMatch take a Pod and ReplicationController, return whether the Pod and ReplicationController are matching
  250. // TODO(mqliang): This logic is a copy from GetPodControllers(), remove the duplication
  251. func isControllerMatch(pod *api.Pod, rc *api.ReplicationController) bool {
  252. if rc.Namespace != pod.Namespace {
  253. return false
  254. }
  255. selector := labels.Set(rc.Spec.Selector).AsSelectorPreValidated()
  256. // If an rc with a nil or empty selector creeps in, it should match nothing, not everything.
  257. if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
  258. return false
  259. }
  260. return true
  261. }
  262. // callback when RC is updated
  263. func (rm *ReplicationManager) updateRC(old, cur interface{}) {
  264. oldRC := old.(*api.ReplicationController)
  265. curRC := cur.(*api.ReplicationController)
  266. // We should invalidate the whole lookup cache if a RC's selector has been updated.
  267. //
  268. // Imagine that you have two RCs:
  269. // * old RC1
  270. // * new RC2
  271. // You also have a pod that is attached to RC2 (because it doesn't match RC1 selector).
  272. // Now imagine that you are changing RC1 selector so that it is now matching that pod,
  273. // in such case, we must invalidate the whole cache so that pod could be adopted by RC1
  274. //
  275. // This makes the lookup cache less helpful, but selector update does not happen often,
  276. // so it's not a big problem
  277. if !reflect.DeepEqual(oldRC.Spec.Selector, curRC.Spec.Selector) {
  278. rm.lookupCache.InvalidateAll()
  279. }
  280. // You might imagine that we only really need to enqueue the
  281. // controller when Spec changes, but it is safer to sync any
  282. // time this function is triggered. That way a full informer
  283. // resync can requeue any controllers that don't yet have pods
  284. // but whose last attempts at creating a pod have failed (since
  285. // we don't block on creation of pods) instead of those
  286. // controllers stalling indefinitely. Enqueueing every time
  287. // does result in some spurious syncs (like when Status.Replica
  288. // is updated and the watch notification from it retriggers
  289. // this function), but in general extra resyncs shouldn't be
  290. // that bad as rcs that haven't met expectations yet won't
  291. // sync, and all the listing is done using local stores.
  292. if oldRC.Status.Replicas != curRC.Status.Replicas {
  293. glog.V(4).Infof("Observed updated replica count for rc: %v, %d->%d", curRC.Name, oldRC.Status.Replicas, curRC.Status.Replicas)
  294. }
  295. rm.enqueueController(cur)
  296. }
  297. // When a pod is created, enqueue the controller that manages it and update it's expectations.
  298. func (rm *ReplicationManager) addPod(obj interface{}) {
  299. pod := obj.(*api.Pod)
  300. rc := rm.getPodController(pod)
  301. if rc == nil {
  302. return
  303. }
  304. rcKey, err := controller.KeyFunc(rc)
  305. if err != nil {
  306. glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
  307. return
  308. }
  309. if pod.DeletionTimestamp != nil {
  310. // on a restart of the controller manager, it's possible a new pod shows up in a state that
  311. // is already pending deletion. Prevent the pod from being a creation observation.
  312. rm.deletePod(pod)
  313. return
  314. }
  315. rm.expectations.CreationObserved(rcKey)
  316. rm.enqueueController(rc)
  317. }
  318. // When a pod is updated, figure out what controller/s manage it and wake them
  319. // up. If the labels of the pod have changed we need to awaken both the old
  320. // and new controller. old and cur must be *api.Pod types.
  321. func (rm *ReplicationManager) updatePod(old, cur interface{}) {
  322. curPod := cur.(*api.Pod)
  323. oldPod := old.(*api.Pod)
  324. if curPod.ResourceVersion == oldPod.ResourceVersion {
  325. // Periodic resync will send update events for all known pods.
  326. // Two different versions of the same pod will always have different RVs.
  327. return
  328. }
  329. glog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
  330. labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
  331. if curPod.DeletionTimestamp != nil {
  332. // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
  333. // and after such time has passed, the kubelet actually deletes it from the store. We receive an update
  334. // for modification of the deletion timestamp and expect an rc to create more replicas asap, not wait
  335. // until the kubelet actually deletes the pod. This is different from the Phase of a pod changing, because
  336. // an rc never initiates a phase change, and so is never asleep waiting for the same.
  337. rm.deletePod(curPod)
  338. if labelChanged {
  339. // we don't need to check the oldPod.DeletionTimestamp because DeletionTimestamp cannot be unset.
  340. rm.deletePod(oldPod)
  341. }
  342. return
  343. }
  344. // Only need to get the old controller if the labels changed.
  345. // Enqueue the oldRC before the curRC to give curRC a chance to adopt the oldPod.
  346. if labelChanged {
  347. // If the old and new rc are the same, the first one that syncs
  348. // will set expectations preventing any damage from the second.
  349. if oldRC := rm.getPodController(oldPod); oldRC != nil {
  350. rm.enqueueController(oldRC)
  351. }
  352. }
  353. if curRC := rm.getPodController(curPod); curRC != nil {
  354. rm.enqueueController(curRC)
  355. }
  356. }
  357. // When a pod is deleted, enqueue the controller that manages the pod and update its expectations.
  358. // obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
  359. func (rm *ReplicationManager) deletePod(obj interface{}) {
  360. pod, ok := obj.(*api.Pod)
  361. // When a delete is dropped, the relist will notice a pod in the store not
  362. // in the list, leading to the insertion of a tombstone object which contains
  363. // the deleted key/value. Note that this value might be stale. If the pod
  364. // changed labels the new rc will not be woken up till the periodic resync.
  365. if !ok {
  366. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  367. if !ok {
  368. glog.Errorf("Couldn't get object from tombstone %#v", obj)
  369. return
  370. }
  371. pod, ok = tombstone.Obj.(*api.Pod)
  372. if !ok {
  373. glog.Errorf("Tombstone contained object that is not a pod %#v", obj)
  374. return
  375. }
  376. }
  377. glog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v, labels %+v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod.Labels)
  378. if rc := rm.getPodController(pod); rc != nil {
  379. rcKey, err := controller.KeyFunc(rc)
  380. if err != nil {
  381. glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
  382. return
  383. }
  384. rm.expectations.DeletionObserved(rcKey, controller.PodKey(pod))
  385. rm.enqueueController(rc)
  386. }
  387. }
  388. // obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown marker item.
  389. func (rm *ReplicationManager) enqueueController(obj interface{}) {
  390. key, err := controller.KeyFunc(obj)
  391. if err != nil {
  392. glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
  393. return
  394. }
  395. // TODO: Handle overlapping controllers better. Either disallow them at admission time or
  396. // deterministically avoid syncing controllers that fight over pods. Currently, we only
  397. // ensure that the same controller is synced for a given pod. When we periodically relist
  398. // all controllers there will still be some replica instability. One way to handle this is
  399. // by querying the store for all controllers that this rc overlaps, as well as all
  400. // controllers that overlap this rc, and sorting them.
  401. rm.queue.Add(key)
  402. }
  403. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  404. // It enforces that the syncHandler is never invoked concurrently with the same key.
  405. func (rm *ReplicationManager) worker() {
  406. workFunc := func() bool {
  407. key, quit := rm.queue.Get()
  408. if quit {
  409. return true
  410. }
  411. defer rm.queue.Done(key)
  412. err := rm.syncHandler(key.(string))
  413. if err == nil {
  414. rm.queue.Forget(key)
  415. return false
  416. }
  417. rm.queue.AddRateLimited(key)
  418. utilruntime.HandleError(err)
  419. return false
  420. }
  421. for {
  422. if quit := workFunc(); quit {
  423. glog.Infof("replication controller worker shutting down")
  424. return
  425. }
  426. }
  427. }
  428. // manageReplicas checks and updates replicas for the given replication controller.
  429. // Does NOT modify <filteredPods>.
  430. func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.ReplicationController) error {
  431. diff := len(filteredPods) - int(rc.Spec.Replicas)
  432. rcKey, err := controller.KeyFunc(rc)
  433. if err != nil {
  434. return err
  435. }
  436. if diff == 0 {
  437. return nil
  438. }
  439. if diff < 0 {
  440. diff *= -1
  441. if diff > rm.burstReplicas {
  442. diff = rm.burstReplicas
  443. }
  444. // TODO: Track UIDs of creates just like deletes. The problem currently
  445. // is we'd need to wait on the result of a create to record the pod's
  446. // UID, which would require locking *across* the create, which will turn
  447. // into a performance bottleneck. We should generate a UID for the pod
  448. // beforehand and store it via ExpectCreations.
  449. errCh := make(chan error, diff)
  450. rm.expectations.ExpectCreations(rcKey, diff)
  451. var wg sync.WaitGroup
  452. wg.Add(diff)
  453. glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff)
  454. for i := 0; i < diff; i++ {
  455. go func() {
  456. defer wg.Done()
  457. var err error
  458. if rm.garbageCollectorEnabled {
  459. var trueVar = true
  460. controllerRef := &api.OwnerReference{
  461. APIVersion: getRCKind().GroupVersion().String(),
  462. Kind: getRCKind().Kind,
  463. Name: rc.Name,
  464. UID: rc.UID,
  465. Controller: &trueVar,
  466. }
  467. err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
  468. } else {
  469. err = rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc)
  470. }
  471. if err != nil {
  472. // Decrement the expected number of creates because the informer won't observe this pod
  473. glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
  474. rm.expectations.CreationObserved(rcKey)
  475. errCh <- err
  476. utilruntime.HandleError(err)
  477. }
  478. }()
  479. }
  480. wg.Wait()
  481. select {
  482. case err := <-errCh:
  483. // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
  484. if err != nil {
  485. return err
  486. }
  487. default:
  488. }
  489. return nil
  490. }
  491. if diff > rm.burstReplicas {
  492. diff = rm.burstReplicas
  493. }
  494. glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", rc.Namespace, rc.Name, rc.Spec.Replicas, diff)
  495. // No need to sort pods if we are about to delete all of them
  496. if rc.Spec.Replicas != 0 {
  497. // Sort the pods in the order such that not-ready < ready, unscheduled
  498. // < scheduled, and pending < running. This ensures that we delete pods
  499. // in the earlier stages whenever possible.
  500. sort.Sort(controller.ActivePods(filteredPods))
  501. }
  502. // Snapshot the UIDs (ns/name) of the pods we're expecting to see
  503. // deleted, so we know to record their expectations exactly once either
  504. // when we see it as an update of the deletion timestamp, or as a delete.
  505. // Note that if the labels on a pod/rc change in a way that the pod gets
  506. // orphaned, the rs will only wake up after the expectations have
  507. // expired even if other pods are deleted.
  508. deletedPodKeys := []string{}
  509. for i := 0; i < diff; i++ {
  510. deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
  511. }
  512. // We use pod namespace/name as a UID to wait for deletions, so if the
  513. // labels on a pod/rc change in a way that the pod gets orphaned, the
  514. // rc will only wake up after the expectation has expired.
  515. errCh := make(chan error, diff)
  516. rm.expectations.ExpectDeletions(rcKey, deletedPodKeys)
  517. var wg sync.WaitGroup
  518. wg.Add(diff)
  519. for i := 0; i < diff; i++ {
  520. go func(ix int) {
  521. defer wg.Done()
  522. if err := rm.podControl.DeletePod(rc.Namespace, filteredPods[ix].Name, rc); err != nil {
  523. // Decrement the expected number of deletes because the informer won't observe this deletion
  524. podKey := controller.PodKey(filteredPods[ix])
  525. glog.V(2).Infof("Failed to delete %v due to %v, decrementing expectations for controller %q/%q", podKey, err, rc.Namespace, rc.Name)
  526. rm.expectations.DeletionObserved(rcKey, podKey)
  527. errCh <- err
  528. utilruntime.HandleError(err)
  529. }
  530. }(i)
  531. }
  532. wg.Wait()
  533. select {
  534. case err := <-errCh:
  535. // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.
  536. if err != nil {
  537. return err
  538. }
  539. default:
  540. }
  541. return nil
  542. }
  543. // syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning
  544. // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
  545. // concurrently with the same key.
  546. func (rm *ReplicationManager) syncReplicationController(key string) error {
  547. trace := util.NewTrace("syncReplicationController: " + key)
  548. defer trace.LogIfLong(250 * time.Millisecond)
  549. startTime := time.Now()
  550. defer func() {
  551. glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime))
  552. }()
  553. if !rm.podStoreSynced() {
  554. // Sleep so we give the pod reflector goroutine a chance to run.
  555. time.Sleep(PodStoreSyncedPollPeriod)
  556. glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key)
  557. rm.queue.Add(key)
  558. return nil
  559. }
  560. obj, exists, err := rm.rcStore.Indexer.GetByKey(key)
  561. if !exists {
  562. glog.Infof("Replication Controller has been deleted %v", key)
  563. rm.expectations.DeleteExpectations(key)
  564. return nil
  565. }
  566. if err != nil {
  567. return err
  568. }
  569. rc := *obj.(*api.ReplicationController)
  570. // Check the expectations of the rc before counting active pods, otherwise a new pod can sneak in
  571. // and update the expectations after we've retrieved active pods from the store. If a new pod enters
  572. // the store after we've checked the expectation, the rc sync is just deferred till the next relist.
  573. rcKey, err := controller.KeyFunc(&rc)
  574. if err != nil {
  575. glog.Errorf("Couldn't get key for replication controller %#v: %v", rc, err)
  576. return err
  577. }
  578. trace.Step("ReplicationController restored")
  579. rcNeedsSync := rm.expectations.SatisfiedExpectations(rcKey)
  580. trace.Step("Expectations restored")
  581. // NOTE: filteredPods are pointing to objects from cache - if you need to
  582. // modify them, you need to copy it first.
  583. // TODO: Do the List and Filter in a single pass, or use an index.
  584. var filteredPods []*api.Pod
  585. if rm.garbageCollectorEnabled {
  586. // list all pods to include the pods that don't match the rc's selector
  587. // anymore but has the stale controller ref.
  588. pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything())
  589. if err != nil {
  590. glog.Errorf("Error getting pods for rc %q: %v", key, err)
  591. rm.queue.Add(key)
  592. return err
  593. }
  594. cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelectorPreValidated(), getRCKind())
  595. matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(pods)
  596. for _, pod := range matchesNeedsController {
  597. err := cm.AdoptPod(pod)
  598. // continue to next pod if adoption fails.
  599. if err != nil {
  600. // If the pod no longer exists, don't even log the error.
  601. if !errors.IsNotFound(err) {
  602. utilruntime.HandleError(err)
  603. }
  604. } else {
  605. matchesAndControlled = append(matchesAndControlled, pod)
  606. }
  607. }
  608. filteredPods = matchesAndControlled
  609. // remove the controllerRef for the pods that no longer have matching labels
  610. var errlist []error
  611. for _, pod := range controlledDoesNotMatch {
  612. err := cm.ReleasePod(pod)
  613. if err != nil {
  614. errlist = append(errlist, err)
  615. }
  616. }
  617. if len(errlist) != 0 {
  618. aggregate := utilerrors.NewAggregate(errlist)
  619. // push the RC into work queue again. We need to try to free the
  620. // pods again otherwise they will stuck with the stale
  621. // controllerRef.
  622. rm.queue.Add(key)
  623. return aggregate
  624. }
  625. } else {
  626. pods, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelectorPreValidated())
  627. if err != nil {
  628. glog.Errorf("Error getting pods for rc %q: %v", key, err)
  629. rm.queue.Add(key)
  630. return err
  631. }
  632. filteredPods = controller.FilterActivePods(pods)
  633. }
  634. var manageReplicasErr error
  635. if rcNeedsSync && rc.DeletionTimestamp == nil {
  636. manageReplicasErr = rm.manageReplicas(filteredPods, &rc)
  637. }
  638. trace.Step("manageReplicas done")
  639. // Count the number of pods that have labels matching the labels of the pod
  640. // template of the replication controller, the matching pods may have more
  641. // labels than are in the template. Because the label of podTemplateSpec is
  642. // a superset of the selector of the replication controller, so the possible
  643. // matching pods must be part of the filteredPods.
  644. fullyLabeledReplicasCount := 0
  645. readyReplicasCount := 0
  646. templateLabel := labels.Set(rc.Spec.Template.Labels).AsSelectorPreValidated()
  647. for _, pod := range filteredPods {
  648. if templateLabel.Matches(labels.Set(pod.Labels)) {
  649. fullyLabeledReplicasCount++
  650. }
  651. if api.IsPodReady(pod) {
  652. readyReplicasCount++
  653. }
  654. }
  655. // Always updates status as pods come up or die.
  656. if err := updateReplicaCount(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, len(filteredPods), fullyLabeledReplicasCount, readyReplicasCount); err != nil {
  657. // Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop
  658. return err
  659. }
  660. return manageReplicasErr
  661. }