jobcontroller.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package job
  14. import (
  15. "reflect"
  16. "sort"
  17. "sync"
  18. "time"
  19. "github.com/golang/glog"
  20. "k8s.io/kubernetes/pkg/api"
  21. "k8s.io/kubernetes/pkg/api/unversioned"
  22. "k8s.io/kubernetes/pkg/apis/batch"
  23. "k8s.io/kubernetes/pkg/client/cache"
  24. clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
  25. unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
  26. "k8s.io/kubernetes/pkg/client/record"
  27. "k8s.io/kubernetes/pkg/controller"
  28. "k8s.io/kubernetes/pkg/controller/framework"
  29. "k8s.io/kubernetes/pkg/controller/framework/informers"
  30. replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
  31. "k8s.io/kubernetes/pkg/runtime"
  32. "k8s.io/kubernetes/pkg/util/metrics"
  33. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  34. "k8s.io/kubernetes/pkg/util/wait"
  35. "k8s.io/kubernetes/pkg/util/workqueue"
  36. "k8s.io/kubernetes/pkg/watch"
  37. )
  38. type JobController struct {
  39. kubeClient clientset.Interface
  40. podControl controller.PodControlInterface
  41. // internalPodInformer is used to hold a personal informer. If we're using
  42. // a normal shared informer, then the informer will be started for us. If
  43. // we have a personal informer, we must start it ourselves. If you start
  44. // the controller using NewJobController(passing SharedInformer), this
  45. // will be null
  46. internalPodInformer framework.SharedInformer
  47. // To allow injection of updateJobStatus for testing.
  48. updateHandler func(job *batch.Job) error
  49. syncHandler func(jobKey string) error
  50. // podStoreSynced returns true if the pod store has been synced at least once.
  51. // Added as a member to the struct to allow injection for testing.
  52. podStoreSynced func() bool
  53. // A TTLCache of pod creates/deletes each rc expects to see
  54. expectations controller.ControllerExpectationsInterface
  55. // A store of job, populated by the jobController
  56. jobStore cache.StoreToJobLister
  57. // Watches changes to all jobs
  58. jobController *framework.Controller
  59. // A store of pods, populated by the podController
  60. podStore cache.StoreToPodLister
  61. // Jobs that need to be updated
  62. queue *workqueue.Type
  63. recorder record.EventRecorder
  64. }
  65. func NewJobController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface) *JobController {
  66. eventBroadcaster := record.NewBroadcaster()
  67. eventBroadcaster.StartLogging(glog.Infof)
  68. // TODO: remove the wrapper when every clients have moved to use the clientset.
  69. eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
  70. if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
  71. metrics.RegisterMetricAndTrackRateLimiterUsage("job_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
  72. }
  73. jm := &JobController{
  74. kubeClient: kubeClient,
  75. podControl: controller.RealPodControl{
  76. KubeClient: kubeClient,
  77. Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
  78. },
  79. expectations: controller.NewControllerExpectations(),
  80. queue: workqueue.NewNamed("job"),
  81. recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "job-controller"}),
  82. }
  83. jm.jobStore.Store, jm.jobController = framework.NewInformer(
  84. &cache.ListWatch{
  85. ListFunc: func(options api.ListOptions) (runtime.Object, error) {
  86. return jm.kubeClient.Batch().Jobs(api.NamespaceAll).List(options)
  87. },
  88. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  89. return jm.kubeClient.Batch().Jobs(api.NamespaceAll).Watch(options)
  90. },
  91. },
  92. &batch.Job{},
  93. // TODO: Can we have much longer period here?
  94. replicationcontroller.FullControllerResyncPeriod,
  95. framework.ResourceEventHandlerFuncs{
  96. AddFunc: jm.enqueueController,
  97. UpdateFunc: func(old, cur interface{}) {
  98. if job := cur.(*batch.Job); !IsJobFinished(job) {
  99. jm.enqueueController(job)
  100. }
  101. },
  102. DeleteFunc: jm.enqueueController,
  103. },
  104. )
  105. podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
  106. AddFunc: jm.addPod,
  107. UpdateFunc: jm.updatePod,
  108. DeleteFunc: jm.deletePod,
  109. })
  110. jm.podStore.Indexer = podInformer.GetIndexer()
  111. jm.podStoreSynced = podInformer.HasSynced
  112. jm.updateHandler = jm.updateJobStatus
  113. jm.syncHandler = jm.syncJob
  114. return jm
  115. }
  116. func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
  117. podInformer := informers.NewPodInformer(kubeClient, resyncPeriod())
  118. jm := NewJobController(podInformer, kubeClient)
  119. jm.internalPodInformer = podInformer
  120. return jm
  121. }
  122. // Run the main goroutine responsible for watching and syncing jobs.
  123. func (jm *JobController) Run(workers int, stopCh <-chan struct{}) {
  124. defer utilruntime.HandleCrash()
  125. go jm.jobController.Run(stopCh)
  126. for i := 0; i < workers; i++ {
  127. go wait.Until(jm.worker, time.Second, stopCh)
  128. }
  129. if jm.internalPodInformer != nil {
  130. go jm.internalPodInformer.Run(stopCh)
  131. }
  132. <-stopCh
  133. glog.Infof("Shutting down Job Manager")
  134. jm.queue.ShutDown()
  135. }
  136. // getPodJob returns the job managing the given pod.
  137. func (jm *JobController) getPodJob(pod *api.Pod) *batch.Job {
  138. jobs, err := jm.jobStore.GetPodJobs(pod)
  139. if err != nil {
  140. glog.V(4).Infof("No jobs found for pod %v, job controller will avoid syncing", pod.Name)
  141. return nil
  142. }
  143. if len(jobs) > 1 {
  144. glog.Errorf("user error! more than one job is selecting pods with labels: %+v", pod.Labels)
  145. sort.Sort(byCreationTimestamp(jobs))
  146. }
  147. return &jobs[0]
  148. }
  149. // When a pod is created, enqueue the controller that manages it and update it's expectations.
  150. func (jm *JobController) addPod(obj interface{}) {
  151. pod := obj.(*api.Pod)
  152. if pod.DeletionTimestamp != nil {
  153. // on a restart of the controller controller, it's possible a new pod shows up in a state that
  154. // is already pending deletion. Prevent the pod from being a creation observation.
  155. jm.deletePod(pod)
  156. return
  157. }
  158. if job := jm.getPodJob(pod); job != nil {
  159. jobKey, err := controller.KeyFunc(job)
  160. if err != nil {
  161. glog.Errorf("Couldn't get key for job %#v: %v", job, err)
  162. return
  163. }
  164. jm.expectations.CreationObserved(jobKey)
  165. jm.enqueueController(job)
  166. }
  167. }
  168. // When a pod is updated, figure out what job/s manage it and wake them up.
  169. // If the labels of the pod have changed we need to awaken both the old
  170. // and new job. old and cur must be *api.Pod types.
  171. func (jm *JobController) updatePod(old, cur interface{}) {
  172. curPod := cur.(*api.Pod)
  173. oldPod := old.(*api.Pod)
  174. if curPod.ResourceVersion == oldPod.ResourceVersion {
  175. // Periodic resync will send update events for all known pods.
  176. // Two different versions of the same pod will always have different RVs.
  177. return
  178. }
  179. if curPod.DeletionTimestamp != nil {
  180. // when a pod is deleted gracefully it's deletion timestamp is first modified to reflect a grace period,
  181. // and after such time has passed, the kubelet actually deletes it from the store. We receive an update
  182. // for modification of the deletion timestamp and expect an job to create more pods asap, not wait
  183. // until the kubelet actually deletes the pod.
  184. jm.deletePod(curPod)
  185. return
  186. }
  187. if job := jm.getPodJob(curPod); job != nil {
  188. jm.enqueueController(job)
  189. }
  190. // Only need to get the old job if the labels changed.
  191. if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
  192. // If the old and new job are the same, the first one that syncs
  193. // will set expectations preventing any damage from the second.
  194. if oldJob := jm.getPodJob(oldPod); oldJob != nil {
  195. jm.enqueueController(oldJob)
  196. }
  197. }
  198. }
  199. // When a pod is deleted, enqueue the job that manages the pod and update its expectations.
  200. // obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
  201. func (jm *JobController) deletePod(obj interface{}) {
  202. pod, ok := obj.(*api.Pod)
  203. // When a delete is dropped, the relist will notice a pod in the store not
  204. // in the list, leading to the insertion of a tombstone object which contains
  205. // the deleted key/value. Note that this value might be stale. If the pod
  206. // changed labels the new job will not be woken up till the periodic resync.
  207. if !ok {
  208. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  209. if !ok {
  210. glog.Errorf("Couldn't get object from tombstone %+v", obj)
  211. return
  212. }
  213. pod, ok = tombstone.Obj.(*api.Pod)
  214. if !ok {
  215. glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
  216. return
  217. }
  218. }
  219. if job := jm.getPodJob(pod); job != nil {
  220. jobKey, err := controller.KeyFunc(job)
  221. if err != nil {
  222. glog.Errorf("Couldn't get key for job %#v: %v", job, err)
  223. return
  224. }
  225. jm.expectations.DeletionObserved(jobKey)
  226. jm.enqueueController(job)
  227. }
  228. }
  229. // obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
  230. func (jm *JobController) enqueueController(obj interface{}) {
  231. key, err := controller.KeyFunc(obj)
  232. if err != nil {
  233. glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
  234. return
  235. }
  236. // TODO: Handle overlapping controllers better. Either disallow them at admission time or
  237. // deterministically avoid syncing controllers that fight over pods. Currently, we only
  238. // ensure that the same controller is synced for a given pod. When we periodically relist
  239. // all controllers there will still be some replica instability. One way to handle this is
  240. // by querying the store for all controllers that this rc overlaps, as well as all
  241. // controllers that overlap this rc, and sorting them.
  242. jm.queue.Add(key)
  243. }
  244. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  245. // It enforces that the syncHandler is never invoked concurrently with the same key.
  246. func (jm *JobController) worker() {
  247. for {
  248. func() {
  249. key, quit := jm.queue.Get()
  250. if quit {
  251. return
  252. }
  253. defer jm.queue.Done(key)
  254. err := jm.syncHandler(key.(string))
  255. if err != nil {
  256. glog.Errorf("Error syncing job: %v", err)
  257. }
  258. }()
  259. }
  260. }
  261. // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
  262. // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
  263. // concurrently with the same key.
  264. func (jm *JobController) syncJob(key string) error {
  265. startTime := time.Now()
  266. defer func() {
  267. glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
  268. }()
  269. if !jm.podStoreSynced() {
  270. // Sleep so we give the pod reflector goroutine a chance to run.
  271. time.Sleep(replicationcontroller.PodStoreSyncedPollPeriod)
  272. glog.V(4).Infof("Waiting for pods controller to sync, requeuing job %v", key)
  273. jm.queue.Add(key)
  274. return nil
  275. }
  276. obj, exists, err := jm.jobStore.Store.GetByKey(key)
  277. if !exists {
  278. glog.V(4).Infof("Job has been deleted: %v", key)
  279. jm.expectations.DeleteExpectations(key)
  280. return nil
  281. }
  282. if err != nil {
  283. glog.Errorf("Unable to retrieve job %v from store: %v", key, err)
  284. jm.queue.Add(key)
  285. return err
  286. }
  287. job := *obj.(*batch.Job)
  288. // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
  289. // and update the expectations after we've retrieved active pods from the store. If a new pod enters
  290. // the store after we've checked the expectation, the job sync is just deferred till the next relist.
  291. jobKey, err := controller.KeyFunc(&job)
  292. if err != nil {
  293. glog.Errorf("Couldn't get key for job %#v: %v", job, err)
  294. return err
  295. }
  296. jobNeedsSync := jm.expectations.SatisfiedExpectations(jobKey)
  297. selector, _ := unversioned.LabelSelectorAsSelector(job.Spec.Selector)
  298. pods, err := jm.podStore.Pods(job.Namespace).List(selector)
  299. if err != nil {
  300. glog.Errorf("Error getting pods for job %q: %v", key, err)
  301. jm.queue.Add(key)
  302. return err
  303. }
  304. activePods := controller.FilterActivePods(pods)
  305. active := int32(len(activePods))
  306. succeeded, failed := getStatus(pods)
  307. conditions := len(job.Status.Conditions)
  308. if job.Status.StartTime == nil {
  309. now := unversioned.Now()
  310. job.Status.StartTime = &now
  311. }
  312. // if job was finished previously, we don't want to redo the termination
  313. if IsJobFinished(&job) {
  314. return nil
  315. }
  316. if pastActiveDeadline(&job) {
  317. // TODO: below code should be replaced with pod termination resulting in
  318. // pod failures, rather than killing pods. Unfortunately none such solution
  319. // exists ATM. There's an open discussion in the topic in
  320. // https://github.com/kubernetes/kubernetes/issues/14602 which might give
  321. // some sort of solution to above problem.
  322. // kill remaining active pods
  323. wait := sync.WaitGroup{}
  324. wait.Add(int(active))
  325. for i := int32(0); i < active; i++ {
  326. go func(ix int32) {
  327. defer wait.Done()
  328. if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, &job); err != nil {
  329. defer utilruntime.HandleError(err)
  330. }
  331. }(i)
  332. }
  333. wait.Wait()
  334. // update status values accordingly
  335. failed += active
  336. active = 0
  337. job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
  338. jm.recorder.Event(&job, api.EventTypeNormal, "DeadlineExceeded", "Job was active longer than specified deadline")
  339. } else {
  340. if jobNeedsSync && job.DeletionTimestamp == nil {
  341. active = jm.manageJob(activePods, succeeded, &job)
  342. }
  343. completions := succeeded
  344. complete := false
  345. if job.Spec.Completions == nil {
  346. // This type of job is complete when any pod exits with success.
  347. // Each pod is capable of
  348. // determining whether or not the entire Job is done. Subsequent pods are
  349. // not expected to fail, but if they do, the failure is ignored. Once any
  350. // pod succeeds, the controller waits for remaining pods to finish, and
  351. // then the job is complete.
  352. if succeeded > 0 && active == 0 {
  353. complete = true
  354. }
  355. } else {
  356. // Job specifies a number of completions. This type of job signals
  357. // success by having that number of successes. Since we do not
  358. // start more pods than there are remaining completions, there should
  359. // not be any remaining active pods once this count is reached.
  360. if completions >= *job.Spec.Completions {
  361. complete = true
  362. if active > 0 {
  363. jm.recorder.Event(&job, api.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")
  364. }
  365. if completions > *job.Spec.Completions {
  366. jm.recorder.Event(&job, api.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
  367. }
  368. }
  369. }
  370. if complete {
  371. job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
  372. now := unversioned.Now()
  373. job.Status.CompletionTime = &now
  374. }
  375. }
  376. // no need to update the job if the status hasn't changed since last time
  377. if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
  378. job.Status.Active = active
  379. job.Status.Succeeded = succeeded
  380. job.Status.Failed = failed
  381. if err := jm.updateHandler(&job); err != nil {
  382. glog.Errorf("Failed to update job %v, requeuing. Error: %v", job.Name, err)
  383. jm.enqueueController(&job)
  384. }
  385. }
  386. return nil
  387. }
  388. // pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
  389. func pastActiveDeadline(job *batch.Job) bool {
  390. if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {
  391. return false
  392. }
  393. now := unversioned.Now()
  394. start := job.Status.StartTime.Time
  395. duration := now.Time.Sub(start)
  396. allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second
  397. return duration >= allowedDuration
  398. }
  399. func newCondition(conditionType batch.JobConditionType, reason, message string) batch.JobCondition {
  400. return batch.JobCondition{
  401. Type: conditionType,
  402. Status: api.ConditionTrue,
  403. LastProbeTime: unversioned.Now(),
  404. LastTransitionTime: unversioned.Now(),
  405. Reason: reason,
  406. Message: message,
  407. }
  408. }
  409. // getStatus returns no of succeeded and failed pods running a job
  410. func getStatus(pods []*api.Pod) (succeeded, failed int32) {
  411. succeeded = int32(filterPods(pods, api.PodSucceeded))
  412. failed = int32(filterPods(pods, api.PodFailed))
  413. return
  414. }
  415. // manageJob is the core method responsible for managing the number of running
  416. // pods according to what is specified in the job.Spec.
  417. // Does NOT modify <activePods>.
  418. func (jm *JobController) manageJob(activePods []*api.Pod, succeeded int32, job *batch.Job) int32 {
  419. var activeLock sync.Mutex
  420. active := int32(len(activePods))
  421. parallelism := *job.Spec.Parallelism
  422. jobKey, err := controller.KeyFunc(job)
  423. if err != nil {
  424. glog.Errorf("Couldn't get key for job %#v: %v", job, err)
  425. return 0
  426. }
  427. if active > parallelism {
  428. diff := active - parallelism
  429. jm.expectations.ExpectDeletions(jobKey, int(diff))
  430. glog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff)
  431. // Sort the pods in the order such that not-ready < ready, unscheduled
  432. // < scheduled, and pending < running. This ensures that we delete pods
  433. // in the earlier stages whenever possible.
  434. sort.Sort(controller.ActivePods(activePods))
  435. active -= diff
  436. wait := sync.WaitGroup{}
  437. wait.Add(int(diff))
  438. for i := int32(0); i < diff; i++ {
  439. go func(ix int32) {
  440. defer wait.Done()
  441. if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil {
  442. defer utilruntime.HandleError(err)
  443. // Decrement the expected number of deletes because the informer won't observe this deletion
  444. jm.expectations.DeletionObserved(jobKey)
  445. activeLock.Lock()
  446. active++
  447. activeLock.Unlock()
  448. }
  449. }(i)
  450. }
  451. wait.Wait()
  452. } else if active < parallelism {
  453. wantActive := int32(0)
  454. if job.Spec.Completions == nil {
  455. // Job does not specify a number of completions. Therefore, number active
  456. // should be equal to parallelism, unless the job has seen at least
  457. // once success, in which leave whatever is running, running.
  458. if succeeded > 0 {
  459. wantActive = active
  460. } else {
  461. wantActive = parallelism
  462. }
  463. } else {
  464. // Job specifies a specific number of completions. Therefore, number
  465. // active should not ever exceed number of remaining completions.
  466. wantActive = *job.Spec.Completions - succeeded
  467. if wantActive > parallelism {
  468. wantActive = parallelism
  469. }
  470. }
  471. diff := wantActive - active
  472. if diff < 0 {
  473. glog.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active)
  474. diff = 0
  475. }
  476. jm.expectations.ExpectCreations(jobKey, int(diff))
  477. glog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)
  478. active += diff
  479. wait := sync.WaitGroup{}
  480. wait.Add(int(diff))
  481. for i := int32(0); i < diff; i++ {
  482. go func() {
  483. defer wait.Done()
  484. if err := jm.podControl.CreatePods(job.Namespace, &job.Spec.Template, job); err != nil {
  485. defer utilruntime.HandleError(err)
  486. // Decrement the expected number of creates because the informer won't observe this pod
  487. jm.expectations.CreationObserved(jobKey)
  488. activeLock.Lock()
  489. active--
  490. activeLock.Unlock()
  491. }
  492. }()
  493. }
  494. wait.Wait()
  495. }
  496. return active
  497. }
  498. func (jm *JobController) updateJobStatus(job *batch.Job) error {
  499. _, err := jm.kubeClient.Batch().Jobs(job.Namespace).UpdateStatus(job)
  500. return err
  501. }
  502. // filterPods returns pods based on their phase.
  503. func filterPods(pods []*api.Pod, phase api.PodPhase) int {
  504. result := 0
  505. for i := range pods {
  506. if phase == pods[i].Status.Phase {
  507. result++
  508. }
  509. }
  510. return result
  511. }
  512. // byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
  513. type byCreationTimestamp []batch.Job
  514. func (o byCreationTimestamp) Len() int { return len(o) }
  515. func (o byCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
  516. func (o byCreationTimestamp) Less(i, j int) bool {
  517. if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
  518. return o[i].Name < o[j].Name
  519. }
  520. return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
  521. }