shared_informer.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "sync"
  17. "time"
  18. "k8s.io/apimachinery/pkg/runtime"
  19. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  20. "k8s.io/apimachinery/pkg/util/wait"
  21. "k8s.io/client-go/util/clock"
  22. "github.com/golang/glog"
  23. )
  24. // SharedInformer has a shared data cache and is capable of distributing notifications for changes
  25. // to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
  26. // one behavior change compared to a standard Informer. When you receive a notification, the cache
  27. // will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend
  28. // on the contents of the cache exactly matching the notification you've received in handler
  29. // functions. If there was a create, followed by a delete, the cache may NOT have your item. This
  30. // has advantages over the broadcaster since it allows us to share a common cache across many
  31. // controllers. Extending the broadcaster would have required us keep duplicate caches for each
  32. // watch.
  33. type SharedInformer interface {
  34. // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
  35. // period. Events to a single handler are delivered sequentially, but there is no coordination
  36. // between different handlers.
  37. AddEventHandler(handler ResourceEventHandler)
  38. // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
  39. // specified resync period. Events to a single handler are delivered sequentially, but there is
  40. // no coordination between different handlers.
  41. AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
  42. // GetStore returns the Store.
  43. GetStore() Store
  44. // GetController gives back a synthetic interface that "votes" to start the informer
  45. GetController() Controller
  46. // Run starts the shared informer, which will be stopped when stopCh is closed.
  47. Run(stopCh <-chan struct{})
  48. // HasSynced returns true if the shared informer's store has synced.
  49. HasSynced() bool
  50. // LastSyncResourceVersion is the resource version observed when last synced with the underlying
  51. // store. The value returned is not synchronized with access to the underlying store and is not
  52. // thread-safe.
  53. LastSyncResourceVersion() string
  54. }
  55. type SharedIndexInformer interface {
  56. SharedInformer
  57. // AddIndexers add indexers to the informer before it starts.
  58. AddIndexers(indexers Indexers) error
  59. GetIndexer() Indexer
  60. }
  61. // NewSharedInformer creates a new instance for the listwatcher.
  62. func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
  63. return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
  64. }
  65. // NewSharedIndexInformer creates a new instance for the listwatcher.
  66. func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
  67. realClock := &clock.RealClock{}
  68. sharedIndexInformer := &sharedIndexInformer{
  69. processor: &sharedProcessor{clock: realClock},
  70. indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
  71. listerWatcher: lw,
  72. objectType: objType,
  73. resyncCheckPeriod: defaultEventHandlerResyncPeriod,
  74. defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
  75. cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
  76. clock: realClock,
  77. }
  78. return sharedIndexInformer
  79. }
  80. // InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
  81. type InformerSynced func() bool
  82. // syncedPollPeriod controls how often you look at the status of your sync funcs
  83. const syncedPollPeriod = 100 * time.Millisecond
  84. // WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
  85. // if the contoller should shutdown
  86. func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
  87. err := wait.PollUntil(syncedPollPeriod,
  88. func() (bool, error) {
  89. for _, syncFunc := range cacheSyncs {
  90. if !syncFunc() {
  91. return false, nil
  92. }
  93. }
  94. return true, nil
  95. },
  96. stopCh)
  97. if err != nil {
  98. glog.V(2).Infof("stop requested")
  99. return false
  100. }
  101. glog.V(4).Infof("caches populated")
  102. return true
  103. }
  104. type sharedIndexInformer struct {
  105. indexer Indexer
  106. controller Controller
  107. processor *sharedProcessor
  108. cacheMutationDetector CacheMutationDetector
  109. // This block is tracked to handle late initialization of the controller
  110. listerWatcher ListerWatcher
  111. objectType runtime.Object
  112. // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
  113. // shouldResync to check if any of our listeners need a resync.
  114. resyncCheckPeriod time.Duration
  115. // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
  116. // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
  117. // value).
  118. defaultEventHandlerResyncPeriod time.Duration
  119. // clock allows for testability
  120. clock clock.Clock
  121. started bool
  122. startedLock sync.Mutex
  123. // blockDeltas gives a way to stop all event distribution so that a late event handler
  124. // can safely join the shared informer.
  125. blockDeltas sync.Mutex
  126. // stopCh is the channel used to stop the main Run process. We have to track it so that
  127. // late joiners can have a proper stop
  128. stopCh <-chan struct{}
  129. }
  130. // dummyController hides the fact that a SharedInformer is different from a dedicated one
  131. // where a caller can `Run`. The run method is disonnected in this case, because higher
  132. // level logic will decide when to start the SharedInformer and related controller.
  133. // Because returning information back is always asynchronous, the legacy callers shouldn't
  134. // notice any change in behavior.
  135. type dummyController struct {
  136. informer *sharedIndexInformer
  137. }
  138. func (v *dummyController) Run(stopCh <-chan struct{}) {
  139. }
  140. func (v *dummyController) HasSynced() bool {
  141. return v.informer.HasSynced()
  142. }
  143. func (c *dummyController) LastSyncResourceVersion() string {
  144. return ""
  145. }
  146. type updateNotification struct {
  147. oldObj interface{}
  148. newObj interface{}
  149. }
  150. type addNotification struct {
  151. newObj interface{}
  152. }
  153. type deleteNotification struct {
  154. oldObj interface{}
  155. }
  156. func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  157. defer utilruntime.HandleCrash()
  158. fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
  159. cfg := &Config{
  160. Queue: fifo,
  161. ListerWatcher: s.listerWatcher,
  162. ObjectType: s.objectType,
  163. FullResyncPeriod: s.resyncCheckPeriod,
  164. RetryOnError: false,
  165. ShouldResync: s.processor.shouldResync,
  166. Process: s.HandleDeltas,
  167. }
  168. func() {
  169. s.startedLock.Lock()
  170. defer s.startedLock.Unlock()
  171. s.controller = New(cfg)
  172. s.controller.(*controller).clock = s.clock
  173. s.started = true
  174. }()
  175. s.stopCh = stopCh
  176. s.cacheMutationDetector.Run(stopCh)
  177. s.processor.run(stopCh)
  178. s.controller.Run(stopCh)
  179. }
  180. func (s *sharedIndexInformer) isStarted() bool {
  181. s.startedLock.Lock()
  182. defer s.startedLock.Unlock()
  183. return s.started
  184. }
  185. func (s *sharedIndexInformer) HasSynced() bool {
  186. s.startedLock.Lock()
  187. defer s.startedLock.Unlock()
  188. if s.controller == nil {
  189. return false
  190. }
  191. return s.controller.HasSynced()
  192. }
  193. func (s *sharedIndexInformer) LastSyncResourceVersion() string {
  194. s.startedLock.Lock()
  195. defer s.startedLock.Unlock()
  196. if s.controller == nil {
  197. return ""
  198. }
  199. return s.controller.LastSyncResourceVersion()
  200. }
  201. func (s *sharedIndexInformer) GetStore() Store {
  202. return s.indexer
  203. }
  204. func (s *sharedIndexInformer) GetIndexer() Indexer {
  205. return s.indexer
  206. }
  207. func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
  208. s.startedLock.Lock()
  209. defer s.startedLock.Unlock()
  210. if s.started {
  211. return fmt.Errorf("informer has already started")
  212. }
  213. return s.indexer.AddIndexers(indexers)
  214. }
  215. func (s *sharedIndexInformer) GetController() Controller {
  216. return &dummyController{informer: s}
  217. }
  218. func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
  219. s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
  220. }
  221. func determineResyncPeriod(desired, check time.Duration) time.Duration {
  222. if desired == 0 {
  223. return desired
  224. }
  225. if check == 0 {
  226. glog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
  227. return 0
  228. }
  229. if desired < check {
  230. glog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
  231. return check
  232. }
  233. return desired
  234. }
  235. const minimumResyncPeriod = 1 * time.Second
  236. func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
  237. s.startedLock.Lock()
  238. defer s.startedLock.Unlock()
  239. if resyncPeriod > 0 {
  240. if resyncPeriod < minimumResyncPeriod {
  241. glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
  242. resyncPeriod = minimumResyncPeriod
  243. }
  244. if resyncPeriod < s.resyncCheckPeriod {
  245. if s.started {
  246. glog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
  247. resyncPeriod = s.resyncCheckPeriod
  248. } else {
  249. // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
  250. // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
  251. // accordingly
  252. s.resyncCheckPeriod = resyncPeriod
  253. s.processor.resyncCheckPeriodChanged(resyncPeriod)
  254. }
  255. }
  256. }
  257. listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now())
  258. if !s.started {
  259. s.processor.addListener(listener)
  260. return
  261. }
  262. // in order to safely join, we have to
  263. // 1. stop sending add/update/delete notifications
  264. // 2. do a list against the store
  265. // 3. send synthetic "Add" events to the new handler
  266. // 4. unblock
  267. s.blockDeltas.Lock()
  268. defer s.blockDeltas.Unlock()
  269. s.processor.addListener(listener)
  270. go listener.run(s.stopCh)
  271. go listener.pop(s.stopCh)
  272. items := s.indexer.List()
  273. for i := range items {
  274. listener.add(addNotification{newObj: items[i]})
  275. }
  276. }
  277. func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
  278. s.blockDeltas.Lock()
  279. defer s.blockDeltas.Unlock()
  280. // from oldest to newest
  281. for _, d := range obj.(Deltas) {
  282. switch d.Type {
  283. case Sync, Added, Updated:
  284. isSync := d.Type == Sync
  285. s.cacheMutationDetector.AddObject(d.Object)
  286. if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
  287. if err := s.indexer.Update(d.Object); err != nil {
  288. return err
  289. }
  290. s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
  291. } else {
  292. if err := s.indexer.Add(d.Object); err != nil {
  293. return err
  294. }
  295. s.processor.distribute(addNotification{newObj: d.Object}, isSync)
  296. }
  297. case Deleted:
  298. if err := s.indexer.Delete(d.Object); err != nil {
  299. return err
  300. }
  301. s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
  302. }
  303. }
  304. return nil
  305. }
  306. type sharedProcessor struct {
  307. listenersLock sync.RWMutex
  308. listeners []*processorListener
  309. syncingListeners []*processorListener
  310. clock clock.Clock
  311. }
  312. func (p *sharedProcessor) addListener(listener *processorListener) {
  313. p.listenersLock.Lock()
  314. defer p.listenersLock.Unlock()
  315. p.listeners = append(p.listeners, listener)
  316. p.syncingListeners = append(p.syncingListeners, listener)
  317. }
  318. func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
  319. p.listenersLock.RLock()
  320. defer p.listenersLock.RUnlock()
  321. if sync {
  322. for _, listener := range p.syncingListeners {
  323. listener.add(obj)
  324. }
  325. } else {
  326. for _, listener := range p.listeners {
  327. listener.add(obj)
  328. }
  329. }
  330. }
  331. func (p *sharedProcessor) run(stopCh <-chan struct{}) {
  332. p.listenersLock.RLock()
  333. defer p.listenersLock.RUnlock()
  334. for _, listener := range p.listeners {
  335. go listener.run(stopCh)
  336. go listener.pop(stopCh)
  337. }
  338. }
  339. // shouldResync queries every listener to determine if any of them need a resync, based on each
  340. // listener's resyncPeriod.
  341. func (p *sharedProcessor) shouldResync() bool {
  342. p.listenersLock.Lock()
  343. defer p.listenersLock.Unlock()
  344. p.syncingListeners = []*processorListener{}
  345. resyncNeeded := false
  346. now := p.clock.Now()
  347. for _, listener := range p.listeners {
  348. // need to loop through all the listeners to see if they need to resync so we can prepare any
  349. // listeners that are going to be resyncing.
  350. if listener.shouldResync(now) {
  351. resyncNeeded = true
  352. p.syncingListeners = append(p.syncingListeners, listener)
  353. listener.determineNextResync(now)
  354. }
  355. }
  356. return resyncNeeded
  357. }
  358. func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
  359. p.listenersLock.RLock()
  360. defer p.listenersLock.RUnlock()
  361. for _, listener := range p.listeners {
  362. resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
  363. listener.setResyncPeriod(resyncPeriod)
  364. }
  365. }
  366. type processorListener struct {
  367. // lock/cond protects access to 'pendingNotifications'.
  368. lock sync.RWMutex
  369. cond sync.Cond
  370. // pendingNotifications is an unbounded slice that holds all notifications not yet distributed
  371. // there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
  372. // added until we OOM.
  373. // TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but
  374. // we should try to do something better
  375. pendingNotifications []interface{}
  376. nextCh chan interface{}
  377. handler ResourceEventHandler
  378. // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
  379. requestedResyncPeriod time.Duration
  380. // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
  381. // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
  382. // informer's overall resync check period.
  383. resyncPeriod time.Duration
  384. // nextResync is the earliest time the listener should get a full resync
  385. nextResync time.Time
  386. // resyncLock guards access to resyncPeriod and nextResync
  387. resyncLock sync.Mutex
  388. }
  389. func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time) *processorListener {
  390. ret := &processorListener{
  391. pendingNotifications: []interface{}{},
  392. nextCh: make(chan interface{}),
  393. handler: handler,
  394. requestedResyncPeriod: requestedResyncPeriod,
  395. resyncPeriod: resyncPeriod,
  396. }
  397. ret.cond.L = &ret.lock
  398. ret.determineNextResync(now)
  399. return ret
  400. }
  401. func (p *processorListener) add(notification interface{}) {
  402. p.lock.Lock()
  403. defer p.lock.Unlock()
  404. p.pendingNotifications = append(p.pendingNotifications, notification)
  405. p.cond.Broadcast()
  406. }
  407. func (p *processorListener) pop(stopCh <-chan struct{}) {
  408. defer utilruntime.HandleCrash()
  409. for {
  410. blockingGet := func() (interface{}, bool) {
  411. p.lock.Lock()
  412. defer p.lock.Unlock()
  413. for len(p.pendingNotifications) == 0 {
  414. // check if we're shutdown
  415. select {
  416. case <-stopCh:
  417. return nil, true
  418. default:
  419. }
  420. p.cond.Wait()
  421. }
  422. nt := p.pendingNotifications[0]
  423. p.pendingNotifications = p.pendingNotifications[1:]
  424. return nt, false
  425. }
  426. notification, stopped := blockingGet()
  427. if stopped {
  428. return
  429. }
  430. select {
  431. case <-stopCh:
  432. return
  433. case p.nextCh <- notification:
  434. }
  435. }
  436. }
  437. func (p *processorListener) run(stopCh <-chan struct{}) {
  438. defer utilruntime.HandleCrash()
  439. for {
  440. var next interface{}
  441. select {
  442. case <-stopCh:
  443. func() {
  444. p.lock.Lock()
  445. defer p.lock.Unlock()
  446. p.cond.Broadcast()
  447. }()
  448. return
  449. case next = <-p.nextCh:
  450. }
  451. switch notification := next.(type) {
  452. case updateNotification:
  453. p.handler.OnUpdate(notification.oldObj, notification.newObj)
  454. case addNotification:
  455. p.handler.OnAdd(notification.newObj)
  456. case deleteNotification:
  457. p.handler.OnDelete(notification.oldObj)
  458. default:
  459. utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
  460. }
  461. }
  462. }
  463. // shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
  464. // this always returns false.
  465. func (p *processorListener) shouldResync(now time.Time) bool {
  466. p.resyncLock.Lock()
  467. defer p.resyncLock.Unlock()
  468. if p.resyncPeriod == 0 {
  469. return false
  470. }
  471. return now.After(p.nextResync) || now.Equal(p.nextResync)
  472. }
  473. func (p *processorListener) determineNextResync(now time.Time) {
  474. p.resyncLock.Lock()
  475. defer p.resyncLock.Unlock()
  476. p.nextResync = now.Add(p.resyncPeriod)
  477. }
  478. func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
  479. p.resyncLock.Lock()
  480. defer p.resyncLock.Unlock()
  481. p.resyncPeriod = resyncPeriod
  482. }