123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- /*
- Copyright 2017 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cache
- import (
- "fmt"
- "strconv"
- "sync"
- "time"
- "k8s.io/klog/v2"
- "k8s.io/apimachinery/pkg/api/meta"
- "k8s.io/apimachinery/pkg/runtime"
- utilcache "k8s.io/apimachinery/pkg/util/cache"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/sets"
- )
- // MutationCache is able to take the result of update operations and stores them in an LRU
- // that can be used to provide a more current view of a requested object. It requires interpreting
- // resourceVersions for comparisons.
- // Implementations must be thread-safe.
- // TODO find a way to layer this into an informer/lister
- type MutationCache interface {
- GetByKey(key string) (interface{}, bool, error)
- ByIndex(indexName, indexKey string) ([]interface{}, error)
- Mutation(interface{})
- }
- // ResourceVersionComparator is able to compare object versions.
- type ResourceVersionComparator interface {
- CompareResourceVersion(lhs, rhs runtime.Object) int
- }
- // NewIntegerResourceVersionMutationCache returns a MutationCache that understands how to
- // deal with objects that have a resource version that:
- //
- // - is an integer
- // - increases when updated
- // - is comparable across the same resource in a namespace
- //
- // Most backends will have these semantics. Indexer may be nil. ttl controls how long an item
- // remains in the mutation cache before it is removed.
- //
- // If includeAdds is true, objects in the mutation cache will be returned even if they don't exist
- // in the underlying store. This is only safe if your use of the cache can handle mutation entries
- // remaining in the cache for up to ttl when mutations and deletes occur very closely in time.
- func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache {
- return &mutationCache{
- backingCache: backingCache,
- indexer: indexer,
- mutationCache: utilcache.NewLRUExpireCache(100),
- comparator: etcdObjectVersioner{},
- ttl: ttl,
- includeAdds: includeAdds,
- }
- }
- // mutationCache doesn't guarantee that it returns values added via Mutation since they can page out and
- // since you can't distinguish between, "didn't observe create" and "was deleted after create",
- // if the key is missing from the backing cache, we always return it as missing
- type mutationCache struct {
- lock sync.Mutex
- backingCache Store
- indexer Indexer
- mutationCache *utilcache.LRUExpireCache
- includeAdds bool
- ttl time.Duration
- comparator ResourceVersionComparator
- }
- // GetByKey is never guaranteed to return back the value set in Mutation. It could be paged out, it could
- // be older than another copy, the backingCache may be more recent or, you might have written twice into the same key.
- // You get a value that was valid at some snapshot of time and will always return the newer of backingCache and mutationCache.
- func (c *mutationCache) GetByKey(key string) (interface{}, bool, error) {
- c.lock.Lock()
- defer c.lock.Unlock()
- obj, exists, err := c.backingCache.GetByKey(key)
- if err != nil {
- return nil, false, err
- }
- if !exists {
- if !c.includeAdds {
- // we can't distinguish between, "didn't observe create" and "was deleted after create", so
- // if the key is missing, we always return it as missing
- return nil, false, nil
- }
- obj, exists = c.mutationCache.Get(key)
- if !exists {
- return nil, false, nil
- }
- }
- objRuntime, ok := obj.(runtime.Object)
- if !ok {
- return obj, true, nil
- }
- return c.newerObject(key, objRuntime), true, nil
- }
- // ByIndex returns the newer objects that match the provided index and indexer key.
- // Will return an error if no indexer was provided.
- func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, error) {
- c.lock.Lock()
- defer c.lock.Unlock()
- if c.indexer == nil {
- return nil, fmt.Errorf("no indexer has been provided to the mutation cache")
- }
- keys, err := c.indexer.IndexKeys(name, indexKey)
- if err != nil {
- return nil, err
- }
- var items []interface{}
- keySet := sets.NewString()
- for _, key := range keys {
- keySet.Insert(key)
- obj, exists, err := c.indexer.GetByKey(key)
- if err != nil {
- return nil, err
- }
- if !exists {
- continue
- }
- if objRuntime, ok := obj.(runtime.Object); ok {
- items = append(items, c.newerObject(key, objRuntime))
- } else {
- items = append(items, obj)
- }
- }
- if c.includeAdds {
- fn := c.indexer.GetIndexers()[name]
- // Keys() is returned oldest to newest, so full traversal does not alter the LRU behavior
- for _, key := range c.mutationCache.Keys() {
- updated, ok := c.mutationCache.Get(key)
- if !ok {
- continue
- }
- if keySet.Has(key.(string)) {
- continue
- }
- elements, err := fn(updated)
- if err != nil {
- klog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
- continue
- }
- for _, inIndex := range elements {
- if inIndex != indexKey {
- continue
- }
- items = append(items, updated)
- break
- }
- }
- }
- return items, nil
- }
- // newerObject checks the mutation cache for a newer object and returns one if found. If the
- // mutated object is older than the backing object, it is removed from the Must be
- // called while the lock is held.
- func (c *mutationCache) newerObject(key string, backing runtime.Object) runtime.Object {
- mutatedObj, exists := c.mutationCache.Get(key)
- if !exists {
- return backing
- }
- mutatedObjRuntime, ok := mutatedObj.(runtime.Object)
- if !ok {
- return backing
- }
- if c.comparator.CompareResourceVersion(backing, mutatedObjRuntime) >= 0 {
- c.mutationCache.Remove(key)
- return backing
- }
- return mutatedObjRuntime
- }
- // Mutation adds a change to the cache that can be returned in GetByKey if it is newer than the backingCache
- // copy. If you call Mutation twice with the same object on different threads, one will win, but its not defined
- // which one. This doesn't affect correctness, since the GetByKey guaranteed of "later of these two caches" is
- // preserved, but you may not get the version of the object you want. The object you get is only guaranteed to
- // "one that was valid at some point in time", not "the one that I want".
- func (c *mutationCache) Mutation(obj interface{}) {
- c.lock.Lock()
- defer c.lock.Unlock()
- key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
- if err != nil {
- // this is a "nice to have", so failures shouldn't do anything weird
- utilruntime.HandleError(err)
- return
- }
- if objRuntime, ok := obj.(runtime.Object); ok {
- if mutatedObj, exists := c.mutationCache.Get(key); exists {
- if mutatedObjRuntime, ok := mutatedObj.(runtime.Object); ok {
- if c.comparator.CompareResourceVersion(objRuntime, mutatedObjRuntime) < 0 {
- return
- }
- }
- }
- }
- c.mutationCache.Add(key, obj, c.ttl)
- }
- // etcdObjectVersioner implements versioning and extracting etcd node information
- // for objects that have an embedded ObjectMeta or ListMeta field.
- type etcdObjectVersioner struct{}
- // ObjectResourceVersion implements Versioner
- func (a etcdObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
- accessor, err := meta.Accessor(obj)
- if err != nil {
- return 0, err
- }
- version := accessor.GetResourceVersion()
- if len(version) == 0 {
- return 0, nil
- }
- return strconv.ParseUint(version, 10, 64)
- }
- // CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
- // but etcd resource versions are special, they're actually ints, so we can easily compare them.
- func (a etcdObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
- lhsVersion, err := a.ObjectResourceVersion(lhs)
- if err != nil {
- // coder error
- panic(err)
- }
- rhsVersion, err := a.ObjectResourceVersion(rhs)
- if err != nil {
- // coder error
- panic(err)
- }
- if lhsVersion == rhsVersion {
- return 0
- }
- if lhsVersion < rhsVersion {
- return -1
- }
- return 1
- }
|