cacher.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package storage
  14. import (
  15. "fmt"
  16. "net/http"
  17. "reflect"
  18. "strconv"
  19. "sync"
  20. "time"
  21. "k8s.io/kubernetes/pkg/api"
  22. "k8s.io/kubernetes/pkg/api/errors"
  23. "k8s.io/kubernetes/pkg/api/meta"
  24. "k8s.io/kubernetes/pkg/api/unversioned"
  25. "k8s.io/kubernetes/pkg/client/cache"
  26. "k8s.io/kubernetes/pkg/conversion"
  27. "k8s.io/kubernetes/pkg/runtime"
  28. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  29. "k8s.io/kubernetes/pkg/util/wait"
  30. "k8s.io/kubernetes/pkg/watch"
  31. "github.com/golang/glog"
  32. "golang.org/x/net/context"
  33. )
  34. // CacherConfig contains the configuration for a given Cache.
  35. type CacherConfig struct {
  36. // Maximum size of the history cached in memory.
  37. CacheCapacity int
  38. // An underlying storage.Interface.
  39. Storage Interface
  40. // An underlying storage.Versioner.
  41. Versioner Versioner
  42. // The Cache will be caching objects of a given Type and assumes that they
  43. // are all stored under ResourcePrefix directory in the underlying database.
  44. Type interface{}
  45. ResourcePrefix string
  46. // KeyFunc is used to get a key in the underyling storage for a given object.
  47. KeyFunc func(runtime.Object) (string, error)
  48. // TriggerPublisherFunc is used for optimizing amount of watchers that
  49. // needs to process an incoming event.
  50. TriggerPublisherFunc TriggerPublisherFunc
  51. // NewList is a function that creates new empty object storing a list of
  52. // objects of type Type.
  53. NewListFunc func() runtime.Object
  54. Codec runtime.Codec
  55. }
  56. type watchersMap map[int]*cacheWatcher
  57. func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
  58. wm[number] = w
  59. }
  60. func (wm watchersMap) deleteWatcher(number int) {
  61. delete(wm, number)
  62. }
  63. func (wm watchersMap) terminateAll() {
  64. for key, watcher := range wm {
  65. delete(wm, key)
  66. watcher.stop()
  67. }
  68. }
  69. type indexedWatchers struct {
  70. allWatchers watchersMap
  71. valueWatchers map[string]watchersMap
  72. }
  73. func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) {
  74. if supported {
  75. if _, ok := i.valueWatchers[value]; !ok {
  76. i.valueWatchers[value] = watchersMap{}
  77. }
  78. i.valueWatchers[value].addWatcher(w, number)
  79. } else {
  80. i.allWatchers.addWatcher(w, number)
  81. }
  82. }
  83. func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) {
  84. if supported {
  85. i.valueWatchers[value].deleteWatcher(number)
  86. if len(i.valueWatchers[value]) == 0 {
  87. delete(i.valueWatchers, value)
  88. }
  89. } else {
  90. i.allWatchers.deleteWatcher(number)
  91. }
  92. }
  93. func (i *indexedWatchers) terminateAll() {
  94. i.allWatchers.terminateAll()
  95. for index, watchers := range i.valueWatchers {
  96. watchers.terminateAll()
  97. delete(i.valueWatchers, index)
  98. }
  99. }
  100. // Cacher is responsible for serving WATCH and LIST requests for a given
  101. // resource from its internal cache and updating its cache in the background
  102. // based on the underlying storage contents.
  103. // Cacher implements storage.Interface (although most of the calls are just
  104. // delegated to the underlying storage).
  105. type Cacher struct {
  106. sync.RWMutex
  107. // Before accessing the cacher's cache, wait for the ready to be ok.
  108. // This is necessary to prevent users from accessing structures that are
  109. // uninitialized or are being repopulated right now.
  110. // ready needs to be set to false when the cacher is paused or stopped.
  111. // ready needs to be set to true when the cacher is ready to use after
  112. // initialization.
  113. ready *ready
  114. // Underlying storage.Interface.
  115. storage Interface
  116. // Expected type of objects in the underlying cache.
  117. objectType reflect.Type
  118. // "sliding window" of recent changes of objects and the current state.
  119. watchCache *watchCache
  120. reflector *cache.Reflector
  121. // Versioner is used to handle resource versions.
  122. versioner Versioner
  123. // keyFunc is used to get a key in the underyling storage for a given object.
  124. keyFunc func(runtime.Object) (string, error)
  125. // triggerFunc is used for optimizing amount of watchers that needs to process
  126. // an incoming event.
  127. triggerFunc TriggerPublisherFunc
  128. // watchers is mapping from the value of trigger function that a
  129. // watcher is interested into the watchers
  130. watcherIdx int
  131. watchers indexedWatchers
  132. // Handling graceful termination.
  133. stopLock sync.RWMutex
  134. stopped bool
  135. stopCh chan struct{}
  136. stopWg sync.WaitGroup
  137. }
  138. // Create a new Cacher responsible from service WATCH and LIST requests from its
  139. // internal cache and updating its cache in the background based on the given
  140. // configuration.
  141. func NewCacherFromConfig(config CacherConfig) *Cacher {
  142. watchCache := newWatchCache(config.CacheCapacity)
  143. listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
  144. // Give this error when it is constructed rather than when you get the
  145. // first watch item, because it's much easier to track down that way.
  146. if obj, ok := config.Type.(runtime.Object); ok {
  147. if err := runtime.CheckCodec(config.Codec, obj); err != nil {
  148. panic("storage codec doesn't seem to match given type: " + err.Error())
  149. }
  150. }
  151. cacher := &Cacher{
  152. ready: newReady(),
  153. storage: config.Storage,
  154. objectType: reflect.TypeOf(config.Type),
  155. watchCache: watchCache,
  156. reflector: cache.NewReflector(listerWatcher, config.Type, watchCache, 0),
  157. versioner: config.Versioner,
  158. keyFunc: config.KeyFunc,
  159. triggerFunc: config.TriggerPublisherFunc,
  160. watcherIdx: 0,
  161. watchers: indexedWatchers{
  162. allWatchers: make(map[int]*cacheWatcher),
  163. valueWatchers: make(map[string]watchersMap),
  164. },
  165. // We need to (potentially) stop both:
  166. // - wait.Until go-routine
  167. // - reflector.ListAndWatch
  168. // and there are no guarantees on the order that they will stop.
  169. // So we will be simply closing the channel, and synchronizing on the WaitGroup.
  170. stopCh: make(chan struct{}),
  171. }
  172. watchCache.SetOnEvent(cacher.processEvent)
  173. stopCh := cacher.stopCh
  174. cacher.stopWg.Add(1)
  175. go func() {
  176. defer cacher.stopWg.Done()
  177. wait.Until(
  178. func() {
  179. if !cacher.isStopped() {
  180. cacher.startCaching(stopCh)
  181. }
  182. }, time.Second, stopCh,
  183. )
  184. }()
  185. return cacher
  186. }
  187. func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
  188. // The 'usable' lock is always 'RLock'able when it is safe to use the cache.
  189. // It is safe to use the cache after a successful list until a disconnection.
  190. // We start with usable (write) locked. The below OnReplace function will
  191. // unlock it after a successful list. The below defer will then re-lock
  192. // it when this function exits (always due to disconnection), only if
  193. // we actually got a successful list. This cycle will repeat as needed.
  194. successfulList := false
  195. c.watchCache.SetOnReplace(func() {
  196. successfulList = true
  197. c.ready.set(true)
  198. })
  199. defer func() {
  200. if successfulList {
  201. c.ready.set(false)
  202. }
  203. }()
  204. c.terminateAllWatchers()
  205. // Note that since onReplace may be not called due to errors, we explicitly
  206. // need to retry it on errors under lock.
  207. // Also note that startCaching is called in a loop, so there's no need
  208. // to have another loop here.
  209. if err := c.reflector.ListAndWatch(stopChannel); err != nil {
  210. glog.Errorf("unexpected ListAndWatch error: %v", err)
  211. }
  212. }
  213. // Implements storage.Interface.
  214. func (c *Cacher) Versioner() Versioner {
  215. return c.storage.Versioner()
  216. }
  217. // Implements storage.Interface.
  218. func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
  219. return c.storage.Create(ctx, key, obj, out, ttl)
  220. }
  221. // Implements storage.Interface.
  222. func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions) error {
  223. return c.storage.Delete(ctx, key, out, preconditions)
  224. }
  225. // Implements storage.Interface.
  226. func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error) {
  227. watchRV, err := ParseWatchResourceVersion(resourceVersion)
  228. if err != nil {
  229. return nil, err
  230. }
  231. c.ready.wait()
  232. // We explicitly use thread unsafe version and do locking ourself to ensure that
  233. // no new events will be processed in the meantime. The watchCache will be unlocked
  234. // on return from this function.
  235. // Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
  236. // underlying watchCache is calling processEvent under its lock.
  237. c.watchCache.RLock()
  238. defer c.watchCache.RUnlock()
  239. initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
  240. if err != nil {
  241. // To match the uncached watch implementation, once we have passed authn/authz/admission,
  242. // and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
  243. // rather than a directly returned error.
  244. return newErrWatcher(err), nil
  245. }
  246. triggerValue, triggerSupported := "", false
  247. // TODO: Currently we assume that in a given Cacher object, any <filter> that is
  248. // passed here is aware of exactly the same trigger (at most one).
  249. // Thus, either 0 or 1 values will be returned.
  250. if matchValues := filter.Trigger(); len(matchValues) > 0 {
  251. triggerValue, triggerSupported = matchValues[0].Value, true
  252. }
  253. c.Lock()
  254. defer c.Unlock()
  255. forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
  256. watcher := newCacheWatcher(watchRV, initEvents, filterFunction(key, c.keyFunc, filter), forget)
  257. c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
  258. c.watcherIdx++
  259. return watcher, nil
  260. }
  261. // Implements storage.Interface.
  262. func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, filter Filter) (watch.Interface, error) {
  263. return c.Watch(ctx, key, resourceVersion, filter)
  264. }
  265. // Implements storage.Interface.
  266. func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error {
  267. return c.storage.Get(ctx, key, objPtr, ignoreNotFound)
  268. }
  269. // Implements storage.Interface.
  270. func (c *Cacher) GetToList(ctx context.Context, key string, filter Filter, listObj runtime.Object) error {
  271. return c.storage.GetToList(ctx, key, filter, listObj)
  272. }
  273. // Implements storage.Interface.
  274. func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter Filter, listObj runtime.Object) error {
  275. if resourceVersion == "" {
  276. // If resourceVersion is not specified, serve it from underlying
  277. // storage (for backward compatibility).
  278. return c.storage.List(ctx, key, resourceVersion, filter, listObj)
  279. }
  280. // If resourceVersion is specified, serve it from cache.
  281. // It's guaranteed that the returned value is at least that
  282. // fresh as the given resourceVersion.
  283. listRV, err := ParseListResourceVersion(resourceVersion)
  284. if err != nil {
  285. return err
  286. }
  287. c.ready.wait()
  288. // List elements from cache, with at least 'listRV'.
  289. listPtr, err := meta.GetItemsPtr(listObj)
  290. if err != nil {
  291. return err
  292. }
  293. listVal, err := conversion.EnforcePtr(listPtr)
  294. if err != nil || listVal.Kind() != reflect.Slice {
  295. return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
  296. }
  297. filterFunc := filterFunction(key, c.keyFunc, filter)
  298. objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV)
  299. if err != nil {
  300. return fmt.Errorf("failed to wait for fresh list: %v", err)
  301. }
  302. for _, obj := range objs {
  303. object, ok := obj.(runtime.Object)
  304. if !ok {
  305. return fmt.Errorf("non runtime.Object returned from storage: %v", obj)
  306. }
  307. if filterFunc.Filter(object) {
  308. listVal.Set(reflect.Append(listVal, reflect.ValueOf(object).Elem()))
  309. }
  310. }
  311. if c.versioner != nil {
  312. if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
  313. return err
  314. }
  315. }
  316. return nil
  317. }
  318. // Implements storage.Interface.
  319. func (c *Cacher) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *Preconditions, tryUpdate UpdateFunc) error {
  320. return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
  321. }
  322. func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
  323. // TODO: Currently we assume that in a given Cacher object, its <c.triggerFunc>
  324. // is aware of exactly the same trigger (at most one). Thus calling:
  325. // c.triggerFunc(<some object>)
  326. // can return only 0 or 1 values.
  327. // That means, that triggerValues itself may return up to 2 different values.
  328. if c.triggerFunc == nil {
  329. return nil, false
  330. }
  331. result := make([]string, 0, 2)
  332. matchValues := c.triggerFunc(event.Object)
  333. if len(matchValues) > 0 {
  334. result = append(result, matchValues[0].Value)
  335. }
  336. if event.PrevObject == nil {
  337. return result, len(result) > 0
  338. }
  339. prevMatchValues := c.triggerFunc(event.PrevObject)
  340. if len(prevMatchValues) > 0 {
  341. if len(result) == 0 || result[0] != prevMatchValues[0].Value {
  342. result = append(result, prevMatchValues[0].Value)
  343. }
  344. }
  345. return result, len(result) > 0
  346. }
  347. func (c *Cacher) processEvent(event watchCacheEvent) {
  348. triggerValues, supported := c.triggerValues(&event)
  349. c.Lock()
  350. defer c.Unlock()
  351. // Iterate over "allWatchers" no matter what the trigger function is.
  352. for _, watcher := range c.watchers.allWatchers {
  353. watcher.add(event)
  354. }
  355. if supported {
  356. // Iterate over watchers interested in the given values of the trigger.
  357. for _, triggerValue := range triggerValues {
  358. for _, watcher := range c.watchers.valueWatchers[triggerValue] {
  359. watcher.add(event)
  360. }
  361. }
  362. } else {
  363. // supported equal to false generally means that trigger function
  364. // is not defined (or not aware of any indexes). In this case,
  365. // watchers filters should generally also don't generate any
  366. // trigger values, but can cause problems in case of some
  367. // misconfiguration. Thus we paranoidly leave this branch.
  368. // Iterate over watchers interested in exact values for all values.
  369. for _, watchers := range c.watchers.valueWatchers {
  370. for _, watcher := range watchers {
  371. watcher.add(event)
  372. }
  373. }
  374. }
  375. }
  376. func (c *Cacher) terminateAllWatchers() {
  377. glog.Warningf("Terminating all watchers from cacher %v", c.objectType)
  378. c.Lock()
  379. defer c.Unlock()
  380. c.watchers.terminateAll()
  381. }
  382. func (c *Cacher) isStopped() bool {
  383. c.stopLock.RLock()
  384. defer c.stopLock.RUnlock()
  385. return c.stopped
  386. }
  387. func (c *Cacher) Stop() {
  388. c.stopLock.Lock()
  389. c.stopped = true
  390. c.stopLock.Unlock()
  391. close(c.stopCh)
  392. c.stopWg.Wait()
  393. }
  394. func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) {
  395. return func(lock bool) {
  396. if lock {
  397. c.Lock()
  398. defer c.Unlock()
  399. }
  400. // It's possible that the watcher is already not in the structure (e.g. in case of
  401. // simulaneous Stop() and terminateAllWatchers(), but it doesn't break anything.
  402. c.watchers.deleteWatcher(index, triggerValue, triggerSupported)
  403. }
  404. }
  405. func filterFunction(key string, keyFunc func(runtime.Object) (string, error), filter Filter) Filter {
  406. filterFunc := func(obj runtime.Object) bool {
  407. objKey, err := keyFunc(obj)
  408. if err != nil {
  409. glog.Errorf("invalid object for filter: %v", obj)
  410. return false
  411. }
  412. if !hasPathPrefix(objKey, key) {
  413. return false
  414. }
  415. return filter.Filter(obj)
  416. }
  417. return NewSimpleFilter(filterFunc, filter.Trigger)
  418. }
  419. // Returns resource version to which the underlying cache is synced.
  420. func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
  421. c.ready.wait()
  422. resourceVersion := c.reflector.LastSyncResourceVersion()
  423. if resourceVersion == "" {
  424. return 0, nil
  425. }
  426. return strconv.ParseUint(resourceVersion, 10, 64)
  427. }
  428. // cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
  429. type cacherListerWatcher struct {
  430. storage Interface
  431. resourcePrefix string
  432. newListFunc func() runtime.Object
  433. }
  434. func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
  435. return &cacherListerWatcher{
  436. storage: storage,
  437. resourcePrefix: resourcePrefix,
  438. newListFunc: newListFunc,
  439. }
  440. }
  441. // Implements cache.ListerWatcher interface.
  442. func (lw *cacherListerWatcher) List(options api.ListOptions) (runtime.Object, error) {
  443. list := lw.newListFunc()
  444. if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
  445. return nil, err
  446. }
  447. return list, nil
  448. }
  449. // Implements cache.ListerWatcher interface.
  450. func (lw *cacherListerWatcher) Watch(options api.ListOptions) (watch.Interface, error) {
  451. return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
  452. }
  453. // cacherWatch implements watch.Interface to return a single error
  454. type errWatcher struct {
  455. result chan watch.Event
  456. }
  457. func newErrWatcher(err error) *errWatcher {
  458. // Create an error event
  459. errEvent := watch.Event{Type: watch.Error}
  460. switch err := err.(type) {
  461. case runtime.Object:
  462. errEvent.Object = err
  463. case *errors.StatusError:
  464. errEvent.Object = &err.ErrStatus
  465. default:
  466. errEvent.Object = &unversioned.Status{
  467. Status: unversioned.StatusFailure,
  468. Message: err.Error(),
  469. Reason: unversioned.StatusReasonInternalError,
  470. Code: http.StatusInternalServerError,
  471. }
  472. }
  473. // Create a watcher with room for a single event, populate it, and close the channel
  474. watcher := &errWatcher{result: make(chan watch.Event, 1)}
  475. watcher.result <- errEvent
  476. close(watcher.result)
  477. return watcher
  478. }
  479. // Implements watch.Interface.
  480. func (c *errWatcher) ResultChan() <-chan watch.Event {
  481. return c.result
  482. }
  483. // Implements watch.Interface.
  484. func (c *errWatcher) Stop() {
  485. // no-op
  486. }
  487. // cacherWatch implements watch.Interface
  488. type cacheWatcher struct {
  489. sync.Mutex
  490. input chan watchCacheEvent
  491. result chan watch.Event
  492. filter Filter
  493. stopped bool
  494. forget func(bool)
  495. }
  496. func newCacheWatcher(resourceVersion uint64, initEvents []watchCacheEvent, filter Filter, forget func(bool)) *cacheWatcher {
  497. watcher := &cacheWatcher{
  498. input: make(chan watchCacheEvent, 10),
  499. result: make(chan watch.Event, 10),
  500. filter: filter,
  501. stopped: false,
  502. forget: forget,
  503. }
  504. go watcher.process(initEvents, resourceVersion)
  505. return watcher
  506. }
  507. // Implements watch.Interface.
  508. func (c *cacheWatcher) ResultChan() <-chan watch.Event {
  509. return c.result
  510. }
  511. // Implements watch.Interface.
  512. func (c *cacheWatcher) Stop() {
  513. c.forget(true)
  514. c.stop()
  515. }
  516. func (c *cacheWatcher) stop() {
  517. c.Lock()
  518. defer c.Unlock()
  519. if !c.stopped {
  520. c.stopped = true
  521. close(c.input)
  522. }
  523. }
  524. var timerPool sync.Pool
  525. func (c *cacheWatcher) add(event watchCacheEvent) {
  526. // Try to send the event immediately, without blocking.
  527. select {
  528. case c.input <- event:
  529. return
  530. default:
  531. }
  532. // OK, block sending, but only for up to 5 seconds.
  533. // cacheWatcher.add is called very often, so arrange
  534. // to reuse timers instead of constantly allocating.
  535. const timeout = 5 * time.Second
  536. t, ok := timerPool.Get().(*time.Timer)
  537. if ok {
  538. t.Reset(timeout)
  539. } else {
  540. t = time.NewTimer(timeout)
  541. }
  542. defer timerPool.Put(t)
  543. select {
  544. case c.input <- event:
  545. stopped := t.Stop()
  546. if !stopped {
  547. // Consume triggered (but not yet received) timer event
  548. // so that future reuse does not get a spurious timeout.
  549. <-t.C
  550. }
  551. case <-t.C:
  552. // This means that we couldn't send event to that watcher.
  553. // Since we don't want to block on it infinitely,
  554. // we simply terminate it.
  555. c.forget(false)
  556. c.stop()
  557. }
  558. }
  559. func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {
  560. curObjPasses := event.Type != watch.Deleted && c.filter.Filter(event.Object)
  561. oldObjPasses := false
  562. if event.PrevObject != nil {
  563. oldObjPasses = c.filter.Filter(event.PrevObject)
  564. }
  565. if !curObjPasses && !oldObjPasses {
  566. // Watcher is not interested in that object.
  567. return
  568. }
  569. object, err := api.Scheme.Copy(event.Object)
  570. if err != nil {
  571. glog.Errorf("unexpected copy error: %v", err)
  572. return
  573. }
  574. switch {
  575. case curObjPasses && !oldObjPasses:
  576. c.result <- watch.Event{Type: watch.Added, Object: object}
  577. case curObjPasses && oldObjPasses:
  578. c.result <- watch.Event{Type: watch.Modified, Object: object}
  579. case !curObjPasses && oldObjPasses:
  580. c.result <- watch.Event{Type: watch.Deleted, Object: object}
  581. }
  582. }
  583. func (c *cacheWatcher) process(initEvents []watchCacheEvent, resourceVersion uint64) {
  584. defer utilruntime.HandleCrash()
  585. for _, event := range initEvents {
  586. c.sendWatchCacheEvent(event)
  587. }
  588. defer close(c.result)
  589. defer c.Stop()
  590. for {
  591. event, ok := <-c.input
  592. if !ok {
  593. return
  594. }
  595. // only send events newer than resourceVersion
  596. if event.ResourceVersion > resourceVersion {
  597. c.sendWatchCacheEvent(event)
  598. }
  599. }
  600. }
  601. type ready struct {
  602. ok bool
  603. c *sync.Cond
  604. }
  605. func newReady() *ready {
  606. return &ready{c: sync.NewCond(&sync.Mutex{})}
  607. }
  608. func (r *ready) wait() {
  609. r.c.L.Lock()
  610. for !r.ok {
  611. r.c.Wait()
  612. }
  613. r.c.L.Unlock()
  614. }
  615. func (r *ready) set(ok bool) {
  616. r.c.L.Lock()
  617. defer r.c.L.Unlock()
  618. r.ok = ok
  619. r.c.Broadcast()
  620. }