123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package cache
- import (
- "context"
- "errors"
- "fmt"
- "io"
- "math/rand"
- "reflect"
- "sync"
- "time"
- apierrors "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/api/meta"
- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
- "k8s.io/apimachinery/pkg/runtime"
- "k8s.io/apimachinery/pkg/runtime/schema"
- "k8s.io/apimachinery/pkg/util/clock"
- "k8s.io/apimachinery/pkg/util/naming"
- utilnet "k8s.io/apimachinery/pkg/util/net"
- utilruntime "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/apimachinery/pkg/watch"
- "k8s.io/client-go/tools/pager"
- "k8s.io/klog/v2"
- "k8s.io/utils/trace"
- )
- const defaultExpectedTypeName = "<unspecified>"
- // Reflector watches a specified resource and causes all changes to be reflected in the given store.
- type Reflector struct {
- // name identifies this reflector. By default it will be a file:line if possible.
- name string
- // The name of the type we expect to place in the store. The name
- // will be the stringification of expectedGVK if provided, and the
- // stringification of expectedType otherwise. It is for display
- // only, and should not be used for parsing or comparison.
- expectedTypeName string
- // An example object of the type we expect to place in the store.
- // Only the type needs to be right, except that when that is
- // `unstructured.Unstructured` the object's `"apiVersion"` and
- // `"kind"` must also be right.
- expectedType reflect.Type
- // The GVK of the object we expect to place in the store if unstructured.
- expectedGVK *schema.GroupVersionKind
- // The destination to sync up with the watch source
- store Store
- // listerWatcher is used to perform lists and watches.
- listerWatcher ListerWatcher
- // backoff manages backoff of ListWatch
- backoffManager wait.BackoffManager
- resyncPeriod time.Duration
- // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
- ShouldResync func() bool
- // clock allows tests to manipulate time
- clock clock.Clock
- // paginatedResult defines whether pagination should be forced for list calls.
- // It is set based on the result of the initial list call.
- paginatedResult bool
- // lastSyncResourceVersion is the resource version token last
- // observed when doing a sync with the underlying store
- // it is thread safe, but not synchronized with the underlying store
- lastSyncResourceVersion string
- // isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
- // lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
- isLastSyncResourceVersionUnavailable bool
- // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
- lastSyncResourceVersionMutex sync.RWMutex
- // WatchListPageSize is the requested chunk size of initial and resync watch lists.
- // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
- // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
- // it will turn off pagination to allow serving them from watch cache.
- // NOTE: It should be used carefully as paginated lists are always served directly from
- // etcd, which is significantly less efficient and may lead to serious performance and
- // scalability problems.
- WatchListPageSize int64
- // Called whenever the ListAndWatch drops the connection with an error.
- watchErrorHandler WatchErrorHandler
- }
- // The WatchErrorHandler is called whenever ListAndWatch drops the
- // connection with an error. After calling this handler, the informer
- // will backoff and retry.
- //
- // The default implementation looks at the error type and tries to log
- // the error message at an appropriate level.
- //
- // Implementations of this handler may display the error message in other
- // ways. Implementations should return quickly - any expensive processing
- // should be offloaded.
- type WatchErrorHandler func(r *Reflector, err error)
- // DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
- func DefaultWatchErrorHandler(r *Reflector, err error) {
- switch {
- case isExpiredError(err):
- // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
- // has a semantic that it returns data at least as fresh as provided RV.
- // So first try to LIST with setting RV to resource version of last observed object.
- klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
- case err == io.EOF:
- // watch closed normally
- case err == io.ErrUnexpectedEOF:
- klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
- default:
- utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
- }
- }
- var (
- // We try to spread the load on apiserver by setting timeouts for
- // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
- minWatchTimeout = 5 * time.Minute
- )
- // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
- // The indexer is configured to key on namespace
- func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
- indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
- reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
- return indexer, reflector
- }
- // NewReflector creates a new Reflector object which will keep the
- // given store up to date with the server's contents for the given
- // resource. Reflector promises to only put things in the store that
- // have the type of expectedType, unless expectedType is nil. If
- // resyncPeriod is non-zero, then the reflector will periodically
- // consult its ShouldResync function to determine whether to invoke
- // the Store's Resync operation; `ShouldResync==nil` means always
- // "yes". This enables you to use reflectors to periodically process
- // everything as well as incrementally processing the things that
- // change.
- func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
- return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
- }
- // NewNamedReflector same as NewReflector, but with a specified name for logging
- func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
- realClock := &clock.RealClock{}
- r := &Reflector{
- name: name,
- listerWatcher: lw,
- store: store,
- // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
- // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
- // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
- backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
- resyncPeriod: resyncPeriod,
- clock: realClock,
- watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
- }
- r.setExpectedType(expectedType)
- return r
- }
- func (r *Reflector) setExpectedType(expectedType interface{}) {
- r.expectedType = reflect.TypeOf(expectedType)
- if r.expectedType == nil {
- r.expectedTypeName = defaultExpectedTypeName
- return
- }
- r.expectedTypeName = r.expectedType.String()
- if obj, ok := expectedType.(*unstructured.Unstructured); ok {
- // Use gvk to check that watch event objects are of the desired type.
- gvk := obj.GroupVersionKind()
- if gvk.Empty() {
- klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name)
- return
- }
- r.expectedGVK = &gvk
- r.expectedTypeName = gvk.String()
- }
- }
- // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
- // call chains to NewReflector, so they'd be low entropy names for reflectors
- var internalPackages = []string{"client-go/tools/cache/"}
- // Run repeatedly uses the reflector's ListAndWatch to fetch all the
- // objects and subsequent deltas.
- // Run will exit when stopCh is closed.
- func (r *Reflector) Run(stopCh <-chan struct{}) {
- klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
- wait.BackoffUntil(func() {
- if err := r.ListAndWatch(stopCh); err != nil {
- r.watchErrorHandler(r, err)
- }
- }, r.backoffManager, true, stopCh)
- klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
- }
- var (
- // nothing will ever be sent down this channel
- neverExitWatch <-chan time.Time = make(chan time.Time)
- // Used to indicate that watching stopped because of a signal from the stop
- // channel passed in from a client of the reflector.
- errorStopRequested = errors.New("Stop requested")
- )
- // resyncChan returns a channel which will receive something when a resync is
- // required, and a cleanup function.
- func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
- if r.resyncPeriod == 0 {
- return neverExitWatch, func() bool { return false }
- }
- // The cleanup function is required: imagine the scenario where watches
- // always fail so we end up listing frequently. Then, if we don't
- // manually stop the timer, we could end up with many timers active
- // concurrently.
- t := r.clock.NewTimer(r.resyncPeriod)
- return t.C(), t.Stop
- }
- // ListAndWatch first lists all items and get the resource version at the moment of call,
- // and then use the resource version to watch.
- // It returns error if ListAndWatch didn't even try to initialize watch.
- func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
- klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
- var resourceVersion string
- options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
- if err := func() error {
- initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
- defer initTrace.LogIfLong(10 * time.Second)
- var list runtime.Object
- var paginatedResult bool
- var err error
- listCh := make(chan struct{}, 1)
- panicCh := make(chan interface{}, 1)
- go func() {
- defer func() {
- if r := recover(); r != nil {
- panicCh <- r
- }
- }()
- // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
- // list request will return the full response.
- pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
- return r.listerWatcher.List(opts)
- }))
- switch {
- case r.WatchListPageSize != 0:
- pager.PageSize = r.WatchListPageSize
- case r.paginatedResult:
- // We got a paginated result initially. Assume this resource and server honor
- // paging requests (i.e. watch cache is probably disabled) and leave the default
- // pager size set.
- case options.ResourceVersion != "" && options.ResourceVersion != "0":
- // User didn't explicitly request pagination.
- //
- // With ResourceVersion != "", we have a possibility to list from watch cache,
- // but we do that (for ResourceVersion != "0") only if Limit is unset.
- // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
- // switch off pagination to force listing from watch cache (if enabled).
- // With the existing semantic of RV (result is at least as fresh as provided RV),
- // this is correct and doesn't lead to going back in time.
- //
- // We also don't turn off pagination for ResourceVersion="0", since watch cache
- // is ignoring Limit in that case anyway, and if watch cache is not enabled
- // we don't introduce regression.
- pager.PageSize = 0
- }
- list, paginatedResult, err = pager.List(context.Background(), options)
- if isExpiredError(err) || isTooLargeResourceVersionError(err) {
- r.setIsLastSyncResourceVersionUnavailable(true)
- // Retry immediately if the resource version used to list is unavailable.
- // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
- // continuation pages, but the pager might not be enabled, the full list might fail because the
- // resource version it is listing at is expired or the cache may not yet be synced to the provided
- // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
- // the reflector makes forward progress.
- list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
- }
- close(listCh)
- }()
- select {
- case <-stopCh:
- return nil
- case r := <-panicCh:
- panic(r)
- case <-listCh:
- }
- if err != nil {
- return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
- }
- // We check if the list was paginated and if so set the paginatedResult based on that.
- // However, we want to do that only for the initial list (which is the only case
- // when we set ResourceVersion="0"). The reasoning behind it is that later, in some
- // situations we may force listing directly from etcd (by setting ResourceVersion="")
- // which will return paginated result, even if watch cache is enabled. However, in
- // that case, we still want to prefer sending requests to watch cache if possible.
- //
- // Paginated result returned for request with ResourceVersion="0" mean that watch
- // cache is disabled and there are a lot of objects of a given type. In such case,
- // there is no need to prefer listing from watch cache.
- if options.ResourceVersion == "0" && paginatedResult {
- r.paginatedResult = true
- }
- r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
- initTrace.Step("Objects listed")
- listMetaInterface, err := meta.ListAccessor(list)
- if err != nil {
- return fmt.Errorf("unable to understand list result %#v: %v", list, err)
- }
- resourceVersion = listMetaInterface.GetResourceVersion()
- initTrace.Step("Resource version extracted")
- items, err := meta.ExtractList(list)
- if err != nil {
- return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
- }
- initTrace.Step("Objects extracted")
- if err := r.syncWith(items, resourceVersion); err != nil {
- return fmt.Errorf("unable to sync list result: %v", err)
- }
- initTrace.Step("SyncWith done")
- r.setLastSyncResourceVersion(resourceVersion)
- initTrace.Step("Resource version updated")
- return nil
- }(); err != nil {
- return err
- }
- resyncerrc := make(chan error, 1)
- cancelCh := make(chan struct{})
- defer close(cancelCh)
- go func() {
- resyncCh, cleanup := r.resyncChan()
- defer func() {
- cleanup() // Call the last one written into cleanup
- }()
- for {
- select {
- case <-resyncCh:
- case <-stopCh:
- return
- case <-cancelCh:
- return
- }
- if r.ShouldResync == nil || r.ShouldResync() {
- klog.V(4).Infof("%s: forcing resync", r.name)
- if err := r.store.Resync(); err != nil {
- resyncerrc <- err
- return
- }
- }
- cleanup()
- resyncCh, cleanup = r.resyncChan()
- }
- }()
- for {
- // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
- select {
- case <-stopCh:
- return nil
- default:
- }
- timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
- options = metav1.ListOptions{
- ResourceVersion: resourceVersion,
- // We want to avoid situations of hanging watchers. Stop any wachers that do not
- // receive any events within the timeout window.
- TimeoutSeconds: &timeoutSeconds,
- // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
- // Reflector doesn't assume bookmarks are returned at all (if the server do not support
- // watch bookmarks, it will ignore this field).
- AllowWatchBookmarks: true,
- }
- // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
- start := r.clock.Now()
- w, err := r.listerWatcher.Watch(options)
- if err != nil {
- // If this is "connection refused" error, it means that most likely apiserver is not responsive.
- // It doesn't make sense to re-list all objects because most likely we will be able to restart
- // watch where we ended.
- // If that's the case wait and resend watch request.
- if utilnet.IsConnectionRefused(err) {
- time.Sleep(time.Second)
- continue
- }
- return err
- }
- if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
- if err != errorStopRequested {
- switch {
- case isExpiredError(err):
- // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
- // has a semantic that it returns data at least as fresh as provided RV.
- // So first try to LIST with setting RV to resource version of last observed object.
- klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
- default:
- klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
- }
- }
- return nil
- }
- }
- }
- // syncWith replaces the store's items with the given list.
- func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
- found := make([]interface{}, 0, len(items))
- for _, item := range items {
- found = append(found, item)
- }
- return r.store.Replace(found, resourceVersion)
- }
- // watchHandler watches w and keeps *resourceVersion up to date.
- func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
- eventCount := 0
- // Stopping the watcher should be idempotent and if we return from this function there's no way
- // we're coming back in with the same watch interface.
- defer w.Stop()
- loop:
- for {
- select {
- case <-stopCh:
- return errorStopRequested
- case err := <-errc:
- return err
- case event, ok := <-w.ResultChan():
- if !ok {
- break loop
- }
- if event.Type == watch.Error {
- return apierrors.FromObject(event.Object)
- }
- if r.expectedType != nil {
- if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
- utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
- continue
- }
- }
- if r.expectedGVK != nil {
- if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
- utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
- continue
- }
- }
- meta, err := meta.Accessor(event.Object)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
- continue
- }
- newResourceVersion := meta.GetResourceVersion()
- switch event.Type {
- case watch.Added:
- err := r.store.Add(event.Object)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
- }
- case watch.Modified:
- err := r.store.Update(event.Object)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
- }
- case watch.Deleted:
- // TODO: Will any consumers need access to the "last known
- // state", which is passed in event.Object? If so, may need
- // to change this.
- err := r.store.Delete(event.Object)
- if err != nil {
- utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
- }
- case watch.Bookmark:
- // A `Bookmark` means watch has synced here, just update the resourceVersion
- default:
- utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
- }
- *resourceVersion = newResourceVersion
- r.setLastSyncResourceVersion(newResourceVersion)
- eventCount++
- }
- }
- watchDuration := r.clock.Since(start)
- if watchDuration < 1*time.Second && eventCount == 0 {
- return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
- }
- klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
- return nil
- }
- // LastSyncResourceVersion is the resource version observed when last sync with the underlying store
- // The value returned is not synchronized with access to the underlying store and is not thread-safe
- func (r *Reflector) LastSyncResourceVersion() string {
- r.lastSyncResourceVersionMutex.RLock()
- defer r.lastSyncResourceVersionMutex.RUnlock()
- return r.lastSyncResourceVersion
- }
- func (r *Reflector) setLastSyncResourceVersion(v string) {
- r.lastSyncResourceVersionMutex.Lock()
- defer r.lastSyncResourceVersionMutex.Unlock()
- r.lastSyncResourceVersion = v
- }
- // relistResourceVersion determines the resource version the reflector should list or relist from.
- // Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
- // versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
- // in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in
- // etcd via a quorum read.
- func (r *Reflector) relistResourceVersion() string {
- r.lastSyncResourceVersionMutex.RLock()
- defer r.lastSyncResourceVersionMutex.RUnlock()
- if r.isLastSyncResourceVersionUnavailable {
- // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
- // if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
- // to the latest available ResourceVersion, using a consistent read from etcd.
- return ""
- }
- if r.lastSyncResourceVersion == "" {
- // For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
- // be served from the watch cache if it is enabled.
- return "0"
- }
- return r.lastSyncResourceVersion
- }
- // setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
- // "expired" or "too large resource version" error.
- func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
- r.lastSyncResourceVersionMutex.Lock()
- defer r.lastSyncResourceVersionMutex.Unlock()
- r.isLastSyncResourceVersionUnavailable = isUnavailable
- }
- func isExpiredError(err error) bool {
- // In Kubernetes 1.17 and earlier, the api server returns both apierrors.StatusReasonExpired and
- // apierrors.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
- // and always returns apierrors.StatusReasonExpired. For backward compatibility we can only remove the apierrors.IsGone
- // check when we fully drop support for Kubernetes 1.17 servers from reflectors.
- return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
- }
- func isTooLargeResourceVersionError(err error) bool {
- if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
- return true
- }
- // In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to
- // metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource
- // version is larger than the largest currently available resource version. To ensure backward
- // compatibility with these server versions we also need to detect the error based on the content
- // of the error message field.
- if !apierrors.IsTimeout(err) {
- return false
- }
- apierr, ok := err.(apierrors.APIStatus)
- if !ok || apierr == nil || apierr.Status().Details == nil {
- return false
- }
- for _, cause := range apierr.Status().Details.Causes {
- // Matches the message returned by api server 1.17.0-1.18.5 for this error condition
- if cause.Message == "Too large resource version" {
- return true
- }
- }
- return false
- }
|