iterator.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package petset
  14. import (
  15. "fmt"
  16. "sort"
  17. "github.com/golang/glog"
  18. "k8s.io/kubernetes/pkg/api"
  19. "k8s.io/kubernetes/pkg/apis/apps"
  20. "k8s.io/kubernetes/pkg/controller"
  21. )
  22. // newPCB generates a new PCB using the id string as a unique qualifier
  23. func newPCB(id string, ps *apps.PetSet) (*pcb, error) {
  24. petPod, err := controller.GetPodFromTemplate(&ps.Spec.Template, ps, nil)
  25. if err != nil {
  26. return nil, err
  27. }
  28. for _, im := range newIdentityMappers(ps) {
  29. im.SetIdentity(id, petPod)
  30. }
  31. petPVCs := []api.PersistentVolumeClaim{}
  32. vMapper := &VolumeIdentityMapper{ps}
  33. for _, c := range vMapper.GetClaims(id) {
  34. petPVCs = append(petPVCs, c)
  35. }
  36. // TODO: Replace id field with IdentityHash, since id is more than just an index.
  37. return &pcb{pod: petPod, pvcs: petPVCs, id: id, parent: ps}, nil
  38. }
  39. // petQueue is a custom datastructure that's resembles a queue of pets.
  40. type petQueue struct {
  41. pets []*pcb
  42. idMapper identityMapper
  43. }
  44. // enqueue enqueues the given pet, evicting any pets with the same id
  45. func (pt *petQueue) enqueue(p *pcb) {
  46. if p == nil {
  47. pt.pets = append(pt.pets, nil)
  48. return
  49. }
  50. // Pop an existing pet from the know list, append the new pet to the end.
  51. petList := []*pcb{}
  52. petID := pt.idMapper.Identity(p.pod)
  53. for i := range pt.pets {
  54. if petID != pt.idMapper.Identity(pt.pets[i].pod) {
  55. petList = append(petList, pt.pets[i])
  56. }
  57. }
  58. pt.pets = petList
  59. p.event = syncPet
  60. pt.pets = append(pt.pets, p)
  61. }
  62. // dequeue returns the last element of the queue
  63. func (pt *petQueue) dequeue() *pcb {
  64. if pt.empty() {
  65. glog.Warningf("Dequeue invoked on an empty queue")
  66. return nil
  67. }
  68. l := len(pt.pets) - 1
  69. pet := pt.pets[l]
  70. pt.pets = pt.pets[:l]
  71. return pet
  72. }
  73. // empty returns true if the pet queue is empty.
  74. func (pt *petQueue) empty() bool {
  75. return len(pt.pets) == 0
  76. }
  77. // NewPetQueue returns a queue for tracking pets
  78. func NewPetQueue(ps *apps.PetSet, podList []*api.Pod) *petQueue {
  79. pt := petQueue{pets: []*pcb{}, idMapper: &NameIdentityMapper{ps}}
  80. // Seed the queue with existing pets. Assume all pets are scheduled for
  81. // deletion, enqueuing a pet will "undelete" it. We always want to delete
  82. // from the higher ids, so sort by creation timestamp.
  83. sort.Sort(PodsByCreationTimestamp(podList))
  84. vMapper := VolumeIdentityMapper{ps}
  85. for i := range podList {
  86. pod := podList[i]
  87. pt.pets = append(pt.pets, &pcb{pod: pod, pvcs: vMapper.GetClaimsForPet(pod), parent: ps, event: deletePet, id: fmt.Sprintf("%v", i)})
  88. }
  89. return &pt
  90. }
  91. // petsetIterator implements a simple iterator over pets in the given petset.
  92. type petSetIterator struct {
  93. // ps is the petset for this iterator.
  94. ps *apps.PetSet
  95. // queue contains the elements to iterate over.
  96. queue *petQueue
  97. // errs is a list because we always want the iterator to drain.
  98. errs []error
  99. // petCount is the number of pets iterated over.
  100. petCount int
  101. }
  102. // Next returns true for as long as there are elements in the underlying queue.
  103. func (pi *petSetIterator) Next() bool {
  104. var pet *pcb
  105. var err error
  106. if pi.petCount < pi.ps.Spec.Replicas {
  107. pet, err = newPCB(fmt.Sprintf("%d", pi.petCount), pi.ps)
  108. if err != nil {
  109. pi.errs = append(pi.errs, err)
  110. // Don't stop iterating over the set on errors. Caller handles nil.
  111. pet = nil
  112. }
  113. pi.queue.enqueue(pet)
  114. pi.petCount++
  115. }
  116. // Keep the iterator running till we've deleted pets in the queue.
  117. return !pi.queue.empty()
  118. }
  119. // Value dequeues an element from the queue.
  120. func (pi *petSetIterator) Value() *pcb {
  121. return pi.queue.dequeue()
  122. }
  123. // NewPetSetIterator returns a new iterator. All pods in the given podList
  124. // are used to seed the queue of the iterator.
  125. func NewPetSetIterator(ps *apps.PetSet, podList []*api.Pod) *petSetIterator {
  126. pi := &petSetIterator{
  127. ps: ps,
  128. queue: NewPetQueue(ps, podList),
  129. errs: []error{},
  130. petCount: 0,
  131. }
  132. return pi
  133. }
  134. // PodsByCreationTimestamp sorts a list of Pods by creation timestamp, using their names as a tie breaker.
  135. type PodsByCreationTimestamp []*api.Pod
  136. func (o PodsByCreationTimestamp) Len() int { return len(o) }
  137. func (o PodsByCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
  138. func (o PodsByCreationTimestamp) Less(i, j int) bool {
  139. if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
  140. return o[i].Name < o[j].Name
  141. }
  142. return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
  143. }