delta_fifo.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "sync"
  18. "k8s.io/kubernetes/pkg/util/sets"
  19. "github.com/golang/glog"
  20. )
  21. // NewDeltaFIFO returns a Store which can be used process changes to items.
  22. //
  23. // keyFunc is used to figure out what key an object should have. (It's
  24. // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
  25. //
  26. // 'compressor' may compress as many or as few items as it wants
  27. // (including returning an empty slice), but it should do what it
  28. // does quickly since it is called while the queue is locked.
  29. // 'compressor' may be nil if you don't want any delta compression.
  30. //
  31. // 'keyLister' is expected to return a list of keys that the consumer of
  32. // this queue "knows about". It is used to decide which items are missing
  33. // when Replace() is called; 'Deleted' deltas are produced for these items.
  34. // It may be nil if you don't need to detect all deletions.
  35. // TODO: consider merging keyLister with this object, tracking a list of
  36. // "known" keys when Pop() is called. Have to think about how that
  37. // affects error retrying.
  38. // TODO(lavalamp): I believe there is a possible race only when using an
  39. // external known object source that the above TODO would
  40. // fix.
  41. //
  42. // Also see the comment on DeltaFIFO.
  43. func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
  44. f := &DeltaFIFO{
  45. items: map[string]Deltas{},
  46. queue: []string{},
  47. keyFunc: keyFunc,
  48. deltaCompressor: compressor,
  49. knownObjects: knownObjects,
  50. }
  51. f.cond.L = &f.lock
  52. return f
  53. }
  54. // DeltaFIFO is like FIFO, but allows you to process deletes.
  55. //
  56. // DeltaFIFO is a producer-consumer queue, where a Reflector is
  57. // intended to be the producer, and the consumer is whatever calls
  58. // the Pop() method.
  59. //
  60. // DeltaFIFO solves this use case:
  61. // * You want to process every object change (delta) at most once.
  62. // * When you process an object, you want to see everything
  63. // that's happened to it since you last processed it.
  64. // * You want to process the deletion of objects.
  65. // * You might want to periodically reprocess objects.
  66. //
  67. // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
  68. // interface{} to satisfy the Store/Queue interfaces, but it
  69. // will always return an object of type Deltas.
  70. //
  71. // A note on threading: If you call Pop() in parallel from multiple
  72. // threads, you could end up with multiple threads processing slightly
  73. // different versions of the same object.
  74. //
  75. // A note on the KeyLister used by the DeltaFIFO: It's main purpose is
  76. // to list keys that are "known", for the purpose of figuring out which
  77. // items have been deleted when Replace() or Delete() are called. The deleted
  78. // object will be included in the DeleteFinalStateUnknown markers. These objects
  79. // could be stale.
  80. //
  81. // You may provide a function to compress deltas (e.g., represent a
  82. // series of Updates as a single Update).
  83. type DeltaFIFO struct {
  84. // lock/cond protects access to 'items' and 'queue'.
  85. lock sync.RWMutex
  86. cond sync.Cond
  87. // We depend on the property that items in the set are in
  88. // the queue and vice versa, and that all Deltas in this
  89. // map have at least one Delta.
  90. items map[string]Deltas
  91. queue []string
  92. // populated is true if the first batch of items inserted by Replace() has been populated
  93. // or Delete/Add/Update was called first.
  94. populated bool
  95. // initialPopulationCount is the number of items inserted by the first call of Replace()
  96. initialPopulationCount int
  97. // keyFunc is used to make the key used for queued item
  98. // insertion and retrieval, and should be deterministic.
  99. keyFunc KeyFunc
  100. // deltaCompressor tells us how to combine two or more
  101. // deltas. It may be nil.
  102. deltaCompressor DeltaCompressor
  103. // knownObjects list keys that are "known", for the
  104. // purpose of figuring out which items have been deleted
  105. // when Replace() or Delete() is called.
  106. knownObjects KeyListerGetter
  107. }
  108. var (
  109. _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
  110. )
  111. var (
  112. // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
  113. // object with zero length is encountered (should be impossible,
  114. // even if such an object is accidentally produced by a DeltaCompressor--
  115. // but included for completeness).
  116. ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
  117. )
  118. // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
  119. // DeletedFinalStateUnknown objects.
  120. func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
  121. if d, ok := obj.(Deltas); ok {
  122. if len(d) == 0 {
  123. return "", KeyError{obj, ErrZeroLengthDeltasObject}
  124. }
  125. obj = d.Newest().Object
  126. }
  127. if d, ok := obj.(DeletedFinalStateUnknown); ok {
  128. return d.Key, nil
  129. }
  130. return f.keyFunc(obj)
  131. }
  132. // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
  133. // or an Update called first but the first batch of items inserted by Replace() has been popped
  134. func (f *DeltaFIFO) HasSynced() bool {
  135. f.lock.Lock()
  136. defer f.lock.Unlock()
  137. return f.populated && f.initialPopulationCount == 0
  138. }
  139. // Add inserts an item, and puts it in the queue. The item is only enqueued
  140. // if it doesn't already exist in the set.
  141. func (f *DeltaFIFO) Add(obj interface{}) error {
  142. f.lock.Lock()
  143. defer f.lock.Unlock()
  144. f.populated = true
  145. return f.queueActionLocked(Added, obj)
  146. }
  147. // Update is just like Add, but makes an Updated Delta.
  148. func (f *DeltaFIFO) Update(obj interface{}) error {
  149. f.lock.Lock()
  150. defer f.lock.Unlock()
  151. f.populated = true
  152. return f.queueActionLocked(Updated, obj)
  153. }
  154. // Delete is just like Add, but makes an Deleted Delta. If the item does not
  155. // already exist, it will be ignored. (It may have already been deleted by a
  156. // Replace (re-list), for example.
  157. func (f *DeltaFIFO) Delete(obj interface{}) error {
  158. id, err := f.KeyOf(obj)
  159. if err != nil {
  160. return KeyError{obj, err}
  161. }
  162. f.lock.Lock()
  163. defer f.lock.Unlock()
  164. f.populated = true
  165. if f.knownObjects == nil {
  166. if _, exists := f.items[id]; !exists {
  167. // Presumably, this was deleted when a relist happened.
  168. // Don't provide a second report of the same deletion.
  169. return nil
  170. }
  171. } else {
  172. // We only want to skip the "deletion" action if the object doesn't
  173. // exist in knownObjects and it doesn't have corresponding item in items.
  174. // Note that even if there is a "deletion" action in items, we can ignore it,
  175. // because it will be deduped automatically in "queueActionLocked"
  176. _, exists, err := f.knownObjects.GetByKey(id)
  177. _, itemsExist := f.items[id]
  178. if err == nil && !exists && !itemsExist {
  179. // Presumably, this was deleted when a relist happened.
  180. // Don't provide a second report of the same deletion.
  181. // TODO(lavalamp): This may be racy-- we aren't properly locked
  182. // with knownObjects.
  183. return nil
  184. }
  185. }
  186. return f.queueActionLocked(Deleted, obj)
  187. }
  188. // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
  189. // present in the set, it is neither enqueued nor added to the set.
  190. //
  191. // This is useful in a single producer/consumer scenario so that the consumer can
  192. // safely retry items without contending with the producer and potentially enqueueing
  193. // stale items.
  194. //
  195. // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
  196. // different from the Add/Update/Delete functions.
  197. func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
  198. deltas, ok := obj.(Deltas)
  199. if !ok {
  200. return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
  201. }
  202. id, err := f.KeyOf(deltas.Newest().Object)
  203. if err != nil {
  204. return KeyError{obj, err}
  205. }
  206. f.lock.Lock()
  207. defer f.lock.Unlock()
  208. f.addIfNotPresent(id, deltas)
  209. return nil
  210. }
  211. // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
  212. // already holds the fifo lock.
  213. func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
  214. f.populated = true
  215. if _, exists := f.items[id]; exists {
  216. return
  217. }
  218. f.queue = append(f.queue, id)
  219. f.items[id] = deltas
  220. f.cond.Broadcast()
  221. }
  222. // re-listing and watching can deliver the same update multiple times in any
  223. // order. This will combine the most recent two deltas if they are the same.
  224. func dedupDeltas(deltas Deltas) Deltas {
  225. n := len(deltas)
  226. if n < 2 {
  227. return deltas
  228. }
  229. a := &deltas[n-1]
  230. b := &deltas[n-2]
  231. if out := isDup(a, b); out != nil {
  232. d := append(Deltas{}, deltas[:n-2]...)
  233. return append(d, *out)
  234. }
  235. return deltas
  236. }
  237. // If a & b represent the same event, returns the delta that ought to be kept.
  238. // Otherwise, returns nil.
  239. // TODO: is there anything other than deletions that need deduping?
  240. func isDup(a, b *Delta) *Delta {
  241. if out := isDeletionDup(a, b); out != nil {
  242. return out
  243. }
  244. // TODO: Detect other duplicate situations? Are there any?
  245. return nil
  246. }
  247. // keep the one with the most information if both are deletions.
  248. func isDeletionDup(a, b *Delta) *Delta {
  249. if b.Type != Deleted || a.Type != Deleted {
  250. return nil
  251. }
  252. // Do more sophisticated checks, or is this sufficient?
  253. if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
  254. return a
  255. }
  256. return b
  257. }
  258. // willObjectBeDeletedLocked returns true only if the last delta for the
  259. // given object is Delete. Caller must lock first.
  260. func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
  261. deltas := f.items[id]
  262. return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
  263. }
  264. // queueActionLocked appends to the delta list for the object, calling
  265. // f.deltaCompressor if needed. Caller must lock first.
  266. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  267. id, err := f.KeyOf(obj)
  268. if err != nil {
  269. return KeyError{obj, err}
  270. }
  271. // If object is supposed to be deleted (last event is Deleted),
  272. // then we should ignore Sync events, because it would result in
  273. // recreation of this object.
  274. if actionType == Sync && f.willObjectBeDeletedLocked(id) {
  275. return nil
  276. }
  277. newDeltas := append(f.items[id], Delta{actionType, obj})
  278. newDeltas = dedupDeltas(newDeltas)
  279. if f.deltaCompressor != nil {
  280. newDeltas = f.deltaCompressor.Compress(newDeltas)
  281. }
  282. _, exists := f.items[id]
  283. if len(newDeltas) > 0 {
  284. if !exists {
  285. f.queue = append(f.queue, id)
  286. }
  287. f.items[id] = newDeltas
  288. f.cond.Broadcast()
  289. } else if exists {
  290. // The compression step removed all deltas, so
  291. // we need to remove this from our map (extra items
  292. // in the queue are ignored if they are not in the
  293. // map).
  294. delete(f.items, id)
  295. }
  296. return nil
  297. }
  298. // List returns a list of all the items; it returns the object
  299. // from the most recent Delta.
  300. // You should treat the items returned inside the deltas as immutable.
  301. func (f *DeltaFIFO) List() []interface{} {
  302. f.lock.RLock()
  303. defer f.lock.RUnlock()
  304. return f.listLocked()
  305. }
  306. func (f *DeltaFIFO) listLocked() []interface{} {
  307. list := make([]interface{}, 0, len(f.items))
  308. for _, item := range f.items {
  309. // Copy item's slice so operations on this slice (delta
  310. // compression) won't interfere with the object we return.
  311. item = copyDeltas(item)
  312. list = append(list, item.Newest().Object)
  313. }
  314. return list
  315. }
  316. // ListKeys returns a list of all the keys of the objects currently
  317. // in the FIFO.
  318. func (f *DeltaFIFO) ListKeys() []string {
  319. f.lock.RLock()
  320. defer f.lock.RUnlock()
  321. list := make([]string, 0, len(f.items))
  322. for key := range f.items {
  323. list = append(list, key)
  324. }
  325. return list
  326. }
  327. // Get returns the complete list of deltas for the requested item,
  328. // or sets exists=false.
  329. // You should treat the items returned inside the deltas as immutable.
  330. func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
  331. key, err := f.KeyOf(obj)
  332. if err != nil {
  333. return nil, false, KeyError{obj, err}
  334. }
  335. return f.GetByKey(key)
  336. }
  337. // GetByKey returns the complete list of deltas for the requested item,
  338. // setting exists=false if that list is empty.
  339. // You should treat the items returned inside the deltas as immutable.
  340. func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
  341. f.lock.RLock()
  342. defer f.lock.RUnlock()
  343. d, exists := f.items[key]
  344. if exists {
  345. // Copy item's slice so operations on this slice (delta
  346. // compression) won't interfere with the object we return.
  347. d = copyDeltas(d)
  348. }
  349. return d, exists, nil
  350. }
  351. // Pop blocks until an item is added to the queue, and then returns it. If
  352. // multiple items are ready, they are returned in the order in which they were
  353. // added/updated. The item is removed from the queue (and the store) before it
  354. // is returned, so if you don't successfully process it, you need to add it back
  355. // with AddIfNotPresent().
  356. // process function is called under lock, so it is safe update data structures
  357. // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
  358. // may return an instance of ErrRequeue with a nested error to indicate the current
  359. // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
  360. //
  361. // Pop returns a 'Deltas', which has a complete list of all the things
  362. // that happened to the object (deltas) while it was sitting in the queue.
  363. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  364. f.lock.Lock()
  365. defer f.lock.Unlock()
  366. for {
  367. for len(f.queue) == 0 {
  368. f.cond.Wait()
  369. }
  370. id := f.queue[0]
  371. f.queue = f.queue[1:]
  372. item, ok := f.items[id]
  373. if f.initialPopulationCount > 0 {
  374. f.initialPopulationCount--
  375. }
  376. if !ok {
  377. // Item may have been deleted subsequently.
  378. continue
  379. }
  380. delete(f.items, id)
  381. err := process(item)
  382. if e, ok := err.(ErrRequeue); ok {
  383. f.addIfNotPresent(id, item)
  384. err = e.Err
  385. }
  386. // Don't need to copyDeltas here, because we're transferring
  387. // ownership to the caller.
  388. return item, err
  389. }
  390. }
  391. // Replace will delete the contents of 'f', using instead the given map.
  392. // 'f' takes ownership of the map, you should not reference the map again
  393. // after calling this function. f's queue is reset, too; upon return, it
  394. // will contain the items in the map, in no particular order.
  395. func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
  396. f.lock.Lock()
  397. defer f.lock.Unlock()
  398. keys := make(sets.String, len(list))
  399. if !f.populated {
  400. f.populated = true
  401. f.initialPopulationCount = len(list)
  402. }
  403. for _, item := range list {
  404. key, err := f.KeyOf(item)
  405. if err != nil {
  406. return KeyError{item, err}
  407. }
  408. keys.Insert(key)
  409. if err := f.queueActionLocked(Sync, item); err != nil {
  410. return fmt.Errorf("couldn't enqueue object: %v", err)
  411. }
  412. }
  413. if f.knownObjects == nil {
  414. // Do deletion detection against our own list.
  415. for k, oldItem := range f.items {
  416. if keys.Has(k) {
  417. continue
  418. }
  419. var deletedObj interface{}
  420. if n := oldItem.Newest(); n != nil {
  421. deletedObj = n.Object
  422. }
  423. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  424. return err
  425. }
  426. }
  427. return nil
  428. }
  429. // Detect deletions not already in the queue.
  430. // TODO(lavalamp): This may be racy-- we aren't properly locked
  431. // with knownObjects. Unproven.
  432. knownKeys := f.knownObjects.ListKeys()
  433. for _, k := range knownKeys {
  434. if keys.Has(k) {
  435. continue
  436. }
  437. deletedObj, exists, err := f.knownObjects.GetByKey(k)
  438. if err != nil {
  439. deletedObj = nil
  440. glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
  441. } else if !exists {
  442. deletedObj = nil
  443. glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
  444. }
  445. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  446. return err
  447. }
  448. }
  449. return nil
  450. }
  451. // Resync will send a sync event for each item
  452. func (f *DeltaFIFO) Resync() error {
  453. var keys []string
  454. func() {
  455. f.lock.RLock()
  456. defer f.lock.RUnlock()
  457. keys = f.knownObjects.ListKeys()
  458. }()
  459. for _, k := range keys {
  460. if err := f.syncKey(k); err != nil {
  461. return err
  462. }
  463. }
  464. return nil
  465. }
  466. func (f *DeltaFIFO) syncKey(key string) error {
  467. f.lock.Lock()
  468. defer f.lock.Unlock()
  469. obj, exists, err := f.knownObjects.GetByKey(key)
  470. if err != nil {
  471. glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
  472. return nil
  473. } else if !exists {
  474. glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
  475. return nil
  476. }
  477. if err := f.queueActionLocked(Sync, obj); err != nil {
  478. return fmt.Errorf("couldn't queue object: %v", err)
  479. }
  480. return nil
  481. }
  482. // A KeyListerGetter is anything that knows how to list its keys and look up by key.
  483. type KeyListerGetter interface {
  484. KeyLister
  485. KeyGetter
  486. }
  487. // A KeyLister is anything that knows how to list its keys.
  488. type KeyLister interface {
  489. ListKeys() []string
  490. }
  491. // A KeyGetter is anything that knows how to get the value stored under a given key.
  492. type KeyGetter interface {
  493. GetByKey(key string) (interface{}, bool, error)
  494. }
  495. // DeltaCompressor is an algorithm that removes redundant changes.
  496. type DeltaCompressor interface {
  497. Compress(Deltas) Deltas
  498. }
  499. // DeltaCompressorFunc should remove redundant changes; but changes that
  500. // are redundant depend on one's desired semantics, so this is an
  501. // injectable function.
  502. //
  503. // DeltaCompressorFunc adapts a raw function to be a DeltaCompressor.
  504. type DeltaCompressorFunc func(Deltas) Deltas
  505. // Compress just calls dc.
  506. func (dc DeltaCompressorFunc) Compress(d Deltas) Deltas {
  507. return dc(d)
  508. }
  509. // DeltaType is the type of a change (addition, deletion, etc)
  510. type DeltaType string
  511. const (
  512. Added DeltaType = "Added"
  513. Updated DeltaType = "Updated"
  514. Deleted DeltaType = "Deleted"
  515. // The other types are obvious. You'll get Sync deltas when:
  516. // * A watch expires/errors out and a new list/watch cycle is started.
  517. // * You've turned on periodic syncs.
  518. // (Anything that trigger's DeltaFIFO's Replace() method.)
  519. Sync DeltaType = "Sync"
  520. )
  521. // Delta is the type stored by a DeltaFIFO. It tells you what change
  522. // happened, and the object's state after* that change.
  523. //
  524. // [*] Unless the change is a deletion, and then you'll get the final
  525. // state of the object before it was deleted.
  526. type Delta struct {
  527. Type DeltaType
  528. Object interface{}
  529. }
  530. // Deltas is a list of one or more 'Delta's to an individual object.
  531. // The oldest delta is at index 0, the newest delta is the last one.
  532. type Deltas []Delta
  533. // Oldest is a convenience function that returns the oldest delta, or
  534. // nil if there are no deltas.
  535. func (d Deltas) Oldest() *Delta {
  536. if len(d) > 0 {
  537. return &d[0]
  538. }
  539. return nil
  540. }
  541. // Newest is a convenience function that returns the newest delta, or
  542. // nil if there are no deltas.
  543. func (d Deltas) Newest() *Delta {
  544. if n := len(d); n > 0 {
  545. return &d[n-1]
  546. }
  547. return nil
  548. }
  549. // copyDeltas returns a shallow copy of d; that is, it copies the slice but not
  550. // the objects in the slice. This allows Get/List to return an object that we
  551. // know won't be clobbered by a subsequent call to a delta compressor.
  552. func copyDeltas(d Deltas) Deltas {
  553. d2 := make(Deltas, len(d))
  554. copy(d2, d)
  555. return d2
  556. }
  557. // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
  558. // an object was deleted but the watch deletion event was missed. In this
  559. // case we don't know the final "resting" state of the object, so there's
  560. // a chance the included `Obj` is stale.
  561. type DeletedFinalStateUnknown struct {
  562. Key string
  563. Obj interface{}
  564. }