reflector.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "io"
  18. "math/rand"
  19. "net"
  20. "net/url"
  21. "reflect"
  22. "regexp"
  23. goruntime "runtime"
  24. "runtime/debug"
  25. "strconv"
  26. "strings"
  27. "sync"
  28. "syscall"
  29. "time"
  30. "github.com/golang/glog"
  31. "k8s.io/kubernetes/pkg/api"
  32. apierrs "k8s.io/kubernetes/pkg/api/errors"
  33. "k8s.io/kubernetes/pkg/api/meta"
  34. "k8s.io/kubernetes/pkg/runtime"
  35. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  36. "k8s.io/kubernetes/pkg/util/wait"
  37. "k8s.io/kubernetes/pkg/watch"
  38. )
  39. // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
  40. type ListerWatcher interface {
  41. // List should return a list type object; the Items field will be extracted, and the
  42. // ResourceVersion field will be used to start the watch in the right place.
  43. List(options api.ListOptions) (runtime.Object, error)
  44. // Watch should begin a watch at the specified version.
  45. Watch(options api.ListOptions) (watch.Interface, error)
  46. }
  47. // Reflector watches a specified resource and causes all changes to be reflected in the given store.
  48. type Reflector struct {
  49. // name identifies this reflector. By default it will be a file:line if possible.
  50. name string
  51. // The type of object we expect to place in the store.
  52. expectedType reflect.Type
  53. // The destination to sync up with the watch source
  54. store Store
  55. // listerWatcher is used to perform lists and watches.
  56. listerWatcher ListerWatcher
  57. // period controls timing between one watch ending and
  58. // the beginning of the next one.
  59. period time.Duration
  60. resyncPeriod time.Duration
  61. // now() returns current time - exposed for testing purposes
  62. now func() time.Time
  63. // lastSyncResourceVersion is the resource version token last
  64. // observed when doing a sync with the underlying store
  65. // it is thread safe, but not synchronized with the underlying store
  66. lastSyncResourceVersion string
  67. // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
  68. lastSyncResourceVersionMutex sync.RWMutex
  69. }
  70. var (
  71. // We try to spread the load on apiserver by setting timeouts for
  72. // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
  73. // However, it can be modified to avoid periodic resync to break the
  74. // TCP connection.
  75. minWatchTimeout = 5 * time.Minute
  76. // If we are within 'forceResyncThreshold' from the next planned resync
  77. // and are just before issuing Watch(), resync will be forced now.
  78. forceResyncThreshold = 3 * time.Second
  79. // We try to set timeouts for Watch() so that we will finish about
  80. // than 'timeoutThreshold' from next planned periodic resync.
  81. timeoutThreshold = 1 * time.Second
  82. )
  83. // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
  84. // The indexer is configured to key on namespace
  85. func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
  86. indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
  87. reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
  88. return indexer, reflector
  89. }
  90. // NewReflector creates a new Reflector object which will keep the given store up to
  91. // date with the server's contents for the given resource. Reflector promises to
  92. // only put things in the store that have the type of expectedType, unless expectedType
  93. // is nil. If resyncPeriod is non-zero, then lists will be executed after every
  94. // resyncPeriod, so that you can use reflectors to periodically process everything as
  95. // well as incrementally processing the things that change.
  96. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  97. return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
  98. }
  99. // NewNamedReflector same as NewReflector, but with a specified name for logging
  100. func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  101. r := &Reflector{
  102. name: name,
  103. listerWatcher: lw,
  104. store: store,
  105. expectedType: reflect.TypeOf(expectedType),
  106. period: time.Second,
  107. resyncPeriod: resyncPeriod,
  108. now: time.Now,
  109. }
  110. return r
  111. }
  112. // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
  113. // call chains to NewReflector, so they'd be low entropy names for reflectors
  114. var internalPackages = []string{"kubernetes/pkg/client/cache/", "kubernetes/pkg/controller/framework/", "/runtime/asm_"}
  115. // getDefaultReflectorName walks back through the call stack until we find a caller from outside of the ignoredPackages
  116. // it returns back a shortpath/filename:line to aid in identification of this reflector when it starts logging
  117. func getDefaultReflectorName(ignoredPackages ...string) string {
  118. name := "????"
  119. const maxStack = 10
  120. for i := 1; i < maxStack; i++ {
  121. _, file, line, ok := goruntime.Caller(i)
  122. if !ok {
  123. file, line, ok = extractStackCreator()
  124. if !ok {
  125. break
  126. }
  127. i += maxStack
  128. }
  129. if hasPackage(file, ignoredPackages) {
  130. continue
  131. }
  132. file = trimPackagePrefix(file)
  133. name = fmt.Sprintf("%s:%d", file, line)
  134. break
  135. }
  136. return name
  137. }
  138. // hasPackage returns true if the file is in one of the ignored packages.
  139. func hasPackage(file string, ignoredPackages []string) bool {
  140. for _, ignoredPackage := range ignoredPackages {
  141. if strings.Contains(file, ignoredPackage) {
  142. return true
  143. }
  144. }
  145. return false
  146. }
  147. // trimPackagePrefix reduces duplicate values off the front of a package name.
  148. func trimPackagePrefix(file string) string {
  149. if l := strings.LastIndex(file, "k8s.io/kubernetes/pkg/"); l >= 0 {
  150. return file[l+len("k8s.io/kubernetes/"):]
  151. }
  152. if l := strings.LastIndex(file, "/src/"); l >= 0 {
  153. return file[l+5:]
  154. }
  155. if l := strings.LastIndex(file, "/pkg/"); l >= 0 {
  156. return file[l+1:]
  157. }
  158. return file
  159. }
  160. var stackCreator = regexp.MustCompile(`(?m)^created by (.*)\n\s+(.*):(\d+) \+0x[[:xdigit:]]+$`)
  161. // extractStackCreator retrieves the goroutine file and line that launched this stack. Returns false
  162. // if the creator cannot be located.
  163. // TODO: Go does not expose this via runtime https://github.com/golang/go/issues/11440
  164. func extractStackCreator() (string, int, bool) {
  165. stack := debug.Stack()
  166. matches := stackCreator.FindStringSubmatch(string(stack))
  167. if matches == nil || len(matches) != 4 {
  168. return "", 0, false
  169. }
  170. line, err := strconv.Atoi(matches[3])
  171. if err != nil {
  172. return "", 0, false
  173. }
  174. return matches[2], line, true
  175. }
  176. // Run starts a watch and handles watch events. Will restart the watch if it is closed.
  177. // Run starts a goroutine and returns immediately.
  178. func (r *Reflector) Run() {
  179. glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
  180. go wait.Until(func() {
  181. if err := r.ListAndWatch(wait.NeverStop); err != nil {
  182. utilruntime.HandleError(err)
  183. }
  184. }, r.period, wait.NeverStop)
  185. }
  186. // RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
  187. // RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
  188. func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
  189. glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
  190. go wait.Until(func() {
  191. if err := r.ListAndWatch(stopCh); err != nil {
  192. utilruntime.HandleError(err)
  193. }
  194. }, r.period, stopCh)
  195. }
  196. var (
  197. // nothing will ever be sent down this channel
  198. neverExitWatch <-chan time.Time = make(chan time.Time)
  199. // Used to indicate that watching stopped so that a resync could happen.
  200. errorResyncRequested = errors.New("resync channel fired")
  201. // Used to indicate that watching stopped because of a signal from the stop
  202. // channel passed in from a client of the reflector.
  203. errorStopRequested = errors.New("Stop requested")
  204. )
  205. // resyncChan returns a channel which will receive something when a resync is
  206. // required, and a cleanup function.
  207. func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
  208. if r.resyncPeriod == 0 {
  209. return neverExitWatch, func() bool { return false }
  210. }
  211. // The cleanup function is required: imagine the scenario where watches
  212. // always fail so we end up listing frequently. Then, if we don't
  213. // manually stop the timer, we could end up with many timers active
  214. // concurrently.
  215. t := time.NewTimer(r.resyncPeriod)
  216. return t.C, t.Stop
  217. }
  218. // ListAndWatch first lists all items and get the resource version at the moment of call,
  219. // and then use the resource version to watch.
  220. // It returns error if ListAndWatch didn't even try to initialize watch.
  221. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  222. glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
  223. var resourceVersion string
  224. resyncCh, cleanup := r.resyncChan()
  225. defer cleanup()
  226. // Explicitly set "0" as resource version - it's fine for the List()
  227. // to be served from cache and potentially be delayed relative to
  228. // etcd contents. Reflector framework will catch up via Watch() eventually.
  229. options := api.ListOptions{ResourceVersion: "0"}
  230. list, err := r.listerWatcher.List(options)
  231. if err != nil {
  232. return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
  233. }
  234. listMetaInterface, err := meta.ListAccessor(list)
  235. if err != nil {
  236. return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
  237. }
  238. resourceVersion = listMetaInterface.GetResourceVersion()
  239. items, err := meta.ExtractList(list)
  240. if err != nil {
  241. return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
  242. }
  243. if err := r.syncWith(items, resourceVersion); err != nil {
  244. return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
  245. }
  246. r.setLastSyncResourceVersion(resourceVersion)
  247. resyncerrc := make(chan error, 1)
  248. go func() {
  249. for {
  250. select {
  251. case <-resyncCh:
  252. case <-stopCh:
  253. return
  254. }
  255. glog.V(4).Infof("%s: forcing resync", r.name)
  256. if err := r.store.Resync(); err != nil {
  257. resyncerrc <- err
  258. return
  259. }
  260. cleanup()
  261. resyncCh, cleanup = r.resyncChan()
  262. }
  263. }()
  264. for {
  265. timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
  266. options = api.ListOptions{
  267. ResourceVersion: resourceVersion,
  268. // We want to avoid situations of hanging watchers. Stop any wachers that do not
  269. // receive any events within the timeout window.
  270. TimeoutSeconds: &timemoutseconds,
  271. }
  272. w, err := r.listerWatcher.Watch(options)
  273. if err != nil {
  274. switch err {
  275. case io.EOF:
  276. // watch closed normally
  277. case io.ErrUnexpectedEOF:
  278. glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
  279. default:
  280. utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
  281. }
  282. // If this is "connection refused" error, it means that most likely apiserver is not responsive.
  283. // It doesn't make sense to re-list all objects because most likely we will be able to restart
  284. // watch where we ended.
  285. // If that's the case wait and resend watch request.
  286. if urlError, ok := err.(*url.Error); ok {
  287. if opError, ok := urlError.Err.(*net.OpError); ok {
  288. if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
  289. time.Sleep(time.Second)
  290. continue
  291. }
  292. }
  293. }
  294. return nil
  295. }
  296. if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
  297. if err != errorStopRequested {
  298. glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
  299. }
  300. return nil
  301. }
  302. }
  303. }
  304. // syncWith replaces the store's items with the given list.
  305. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
  306. found := make([]interface{}, 0, len(items))
  307. for _, item := range items {
  308. found = append(found, item)
  309. }
  310. return r.store.Replace(found, resourceVersion)
  311. }
  312. // watchHandler watches w and keeps *resourceVersion up to date.
  313. func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
  314. start := time.Now()
  315. eventCount := 0
  316. // Stopping the watcher should be idempotent and if we return from this function there's no way
  317. // we're coming back in with the same watch interface.
  318. defer w.Stop()
  319. loop:
  320. for {
  321. select {
  322. case <-stopCh:
  323. return errorStopRequested
  324. case err := <-errc:
  325. return err
  326. case event, ok := <-w.ResultChan():
  327. if !ok {
  328. break loop
  329. }
  330. if event.Type == watch.Error {
  331. return apierrs.FromObject(event.Object)
  332. }
  333. if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
  334. utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
  335. continue
  336. }
  337. meta, err := meta.Accessor(event.Object)
  338. if err != nil {
  339. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  340. continue
  341. }
  342. newResourceVersion := meta.GetResourceVersion()
  343. switch event.Type {
  344. case watch.Added:
  345. r.store.Add(event.Object)
  346. case watch.Modified:
  347. r.store.Update(event.Object)
  348. case watch.Deleted:
  349. // TODO: Will any consumers need access to the "last known
  350. // state", which is passed in event.Object? If so, may need
  351. // to change this.
  352. r.store.Delete(event.Object)
  353. default:
  354. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
  355. }
  356. *resourceVersion = newResourceVersion
  357. r.setLastSyncResourceVersion(newResourceVersion)
  358. eventCount++
  359. }
  360. }
  361. watchDuration := time.Now().Sub(start)
  362. if watchDuration < 1*time.Second && eventCount == 0 {
  363. glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
  364. return errors.New("very short watch")
  365. }
  366. glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
  367. return nil
  368. }
  369. // LastSyncResourceVersion is the resource version observed when last sync with the underlying store
  370. // The value returned is not synchronized with access to the underlying store and is not thread-safe
  371. func (r *Reflector) LastSyncResourceVersion() string {
  372. r.lastSyncResourceVersionMutex.RLock()
  373. defer r.lastSyncResourceVersionMutex.RUnlock()
  374. return r.lastSyncResourceVersion
  375. }
  376. func (r *Reflector) setLastSyncResourceVersion(v string) {
  377. r.lastSyncResourceVersionMutex.Lock()
  378. defer r.lastSyncResourceVersionMutex.Unlock()
  379. r.lastSyncResourceVersion = v
  380. }