reflector.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "math/rand"
  20. "reflect"
  21. "sync"
  22. "time"
  23. apierrors "k8s.io/apimachinery/pkg/api/errors"
  24. "k8s.io/apimachinery/pkg/api/meta"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
  27. "k8s.io/apimachinery/pkg/runtime"
  28. "k8s.io/apimachinery/pkg/runtime/schema"
  29. "k8s.io/apimachinery/pkg/util/clock"
  30. "k8s.io/apimachinery/pkg/util/naming"
  31. utilnet "k8s.io/apimachinery/pkg/util/net"
  32. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  33. "k8s.io/apimachinery/pkg/util/wait"
  34. "k8s.io/apimachinery/pkg/watch"
  35. "k8s.io/client-go/tools/pager"
  36. "k8s.io/klog/v2"
  37. "k8s.io/utils/trace"
  38. )
  39. const defaultExpectedTypeName = "<unspecified>"
  40. // Reflector watches a specified resource and causes all changes to be reflected in the given store.
  41. type Reflector struct {
  42. // name identifies this reflector. By default it will be a file:line if possible.
  43. name string
  44. // The name of the type we expect to place in the store. The name
  45. // will be the stringification of expectedGVK if provided, and the
  46. // stringification of expectedType otherwise. It is for display
  47. // only, and should not be used for parsing or comparison.
  48. expectedTypeName string
  49. // An example object of the type we expect to place in the store.
  50. // Only the type needs to be right, except that when that is
  51. // `unstructured.Unstructured` the object's `"apiVersion"` and
  52. // `"kind"` must also be right.
  53. expectedType reflect.Type
  54. // The GVK of the object we expect to place in the store if unstructured.
  55. expectedGVK *schema.GroupVersionKind
  56. // The destination to sync up with the watch source
  57. store Store
  58. // listerWatcher is used to perform lists and watches.
  59. listerWatcher ListerWatcher
  60. // backoff manages backoff of ListWatch
  61. backoffManager wait.BackoffManager
  62. resyncPeriod time.Duration
  63. // ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
  64. ShouldResync func() bool
  65. // clock allows tests to manipulate time
  66. clock clock.Clock
  67. // paginatedResult defines whether pagination should be forced for list calls.
  68. // It is set based on the result of the initial list call.
  69. paginatedResult bool
  70. // lastSyncResourceVersion is the resource version token last
  71. // observed when doing a sync with the underlying store
  72. // it is thread safe, but not synchronized with the underlying store
  73. lastSyncResourceVersion string
  74. // isLastSyncResourceVersionUnavailable is true if the previous list or watch request with
  75. // lastSyncResourceVersion failed with an "expired" or "too large resource version" error.
  76. isLastSyncResourceVersionUnavailable bool
  77. // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
  78. lastSyncResourceVersionMutex sync.RWMutex
  79. // WatchListPageSize is the requested chunk size of initial and resync watch lists.
  80. // If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
  81. // (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
  82. // it will turn off pagination to allow serving them from watch cache.
  83. // NOTE: It should be used carefully as paginated lists are always served directly from
  84. // etcd, which is significantly less efficient and may lead to serious performance and
  85. // scalability problems.
  86. WatchListPageSize int64
  87. // Called whenever the ListAndWatch drops the connection with an error.
  88. watchErrorHandler WatchErrorHandler
  89. }
  90. // The WatchErrorHandler is called whenever ListAndWatch drops the
  91. // connection with an error. After calling this handler, the informer
  92. // will backoff and retry.
  93. //
  94. // The default implementation looks at the error type and tries to log
  95. // the error message at an appropriate level.
  96. //
  97. // Implementations of this handler may display the error message in other
  98. // ways. Implementations should return quickly - any expensive processing
  99. // should be offloaded.
  100. type WatchErrorHandler func(r *Reflector, err error)
  101. // DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
  102. func DefaultWatchErrorHandler(r *Reflector, err error) {
  103. switch {
  104. case isExpiredError(err):
  105. // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
  106. // has a semantic that it returns data at least as fresh as provided RV.
  107. // So first try to LIST with setting RV to resource version of last observed object.
  108. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
  109. case err == io.EOF:
  110. // watch closed normally
  111. case err == io.ErrUnexpectedEOF:
  112. klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
  113. default:
  114. utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
  115. }
  116. }
  117. var (
  118. // We try to spread the load on apiserver by setting timeouts for
  119. // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
  120. minWatchTimeout = 5 * time.Minute
  121. )
  122. // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
  123. // The indexer is configured to key on namespace
  124. func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
  125. indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
  126. reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
  127. return indexer, reflector
  128. }
  129. // NewReflector creates a new Reflector object which will keep the
  130. // given store up to date with the server's contents for the given
  131. // resource. Reflector promises to only put things in the store that
  132. // have the type of expectedType, unless expectedType is nil. If
  133. // resyncPeriod is non-zero, then the reflector will periodically
  134. // consult its ShouldResync function to determine whether to invoke
  135. // the Store's Resync operation; `ShouldResync==nil` means always
  136. // "yes". This enables you to use reflectors to periodically process
  137. // everything as well as incrementally processing the things that
  138. // change.
  139. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  140. return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
  141. }
  142. // NewNamedReflector same as NewReflector, but with a specified name for logging
  143. func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  144. realClock := &clock.RealClock{}
  145. r := &Reflector{
  146. name: name,
  147. listerWatcher: lw,
  148. store: store,
  149. // We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
  150. // API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
  151. // 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
  152. backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
  153. resyncPeriod: resyncPeriod,
  154. clock: realClock,
  155. watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler),
  156. }
  157. r.setExpectedType(expectedType)
  158. return r
  159. }
  160. func (r *Reflector) setExpectedType(expectedType interface{}) {
  161. r.expectedType = reflect.TypeOf(expectedType)
  162. if r.expectedType == nil {
  163. r.expectedTypeName = defaultExpectedTypeName
  164. return
  165. }
  166. r.expectedTypeName = r.expectedType.String()
  167. if obj, ok := expectedType.(*unstructured.Unstructured); ok {
  168. // Use gvk to check that watch event objects are of the desired type.
  169. gvk := obj.GroupVersionKind()
  170. if gvk.Empty() {
  171. klog.V(4).Infof("Reflector from %s configured with expectedType of *unstructured.Unstructured with empty GroupVersionKind.", r.name)
  172. return
  173. }
  174. r.expectedGVK = &gvk
  175. r.expectedTypeName = gvk.String()
  176. }
  177. }
  178. // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
  179. // call chains to NewReflector, so they'd be low entropy names for reflectors
  180. var internalPackages = []string{"client-go/tools/cache/"}
  181. // Run repeatedly uses the reflector's ListAndWatch to fetch all the
  182. // objects and subsequent deltas.
  183. // Run will exit when stopCh is closed.
  184. func (r *Reflector) Run(stopCh <-chan struct{}) {
  185. klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
  186. wait.BackoffUntil(func() {
  187. if err := r.ListAndWatch(stopCh); err != nil {
  188. r.watchErrorHandler(r, err)
  189. }
  190. }, r.backoffManager, true, stopCh)
  191. klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
  192. }
  193. var (
  194. // nothing will ever be sent down this channel
  195. neverExitWatch <-chan time.Time = make(chan time.Time)
  196. // Used to indicate that watching stopped because of a signal from the stop
  197. // channel passed in from a client of the reflector.
  198. errorStopRequested = errors.New("Stop requested")
  199. )
  200. // resyncChan returns a channel which will receive something when a resync is
  201. // required, and a cleanup function.
  202. func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
  203. if r.resyncPeriod == 0 {
  204. return neverExitWatch, func() bool { return false }
  205. }
  206. // The cleanup function is required: imagine the scenario where watches
  207. // always fail so we end up listing frequently. Then, if we don't
  208. // manually stop the timer, we could end up with many timers active
  209. // concurrently.
  210. t := r.clock.NewTimer(r.resyncPeriod)
  211. return t.C(), t.Stop
  212. }
  213. // ListAndWatch first lists all items and get the resource version at the moment of call,
  214. // and then use the resource version to watch.
  215. // It returns error if ListAndWatch didn't even try to initialize watch.
  216. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  217. klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
  218. var resourceVersion string
  219. options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
  220. if err := func() error {
  221. initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
  222. defer initTrace.LogIfLong(10 * time.Second)
  223. var list runtime.Object
  224. var paginatedResult bool
  225. var err error
  226. listCh := make(chan struct{}, 1)
  227. panicCh := make(chan interface{}, 1)
  228. go func() {
  229. defer func() {
  230. if r := recover(); r != nil {
  231. panicCh <- r
  232. }
  233. }()
  234. // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
  235. // list request will return the full response.
  236. pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
  237. return r.listerWatcher.List(opts)
  238. }))
  239. switch {
  240. case r.WatchListPageSize != 0:
  241. pager.PageSize = r.WatchListPageSize
  242. case r.paginatedResult:
  243. // We got a paginated result initially. Assume this resource and server honor
  244. // paging requests (i.e. watch cache is probably disabled) and leave the default
  245. // pager size set.
  246. case options.ResourceVersion != "" && options.ResourceVersion != "0":
  247. // User didn't explicitly request pagination.
  248. //
  249. // With ResourceVersion != "", we have a possibility to list from watch cache,
  250. // but we do that (for ResourceVersion != "0") only if Limit is unset.
  251. // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
  252. // switch off pagination to force listing from watch cache (if enabled).
  253. // With the existing semantic of RV (result is at least as fresh as provided RV),
  254. // this is correct and doesn't lead to going back in time.
  255. //
  256. // We also don't turn off pagination for ResourceVersion="0", since watch cache
  257. // is ignoring Limit in that case anyway, and if watch cache is not enabled
  258. // we don't introduce regression.
  259. pager.PageSize = 0
  260. }
  261. list, paginatedResult, err = pager.List(context.Background(), options)
  262. if isExpiredError(err) || isTooLargeResourceVersionError(err) {
  263. r.setIsLastSyncResourceVersionUnavailable(true)
  264. // Retry immediately if the resource version used to list is unavailable.
  265. // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
  266. // continuation pages, but the pager might not be enabled, the full list might fail because the
  267. // resource version it is listing at is expired or the cache may not yet be synced to the provided
  268. // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
  269. // the reflector makes forward progress.
  270. list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
  271. }
  272. close(listCh)
  273. }()
  274. select {
  275. case <-stopCh:
  276. return nil
  277. case r := <-panicCh:
  278. panic(r)
  279. case <-listCh:
  280. }
  281. if err != nil {
  282. return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
  283. }
  284. // We check if the list was paginated and if so set the paginatedResult based on that.
  285. // However, we want to do that only for the initial list (which is the only case
  286. // when we set ResourceVersion="0"). The reasoning behind it is that later, in some
  287. // situations we may force listing directly from etcd (by setting ResourceVersion="")
  288. // which will return paginated result, even if watch cache is enabled. However, in
  289. // that case, we still want to prefer sending requests to watch cache if possible.
  290. //
  291. // Paginated result returned for request with ResourceVersion="0" mean that watch
  292. // cache is disabled and there are a lot of objects of a given type. In such case,
  293. // there is no need to prefer listing from watch cache.
  294. if options.ResourceVersion == "0" && paginatedResult {
  295. r.paginatedResult = true
  296. }
  297. r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
  298. initTrace.Step("Objects listed")
  299. listMetaInterface, err := meta.ListAccessor(list)
  300. if err != nil {
  301. return fmt.Errorf("unable to understand list result %#v: %v", list, err)
  302. }
  303. resourceVersion = listMetaInterface.GetResourceVersion()
  304. initTrace.Step("Resource version extracted")
  305. items, err := meta.ExtractList(list)
  306. if err != nil {
  307. return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
  308. }
  309. initTrace.Step("Objects extracted")
  310. if err := r.syncWith(items, resourceVersion); err != nil {
  311. return fmt.Errorf("unable to sync list result: %v", err)
  312. }
  313. initTrace.Step("SyncWith done")
  314. r.setLastSyncResourceVersion(resourceVersion)
  315. initTrace.Step("Resource version updated")
  316. return nil
  317. }(); err != nil {
  318. return err
  319. }
  320. resyncerrc := make(chan error, 1)
  321. cancelCh := make(chan struct{})
  322. defer close(cancelCh)
  323. go func() {
  324. resyncCh, cleanup := r.resyncChan()
  325. defer func() {
  326. cleanup() // Call the last one written into cleanup
  327. }()
  328. for {
  329. select {
  330. case <-resyncCh:
  331. case <-stopCh:
  332. return
  333. case <-cancelCh:
  334. return
  335. }
  336. if r.ShouldResync == nil || r.ShouldResync() {
  337. klog.V(4).Infof("%s: forcing resync", r.name)
  338. if err := r.store.Resync(); err != nil {
  339. resyncerrc <- err
  340. return
  341. }
  342. }
  343. cleanup()
  344. resyncCh, cleanup = r.resyncChan()
  345. }
  346. }()
  347. for {
  348. // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
  349. select {
  350. case <-stopCh:
  351. return nil
  352. default:
  353. }
  354. timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
  355. options = metav1.ListOptions{
  356. ResourceVersion: resourceVersion,
  357. // We want to avoid situations of hanging watchers. Stop any wachers that do not
  358. // receive any events within the timeout window.
  359. TimeoutSeconds: &timeoutSeconds,
  360. // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
  361. // Reflector doesn't assume bookmarks are returned at all (if the server do not support
  362. // watch bookmarks, it will ignore this field).
  363. AllowWatchBookmarks: true,
  364. }
  365. // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
  366. start := r.clock.Now()
  367. w, err := r.listerWatcher.Watch(options)
  368. if err != nil {
  369. // If this is "connection refused" error, it means that most likely apiserver is not responsive.
  370. // It doesn't make sense to re-list all objects because most likely we will be able to restart
  371. // watch where we ended.
  372. // If that's the case wait and resend watch request.
  373. if utilnet.IsConnectionRefused(err) {
  374. time.Sleep(time.Second)
  375. continue
  376. }
  377. return err
  378. }
  379. if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
  380. if err != errorStopRequested {
  381. switch {
  382. case isExpiredError(err):
  383. // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
  384. // has a semantic that it returns data at least as fresh as provided RV.
  385. // So first try to LIST with setting RV to resource version of last observed object.
  386. klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
  387. default:
  388. klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
  389. }
  390. }
  391. return nil
  392. }
  393. }
  394. }
  395. // syncWith replaces the store's items with the given list.
  396. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
  397. found := make([]interface{}, 0, len(items))
  398. for _, item := range items {
  399. found = append(found, item)
  400. }
  401. return r.store.Replace(found, resourceVersion)
  402. }
  403. // watchHandler watches w and keeps *resourceVersion up to date.
  404. func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
  405. eventCount := 0
  406. // Stopping the watcher should be idempotent and if we return from this function there's no way
  407. // we're coming back in with the same watch interface.
  408. defer w.Stop()
  409. loop:
  410. for {
  411. select {
  412. case <-stopCh:
  413. return errorStopRequested
  414. case err := <-errc:
  415. return err
  416. case event, ok := <-w.ResultChan():
  417. if !ok {
  418. break loop
  419. }
  420. if event.Type == watch.Error {
  421. return apierrors.FromObject(event.Object)
  422. }
  423. if r.expectedType != nil {
  424. if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
  425. utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
  426. continue
  427. }
  428. }
  429. if r.expectedGVK != nil {
  430. if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
  431. utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
  432. continue
  433. }
  434. }
  435. meta, err := meta.Accessor(event.Object)
  436. if err != nil {
  437. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  438. continue
  439. }
  440. newResourceVersion := meta.GetResourceVersion()
  441. switch event.Type {
  442. case watch.Added:
  443. err := r.store.Add(event.Object)
  444. if err != nil {
  445. utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
  446. }
  447. case watch.Modified:
  448. err := r.store.Update(event.Object)
  449. if err != nil {
  450. utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
  451. }
  452. case watch.Deleted:
  453. // TODO: Will any consumers need access to the "last known
  454. // state", which is passed in event.Object? If so, may need
  455. // to change this.
  456. err := r.store.Delete(event.Object)
  457. if err != nil {
  458. utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
  459. }
  460. case watch.Bookmark:
  461. // A `Bookmark` means watch has synced here, just update the resourceVersion
  462. default:
  463. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  464. }
  465. *resourceVersion = newResourceVersion
  466. r.setLastSyncResourceVersion(newResourceVersion)
  467. eventCount++
  468. }
  469. }
  470. watchDuration := r.clock.Since(start)
  471. if watchDuration < 1*time.Second && eventCount == 0 {
  472. return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
  473. }
  474. klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
  475. return nil
  476. }
  477. // LastSyncResourceVersion is the resource version observed when last sync with the underlying store
  478. // The value returned is not synchronized with access to the underlying store and is not thread-safe
  479. func (r *Reflector) LastSyncResourceVersion() string {
  480. r.lastSyncResourceVersionMutex.RLock()
  481. defer r.lastSyncResourceVersionMutex.RUnlock()
  482. return r.lastSyncResourceVersion
  483. }
  484. func (r *Reflector) setLastSyncResourceVersion(v string) {
  485. r.lastSyncResourceVersionMutex.Lock()
  486. defer r.lastSyncResourceVersionMutex.Unlock()
  487. r.lastSyncResourceVersion = v
  488. }
  489. // relistResourceVersion determines the resource version the reflector should list or relist from.
  490. // Returns either the lastSyncResourceVersion so that this reflector will relist with a resource
  491. // versions no older than has already been observed in relist results or watch events, or, if the last relist resulted
  492. // in an HTTP 410 (Gone) status code, returns "" so that the relist will use the latest resource version available in
  493. // etcd via a quorum read.
  494. func (r *Reflector) relistResourceVersion() string {
  495. r.lastSyncResourceVersionMutex.RLock()
  496. defer r.lastSyncResourceVersionMutex.RUnlock()
  497. if r.isLastSyncResourceVersionUnavailable {
  498. // Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
  499. // if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to re-establish reflector
  500. // to the latest available ResourceVersion, using a consistent read from etcd.
  501. return ""
  502. }
  503. if r.lastSyncResourceVersion == "" {
  504. // For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
  505. // be served from the watch cache if it is enabled.
  506. return "0"
  507. }
  508. return r.lastSyncResourceVersion
  509. }
  510. // setIsLastSyncResourceVersionUnavailable sets if the last list or watch request with lastSyncResourceVersion returned
  511. // "expired" or "too large resource version" error.
  512. func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
  513. r.lastSyncResourceVersionMutex.Lock()
  514. defer r.lastSyncResourceVersionMutex.Unlock()
  515. r.isLastSyncResourceVersionUnavailable = isUnavailable
  516. }
  517. func isExpiredError(err error) bool {
  518. // In Kubernetes 1.17 and earlier, the api server returns both apierrors.StatusReasonExpired and
  519. // apierrors.StatusReasonGone for HTTP 410 (Gone) status code responses. In 1.18 the kube server is more consistent
  520. // and always returns apierrors.StatusReasonExpired. For backward compatibility we can only remove the apierrors.IsGone
  521. // check when we fully drop support for Kubernetes 1.17 servers from reflectors.
  522. return apierrors.IsResourceExpired(err) || apierrors.IsGone(err)
  523. }
  524. func isTooLargeResourceVersionError(err error) bool {
  525. if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) {
  526. return true
  527. }
  528. // In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to
  529. // metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource
  530. // version is larger than the largest currently available resource version. To ensure backward
  531. // compatibility with these server versions we also need to detect the error based on the content
  532. // of the error message field.
  533. if !apierrors.IsTimeout(err) {
  534. return false
  535. }
  536. apierr, ok := err.(apierrors.APIStatus)
  537. if !ok || apierr == nil || apierr.Status().Details == nil {
  538. return false
  539. }
  540. for _, cause := range apierr.Status().Details.Causes {
  541. // Matches the message returned by api server 1.17.0-1.18.5 for this error condition
  542. if cause.Message == "Too large resource version" {
  543. return true
  544. }
  545. }
  546. return false
  547. }