mutation_cache.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. /*
  2. Copyright 2017 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "strconv"
  17. "sync"
  18. "time"
  19. "k8s.io/klog/v2"
  20. "k8s.io/apimachinery/pkg/api/meta"
  21. "k8s.io/apimachinery/pkg/runtime"
  22. utilcache "k8s.io/apimachinery/pkg/util/cache"
  23. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  24. "k8s.io/apimachinery/pkg/util/sets"
  25. )
  26. // MutationCache is able to take the result of update operations and stores them in an LRU
  27. // that can be used to provide a more current view of a requested object. It requires interpreting
  28. // resourceVersions for comparisons.
  29. // Implementations must be thread-safe.
  30. // TODO find a way to layer this into an informer/lister
  31. type MutationCache interface {
  32. GetByKey(key string) (interface{}, bool, error)
  33. ByIndex(indexName, indexKey string) ([]interface{}, error)
  34. Mutation(interface{})
  35. }
  36. // ResourceVersionComparator is able to compare object versions.
  37. type ResourceVersionComparator interface {
  38. CompareResourceVersion(lhs, rhs runtime.Object) int
  39. }
  40. // NewIntegerResourceVersionMutationCache returns a MutationCache that understands how to
  41. // deal with objects that have a resource version that:
  42. //
  43. // - is an integer
  44. // - increases when updated
  45. // - is comparable across the same resource in a namespace
  46. //
  47. // Most backends will have these semantics. Indexer may be nil. ttl controls how long an item
  48. // remains in the mutation cache before it is removed.
  49. //
  50. // If includeAdds is true, objects in the mutation cache will be returned even if they don't exist
  51. // in the underlying store. This is only safe if your use of the cache can handle mutation entries
  52. // remaining in the cache for up to ttl when mutations and deletes occur very closely in time.
  53. func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache {
  54. return &mutationCache{
  55. backingCache: backingCache,
  56. indexer: indexer,
  57. mutationCache: utilcache.NewLRUExpireCache(100),
  58. comparator: etcdObjectVersioner{},
  59. ttl: ttl,
  60. includeAdds: includeAdds,
  61. }
  62. }
  63. // mutationCache doesn't guarantee that it returns values added via Mutation since they can page out and
  64. // since you can't distinguish between, "didn't observe create" and "was deleted after create",
  65. // if the key is missing from the backing cache, we always return it as missing
  66. type mutationCache struct {
  67. lock sync.Mutex
  68. backingCache Store
  69. indexer Indexer
  70. mutationCache *utilcache.LRUExpireCache
  71. includeAdds bool
  72. ttl time.Duration
  73. comparator ResourceVersionComparator
  74. }
  75. // GetByKey is never guaranteed to return back the value set in Mutation. It could be paged out, it could
  76. // be older than another copy, the backingCache may be more recent or, you might have written twice into the same key.
  77. // You get a value that was valid at some snapshot of time and will always return the newer of backingCache and mutationCache.
  78. func (c *mutationCache) GetByKey(key string) (interface{}, bool, error) {
  79. c.lock.Lock()
  80. defer c.lock.Unlock()
  81. obj, exists, err := c.backingCache.GetByKey(key)
  82. if err != nil {
  83. return nil, false, err
  84. }
  85. if !exists {
  86. if !c.includeAdds {
  87. // we can't distinguish between, "didn't observe create" and "was deleted after create", so
  88. // if the key is missing, we always return it as missing
  89. return nil, false, nil
  90. }
  91. obj, exists = c.mutationCache.Get(key)
  92. if !exists {
  93. return nil, false, nil
  94. }
  95. }
  96. objRuntime, ok := obj.(runtime.Object)
  97. if !ok {
  98. return obj, true, nil
  99. }
  100. return c.newerObject(key, objRuntime), true, nil
  101. }
  102. // ByIndex returns the newer objects that match the provided index and indexer key.
  103. // Will return an error if no indexer was provided.
  104. func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, error) {
  105. c.lock.Lock()
  106. defer c.lock.Unlock()
  107. if c.indexer == nil {
  108. return nil, fmt.Errorf("no indexer has been provided to the mutation cache")
  109. }
  110. keys, err := c.indexer.IndexKeys(name, indexKey)
  111. if err != nil {
  112. return nil, err
  113. }
  114. var items []interface{}
  115. keySet := sets.NewString()
  116. for _, key := range keys {
  117. keySet.Insert(key)
  118. obj, exists, err := c.indexer.GetByKey(key)
  119. if err != nil {
  120. return nil, err
  121. }
  122. if !exists {
  123. continue
  124. }
  125. if objRuntime, ok := obj.(runtime.Object); ok {
  126. items = append(items, c.newerObject(key, objRuntime))
  127. } else {
  128. items = append(items, obj)
  129. }
  130. }
  131. if c.includeAdds {
  132. fn := c.indexer.GetIndexers()[name]
  133. // Keys() is returned oldest to newest, so full traversal does not alter the LRU behavior
  134. for _, key := range c.mutationCache.Keys() {
  135. updated, ok := c.mutationCache.Get(key)
  136. if !ok {
  137. continue
  138. }
  139. if keySet.Has(key.(string)) {
  140. continue
  141. }
  142. elements, err := fn(updated)
  143. if err != nil {
  144. klog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
  145. continue
  146. }
  147. for _, inIndex := range elements {
  148. if inIndex != indexKey {
  149. continue
  150. }
  151. items = append(items, updated)
  152. break
  153. }
  154. }
  155. }
  156. return items, nil
  157. }
  158. // newerObject checks the mutation cache for a newer object and returns one if found. If the
  159. // mutated object is older than the backing object, it is removed from the Must be
  160. // called while the lock is held.
  161. func (c *mutationCache) newerObject(key string, backing runtime.Object) runtime.Object {
  162. mutatedObj, exists := c.mutationCache.Get(key)
  163. if !exists {
  164. return backing
  165. }
  166. mutatedObjRuntime, ok := mutatedObj.(runtime.Object)
  167. if !ok {
  168. return backing
  169. }
  170. if c.comparator.CompareResourceVersion(backing, mutatedObjRuntime) >= 0 {
  171. c.mutationCache.Remove(key)
  172. return backing
  173. }
  174. return mutatedObjRuntime
  175. }
  176. // Mutation adds a change to the cache that can be returned in GetByKey if it is newer than the backingCache
  177. // copy. If you call Mutation twice with the same object on different threads, one will win, but its not defined
  178. // which one. This doesn't affect correctness, since the GetByKey guaranteed of "later of these two caches" is
  179. // preserved, but you may not get the version of the object you want. The object you get is only guaranteed to
  180. // "one that was valid at some point in time", not "the one that I want".
  181. func (c *mutationCache) Mutation(obj interface{}) {
  182. c.lock.Lock()
  183. defer c.lock.Unlock()
  184. key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
  185. if err != nil {
  186. // this is a "nice to have", so failures shouldn't do anything weird
  187. utilruntime.HandleError(err)
  188. return
  189. }
  190. if objRuntime, ok := obj.(runtime.Object); ok {
  191. if mutatedObj, exists := c.mutationCache.Get(key); exists {
  192. if mutatedObjRuntime, ok := mutatedObj.(runtime.Object); ok {
  193. if c.comparator.CompareResourceVersion(objRuntime, mutatedObjRuntime) < 0 {
  194. return
  195. }
  196. }
  197. }
  198. }
  199. c.mutationCache.Add(key, obj, c.ttl)
  200. }
  201. // etcdObjectVersioner implements versioning and extracting etcd node information
  202. // for objects that have an embedded ObjectMeta or ListMeta field.
  203. type etcdObjectVersioner struct{}
  204. // ObjectResourceVersion implements Versioner
  205. func (a etcdObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
  206. accessor, err := meta.Accessor(obj)
  207. if err != nil {
  208. return 0, err
  209. }
  210. version := accessor.GetResourceVersion()
  211. if len(version) == 0 {
  212. return 0, nil
  213. }
  214. return strconv.ParseUint(version, 10, 64)
  215. }
  216. // CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
  217. // but etcd resource versions are special, they're actually ints, so we can easily compare them.
  218. func (a etcdObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
  219. lhsVersion, err := a.ObjectResourceVersion(lhs)
  220. if err != nil {
  221. // coder error
  222. panic(err)
  223. }
  224. rhsVersion, err := a.ObjectResourceVersion(rhs)
  225. if err != nil {
  226. // coder error
  227. panic(err)
  228. }
  229. if lhsVersion == rhsVersion {
  230. return 0
  231. }
  232. if lhsVersion < rhsVersion {
  233. return -1
  234. }
  235. return 1
  236. }