fifo.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "sync"
  17. "k8s.io/apimachinery/pkg/util/sets"
  18. )
  19. // PopProcessFunc is passed to Pop() method of Queue interface.
  20. // It is supposed to process the element popped from the queue.
  21. type PopProcessFunc func(interface{}) error
  22. // ErrRequeue may be returned by a PopProcessFunc to safely requeue
  23. // the current item. The value of Err will be returned from Pop.
  24. type ErrRequeue struct {
  25. // Err is returned by the Pop function
  26. Err error
  27. }
  28. var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue")
  29. func (e ErrRequeue) Error() string {
  30. if e.Err == nil {
  31. return "the popped item should be requeued without returning an error"
  32. }
  33. return e.Err.Error()
  34. }
  35. // Queue is exactly like a Store, but has a Pop() method too.
  36. type Queue interface {
  37. Store
  38. // Pop blocks until it has something to process.
  39. // It returns the object that was process and the result of processing.
  40. // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
  41. // should be requeued before releasing the lock on the queue.
  42. Pop(PopProcessFunc) (interface{}, error)
  43. // AddIfNotPresent adds a value previously
  44. // returned by Pop back into the queue as long
  45. // as nothing else (presumably more recent)
  46. // has since been added.
  47. AddIfNotPresent(interface{}) error
  48. // Return true if the first batch of items has been popped
  49. HasSynced() bool
  50. // Close queue
  51. Close()
  52. }
  53. // Helper function for popping from Queue.
  54. // WARNING: Do NOT use this function in non-test code to avoid races
  55. // unless you really really really really know what you are doing.
  56. func Pop(queue Queue) interface{} {
  57. var result interface{}
  58. queue.Pop(func(obj interface{}) error {
  59. result = obj
  60. return nil
  61. })
  62. return result
  63. }
  64. // FIFO receives adds and updates from a Reflector, and puts them in a queue for
  65. // FIFO order processing. If multiple adds/updates of a single item happen while
  66. // an item is in the queue before it has been processed, it will only be
  67. // processed once, and when it is processed, the most recent version will be
  68. // processed. This can't be done with a channel.
  69. //
  70. // FIFO solves this use case:
  71. // * You want to process every object (exactly) once.
  72. // * You want to process the most recent version of the object when you process it.
  73. // * You do not want to process deleted objects, they should be removed from the queue.
  74. // * You do not want to periodically reprocess objects.
  75. // Compare with DeltaFIFO for other use cases.
  76. type FIFO struct {
  77. lock sync.RWMutex
  78. cond sync.Cond
  79. // We depend on the property that items in the set are in the queue and vice versa.
  80. items map[string]interface{}
  81. queue []string
  82. // populated is true if the first batch of items inserted by Replace() has been populated
  83. // or Delete/Add/Update was called first.
  84. populated bool
  85. // initialPopulationCount is the number of items inserted by the first call of Replace()
  86. initialPopulationCount int
  87. // keyFunc is used to make the key used for queued item insertion and retrieval, and
  88. // should be deterministic.
  89. keyFunc KeyFunc
  90. // Indication the queue is closed.
  91. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
  92. // Currently, not used to gate any of CRED operations.
  93. closed bool
  94. closedLock sync.Mutex
  95. }
  96. var (
  97. _ = Queue(&FIFO{}) // FIFO is a Queue
  98. )
  99. // Close the queue.
  100. func (f *FIFO) Close() {
  101. f.closedLock.Lock()
  102. defer f.closedLock.Unlock()
  103. f.closed = true
  104. f.cond.Broadcast()
  105. }
  106. // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
  107. // or an Update called first but the first batch of items inserted by Replace() has been popped
  108. func (f *FIFO) HasSynced() bool {
  109. f.lock.Lock()
  110. defer f.lock.Unlock()
  111. return f.populated && f.initialPopulationCount == 0
  112. }
  113. // Add inserts an item, and puts it in the queue. The item is only enqueued
  114. // if it doesn't already exist in the set.
  115. func (f *FIFO) Add(obj interface{}) error {
  116. id, err := f.keyFunc(obj)
  117. if err != nil {
  118. return KeyError{obj, err}
  119. }
  120. f.lock.Lock()
  121. defer f.lock.Unlock()
  122. f.populated = true
  123. if _, exists := f.items[id]; !exists {
  124. f.queue = append(f.queue, id)
  125. }
  126. f.items[id] = obj
  127. f.cond.Broadcast()
  128. return nil
  129. }
  130. // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
  131. // present in the set, it is neither enqueued nor added to the set.
  132. //
  133. // This is useful in a single producer/consumer scenario so that the consumer can
  134. // safely retry items without contending with the producer and potentially enqueueing
  135. // stale items.
  136. func (f *FIFO) AddIfNotPresent(obj interface{}) error {
  137. id, err := f.keyFunc(obj)
  138. if err != nil {
  139. return KeyError{obj, err}
  140. }
  141. f.lock.Lock()
  142. defer f.lock.Unlock()
  143. f.addIfNotPresent(id, obj)
  144. return nil
  145. }
  146. // addIfNotPresent assumes the fifo lock is already held and adds the the provided
  147. // item to the queue under id if it does not already exist.
  148. func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
  149. f.populated = true
  150. if _, exists := f.items[id]; exists {
  151. return
  152. }
  153. f.queue = append(f.queue, id)
  154. f.items[id] = obj
  155. f.cond.Broadcast()
  156. }
  157. // Update is the same as Add in this implementation.
  158. func (f *FIFO) Update(obj interface{}) error {
  159. return f.Add(obj)
  160. }
  161. // Delete removes an item. It doesn't add it to the queue, because
  162. // this implementation assumes the consumer only cares about the objects,
  163. // not the order in which they were created/added.
  164. func (f *FIFO) Delete(obj interface{}) error {
  165. id, err := f.keyFunc(obj)
  166. if err != nil {
  167. return KeyError{obj, err}
  168. }
  169. f.lock.Lock()
  170. defer f.lock.Unlock()
  171. f.populated = true
  172. delete(f.items, id)
  173. return err
  174. }
  175. // List returns a list of all the items.
  176. func (f *FIFO) List() []interface{} {
  177. f.lock.RLock()
  178. defer f.lock.RUnlock()
  179. list := make([]interface{}, 0, len(f.items))
  180. for _, item := range f.items {
  181. list = append(list, item)
  182. }
  183. return list
  184. }
  185. // ListKeys returns a list of all the keys of the objects currently
  186. // in the FIFO.
  187. func (f *FIFO) ListKeys() []string {
  188. f.lock.RLock()
  189. defer f.lock.RUnlock()
  190. list := make([]string, 0, len(f.items))
  191. for key := range f.items {
  192. list = append(list, key)
  193. }
  194. return list
  195. }
  196. // Get returns the requested item, or sets exists=false.
  197. func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
  198. key, err := f.keyFunc(obj)
  199. if err != nil {
  200. return nil, false, KeyError{obj, err}
  201. }
  202. return f.GetByKey(key)
  203. }
  204. // GetByKey returns the requested item, or sets exists=false.
  205. func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
  206. f.lock.RLock()
  207. defer f.lock.RUnlock()
  208. item, exists = f.items[key]
  209. return item, exists, nil
  210. }
  211. // Checks if the queue is closed
  212. func (f *FIFO) IsClosed() bool {
  213. f.closedLock.Lock()
  214. defer f.closedLock.Unlock()
  215. if f.closed {
  216. return true
  217. }
  218. return false
  219. }
  220. // Pop waits until an item is ready and processes it. If multiple items are
  221. // ready, they are returned in the order in which they were added/updated.
  222. // The item is removed from the queue (and the store) before it is processed,
  223. // so if you don't successfully process it, it should be added back with
  224. // AddIfNotPresent(). process function is called under lock, so it is safe
  225. // update data structures in it that need to be in sync with the queue.
  226. func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
  227. f.lock.Lock()
  228. defer f.lock.Unlock()
  229. for {
  230. for len(f.queue) == 0 {
  231. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  232. // When Close() is called, the f.closed is set and the condition is broadcasted.
  233. // Which causes this loop to continue and return from the Pop().
  234. if f.IsClosed() {
  235. return nil, FIFOClosedError
  236. }
  237. f.cond.Wait()
  238. }
  239. id := f.queue[0]
  240. f.queue = f.queue[1:]
  241. if f.initialPopulationCount > 0 {
  242. f.initialPopulationCount--
  243. }
  244. item, ok := f.items[id]
  245. if !ok {
  246. // Item may have been deleted subsequently.
  247. continue
  248. }
  249. delete(f.items, id)
  250. err := process(item)
  251. if e, ok := err.(ErrRequeue); ok {
  252. f.addIfNotPresent(id, item)
  253. err = e.Err
  254. }
  255. return item, err
  256. }
  257. }
  258. // Replace will delete the contents of 'f', using instead the given map.
  259. // 'f' takes ownership of the map, you should not reference the map again
  260. // after calling this function. f's queue is reset, too; upon return, it
  261. // will contain the items in the map, in no particular order.
  262. func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
  263. items := map[string]interface{}{}
  264. for _, item := range list {
  265. key, err := f.keyFunc(item)
  266. if err != nil {
  267. return KeyError{item, err}
  268. }
  269. items[key] = item
  270. }
  271. f.lock.Lock()
  272. defer f.lock.Unlock()
  273. if !f.populated {
  274. f.populated = true
  275. f.initialPopulationCount = len(items)
  276. }
  277. f.items = items
  278. f.queue = f.queue[:0]
  279. for id := range items {
  280. f.queue = append(f.queue, id)
  281. }
  282. if len(f.queue) > 0 {
  283. f.cond.Broadcast()
  284. }
  285. return nil
  286. }
  287. // Resync will touch all objects to put them into the processing queue
  288. func (f *FIFO) Resync() error {
  289. f.lock.Lock()
  290. defer f.lock.Unlock()
  291. inQueue := sets.NewString()
  292. for _, id := range f.queue {
  293. inQueue.Insert(id)
  294. }
  295. for id := range f.items {
  296. if !inQueue.Has(id) {
  297. f.queue = append(f.queue, id)
  298. }
  299. }
  300. if len(f.queue) > 0 {
  301. f.cond.Broadcast()
  302. }
  303. return nil
  304. }
  305. // NewFIFO returns a Store which can be used to queue up items to
  306. // process.
  307. func NewFIFO(keyFunc KeyFunc) *FIFO {
  308. f := &FIFO{
  309. items: map[string]interface{}{},
  310. queue: []string{},
  311. keyFunc: keyFunc,
  312. }
  313. f.cond.L = &f.lock
  314. return f
  315. }