delaying_queue.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package workqueue
  14. import (
  15. "sort"
  16. "time"
  17. "k8s.io/kubernetes/pkg/util/clock"
  18. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  19. )
  20. // DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
  21. // requeue items after failures without ending up in a hot-loop.
  22. type DelayingInterface interface {
  23. Interface
  24. // AddAfter adds an item to the workqueue after the indicated duration has passed
  25. AddAfter(item interface{}, duration time.Duration)
  26. }
  27. // NewDelayingQueue constructs a new workqueue with delayed queuing ability
  28. func NewDelayingQueue() DelayingInterface {
  29. return newDelayingQueue(clock.RealClock{}, "")
  30. }
  31. func NewNamedDelayingQueue(name string) DelayingInterface {
  32. return newDelayingQueue(clock.RealClock{}, name)
  33. }
  34. func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
  35. ret := &delayingType{
  36. Interface: NewNamed(name),
  37. clock: clock,
  38. heartbeat: clock.Tick(maxWait),
  39. stopCh: make(chan struct{}),
  40. waitingTimeByEntry: map[t]time.Time{},
  41. waitingForAddCh: make(chan waitFor, 1000),
  42. metrics: newRetryMetrics(name),
  43. }
  44. go ret.waitingLoop()
  45. return ret
  46. }
  47. // delayingType wraps an Interface and provides delayed re-enquing
  48. type delayingType struct {
  49. Interface
  50. // clock tracks time for delayed firing
  51. clock clock.Clock
  52. // stopCh lets us signal a shutdown to the waiting loop
  53. stopCh chan struct{}
  54. // heartbeat ensures we wait no more than maxWait before firing
  55. heartbeat <-chan time.Time
  56. // waitingForAdd is an ordered slice of items to be added to the contained work queue
  57. waitingForAdd []waitFor
  58. // waitingTimeByEntry holds wait time by entry, so we can lookup pre-existing indexes
  59. waitingTimeByEntry map[t]time.Time
  60. // waitingForAddCh is a buffered channel that feeds waitingForAdd
  61. waitingForAddCh chan waitFor
  62. // metrics counts the number of retries
  63. metrics retryMetrics
  64. }
  65. // waitFor holds the data to add and the time it should be added
  66. type waitFor struct {
  67. data t
  68. readyAt time.Time
  69. }
  70. // ShutDown gives a way to shut off this queue
  71. func (q *delayingType) ShutDown() {
  72. q.Interface.ShutDown()
  73. close(q.stopCh)
  74. }
  75. // AddAfter adds the given item to the work queue after the given delay
  76. func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
  77. // don't add if we're already shutting down
  78. if q.ShuttingDown() {
  79. return
  80. }
  81. q.metrics.retry()
  82. // immediately add things with no delay
  83. if duration <= 0 {
  84. q.Add(item)
  85. return
  86. }
  87. select {
  88. case <-q.stopCh:
  89. // unblock if ShutDown() is called
  90. case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
  91. }
  92. }
  93. // maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
  94. // Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
  95. // expired item sitting for more than 10 seconds.
  96. const maxWait = 10 * time.Second
  97. // waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
  98. func (q *delayingType) waitingLoop() {
  99. defer utilruntime.HandleCrash()
  100. // Make a placeholder channel to use when there are no items in our list
  101. never := make(<-chan time.Time)
  102. for {
  103. if q.Interface.ShuttingDown() {
  104. // discard waiting entries
  105. q.waitingForAdd = nil
  106. q.waitingTimeByEntry = nil
  107. return
  108. }
  109. now := q.clock.Now()
  110. // Add ready entries
  111. readyEntries := 0
  112. for _, entry := range q.waitingForAdd {
  113. if entry.readyAt.After(now) {
  114. break
  115. }
  116. q.Add(entry.data)
  117. delete(q.waitingTimeByEntry, entry.data)
  118. readyEntries++
  119. }
  120. q.waitingForAdd = q.waitingForAdd[readyEntries:]
  121. // Set up a wait for the first item's readyAt (if one exists)
  122. nextReadyAt := never
  123. if len(q.waitingForAdd) > 0 {
  124. nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now))
  125. }
  126. select {
  127. case <-q.stopCh:
  128. return
  129. case <-q.heartbeat:
  130. // continue the loop, which will add ready items
  131. case <-nextReadyAt:
  132. // continue the loop, which will add ready items
  133. case waitEntry := <-q.waitingForAddCh:
  134. if waitEntry.readyAt.After(q.clock.Now()) {
  135. q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
  136. } else {
  137. q.Add(waitEntry.data)
  138. }
  139. drained := false
  140. for !drained {
  141. select {
  142. case waitEntry := <-q.waitingForAddCh:
  143. if waitEntry.readyAt.After(q.clock.Now()) {
  144. q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
  145. } else {
  146. q.Add(waitEntry.data)
  147. }
  148. default:
  149. drained = true
  150. }
  151. }
  152. }
  153. }
  154. }
  155. // inserts the given entry into the sorted entries list
  156. // same semantics as append()... the given slice may be modified,
  157. // and the returned value should be used
  158. func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor {
  159. // if the entry is already in our retry list and the existing time is before the new one, just skip it
  160. existingTime, exists := knownEntries[entry.data]
  161. if exists && existingTime.Before(entry.readyAt) {
  162. return entries
  163. }
  164. // if the entry exists and is scheduled for later, go ahead and remove the entry
  165. if exists {
  166. if existingIndex := findEntryIndex(entries, existingTime, entry.data); existingIndex >= 0 && existingIndex < len(entries) {
  167. entries = append(entries[:existingIndex], entries[existingIndex+1:]...)
  168. }
  169. }
  170. insertionIndex := sort.Search(len(entries), func(i int) bool {
  171. return entry.readyAt.Before(entries[i].readyAt)
  172. })
  173. // grow by 1
  174. entries = append(entries, waitFor{})
  175. // shift items from the insertion point to the end
  176. copy(entries[insertionIndex+1:], entries[insertionIndex:])
  177. // insert the record
  178. entries[insertionIndex] = entry
  179. knownEntries[entry.data] = entry.readyAt
  180. return entries
  181. }
  182. // findEntryIndex returns the index for an existing entry
  183. func findEntryIndex(entries []waitFor, existingTime time.Time, data t) int {
  184. index := sort.Search(len(entries), func(i int) bool {
  185. return entries[i].readyAt.After(existingTime) || existingTime == entries[i].readyAt
  186. })
  187. // we know this is the earliest possible index, but there could be multiple with the same time
  188. // iterate from here to find the dupe
  189. for ; index < len(entries); index++ {
  190. if entries[index].data == data {
  191. break
  192. }
  193. }
  194. return index
  195. }