rate_limited_queue.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package node
  14. import (
  15. "container/heap"
  16. "sync"
  17. "time"
  18. "k8s.io/kubernetes/pkg/util/flowcontrol"
  19. "k8s.io/kubernetes/pkg/util/sets"
  20. "github.com/golang/glog"
  21. )
  22. // TimedValue is a value that should be processed at a designated time.
  23. type TimedValue struct {
  24. Value string
  25. // UID could be anything that helps identify the value
  26. UID interface{}
  27. AddedAt time.Time
  28. ProcessAt time.Time
  29. }
  30. // now is used to test time
  31. var now func() time.Time = time.Now
  32. // TimedQueue is a priority heap where the lowest ProcessAt is at the front of the queue
  33. type TimedQueue []*TimedValue
  34. func (h TimedQueue) Len() int { return len(h) }
  35. func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) }
  36. func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  37. func (h *TimedQueue) Push(x interface{}) {
  38. *h = append(*h, x.(*TimedValue))
  39. }
  40. func (h *TimedQueue) Pop() interface{} {
  41. old := *h
  42. n := len(old)
  43. x := old[n-1]
  44. *h = old[0 : n-1]
  45. return x
  46. }
  47. // A FIFO queue which additionally guarantees that any element can be added only once until
  48. // it is removed.
  49. type UniqueQueue struct {
  50. lock sync.Mutex
  51. queue TimedQueue
  52. set sets.String
  53. }
  54. // Adds a new value to the queue if it wasn't added before, or was explicitly removed by the
  55. // Remove call. Returns true if new value was added.
  56. func (q *UniqueQueue) Add(value TimedValue) bool {
  57. q.lock.Lock()
  58. defer q.lock.Unlock()
  59. if q.set.Has(value.Value) {
  60. return false
  61. }
  62. heap.Push(&q.queue, &value)
  63. q.set.Insert(value.Value)
  64. return true
  65. }
  66. // Replace replaces an existing value in the queue if it already exists, otherwise it does nothing.
  67. // Returns true if the item was found.
  68. func (q *UniqueQueue) Replace(value TimedValue) bool {
  69. q.lock.Lock()
  70. defer q.lock.Unlock()
  71. for i := range q.queue {
  72. if q.queue[i].Value != value.Value {
  73. continue
  74. }
  75. heap.Remove(&q.queue, i)
  76. heap.Push(&q.queue, &value)
  77. return true
  78. }
  79. return false
  80. }
  81. // Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
  82. // of the given value. If the value is not present does nothing and returns false.
  83. func (q *UniqueQueue) Remove(value string) bool {
  84. q.lock.Lock()
  85. defer q.lock.Unlock()
  86. q.set.Delete(value)
  87. for i, val := range q.queue {
  88. if val.Value == value {
  89. heap.Remove(&q.queue, i)
  90. return true
  91. }
  92. }
  93. return false
  94. }
  95. // Returns the oldest added value that wasn't returned yet.
  96. func (q *UniqueQueue) Get() (TimedValue, bool) {
  97. q.lock.Lock()
  98. defer q.lock.Unlock()
  99. if len(q.queue) == 0 {
  100. return TimedValue{}, false
  101. }
  102. result := heap.Pop(&q.queue).(*TimedValue)
  103. q.set.Delete(result.Value)
  104. return *result, true
  105. }
  106. // Head returns the oldest added value that wasn't returned yet without removing it.
  107. func (q *UniqueQueue) Head() (TimedValue, bool) {
  108. q.lock.Lock()
  109. defer q.lock.Unlock()
  110. if len(q.queue) == 0 {
  111. return TimedValue{}, false
  112. }
  113. result := q.queue[0]
  114. return *result, true
  115. }
  116. // Clear removes all items from the queue and duplication preventing set.
  117. func (q *UniqueQueue) Clear() {
  118. q.lock.Lock()
  119. defer q.lock.Unlock()
  120. if q.queue.Len() > 0 {
  121. q.queue = make(TimedQueue, 0)
  122. }
  123. if len(q.set) > 0 {
  124. q.set = sets.NewString()
  125. }
  126. }
  127. // RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time
  128. // of execution. It is also rate limited.
  129. type RateLimitedTimedQueue struct {
  130. queue UniqueQueue
  131. limiterLock sync.Mutex
  132. limiter flowcontrol.RateLimiter
  133. }
  134. // Creates new queue which will use given RateLimiter to oversee execution.
  135. func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimedQueue {
  136. return &RateLimitedTimedQueue{
  137. queue: UniqueQueue{
  138. queue: TimedQueue{},
  139. set: sets.NewString(),
  140. },
  141. limiter: limiter,
  142. }
  143. }
  144. // ActionFunc takes a timed value and returns false if the item must be retried, with an optional
  145. // time.Duration if some minimum wait interval should be used.
  146. type ActionFunc func(TimedValue) (bool, time.Duration)
  147. // Try processes the queue. Ends prematurely if RateLimiter forbids an action and leak is true.
  148. // Otherwise, requeues the item to be processed. Each value is processed once if fn returns true,
  149. // otherwise it is added back to the queue. The returned remaining is used to identify the minimum
  150. // time to execute the next item in the queue.
  151. func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
  152. val, ok := q.queue.Head()
  153. q.limiterLock.Lock()
  154. defer q.limiterLock.Unlock()
  155. for ok {
  156. // rate limit the queue checking
  157. if !q.limiter.TryAccept() {
  158. glog.V(10).Infof("Try rate limited for value: %v", val)
  159. // Try again later
  160. break
  161. }
  162. now := now()
  163. if now.Before(val.ProcessAt) {
  164. break
  165. }
  166. if ok, wait := fn(val); !ok {
  167. val.ProcessAt = now.Add(wait + 1)
  168. q.queue.Replace(val)
  169. } else {
  170. q.queue.Remove(val.Value)
  171. }
  172. val, ok = q.queue.Head()
  173. }
  174. }
  175. // Adds value to the queue to be processed. Won't add the same value(comparsion by value) a second time
  176. // if it was already added and not removed.
  177. func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool {
  178. now := now()
  179. return q.queue.Add(TimedValue{
  180. Value: value,
  181. UID: uid,
  182. AddedAt: now,
  183. ProcessAt: now,
  184. })
  185. }
  186. // Removes Node from the Evictor. The Node won't be processed until added again.
  187. func (q *RateLimitedTimedQueue) Remove(value string) bool {
  188. return q.queue.Remove(value)
  189. }
  190. // Removes all items from the queue
  191. func (q *RateLimitedTimedQueue) Clear() {
  192. q.queue.Clear()
  193. }
  194. // SwapLimiter safely swaps current limiter for this queue with the passed one if capacities or qps's differ.
  195. func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) {
  196. q.limiterLock.Lock()
  197. defer q.limiterLock.Unlock()
  198. if q.limiter.QPS() == newQPS {
  199. return
  200. }
  201. var newLimiter flowcontrol.RateLimiter
  202. if newQPS <= 0 {
  203. newLimiter = flowcontrol.NewFakeNeverRateLimiter()
  204. } else {
  205. newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, evictionRateLimiterBurst)
  206. }
  207. // If we're currently waiting on limiter, we drain the new one - this is a good approach when Burst value is 1
  208. // TODO: figure out if we need to support higher Burst values and decide on the drain logic, should we keep:
  209. // - saturation (percentage of used tokens)
  210. // - number of used tokens
  211. // - number of available tokens
  212. // - something else
  213. for q.limiter.Saturation() > newLimiter.Saturation() {
  214. // Check if we're not using fake limiter
  215. previousSaturation := newLimiter.Saturation()
  216. newLimiter.TryAccept()
  217. // It's a fake limiter
  218. if newLimiter.Saturation() == previousSaturation {
  219. break
  220. }
  221. }
  222. q.limiter.Stop()
  223. q.limiter = newLimiter
  224. }