delta_fifo.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "sync"
  18. "k8s.io/apimachinery/pkg/util/sets"
  19. "github.com/golang/glog"
  20. )
  21. // NewDeltaFIFO returns a Store which can be used process changes to items.
  22. //
  23. // keyFunc is used to figure out what key an object should have. (It's
  24. // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
  25. //
  26. // 'compressor' may compress as many or as few items as it wants
  27. // (including returning an empty slice), but it should do what it
  28. // does quickly since it is called while the queue is locked.
  29. // 'compressor' may be nil if you don't want any delta compression.
  30. //
  31. // 'keyLister' is expected to return a list of keys that the consumer of
  32. // this queue "knows about". It is used to decide which items are missing
  33. // when Replace() is called; 'Deleted' deltas are produced for these items.
  34. // It may be nil if you don't need to detect all deletions.
  35. // TODO: consider merging keyLister with this object, tracking a list of
  36. // "known" keys when Pop() is called. Have to think about how that
  37. // affects error retrying.
  38. // TODO(lavalamp): I believe there is a possible race only when using an
  39. // external known object source that the above TODO would
  40. // fix.
  41. //
  42. // Also see the comment on DeltaFIFO.
  43. func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
  44. f := &DeltaFIFO{
  45. items: map[string]Deltas{},
  46. queue: []string{},
  47. keyFunc: keyFunc,
  48. deltaCompressor: compressor,
  49. knownObjects: knownObjects,
  50. }
  51. f.cond.L = &f.lock
  52. return f
  53. }
  54. // DeltaFIFO is like FIFO, but allows you to process deletes.
  55. //
  56. // DeltaFIFO is a producer-consumer queue, where a Reflector is
  57. // intended to be the producer, and the consumer is whatever calls
  58. // the Pop() method.
  59. //
  60. // DeltaFIFO solves this use case:
  61. // * You want to process every object change (delta) at most once.
  62. // * When you process an object, you want to see everything
  63. // that's happened to it since you last processed it.
  64. // * You want to process the deletion of objects.
  65. // * You might want to periodically reprocess objects.
  66. //
  67. // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
  68. // interface{} to satisfy the Store/Queue interfaces, but it
  69. // will always return an object of type Deltas.
  70. //
  71. // A note on threading: If you call Pop() in parallel from multiple
  72. // threads, you could end up with multiple threads processing slightly
  73. // different versions of the same object.
  74. //
  75. // A note on the KeyLister used by the DeltaFIFO: It's main purpose is
  76. // to list keys that are "known", for the purpose of figuring out which
  77. // items have been deleted when Replace() or Delete() are called. The deleted
  78. // object will be included in the DeleteFinalStateUnknown markers. These objects
  79. // could be stale.
  80. //
  81. // You may provide a function to compress deltas (e.g., represent a
  82. // series of Updates as a single Update).
  83. type DeltaFIFO struct {
  84. // lock/cond protects access to 'items' and 'queue'.
  85. lock sync.RWMutex
  86. cond sync.Cond
  87. // We depend on the property that items in the set are in
  88. // the queue and vice versa, and that all Deltas in this
  89. // map have at least one Delta.
  90. items map[string]Deltas
  91. queue []string
  92. // populated is true if the first batch of items inserted by Replace() has been populated
  93. // or Delete/Add/Update was called first.
  94. populated bool
  95. // initialPopulationCount is the number of items inserted by the first call of Replace()
  96. initialPopulationCount int
  97. // keyFunc is used to make the key used for queued item
  98. // insertion and retrieval, and should be deterministic.
  99. keyFunc KeyFunc
  100. // deltaCompressor tells us how to combine two or more
  101. // deltas. It may be nil.
  102. deltaCompressor DeltaCompressor
  103. // knownObjects list keys that are "known", for the
  104. // purpose of figuring out which items have been deleted
  105. // when Replace() or Delete() is called.
  106. knownObjects KeyListerGetter
  107. // Indication the queue is closed.
  108. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
  109. // Currently, not used to gate any of CRED operations.
  110. closed bool
  111. closedLock sync.Mutex
  112. }
  113. var (
  114. _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
  115. )
  116. var (
  117. // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
  118. // object with zero length is encountered (should be impossible,
  119. // even if such an object is accidentally produced by a DeltaCompressor--
  120. // but included for completeness).
  121. ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
  122. )
  123. // Close the queue.
  124. func (f *DeltaFIFO) Close() {
  125. f.closedLock.Lock()
  126. defer f.closedLock.Unlock()
  127. f.closed = true
  128. f.cond.Broadcast()
  129. }
  130. // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
  131. // DeletedFinalStateUnknown objects.
  132. func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
  133. if d, ok := obj.(Deltas); ok {
  134. if len(d) == 0 {
  135. return "", KeyError{obj, ErrZeroLengthDeltasObject}
  136. }
  137. obj = d.Newest().Object
  138. }
  139. if d, ok := obj.(DeletedFinalStateUnknown); ok {
  140. return d.Key, nil
  141. }
  142. return f.keyFunc(obj)
  143. }
  144. // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
  145. // or an Update called first but the first batch of items inserted by Replace() has been popped
  146. func (f *DeltaFIFO) HasSynced() bool {
  147. f.lock.Lock()
  148. defer f.lock.Unlock()
  149. return f.populated && f.initialPopulationCount == 0
  150. }
  151. // Add inserts an item, and puts it in the queue. The item is only enqueued
  152. // if it doesn't already exist in the set.
  153. func (f *DeltaFIFO) Add(obj interface{}) error {
  154. f.lock.Lock()
  155. defer f.lock.Unlock()
  156. f.populated = true
  157. return f.queueActionLocked(Added, obj)
  158. }
  159. // Update is just like Add, but makes an Updated Delta.
  160. func (f *DeltaFIFO) Update(obj interface{}) error {
  161. f.lock.Lock()
  162. defer f.lock.Unlock()
  163. f.populated = true
  164. return f.queueActionLocked(Updated, obj)
  165. }
  166. // Delete is just like Add, but makes an Deleted Delta. If the item does not
  167. // already exist, it will be ignored. (It may have already been deleted by a
  168. // Replace (re-list), for example.
  169. func (f *DeltaFIFO) Delete(obj interface{}) error {
  170. id, err := f.KeyOf(obj)
  171. if err != nil {
  172. return KeyError{obj, err}
  173. }
  174. f.lock.Lock()
  175. defer f.lock.Unlock()
  176. f.populated = true
  177. if f.knownObjects == nil {
  178. if _, exists := f.items[id]; !exists {
  179. // Presumably, this was deleted when a relist happened.
  180. // Don't provide a second report of the same deletion.
  181. return nil
  182. }
  183. } else {
  184. // We only want to skip the "deletion" action if the object doesn't
  185. // exist in knownObjects and it doesn't have corresponding item in items.
  186. // Note that even if there is a "deletion" action in items, we can ignore it,
  187. // because it will be deduped automatically in "queueActionLocked"
  188. _, exists, err := f.knownObjects.GetByKey(id)
  189. _, itemsExist := f.items[id]
  190. if err == nil && !exists && !itemsExist {
  191. // Presumably, this was deleted when a relist happened.
  192. // Don't provide a second report of the same deletion.
  193. // TODO(lavalamp): This may be racy-- we aren't properly locked
  194. // with knownObjects.
  195. return nil
  196. }
  197. }
  198. return f.queueActionLocked(Deleted, obj)
  199. }
  200. // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
  201. // present in the set, it is neither enqueued nor added to the set.
  202. //
  203. // This is useful in a single producer/consumer scenario so that the consumer can
  204. // safely retry items without contending with the producer and potentially enqueueing
  205. // stale items.
  206. //
  207. // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
  208. // different from the Add/Update/Delete functions.
  209. func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
  210. deltas, ok := obj.(Deltas)
  211. if !ok {
  212. return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
  213. }
  214. id, err := f.KeyOf(deltas.Newest().Object)
  215. if err != nil {
  216. return KeyError{obj, err}
  217. }
  218. f.lock.Lock()
  219. defer f.lock.Unlock()
  220. f.addIfNotPresent(id, deltas)
  221. return nil
  222. }
  223. // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
  224. // already holds the fifo lock.
  225. func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
  226. f.populated = true
  227. if _, exists := f.items[id]; exists {
  228. return
  229. }
  230. f.queue = append(f.queue, id)
  231. f.items[id] = deltas
  232. f.cond.Broadcast()
  233. }
  234. // re-listing and watching can deliver the same update multiple times in any
  235. // order. This will combine the most recent two deltas if they are the same.
  236. func dedupDeltas(deltas Deltas) Deltas {
  237. n := len(deltas)
  238. if n < 2 {
  239. return deltas
  240. }
  241. a := &deltas[n-1]
  242. b := &deltas[n-2]
  243. if out := isDup(a, b); out != nil {
  244. d := append(Deltas{}, deltas[:n-2]...)
  245. return append(d, *out)
  246. }
  247. return deltas
  248. }
  249. // If a & b represent the same event, returns the delta that ought to be kept.
  250. // Otherwise, returns nil.
  251. // TODO: is there anything other than deletions that need deduping?
  252. func isDup(a, b *Delta) *Delta {
  253. if out := isDeletionDup(a, b); out != nil {
  254. return out
  255. }
  256. // TODO: Detect other duplicate situations? Are there any?
  257. return nil
  258. }
  259. // keep the one with the most information if both are deletions.
  260. func isDeletionDup(a, b *Delta) *Delta {
  261. if b.Type != Deleted || a.Type != Deleted {
  262. return nil
  263. }
  264. // Do more sophisticated checks, or is this sufficient?
  265. if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
  266. return a
  267. }
  268. return b
  269. }
  270. // willObjectBeDeletedLocked returns true only if the last delta for the
  271. // given object is Delete. Caller must lock first.
  272. func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
  273. deltas := f.items[id]
  274. return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
  275. }
  276. // queueActionLocked appends to the delta list for the object, calling
  277. // f.deltaCompressor if needed. Caller must lock first.
  278. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  279. id, err := f.KeyOf(obj)
  280. if err != nil {
  281. return KeyError{obj, err}
  282. }
  283. // If object is supposed to be deleted (last event is Deleted),
  284. // then we should ignore Sync events, because it would result in
  285. // recreation of this object.
  286. if actionType == Sync && f.willObjectBeDeletedLocked(id) {
  287. return nil
  288. }
  289. newDeltas := append(f.items[id], Delta{actionType, obj})
  290. newDeltas = dedupDeltas(newDeltas)
  291. if f.deltaCompressor != nil {
  292. newDeltas = f.deltaCompressor.Compress(newDeltas)
  293. }
  294. _, exists := f.items[id]
  295. if len(newDeltas) > 0 {
  296. if !exists {
  297. f.queue = append(f.queue, id)
  298. }
  299. f.items[id] = newDeltas
  300. f.cond.Broadcast()
  301. } else if exists {
  302. // The compression step removed all deltas, so
  303. // we need to remove this from our map (extra items
  304. // in the queue are ignored if they are not in the
  305. // map).
  306. delete(f.items, id)
  307. }
  308. return nil
  309. }
  310. // List returns a list of all the items; it returns the object
  311. // from the most recent Delta.
  312. // You should treat the items returned inside the deltas as immutable.
  313. func (f *DeltaFIFO) List() []interface{} {
  314. f.lock.RLock()
  315. defer f.lock.RUnlock()
  316. return f.listLocked()
  317. }
  318. func (f *DeltaFIFO) listLocked() []interface{} {
  319. list := make([]interface{}, 0, len(f.items))
  320. for _, item := range f.items {
  321. // Copy item's slice so operations on this slice (delta
  322. // compression) won't interfere with the object we return.
  323. item = copyDeltas(item)
  324. list = append(list, item.Newest().Object)
  325. }
  326. return list
  327. }
  328. // ListKeys returns a list of all the keys of the objects currently
  329. // in the FIFO.
  330. func (f *DeltaFIFO) ListKeys() []string {
  331. f.lock.RLock()
  332. defer f.lock.RUnlock()
  333. list := make([]string, 0, len(f.items))
  334. for key := range f.items {
  335. list = append(list, key)
  336. }
  337. return list
  338. }
  339. // Get returns the complete list of deltas for the requested item,
  340. // or sets exists=false.
  341. // You should treat the items returned inside the deltas as immutable.
  342. func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
  343. key, err := f.KeyOf(obj)
  344. if err != nil {
  345. return nil, false, KeyError{obj, err}
  346. }
  347. return f.GetByKey(key)
  348. }
  349. // GetByKey returns the complete list of deltas for the requested item,
  350. // setting exists=false if that list is empty.
  351. // You should treat the items returned inside the deltas as immutable.
  352. func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
  353. f.lock.RLock()
  354. defer f.lock.RUnlock()
  355. d, exists := f.items[key]
  356. if exists {
  357. // Copy item's slice so operations on this slice (delta
  358. // compression) won't interfere with the object we return.
  359. d = copyDeltas(d)
  360. }
  361. return d, exists, nil
  362. }
  363. // Checks if the queue is closed
  364. func (f *DeltaFIFO) IsClosed() bool {
  365. f.closedLock.Lock()
  366. defer f.closedLock.Unlock()
  367. if f.closed {
  368. return true
  369. }
  370. return false
  371. }
  372. // Pop blocks until an item is added to the queue, and then returns it. If
  373. // multiple items are ready, they are returned in the order in which they were
  374. // added/updated. The item is removed from the queue (and the store) before it
  375. // is returned, so if you don't successfully process it, you need to add it back
  376. // with AddIfNotPresent().
  377. // process function is called under lock, so it is safe update data structures
  378. // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
  379. // may return an instance of ErrRequeue with a nested error to indicate the current
  380. // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
  381. //
  382. // Pop returns a 'Deltas', which has a complete list of all the things
  383. // that happened to the object (deltas) while it was sitting in the queue.
  384. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  385. f.lock.Lock()
  386. defer f.lock.Unlock()
  387. for {
  388. for len(f.queue) == 0 {
  389. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
  390. // When Close() is called, the f.closed is set and the condition is broadcasted.
  391. // Which causes this loop to continue and return from the Pop().
  392. if f.IsClosed() {
  393. return nil, FIFOClosedError
  394. }
  395. f.cond.Wait()
  396. }
  397. id := f.queue[0]
  398. f.queue = f.queue[1:]
  399. item, ok := f.items[id]
  400. if f.initialPopulationCount > 0 {
  401. f.initialPopulationCount--
  402. }
  403. if !ok {
  404. // Item may have been deleted subsequently.
  405. continue
  406. }
  407. delete(f.items, id)
  408. err := process(item)
  409. if e, ok := err.(ErrRequeue); ok {
  410. f.addIfNotPresent(id, item)
  411. err = e.Err
  412. }
  413. // Don't need to copyDeltas here, because we're transferring
  414. // ownership to the caller.
  415. return item, err
  416. }
  417. }
  418. // Replace will delete the contents of 'f', using instead the given map.
  419. // 'f' takes ownership of the map, you should not reference the map again
  420. // after calling this function. f's queue is reset, too; upon return, it
  421. // will contain the items in the map, in no particular order.
  422. func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
  423. f.lock.Lock()
  424. defer f.lock.Unlock()
  425. keys := make(sets.String, len(list))
  426. for _, item := range list {
  427. key, err := f.KeyOf(item)
  428. if err != nil {
  429. return KeyError{item, err}
  430. }
  431. keys.Insert(key)
  432. if err := f.queueActionLocked(Sync, item); err != nil {
  433. return fmt.Errorf("couldn't enqueue object: %v", err)
  434. }
  435. }
  436. if f.knownObjects == nil {
  437. // Do deletion detection against our own list.
  438. for k, oldItem := range f.items {
  439. if keys.Has(k) {
  440. continue
  441. }
  442. var deletedObj interface{}
  443. if n := oldItem.Newest(); n != nil {
  444. deletedObj = n.Object
  445. }
  446. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  447. return err
  448. }
  449. }
  450. if !f.populated {
  451. f.populated = true
  452. f.initialPopulationCount = len(list)
  453. }
  454. return nil
  455. }
  456. // Detect deletions not already in the queue.
  457. // TODO(lavalamp): This may be racy-- we aren't properly locked
  458. // with knownObjects. Unproven.
  459. knownKeys := f.knownObjects.ListKeys()
  460. queuedDeletions := 0
  461. for _, k := range knownKeys {
  462. if keys.Has(k) {
  463. continue
  464. }
  465. deletedObj, exists, err := f.knownObjects.GetByKey(k)
  466. if err != nil {
  467. deletedObj = nil
  468. glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
  469. } else if !exists {
  470. deletedObj = nil
  471. glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
  472. }
  473. queuedDeletions++
  474. if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
  475. return err
  476. }
  477. }
  478. if !f.populated {
  479. f.populated = true
  480. f.initialPopulationCount = len(list) + queuedDeletions
  481. }
  482. return nil
  483. }
  484. // Resync will send a sync event for each item
  485. func (f *DeltaFIFO) Resync() error {
  486. f.lock.Lock()
  487. defer f.lock.Unlock()
  488. keys := f.knownObjects.ListKeys()
  489. for _, k := range keys {
  490. if err := f.syncKeyLocked(k); err != nil {
  491. return err
  492. }
  493. }
  494. return nil
  495. }
  496. func (f *DeltaFIFO) syncKey(key string) error {
  497. f.lock.Lock()
  498. defer f.lock.Unlock()
  499. return f.syncKeyLocked(key)
  500. }
  501. func (f *DeltaFIFO) syncKeyLocked(key string) error {
  502. obj, exists, err := f.knownObjects.GetByKey(key)
  503. if err != nil {
  504. glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
  505. return nil
  506. } else if !exists {
  507. glog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
  508. return nil
  509. }
  510. // If we are doing Resync() and there is already an event queued for that object,
  511. // we ignore the Resync for it. This is to avoid the race, in which the resync
  512. // comes with the previous value of object (since queueing an event for the object
  513. // doesn't trigger changing the underlying store <knownObjects>.
  514. id, err := f.KeyOf(obj)
  515. if err != nil {
  516. return KeyError{obj, err}
  517. }
  518. if len(f.items[id]) > 0 {
  519. return nil
  520. }
  521. if err := f.queueActionLocked(Sync, obj); err != nil {
  522. return fmt.Errorf("couldn't queue object: %v", err)
  523. }
  524. return nil
  525. }
  526. // A KeyListerGetter is anything that knows how to list its keys and look up by key.
  527. type KeyListerGetter interface {
  528. KeyLister
  529. KeyGetter
  530. }
  531. // A KeyLister is anything that knows how to list its keys.
  532. type KeyLister interface {
  533. ListKeys() []string
  534. }
  535. // A KeyGetter is anything that knows how to get the value stored under a given key.
  536. type KeyGetter interface {
  537. GetByKey(key string) (interface{}, bool, error)
  538. }
  539. // DeltaCompressor is an algorithm that removes redundant changes.
  540. type DeltaCompressor interface {
  541. Compress(Deltas) Deltas
  542. }
  543. // DeltaCompressorFunc should remove redundant changes; but changes that
  544. // are redundant depend on one's desired semantics, so this is an
  545. // injectable function.
  546. //
  547. // DeltaCompressorFunc adapts a raw function to be a DeltaCompressor.
  548. type DeltaCompressorFunc func(Deltas) Deltas
  549. // Compress just calls dc.
  550. func (dc DeltaCompressorFunc) Compress(d Deltas) Deltas {
  551. return dc(d)
  552. }
  553. // DeltaType is the type of a change (addition, deletion, etc)
  554. type DeltaType string
  555. const (
  556. Added DeltaType = "Added"
  557. Updated DeltaType = "Updated"
  558. Deleted DeltaType = "Deleted"
  559. // The other types are obvious. You'll get Sync deltas when:
  560. // * A watch expires/errors out and a new list/watch cycle is started.
  561. // * You've turned on periodic syncs.
  562. // (Anything that trigger's DeltaFIFO's Replace() method.)
  563. Sync DeltaType = "Sync"
  564. )
  565. // Delta is the type stored by a DeltaFIFO. It tells you what change
  566. // happened, and the object's state after* that change.
  567. //
  568. // [*] Unless the change is a deletion, and then you'll get the final
  569. // state of the object before it was deleted.
  570. type Delta struct {
  571. Type DeltaType
  572. Object interface{}
  573. }
  574. // Deltas is a list of one or more 'Delta's to an individual object.
  575. // The oldest delta is at index 0, the newest delta is the last one.
  576. type Deltas []Delta
  577. // Oldest is a convenience function that returns the oldest delta, or
  578. // nil if there are no deltas.
  579. func (d Deltas) Oldest() *Delta {
  580. if len(d) > 0 {
  581. return &d[0]
  582. }
  583. return nil
  584. }
  585. // Newest is a convenience function that returns the newest delta, or
  586. // nil if there are no deltas.
  587. func (d Deltas) Newest() *Delta {
  588. if n := len(d); n > 0 {
  589. return &d[n-1]
  590. }
  591. return nil
  592. }
  593. // copyDeltas returns a shallow copy of d; that is, it copies the slice but not
  594. // the objects in the slice. This allows Get/List to return an object that we
  595. // know won't be clobbered by a subsequent call to a delta compressor.
  596. func copyDeltas(d Deltas) Deltas {
  597. d2 := make(Deltas, len(d))
  598. copy(d2, d)
  599. return d2
  600. }
  601. // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
  602. // an object was deleted but the watch deletion event was missed. In this
  603. // case we don't know the final "resting" state of the object, so there's
  604. // a chance the included `Obj` is stale.
  605. type DeletedFinalStateUnknown struct {
  606. Key string
  607. Obj interface{}
  608. }