fifo.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "sync"
  17. "k8s.io/apimachinery/pkg/util/sets"
  18. )
  19. // PopProcessFunc is passed to Pop() method of Queue interface.
  20. // It is supposed to process the accumulator popped from the queue.
  21. type PopProcessFunc func(interface{}) error
  22. // ErrRequeue may be returned by a PopProcessFunc to safely requeue
  23. // the current item. The value of Err will be returned from Pop.
  24. type ErrRequeue struct {
  25. // Err is returned by the Pop function
  26. Err error
  27. }
  28. // ErrFIFOClosed used when FIFO is closed
  29. var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue")
  30. func (e ErrRequeue) Error() string {
  31. if e.Err == nil {
  32. return "the popped item should be requeued without returning an error"
  33. }
  34. return e.Err.Error()
  35. }
  36. // Queue extends Store with a collection of Store keys to "process".
  37. // Every Add, Update, or Delete may put the object's key in that collection.
  38. // A Queue has a way to derive the corresponding key given an accumulator.
  39. // A Queue can be accessed concurrently from multiple goroutines.
  40. // A Queue can be "closed", after which Pop operations return an error.
  41. type Queue interface {
  42. Store
  43. // Pop blocks until there is at least one key to process or the
  44. // Queue is closed. In the latter case Pop returns with an error.
  45. // In the former case Pop atomically picks one key to process,
  46. // removes that (key, accumulator) association from the Store, and
  47. // processes the accumulator. Pop returns the accumulator that
  48. // was processed and the result of processing. The PopProcessFunc
  49. // may return an ErrRequeue{inner} and in this case Pop will (a)
  50. // return that (key, accumulator) association to the Queue as part
  51. // of the atomic processing and (b) return the inner error from
  52. // Pop.
  53. Pop(PopProcessFunc) (interface{}, error)
  54. // AddIfNotPresent puts the given accumulator into the Queue (in
  55. // association with the accumulator's key) if and only if that key
  56. // is not already associated with a non-empty accumulator.
  57. AddIfNotPresent(interface{}) error
  58. // HasSynced returns true if the first batch of keys have all been
  59. // popped. The first batch of keys are those of the first Replace
  60. // operation if that happened before any Add, AddIfNotPresent,
  61. // Update, or Delete; otherwise the first batch is empty.
  62. HasSynced() bool
  63. // Close the queue
  64. Close()
  65. }
  66. // Pop is helper function for popping from Queue.
  67. // WARNING: Do NOT use this function in non-test code to avoid races
  68. // unless you really really really really know what you are doing.
  69. func Pop(queue Queue) interface{} {
  70. var result interface{}
  71. queue.Pop(func(obj interface{}) error {
  72. result = obj
  73. return nil
  74. })
  75. return result
  76. }
  77. // FIFO is a Queue in which (a) each accumulator is simply the most
  78. // recently provided object and (b) the collection of keys to process
  79. // is a FIFO. The accumulators all start out empty, and deleting an
  80. // object from its accumulator empties the accumulator. The Resync
  81. // operation is a no-op.
  82. //
  83. // Thus: if multiple adds/updates of a single object happen while that
  84. // object's key is in the queue before it has been processed then it
  85. // will only be processed once, and when it is processed the most
  86. // recent version will be processed. This can't be done with a channel
  87. //
  88. // FIFO solves this use case:
  89. // * You want to process every object (exactly) once.
  90. // * You want to process the most recent version of the object when you process it.
  91. // * You do not want to process deleted objects, they should be removed from the queue.
  92. // * You do not want to periodically reprocess objects.
  93. // Compare with DeltaFIFO for other use cases.
  94. type FIFO struct {
  95. lock sync.RWMutex
  96. cond sync.Cond
  97. // We depend on the property that every key in `items` is also in `queue`
  98. items map[string]interface{}
  99. queue []string
  100. // populated is true if the first batch of items inserted by Replace() has been populated
  101. // or Delete/Add/Update was called first.
  102. populated bool
  103. // initialPopulationCount is the number of items inserted by the first call of Replace()
  104. initialPopulationCount int
  105. // keyFunc is used to make the key used for queued item insertion and retrieval, and
  106. // should be deterministic.
  107. keyFunc KeyFunc
  108. // Indication the queue is closed.
  109. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
  110. // Currently, not used to gate any of CRED operations.
  111. closed bool
  112. }
  113. var (
  114. _ = Queue(&FIFO{}) // FIFO is a Queue
  115. )
  116. // Close the queue.
  117. func (f *FIFO) Close() {
  118. f.lock.Lock()
  119. defer f.lock.Unlock()
  120. f.closed = true
  121. f.cond.Broadcast()
  122. }
  123. // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
  124. // or the first batch of items inserted by Replace() has been popped.
  125. func (f *FIFO) HasSynced() bool {
  126. f.lock.Lock()
  127. defer f.lock.Unlock()
  128. return f.populated && f.initialPopulationCount == 0
  129. }
  130. // Add inserts an item, and puts it in the queue. The item is only enqueued
  131. // if it doesn't already exist in the set.
  132. func (f *FIFO) Add(obj interface{}) error {
  133. id, err := f.keyFunc(obj)
  134. if err != nil {
  135. return KeyError{obj, err}
  136. }
  137. f.lock.Lock()
  138. defer f.lock.Unlock()
  139. f.populated = true
  140. if _, exists := f.items[id]; !exists {
  141. f.queue = append(f.queue, id)
  142. }
  143. f.items[id] = obj
  144. f.cond.Broadcast()
  145. return nil
  146. }
  147. // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
  148. // present in the set, it is neither enqueued nor added to the set.
  149. //
  150. // This is useful in a single producer/consumer scenario so that the consumer can
  151. // safely retry items without contending with the producer and potentially enqueueing
  152. // stale items.
  153. func (f *FIFO) AddIfNotPresent(obj interface{}) error {
  154. id, err := f.keyFunc(obj)
  155. if err != nil {
  156. return KeyError{obj, err}
  157. }
  158. f.lock.Lock()
  159. defer f.lock.Unlock()
  160. f.addIfNotPresent(id, obj)
  161. return nil
  162. }
  163. // addIfNotPresent assumes the fifo lock is already held and adds the provided
  164. // item to the queue under id if it does not already exist.
  165. func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
  166. f.populated = true
  167. if _, exists := f.items[id]; exists {
  168. return
  169. }
  170. f.queue = append(f.queue, id)
  171. f.items[id] = obj
  172. f.cond.Broadcast()
  173. }
  174. // Update is the same as Add in this implementation.
  175. func (f *FIFO) Update(obj interface{}) error {
  176. return f.Add(obj)
  177. }
  178. // Delete removes an item. It doesn't add it to the queue, because
  179. // this implementation assumes the consumer only cares about the objects,
  180. // not the order in which they were created/added.
  181. func (f *FIFO) Delete(obj interface{}) error {
  182. id, err := f.keyFunc(obj)
  183. if err != nil {
  184. return KeyError{obj, err}
  185. }
  186. f.lock.Lock()
  187. defer f.lock.Unlock()
  188. f.populated = true
  189. delete(f.items, id)
  190. return err
  191. }
  192. // List returns a list of all the items.
  193. func (f *FIFO) List() []interface{} {
  194. f.lock.RLock()
  195. defer f.lock.RUnlock()
  196. list := make([]interface{}, 0, len(f.items))
  197. for _, item := range f.items {
  198. list = append(list, item)
  199. }
  200. return list
  201. }
  202. // ListKeys returns a list of all the keys of the objects currently
  203. // in the FIFO.
  204. func (f *FIFO) ListKeys() []string {
  205. f.lock.RLock()
  206. defer f.lock.RUnlock()
  207. list := make([]string, 0, len(f.items))
  208. for key := range f.items {
  209. list = append(list, key)
  210. }
  211. return list
  212. }
  213. // Get returns the requested item, or sets exists=false.
  214. func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
  215. key, err := f.keyFunc(obj)
  216. if err != nil {
  217. return nil, false, KeyError{obj, err}
  218. }
  219. return f.GetByKey(key)
  220. }
  221. // GetByKey returns the requested item, or sets exists=false.
  222. func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
  223. f.lock.RLock()
  224. defer f.lock.RUnlock()
  225. item, exists = f.items[key]
  226. return item, exists, nil
  227. }
  228. // IsClosed checks if the queue is closed
  229. func (f *FIFO) IsClosed() bool {
  230. f.lock.Lock()
  231. defer f.lock.Unlock()
  232. if f.closed {
  233. return true
  234. }
  235. return false
  236. }
  237. // Pop waits until an item is ready and processes it. If multiple items are
  238. // ready, they are returned in the order in which they were added/updated.
  239. // The item is removed from the queue (and the store) before it is processed,
  240. // so if you don't successfully process it, it should be added back with
  241. // AddIfNotPresent(). process function is called under lock, so it is safe
  242. // update data structures in it that need to be in sync with the queue.
  243. func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
  244. f.lock.Lock()
  245. defer f.lock.Unlock()
  246. for {
  247. for len(f.queue) == 0 {
  248. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  249. // When Close() is called, the f.closed is set and the condition is broadcasted.
  250. // Which causes this loop to continue and return from the Pop().
  251. if f.closed {
  252. return nil, ErrFIFOClosed
  253. }
  254. f.cond.Wait()
  255. }
  256. id := f.queue[0]
  257. f.queue = f.queue[1:]
  258. if f.initialPopulationCount > 0 {
  259. f.initialPopulationCount--
  260. }
  261. item, ok := f.items[id]
  262. if !ok {
  263. // Item may have been deleted subsequently.
  264. continue
  265. }
  266. delete(f.items, id)
  267. err := process(item)
  268. if e, ok := err.(ErrRequeue); ok {
  269. f.addIfNotPresent(id, item)
  270. err = e.Err
  271. }
  272. return item, err
  273. }
  274. }
  275. // Replace will delete the contents of 'f', using instead the given map.
  276. // 'f' takes ownership of the map, you should not reference the map again
  277. // after calling this function. f's queue is reset, too; upon return, it
  278. // will contain the items in the map, in no particular order.
  279. func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
  280. items := make(map[string]interface{}, len(list))
  281. for _, item := range list {
  282. key, err := f.keyFunc(item)
  283. if err != nil {
  284. return KeyError{item, err}
  285. }
  286. items[key] = item
  287. }
  288. f.lock.Lock()
  289. defer f.lock.Unlock()
  290. if !f.populated {
  291. f.populated = true
  292. f.initialPopulationCount = len(items)
  293. }
  294. f.items = items
  295. f.queue = f.queue[:0]
  296. for id := range items {
  297. f.queue = append(f.queue, id)
  298. }
  299. if len(f.queue) > 0 {
  300. f.cond.Broadcast()
  301. }
  302. return nil
  303. }
  304. // Resync will ensure that every object in the Store has its key in the queue.
  305. // This should be a no-op, because that property is maintained by all operations.
  306. func (f *FIFO) Resync() error {
  307. f.lock.Lock()
  308. defer f.lock.Unlock()
  309. inQueue := sets.NewString()
  310. for _, id := range f.queue {
  311. inQueue.Insert(id)
  312. }
  313. for id := range f.items {
  314. if !inQueue.Has(id) {
  315. f.queue = append(f.queue, id)
  316. }
  317. }
  318. if len(f.queue) > 0 {
  319. f.cond.Broadcast()
  320. }
  321. return nil
  322. }
  323. // NewFIFO returns a Store which can be used to queue up items to
  324. // process.
  325. func NewFIFO(keyFunc KeyFunc) *FIFO {
  326. f := &FIFO{
  327. items: map[string]interface{}{},
  328. queue: []string{},
  329. keyFunc: keyFunc,
  330. }
  331. f.cond.L = &f.lock
  332. return f
  333. }