fifo.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "sync"
  16. "k8s.io/kubernetes/pkg/util/sets"
  17. )
  18. // PopProcessFunc is passed to Pop() method of Queue interface.
  19. // It is supposed to process the element popped from the queue.
  20. type PopProcessFunc func(interface{}) error
  21. // ErrRequeue may be returned by a PopProcessFunc to safely requeue
  22. // the current item. The value of Err will be returned from Pop.
  23. type ErrRequeue struct {
  24. // Err is returned by the Pop function
  25. Err error
  26. }
  27. func (e ErrRequeue) Error() string {
  28. if e.Err == nil {
  29. return "the popped item should be requeued without returning an error"
  30. }
  31. return e.Err.Error()
  32. }
  33. // Queue is exactly like a Store, but has a Pop() method too.
  34. type Queue interface {
  35. Store
  36. // Pop blocks until it has something to process.
  37. // It returns the object that was process and the result of processing.
  38. // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
  39. // should be requeued before releasing the lock on the queue.
  40. Pop(PopProcessFunc) (interface{}, error)
  41. // AddIfNotPresent adds a value previously
  42. // returned by Pop back into the queue as long
  43. // as nothing else (presumably more recent)
  44. // has since been added.
  45. AddIfNotPresent(interface{}) error
  46. // Return true if the first batch of items has been popped
  47. HasSynced() bool
  48. }
  49. // Helper function for popping from Queue.
  50. // WARNING: Do NOT use this function in non-test code to avoid races
  51. // unless you really really really really know what you are doing.
  52. func Pop(queue Queue) interface{} {
  53. var result interface{}
  54. queue.Pop(func(obj interface{}) error {
  55. result = obj
  56. return nil
  57. })
  58. return result
  59. }
  60. // FIFO receives adds and updates from a Reflector, and puts them in a queue for
  61. // FIFO order processing. If multiple adds/updates of a single item happen while
  62. // an item is in the queue before it has been processed, it will only be
  63. // processed once, and when it is processed, the most recent version will be
  64. // processed. This can't be done with a channel.
  65. //
  66. // FIFO solves this use case:
  67. // * You want to process every object (exactly) once.
  68. // * You want to process the most recent version of the object when you process it.
  69. // * You do not want to process deleted objects, they should be removed from the queue.
  70. // * You do not want to periodically reprocess objects.
  71. // Compare with DeltaFIFO for other use cases.
  72. type FIFO struct {
  73. lock sync.RWMutex
  74. cond sync.Cond
  75. // We depend on the property that items in the set are in the queue and vice versa.
  76. items map[string]interface{}
  77. queue []string
  78. // populated is true if the first batch of items inserted by Replace() has been populated
  79. // or Delete/Add/Update was called first.
  80. populated bool
  81. // initialPopulationCount is the number of items inserted by the first call of Replace()
  82. initialPopulationCount int
  83. // keyFunc is used to make the key used for queued item insertion and retrieval, and
  84. // should be deterministic.
  85. keyFunc KeyFunc
  86. }
  87. var (
  88. _ = Queue(&FIFO{}) // FIFO is a Queue
  89. )
  90. // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
  91. // or an Update called first but the first batch of items inserted by Replace() has been popped
  92. func (f *FIFO) HasSynced() bool {
  93. f.lock.Lock()
  94. defer f.lock.Unlock()
  95. return f.populated && f.initialPopulationCount == 0
  96. }
  97. // Add inserts an item, and puts it in the queue. The item is only enqueued
  98. // if it doesn't already exist in the set.
  99. func (f *FIFO) Add(obj interface{}) error {
  100. id, err := f.keyFunc(obj)
  101. if err != nil {
  102. return KeyError{obj, err}
  103. }
  104. f.lock.Lock()
  105. defer f.lock.Unlock()
  106. f.populated = true
  107. if _, exists := f.items[id]; !exists {
  108. f.queue = append(f.queue, id)
  109. }
  110. f.items[id] = obj
  111. f.cond.Broadcast()
  112. return nil
  113. }
  114. // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
  115. // present in the set, it is neither enqueued nor added to the set.
  116. //
  117. // This is useful in a single producer/consumer scenario so that the consumer can
  118. // safely retry items without contending with the producer and potentially enqueueing
  119. // stale items.
  120. func (f *FIFO) AddIfNotPresent(obj interface{}) error {
  121. id, err := f.keyFunc(obj)
  122. if err != nil {
  123. return KeyError{obj, err}
  124. }
  125. f.lock.Lock()
  126. defer f.lock.Unlock()
  127. f.addIfNotPresent(id, obj)
  128. return nil
  129. }
  130. // addIfNotPresent assumes the fifo lock is already held and adds the the provided
  131. // item to the queue under id if it does not already exist.
  132. func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
  133. f.populated = true
  134. if _, exists := f.items[id]; exists {
  135. return
  136. }
  137. f.queue = append(f.queue, id)
  138. f.items[id] = obj
  139. f.cond.Broadcast()
  140. }
  141. // Update is the same as Add in this implementation.
  142. func (f *FIFO) Update(obj interface{}) error {
  143. return f.Add(obj)
  144. }
  145. // Delete removes an item. It doesn't add it to the queue, because
  146. // this implementation assumes the consumer only cares about the objects,
  147. // not the order in which they were created/added.
  148. func (f *FIFO) Delete(obj interface{}) error {
  149. id, err := f.keyFunc(obj)
  150. if err != nil {
  151. return KeyError{obj, err}
  152. }
  153. f.lock.Lock()
  154. defer f.lock.Unlock()
  155. f.populated = true
  156. delete(f.items, id)
  157. return err
  158. }
  159. // List returns a list of all the items.
  160. func (f *FIFO) List() []interface{} {
  161. f.lock.RLock()
  162. defer f.lock.RUnlock()
  163. list := make([]interface{}, 0, len(f.items))
  164. for _, item := range f.items {
  165. list = append(list, item)
  166. }
  167. return list
  168. }
  169. // ListKeys returns a list of all the keys of the objects currently
  170. // in the FIFO.
  171. func (f *FIFO) ListKeys() []string {
  172. f.lock.RLock()
  173. defer f.lock.RUnlock()
  174. list := make([]string, 0, len(f.items))
  175. for key := range f.items {
  176. list = append(list, key)
  177. }
  178. return list
  179. }
  180. // Get returns the requested item, or sets exists=false.
  181. func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
  182. key, err := f.keyFunc(obj)
  183. if err != nil {
  184. return nil, false, KeyError{obj, err}
  185. }
  186. return f.GetByKey(key)
  187. }
  188. // GetByKey returns the requested item, or sets exists=false.
  189. func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
  190. f.lock.RLock()
  191. defer f.lock.RUnlock()
  192. item, exists = f.items[key]
  193. return item, exists, nil
  194. }
  195. // Pop waits until an item is ready and processes it. If multiple items are
  196. // ready, they are returned in the order in which they were added/updated.
  197. // The item is removed from the queue (and the store) before it is processed,
  198. // so if you don't successfully process it, it should be added back with
  199. // AddIfNotPresent(). process function is called under lock, so it is safe
  200. // update data structures in it that need to be in sync with the queue.
  201. func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
  202. f.lock.Lock()
  203. defer f.lock.Unlock()
  204. for {
  205. for len(f.queue) == 0 {
  206. f.cond.Wait()
  207. }
  208. id := f.queue[0]
  209. f.queue = f.queue[1:]
  210. if f.initialPopulationCount > 0 {
  211. f.initialPopulationCount--
  212. }
  213. item, ok := f.items[id]
  214. if !ok {
  215. // Item may have been deleted subsequently.
  216. continue
  217. }
  218. delete(f.items, id)
  219. err := process(item)
  220. if e, ok := err.(ErrRequeue); ok {
  221. f.addIfNotPresent(id, item)
  222. err = e.Err
  223. }
  224. return item, err
  225. }
  226. }
  227. // Replace will delete the contents of 'f', using instead the given map.
  228. // 'f' takes ownership of the map, you should not reference the map again
  229. // after calling this function. f's queue is reset, too; upon return, it
  230. // will contain the items in the map, in no particular order.
  231. func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
  232. items := map[string]interface{}{}
  233. for _, item := range list {
  234. key, err := f.keyFunc(item)
  235. if err != nil {
  236. return KeyError{item, err}
  237. }
  238. items[key] = item
  239. }
  240. f.lock.Lock()
  241. defer f.lock.Unlock()
  242. if !f.populated {
  243. f.populated = true
  244. f.initialPopulationCount = len(items)
  245. }
  246. f.items = items
  247. f.queue = f.queue[:0]
  248. for id := range items {
  249. f.queue = append(f.queue, id)
  250. }
  251. if len(f.queue) > 0 {
  252. f.cond.Broadcast()
  253. }
  254. return nil
  255. }
  256. // Resync will touch all objects to put them into the processing queue
  257. func (f *FIFO) Resync() error {
  258. f.lock.Lock()
  259. defer f.lock.Unlock()
  260. inQueue := sets.NewString()
  261. for _, id := range f.queue {
  262. inQueue.Insert(id)
  263. }
  264. for id := range f.items {
  265. if !inQueue.Has(id) {
  266. f.queue = append(f.queue, id)
  267. }
  268. }
  269. if len(f.queue) > 0 {
  270. f.cond.Broadcast()
  271. }
  272. return nil
  273. }
  274. // NewFIFO returns a Store which can be used to queue up items to
  275. // process.
  276. func NewFIFO(keyFunc KeyFunc) *FIFO {
  277. f := &FIFO{
  278. items: map[string]interface{}{},
  279. queue: []string{},
  280. keyFunc: keyFunc,
  281. }
  282. f.cond.L = &f.lock
  283. return f
  284. }