delta_fifo.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "sync"
  18. "k8s.io/apimachinery/pkg/util/sets"
  19. "k8s.io/klog/v2"
  20. )
  21. // NewDeltaFIFO returns a Queue which can be used to process changes to items.
  22. //
  23. // keyFunc is used to figure out what key an object should have. (It is
  24. // exposed in the returned DeltaFIFO's KeyOf() method, with additional handling
  25. // around deleted objects and queue state).
  26. //
  27. // 'knownObjects' may be supplied to modify the behavior of Delete,
  28. // Replace, and Resync. It may be nil if you do not need those
  29. // modifications.
  30. //
  31. // TODO: consider merging keyLister with this object, tracking a list of
  32. // "known" keys when Pop() is called. Have to think about how that
  33. // affects error retrying.
  34. // NOTE: It is possible to misuse this and cause a race when using an
  35. // external known object source.
  36. // Whether there is a potential race depends on how the consumer
  37. // modifies knownObjects. In Pop(), process function is called under
  38. // lock, so it is safe to update data structures in it that need to be
  39. // in sync with the queue (e.g. knownObjects).
  40. //
  41. // Example:
  42. // In case of sharedIndexInformer being a consumer
  43. // (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/
  44. // src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
  45. // there is no race as knownObjects (s.indexer) is modified safely
  46. // under DeltaFIFO's lock. The only exceptions are GetStore() and
  47. // GetIndexer() methods, which expose ways to modify the underlying
  48. // storage. Currently these two methods are used for creating Lister
  49. // and internal tests.
  50. //
  51. // Also see the comment on DeltaFIFO.
  52. //
  53. // Warning: This constructs a DeltaFIFO that does not differentiate between
  54. // events caused by a call to Replace (e.g., from a relist, which may
  55. // contain object updates), and synthetic events caused by a periodic resync
  56. // (which just emit the existing object). See https://issue.k8s.io/86015 for details.
  57. //
  58. // Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})`
  59. // instead to receive a `Replaced` event depending on the type.
  60. //
  61. // Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})
  62. func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
  63. return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
  64. KeyFunction: keyFunc,
  65. KnownObjects: knownObjects,
  66. })
  67. }
  68. // DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
  69. // optional.
  70. type DeltaFIFOOptions struct {
  71. // KeyFunction is used to figure out what key an object should have. (It's
  72. // exposed in the returned DeltaFIFO's KeyOf() method, with additional
  73. // handling around deleted objects and queue state).
  74. // Optional, the default is MetaNamespaceKeyFunc.
  75. KeyFunction KeyFunc
  76. // KnownObjects is expected to return a list of keys that the consumer of
  77. // this queue "knows about". It is used to decide which items are missing
  78. // when Replace() is called; 'Deleted' deltas are produced for the missing items.
  79. // KnownObjects may be nil if you can tolerate missing deletions on Replace().
  80. KnownObjects KeyListerGetter
  81. // EmitDeltaTypeReplaced indicates that the queue consumer
  82. // understands the Replaced DeltaType. Before the `Replaced` event type was
  83. // added, calls to Replace() were handled the same as Sync(). For
  84. // backwards-compatibility purposes, this is false by default.
  85. // When true, `Replaced` events will be sent for items passed to a Replace() call.
  86. // When false, `Sync` events will be sent instead.
  87. EmitDeltaTypeReplaced bool
  88. }
  89. // NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
  90. // items. See also the comment on DeltaFIFO.
  91. func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
  92. if opts.KeyFunction == nil {
  93. opts.KeyFunction = MetaNamespaceKeyFunc
  94. }
  95. f := &DeltaFIFO{
  96. items: map[string]Deltas{},
  97. queue: []string{},
  98. keyFunc: opts.KeyFunction,
  99. knownObjects: opts.KnownObjects,
  100. emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
  101. }
  102. f.cond.L = &f.lock
  103. return f
  104. }
  105. // DeltaFIFO is like FIFO, but differs in two ways. One is that the
  106. // accumulator associated with a given object's key is not that object
  107. // but rather a Deltas, which is a slice of Delta values for that
  108. // object. Applying an object to a Deltas means to append a Delta
  109. // except when the potentially appended Delta is a Deleted and the
  110. // Deltas already ends with a Deleted. In that case the Deltas does
  111. // not grow, although the terminal Deleted will be replaced by the new
  112. // Deleted if the older Deleted's object is a
  113. // DeletedFinalStateUnknown.
  114. //
  115. // The other difference is that DeltaFIFO has an additional way that
  116. // an object can be applied to an accumulator, called Sync.
  117. //
  118. // DeltaFIFO is a producer-consumer queue, where a Reflector is
  119. // intended to be the producer, and the consumer is whatever calls
  120. // the Pop() method.
  121. //
  122. // DeltaFIFO solves this use case:
  123. // * You want to process every object change (delta) at most once.
  124. // * When you process an object, you want to see everything
  125. // that's happened to it since you last processed it.
  126. // * You want to process the deletion of some of the objects.
  127. // * You might want to periodically reprocess objects.
  128. //
  129. // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
  130. // interface{} to satisfy the Store/Queue interfaces, but they
  131. // will always return an object of type Deltas. List() returns
  132. // the newest objects currently in the FIFO.
  133. //
  134. // A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
  135. // to list Store keys and to get objects by Store key. The objects in
  136. // question are called "known objects" and this set of objects
  137. // modifies the behavior of the Delete, Replace, and Resync methods
  138. // (each in a different way).
  139. //
  140. // A note on threading: If you call Pop() in parallel from multiple
  141. // threads, you could end up with multiple threads processing slightly
  142. // different versions of the same object.
  143. type DeltaFIFO struct {
  144. // lock/cond protects access to 'items' and 'queue'.
  145. lock sync.RWMutex
  146. cond sync.Cond
  147. // `items` maps keys to Deltas.
  148. // `queue` maintains FIFO order of keys for consumption in Pop().
  149. // We maintain the property that keys in the `items` and `queue` are
  150. // strictly 1:1 mapping, and that all Deltas in `items` should have
  151. // at least one Delta.
  152. items map[string]Deltas
  153. queue []string
  154. // populated is true if the first batch of items inserted by Replace() has been populated
  155. // or Delete/Add/Update/AddIfNotPresent was called first.
  156. populated bool
  157. // initialPopulationCount is the number of items inserted by the first call of Replace()
  158. initialPopulationCount int
  159. // keyFunc is used to make the key used for queued item
  160. // insertion and retrieval, and should be deterministic.
  161. keyFunc KeyFunc
  162. // knownObjects list keys that are "known" --- affecting Delete(),
  163. // Replace(), and Resync()
  164. knownObjects KeyListerGetter
  165. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
  166. // Currently, not used to gate any of CRED operations.
  167. closed bool
  168. // emitDeltaTypeReplaced is whether to emit the Replaced or Sync
  169. // DeltaType when Replace() is called (to preserve backwards compat).
  170. emitDeltaTypeReplaced bool
  171. }
  172. var (
  173. _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
  174. )
  175. var (
  176. // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
  177. // object with zero length is encountered (should be impossible,
  178. // but included for completeness).
  179. ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
  180. )
  181. // Close the queue.
  182. func (f *DeltaFIFO) Close() {
  183. f.lock.Lock()
  184. defer f.lock.Unlock()
  185. f.closed = true
  186. f.cond.Broadcast()
  187. }
  188. // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
  189. // DeletedFinalStateUnknown objects.
  190. func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
  191. if d, ok := obj.(Deltas); ok {
  192. if len(d) == 0 {
  193. return "", KeyError{obj, ErrZeroLengthDeltasObject}
  194. }
  195. obj = d.Newest().Object
  196. }
  197. if d, ok := obj.(DeletedFinalStateUnknown); ok {
  198. return d.Key, nil
  199. }
  200. return f.keyFunc(obj)
  201. }
  202. // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
  203. // or the first batch of items inserted by Replace() has been popped.
  204. func (f *DeltaFIFO) HasSynced() bool {
  205. f.lock.Lock()
  206. defer f.lock.Unlock()
  207. return f.populated && f.initialPopulationCount == 0
  208. }
  209. // Add inserts an item, and puts it in the queue. The item is only enqueued
  210. // if it doesn't already exist in the set.
  211. func (f *DeltaFIFO) Add(obj interface{}) error {
  212. f.lock.Lock()
  213. defer f.lock.Unlock()
  214. f.populated = true
  215. return f.queueActionLocked(Added, obj)
  216. }
  217. // Update is just like Add, but makes an Updated Delta.
  218. func (f *DeltaFIFO) Update(obj interface{}) error {
  219. f.lock.Lock()
  220. defer f.lock.Unlock()
  221. f.populated = true
  222. return f.queueActionLocked(Updated, obj)
  223. }
  224. // Delete is just like Add, but makes a Deleted Delta. If the given
  225. // object does not already exist, it will be ignored. (It may have
  226. // already been deleted by a Replace (re-list), for example.) In this
  227. // method `f.knownObjects`, if not nil, provides (via GetByKey)
  228. // _additional_ objects that are considered to already exist.
  229. func (f *DeltaFIFO) Delete(obj interface{}) error {
  230. id, err := f.KeyOf(obj)
  231. if err != nil {
  232. return KeyError{obj, err}
  233. }
  234. f.lock.Lock()
  235. defer f.lock.Unlock()
  236. f.populated = true
  237. if f.knownObjects == nil {
  238. if _, exists := f.items[id]; !exists {
  239. // Presumably, this was deleted when a relist happened.
  240. // Don't provide a second report of the same deletion.
  241. return nil
  242. }
  243. } else {
  244. // We only want to skip the "deletion" action if the object doesn't
  245. // exist in knownObjects and it doesn't have corresponding item in items.
  246. // Note that even if there is a "deletion" action in items, we can ignore it,
  247. // because it will be deduped automatically in "queueActionLocked"
  248. _, exists, err := f.knownObjects.GetByKey(id)
  249. _, itemsExist := f.items[id]
  250. if err == nil && !exists && !itemsExist {
  251. // Presumably, this was deleted when a relist happened.
  252. // Don't provide a second report of the same deletion.
  253. return nil
  254. }
  255. }
  256. // exist in items and/or KnownObjects
  257. return f.queueActionLocked(Deleted, obj)
  258. }
  259. // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
  260. // present in the set, it is neither enqueued nor added to the set.
  261. //
  262. // This is useful in a single producer/consumer scenario so that the consumer can
  263. // safely retry items without contending with the producer and potentially enqueueing
  264. // stale items.
  265. //
  266. // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
  267. // different from the Add/Update/Delete functions.
  268. func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
  269. deltas, ok := obj.(Deltas)
  270. if !ok {
  271. return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
  272. }
  273. id, err := f.KeyOf(deltas.Newest().Object)
  274. if err != nil {
  275. return KeyError{obj, err}
  276. }
  277. f.lock.Lock()
  278. defer f.lock.Unlock()
  279. f.addIfNotPresent(id, deltas)
  280. return nil
  281. }
  282. // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
  283. // already holds the fifo lock.
  284. func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
  285. f.populated = true
  286. if _, exists := f.items[id]; exists {
  287. return
  288. }
  289. f.queue = append(f.queue, id)
  290. f.items[id] = deltas
  291. f.cond.Broadcast()
  292. }
  293. // re-listing and watching can deliver the same update multiple times in any
  294. // order. This will combine the most recent two deltas if they are the same.
  295. func dedupDeltas(deltas Deltas) Deltas {
  296. n := len(deltas)
  297. if n < 2 {
  298. return deltas
  299. }
  300. a := &deltas[n-1]
  301. b := &deltas[n-2]
  302. if out := isDup(a, b); out != nil {
  303. // `a` and `b` are duplicates. Only keep the one returned from isDup().
  304. // TODO: This extra array allocation and copy seems unnecessary if
  305. // all we do to dedup is compare the new delta with the last element
  306. // in `items`, which could be done by mutating `items` directly.
  307. // Might be worth profiling and investigating if it is safe to optimize.
  308. d := append(Deltas{}, deltas[:n-2]...)
  309. return append(d, *out)
  310. }
  311. return deltas
  312. }
  313. // If a & b represent the same event, returns the delta that ought to be kept.
  314. // Otherwise, returns nil.
  315. // TODO: is there anything other than deletions that need deduping?
  316. func isDup(a, b *Delta) *Delta {
  317. if out := isDeletionDup(a, b); out != nil {
  318. return out
  319. }
  320. // TODO: Detect other duplicate situations? Are there any?
  321. return nil
  322. }
  323. // keep the one with the most information if both are deletions.
  324. func isDeletionDup(a, b *Delta) *Delta {
  325. if b.Type != Deleted || a.Type != Deleted {
  326. return nil
  327. }
  328. // Do more sophisticated checks, or is this sufficient?
  329. if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
  330. return a
  331. }
  332. return b
  333. }
  334. // queueActionLocked appends to the delta list for the object.
  335. // Caller must lock first.
  336. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  337. id, err := f.KeyOf(obj)
  338. if err != nil {
  339. return KeyError{obj, err}
  340. }
  341. newDeltas := append(f.items[id], Delta{actionType, obj})
  342. newDeltas = dedupDeltas(newDeltas)
  343. if len(newDeltas) > 0 {
  344. if _, exists := f.items[id]; !exists {
  345. f.queue = append(f.queue, id)
  346. }
  347. f.items[id] = newDeltas
  348. f.cond.Broadcast()
  349. } else {
  350. // This never happens, because dedupDeltas never returns an empty list
  351. // when given a non-empty list (as it is here).
  352. // But if somehow it ever does return an empty list, then
  353. // We need to remove this from our map (extra items in the queue are
  354. // ignored if they are not in the map).
  355. delete(f.items, id)
  356. }
  357. return nil
  358. }
  359. // List returns a list of all the items; it returns the object
  360. // from the most recent Delta.
  361. // You should treat the items returned inside the deltas as immutable.
  362. func (f *DeltaFIFO) List() []interface{} {
  363. f.lock.RLock()
  364. defer f.lock.RUnlock()
  365. return f.listLocked()
  366. }
  367. func (f *DeltaFIFO) listLocked() []interface{} {
  368. list := make([]interface{}, 0, len(f.items))
  369. for _, item := range f.items {
  370. list = append(list, item.Newest().Object)
  371. }
  372. return list
  373. }
  374. // ListKeys returns a list of all the keys of the objects currently
  375. // in the FIFO.
  376. func (f *DeltaFIFO) ListKeys() []string {
  377. f.lock.RLock()
  378. defer f.lock.RUnlock()
  379. list := make([]string, 0, len(f.items))
  380. for key := range f.items {
  381. list = append(list, key)
  382. }
  383. return list
  384. }
  385. // Get returns the complete list of deltas for the requested item,
  386. // or sets exists=false.
  387. // You should treat the items returned inside the deltas as immutable.
  388. func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
  389. key, err := f.KeyOf(obj)
  390. if err != nil {
  391. return nil, false, KeyError{obj, err}
  392. }
  393. return f.GetByKey(key)
  394. }
  395. // GetByKey returns the complete list of deltas for the requested item,
  396. // setting exists=false if that list is empty.
  397. // You should treat the items returned inside the deltas as immutable.
  398. func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
  399. f.lock.RLock()
  400. defer f.lock.RUnlock()
  401. d, exists := f.items[key]
  402. if exists {
  403. // Copy item's slice so operations on this slice
  404. // won't interfere with the object we return.
  405. d = copyDeltas(d)
  406. }
  407. return d, exists, nil
  408. }
  409. // IsClosed checks if the queue is closed
  410. func (f *DeltaFIFO) IsClosed() bool {
  411. f.lock.Lock()
  412. defer f.lock.Unlock()
  413. return f.closed
  414. }
  415. // Pop blocks until an item is added to the queue, and then returns it. If
  416. // multiple items are ready, they are returned in the order in which they were
  417. // added/updated. The item is removed from the queue (and the store) before it
  418. // is returned, so if you don't successfully process it, you need to add it back
  419. // with AddIfNotPresent().
  420. // process function is called under lock, so it is safe to update data structures
  421. // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
  422. // may return an instance of ErrRequeue with a nested error to indicate the current
  423. // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
  424. // process should avoid expensive I/O operation so that other queue operations, i.e.
  425. // Add() and Get(), won't be blocked for too long.
  426. //
  427. // Pop returns a 'Deltas', which has a complete list of all the things
  428. // that happened to the object (deltas) while it was sitting in the queue.
  429. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  430. f.lock.Lock()
  431. defer f.lock.Unlock()
  432. for {
  433. for len(f.queue) == 0 {
  434. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  435. // When Close() is called, the f.closed is set and the condition is broadcasted.
  436. // Which causes this loop to continue and return from the Pop().
  437. if f.closed {
  438. return nil, ErrFIFOClosed
  439. }
  440. f.cond.Wait()
  441. }
  442. id := f.queue[0]
  443. f.queue = f.queue[1:]
  444. if f.initialPopulationCount > 0 {
  445. f.initialPopulationCount--
  446. }
  447. item, ok := f.items[id]
  448. if !ok {
  449. // Item may have been deleted subsequently.
  450. continue
  451. }
  452. delete(f.items, id)
  453. err := process(item)
  454. if e, ok := err.(ErrRequeue); ok {
  455. f.addIfNotPresent(id, item)
  456. err = e.Err
  457. }
  458. // Don't need to copyDeltas here, because we're transferring
  459. // ownership to the caller.
  460. return item, err
  461. }
  462. }
  463. // Replace atomically does two things: (1) it adds the given objects
  464. // using the Sync or Replace DeltaType and then (2) it does some deletions.
  465. // In particular: for every pre-existing key K that is not the key of
  466. // an object in `list` there is the effect of
  467. // `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
  468. // of K. If `f.knownObjects == nil` then the pre-existing keys are
  469. // those in `f.items` and the current object of K is the `.Newest()`
  470. // of the Deltas associated with K. Otherwise the pre-existing keys
  471. // are those listed by `f.knownObjects` and the current object of K is
  472. // what `f.knownObjects.GetByKey(K)` returns.
  473. func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
  474. f.lock.Lock()
  475. defer f.lock.Unlock()
  476. keys := make(sets.String, len(list))
  477. // keep backwards compat for old clients
  478. action := Sync
  479. if f.emitDeltaTypeReplaced {
  480. action = Replaced
  481. }
  482. // Add Sync/Replaced action for each new item.
  483. for _, item := range list {
  484. key, err := f.KeyOf(item)
  485. if err != nil {
  486. return KeyError{item, err}
  487. }
  488. keys.Insert(key)
  489. if err := f.queueActionLocked(action, item); err != nil {
  490. return fmt.Errorf("couldn't enqueue object: %v", err)
  491. }
  492. }
  493. if f.knownObjects == nil {
  494. // Do deletion detection against our own list.
  495. queuedDeletions := 0
  496. for k, oldItem := range f.items {
  497. if keys.Has(k) {
  498. continue
  499. }
  500. // Delete pre-existing items not in the new list.
  501. // This could happen if watch deletion event was missed while
  502. // disconnected from apiserver.
  503. var deletedObj interface{}
  504. if n := oldItem.Newest(); n != nil {
  505. deletedObj = n.Object
  506. }
  507. queuedDeletions++
  508. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  509. return err
  510. }
  511. }
  512. if !f.populated {
  513. f.populated = true
  514. // While there shouldn't be any queued deletions in the initial
  515. // population of the queue, it's better to be on the safe side.
  516. f.initialPopulationCount = len(list) + queuedDeletions
  517. }
  518. return nil
  519. }
  520. // Detect deletions not already in the queue.
  521. knownKeys := f.knownObjects.ListKeys()
  522. queuedDeletions := 0
  523. for _, k := range knownKeys {
  524. if keys.Has(k) {
  525. continue
  526. }
  527. deletedObj, exists, err := f.knownObjects.GetByKey(k)
  528. if err != nil {
  529. deletedObj = nil
  530. klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
  531. } else if !exists {
  532. deletedObj = nil
  533. klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
  534. }
  535. queuedDeletions++
  536. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  537. return err
  538. }
  539. }
  540. if !f.populated {
  541. f.populated = true
  542. f.initialPopulationCount = len(list) + queuedDeletions
  543. }
  544. return nil
  545. }
  546. // Resync adds, with a Sync type of Delta, every object listed by
  547. // `f.knownObjects` whose key is not already queued for processing.
  548. // If `f.knownObjects` is `nil` then Resync does nothing.
  549. func (f *DeltaFIFO) Resync() error {
  550. f.lock.Lock()
  551. defer f.lock.Unlock()
  552. if f.knownObjects == nil {
  553. return nil
  554. }
  555. keys := f.knownObjects.ListKeys()
  556. for _, k := range keys {
  557. if err := f.syncKeyLocked(k); err != nil {
  558. return err
  559. }
  560. }
  561. return nil
  562. }
  563. func (f *DeltaFIFO) syncKeyLocked(key string) error {
  564. obj, exists, err := f.knownObjects.GetByKey(key)
  565. if err != nil {
  566. klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
  567. return nil
  568. } else if !exists {
  569. klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
  570. return nil
  571. }
  572. // If we are doing Resync() and there is already an event queued for that object,
  573. // we ignore the Resync for it. This is to avoid the race, in which the resync
  574. // comes with the previous value of object (since queueing an event for the object
  575. // doesn't trigger changing the underlying store <knownObjects>.
  576. id, err := f.KeyOf(obj)
  577. if err != nil {
  578. return KeyError{obj, err}
  579. }
  580. if len(f.items[id]) > 0 {
  581. return nil
  582. }
  583. if err := f.queueActionLocked(Sync, obj); err != nil {
  584. return fmt.Errorf("couldn't queue object: %v", err)
  585. }
  586. return nil
  587. }
  588. // A KeyListerGetter is anything that knows how to list its keys and look up by key.
  589. type KeyListerGetter interface {
  590. KeyLister
  591. KeyGetter
  592. }
  593. // A KeyLister is anything that knows how to list its keys.
  594. type KeyLister interface {
  595. ListKeys() []string
  596. }
  597. // A KeyGetter is anything that knows how to get the value stored under a given key.
  598. type KeyGetter interface {
  599. // GetByKey returns the value associated with the key, or sets exists=false.
  600. GetByKey(key string) (value interface{}, exists bool, err error)
  601. }
  602. // DeltaType is the type of a change (addition, deletion, etc)
  603. type DeltaType string
  604. // Change type definition
  605. const (
  606. Added DeltaType = "Added"
  607. Updated DeltaType = "Updated"
  608. Deleted DeltaType = "Deleted"
  609. // Replaced is emitted when we encountered watch errors and had to do a
  610. // relist. We don't know if the replaced object has changed.
  611. //
  612. // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
  613. // as well. Hence, Replaced is only emitted when the option
  614. // EmitDeltaTypeReplaced is true.
  615. Replaced DeltaType = "Replaced"
  616. // Sync is for synthetic events during a periodic resync.
  617. Sync DeltaType = "Sync"
  618. )
  619. // Delta is the type stored by a DeltaFIFO. It tells you what change
  620. // happened, and the object's state after* that change.
  621. //
  622. // [*] Unless the change is a deletion, and then you'll get the final
  623. // state of the object before it was deleted.
  624. type Delta struct {
  625. Type DeltaType
  626. Object interface{}
  627. }
  628. // Deltas is a list of one or more 'Delta's to an individual object.
  629. // The oldest delta is at index 0, the newest delta is the last one.
  630. type Deltas []Delta
  631. // Oldest is a convenience function that returns the oldest delta, or
  632. // nil if there are no deltas.
  633. func (d Deltas) Oldest() *Delta {
  634. if len(d) > 0 {
  635. return &d[0]
  636. }
  637. return nil
  638. }
  639. // Newest is a convenience function that returns the newest delta, or
  640. // nil if there are no deltas.
  641. func (d Deltas) Newest() *Delta {
  642. if n := len(d); n > 0 {
  643. return &d[n-1]
  644. }
  645. return nil
  646. }
  647. // copyDeltas returns a shallow copy of d; that is, it copies the slice but not
  648. // the objects in the slice. This allows Get/List to return an object that we
  649. // know won't be clobbered by a subsequent modifications.
  650. func copyDeltas(d Deltas) Deltas {
  651. d2 := make(Deltas, len(d))
  652. copy(d2, d)
  653. return d2
  654. }
  655. // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object
  656. // was deleted but the watch deletion event was missed while disconnected from
  657. // apiserver. In this case we don't know the final "resting" state of the object, so
  658. // there's a chance the included `Obj` is stale.
  659. type DeletedFinalStateUnknown struct {
  660. Key string
  661. Obj interface{}
  662. }