etcd_watcher.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "fmt"
  16. "net/http"
  17. "sync"
  18. "sync/atomic"
  19. "time"
  20. "k8s.io/kubernetes/pkg/api/unversioned"
  21. "k8s.io/kubernetes/pkg/runtime"
  22. "k8s.io/kubernetes/pkg/storage"
  23. etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
  24. utilruntime "k8s.io/kubernetes/pkg/util/runtime"
  25. "k8s.io/kubernetes/pkg/watch"
  26. etcd "github.com/coreos/etcd/client"
  27. "github.com/golang/glog"
  28. "golang.org/x/net/context"
  29. )
  30. // Etcd watch event actions
  31. const (
  32. EtcdCreate = "create"
  33. EtcdGet = "get"
  34. EtcdSet = "set"
  35. EtcdCAS = "compareAndSwap"
  36. EtcdDelete = "delete"
  37. EtcdCAD = "compareAndDelete"
  38. EtcdExpire = "expire"
  39. )
  40. // HighWaterMark is a thread-safe object for tracking the maximum value seen
  41. // for some quantity.
  42. type HighWaterMark int64
  43. // Update returns true if and only if 'current' is the highest value ever seen.
  44. func (hwm *HighWaterMark) Update(current int64) bool {
  45. for {
  46. old := atomic.LoadInt64((*int64)(hwm))
  47. if current <= old {
  48. return false
  49. }
  50. if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
  51. return true
  52. }
  53. }
  54. }
  55. // TransformFunc attempts to convert an object to another object for use with a watcher.
  56. type TransformFunc func(runtime.Object) (runtime.Object, error)
  57. // includeFunc returns true if the given key should be considered part of a watch
  58. type includeFunc func(key string) bool
  59. // exceptKey is an includeFunc that returns false when the provided key matches the watched key
  60. func exceptKey(except string) includeFunc {
  61. return func(key string) bool {
  62. return key != except
  63. }
  64. }
  65. // etcdWatcher converts a native etcd watch to a watch.Interface.
  66. type etcdWatcher struct {
  67. encoding runtime.Codec
  68. // Note that versioner is required for etcdWatcher to work correctly.
  69. // There is no public constructor of it, so be careful when manipulating
  70. // with it manually.
  71. versioner storage.Versioner
  72. transform TransformFunc
  73. list bool // If we're doing a recursive watch, should be true.
  74. quorum bool // If we enable quorum, shoule be true
  75. include includeFunc
  76. filter storage.Filter
  77. etcdIncoming chan *etcd.Response
  78. etcdError chan error
  79. ctx context.Context
  80. cancel context.CancelFunc
  81. etcdCallEnded chan struct{}
  82. outgoing chan watch.Event
  83. userStop chan struct{}
  84. stopped bool
  85. stopLock sync.Mutex
  86. // wg is used to avoid calls to etcd after Stop(), and to make sure
  87. // that the translate goroutine is not leaked.
  88. wg sync.WaitGroup
  89. // Injectable for testing. Send the event down the outgoing channel.
  90. emit func(watch.Event)
  91. cache etcdCache
  92. }
  93. // watchWaitDuration is the amount of time to wait for an error from watch.
  94. const watchWaitDuration = 100 * time.Millisecond
  95. // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes.
  96. // The versioner must be able to handle the objects that transform creates.
  97. func newEtcdWatcher(
  98. list bool, quorum bool, include includeFunc, filter storage.Filter,
  99. encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc,
  100. cache etcdCache) *etcdWatcher {
  101. w := &etcdWatcher{
  102. encoding: encoding,
  103. versioner: versioner,
  104. transform: transform,
  105. list: list,
  106. quorum: quorum,
  107. include: include,
  108. filter: filter,
  109. // Buffer this channel, so that the etcd client is not forced
  110. // to context switch with every object it gets, and so that a
  111. // long time spent decoding an object won't block the *next*
  112. // object. Basically, we see a lot of "401 window exceeded"
  113. // errors from etcd, and that's due to the client not streaming
  114. // results but rather getting them one at a time. So we really
  115. // want to never block the etcd client, if possible. The 100 is
  116. // mostly arbitrary--we know it goes as high as 50, though.
  117. // There's a V(2) log message that prints the length so we can
  118. // monitor how much of this buffer is actually used.
  119. etcdIncoming: make(chan *etcd.Response, 100),
  120. etcdError: make(chan error, 1),
  121. // Similarly to etcdIncomming, we don't want to force context
  122. // switch on every new incoming object.
  123. outgoing: make(chan watch.Event, 100),
  124. userStop: make(chan struct{}),
  125. stopped: false,
  126. wg: sync.WaitGroup{},
  127. cache: cache,
  128. ctx: nil,
  129. cancel: nil,
  130. }
  131. w.emit = func(e watch.Event) {
  132. // Give up on user stop, without this we leak a lot of goroutines in tests.
  133. select {
  134. case w.outgoing <- e:
  135. case <-w.userStop:
  136. }
  137. }
  138. // translate will call done. We need to Add() here because otherwise,
  139. // if Stop() gets called before translate gets started, there'd be a
  140. // problem.
  141. w.wg.Add(1)
  142. go w.translate()
  143. return w
  144. }
  145. // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
  146. // as a goroutine.
  147. func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key string, resourceVersion uint64) {
  148. defer utilruntime.HandleCrash()
  149. defer close(w.etcdError)
  150. defer close(w.etcdIncoming)
  151. // All calls to etcd are coming from this function - once it is finished
  152. // no other call to etcd should be generated by this watcher.
  153. done := func() {}
  154. // We need to be prepared, that Stop() can be called at any time.
  155. // It can potentially also be called, even before this function is called.
  156. // If that is the case, we simply skip all the code here.
  157. // See #18928 for more details.
  158. var watcher etcd.Watcher
  159. returned := func() bool {
  160. w.stopLock.Lock()
  161. defer w.stopLock.Unlock()
  162. if w.stopped {
  163. // Watcher has already been stopped - don't event initiate it here.
  164. return true
  165. }
  166. w.wg.Add(1)
  167. done = w.wg.Done
  168. // Perform initialization of watcher under lock - we want to avoid situation when
  169. // Stop() is called in the meantime (which in tests can cause etcd termination and
  170. // strange behavior here).
  171. if resourceVersion == 0 {
  172. latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.quorum, w.etcdIncoming)
  173. if err != nil {
  174. w.etcdError <- err
  175. return true
  176. }
  177. resourceVersion = latest
  178. }
  179. opts := etcd.WatcherOptions{
  180. Recursive: w.list,
  181. AfterIndex: resourceVersion,
  182. }
  183. watcher = client.Watcher(key, &opts)
  184. w.ctx, w.cancel = context.WithCancel(ctx)
  185. return false
  186. }()
  187. defer done()
  188. if returned {
  189. return
  190. }
  191. for {
  192. resp, err := watcher.Next(w.ctx)
  193. if err != nil {
  194. w.etcdError <- err
  195. return
  196. }
  197. w.etcdIncoming <- resp
  198. }
  199. }
  200. // etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
  201. func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, quorum bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
  202. opts := etcd.GetOptions{
  203. Recursive: recursive,
  204. Sort: false,
  205. Quorum: quorum,
  206. }
  207. resp, err := client.Get(ctx, key, &opts)
  208. if err != nil {
  209. if !etcdutil.IsEtcdNotFound(err) {
  210. utilruntime.HandleError(fmt.Errorf("watch was unable to retrieve the current index for the provided key (%q): %v", key, err))
  211. return resourceVersion, toStorageErr(err, key, 0)
  212. }
  213. if etcdError, ok := err.(etcd.Error); ok {
  214. resourceVersion = etcdError.Index
  215. }
  216. return resourceVersion, nil
  217. }
  218. resourceVersion = resp.Index
  219. convertRecursiveResponse(resp.Node, resp, incoming)
  220. return
  221. }
  222. // convertRecursiveResponse turns a recursive get response from etcd into individual response objects
  223. // by copying the original response. This emulates the behavior of a recursive watch.
  224. func convertRecursiveResponse(node *etcd.Node, response *etcd.Response, incoming chan<- *etcd.Response) {
  225. if node.Dir {
  226. for i := range node.Nodes {
  227. convertRecursiveResponse(node.Nodes[i], response, incoming)
  228. }
  229. return
  230. }
  231. copied := *response
  232. copied.Action = "get"
  233. copied.Node = node
  234. incoming <- &copied
  235. }
  236. var (
  237. watchChannelHWM HighWaterMark
  238. )
  239. // translate pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
  240. // called as a goroutine.
  241. func (w *etcdWatcher) translate() {
  242. defer w.wg.Done()
  243. defer close(w.outgoing)
  244. defer utilruntime.HandleCrash()
  245. for {
  246. select {
  247. case err := <-w.etcdError:
  248. if err != nil {
  249. var status *unversioned.Status
  250. switch {
  251. case etcdutil.IsEtcdWatchExpired(err):
  252. status = &unversioned.Status{
  253. Status: unversioned.StatusFailure,
  254. Message: err.Error(),
  255. Code: http.StatusGone, // Gone
  256. Reason: unversioned.StatusReasonExpired,
  257. }
  258. // TODO: need to generate errors using api/errors which has a circular dependency on this package
  259. // no other way to inject errors
  260. // case etcdutil.IsEtcdUnreachable(err):
  261. // status = errors.NewServerTimeout(...)
  262. default:
  263. status = &unversioned.Status{
  264. Status: unversioned.StatusFailure,
  265. Message: err.Error(),
  266. Code: http.StatusInternalServerError,
  267. Reason: unversioned.StatusReasonInternalError,
  268. }
  269. }
  270. w.emit(watch.Event{
  271. Type: watch.Error,
  272. Object: status,
  273. })
  274. }
  275. return
  276. case <-w.userStop:
  277. return
  278. case res, ok := <-w.etcdIncoming:
  279. if ok {
  280. if curLen := int64(len(w.etcdIncoming)); watchChannelHWM.Update(curLen) {
  281. // Monitor if this gets backed up, and how much.
  282. glog.V(2).Infof("watch: %v objects queued in channel.", curLen)
  283. }
  284. w.sendResult(res)
  285. }
  286. // If !ok, don't return here-- must wait for etcdError channel
  287. // to give an error or be closed.
  288. }
  289. }
  290. }
  291. func (w *etcdWatcher) decodeObject(node *etcd.Node) (runtime.Object, error) {
  292. if obj, found := w.cache.getFromCache(node.ModifiedIndex, storage.Everything); found {
  293. return obj, nil
  294. }
  295. obj, err := runtime.Decode(w.encoding, []byte(node.Value))
  296. if err != nil {
  297. return nil, err
  298. }
  299. // ensure resource version is set on the object we load from etcd
  300. if err := w.versioner.UpdateObject(obj, node.ModifiedIndex); err != nil {
  301. utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", node.ModifiedIndex, obj, err))
  302. }
  303. // perform any necessary transformation
  304. if w.transform != nil {
  305. obj, err = w.transform(obj)
  306. if err != nil {
  307. utilruntime.HandleError(fmt.Errorf("failure to transform api object %#v: %v", obj, err))
  308. return nil, err
  309. }
  310. }
  311. if node.ModifiedIndex != 0 {
  312. w.cache.addToCache(node.ModifiedIndex, obj)
  313. }
  314. return obj, nil
  315. }
  316. func (w *etcdWatcher) sendAdd(res *etcd.Response) {
  317. if res.Node == nil {
  318. utilruntime.HandleError(fmt.Errorf("unexpected nil node: %#v", res))
  319. return
  320. }
  321. if w.include != nil && !w.include(res.Node.Key) {
  322. return
  323. }
  324. obj, err := w.decodeObject(res.Node)
  325. if err != nil {
  326. utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
  327. // TODO: expose an error through watch.Interface?
  328. // Ignore this value. If we stop the watch on a bad value, a client that uses
  329. // the resourceVersion to resume will never be able to get past a bad value.
  330. return
  331. }
  332. if !w.filter.Filter(obj) {
  333. return
  334. }
  335. action := watch.Added
  336. if res.Node.ModifiedIndex != res.Node.CreatedIndex {
  337. action = watch.Modified
  338. }
  339. w.emit(watch.Event{
  340. Type: action,
  341. Object: obj,
  342. })
  343. }
  344. func (w *etcdWatcher) sendModify(res *etcd.Response) {
  345. if res.Node == nil {
  346. glog.Errorf("unexpected nil node: %#v", res)
  347. return
  348. }
  349. if w.include != nil && !w.include(res.Node.Key) {
  350. return
  351. }
  352. curObj, err := w.decodeObject(res.Node)
  353. if err != nil {
  354. utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\n'%v' from %#v %#v", err, string(res.Node.Value), res, res.Node))
  355. // TODO: expose an error through watch.Interface?
  356. // Ignore this value. If we stop the watch on a bad value, a client that uses
  357. // the resourceVersion to resume will never be able to get past a bad value.
  358. return
  359. }
  360. curObjPasses := w.filter.Filter(curObj)
  361. oldObjPasses := false
  362. var oldObj runtime.Object
  363. if res.PrevNode != nil && res.PrevNode.Value != "" {
  364. // Ignore problems reading the old object.
  365. if oldObj, err = w.decodeObject(res.PrevNode); err == nil {
  366. if err := w.versioner.UpdateObject(oldObj, res.Node.ModifiedIndex); err != nil {
  367. utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", res.Node.ModifiedIndex, oldObj, err))
  368. }
  369. oldObjPasses = w.filter.Filter(oldObj)
  370. }
  371. }
  372. // Some changes to an object may cause it to start or stop matching a filter.
  373. // We need to report those as adds/deletes. So we have to check both the previous
  374. // and current value of the object.
  375. switch {
  376. case curObjPasses && oldObjPasses:
  377. w.emit(watch.Event{
  378. Type: watch.Modified,
  379. Object: curObj,
  380. })
  381. case curObjPasses && !oldObjPasses:
  382. w.emit(watch.Event{
  383. Type: watch.Added,
  384. Object: curObj,
  385. })
  386. case !curObjPasses && oldObjPasses:
  387. w.emit(watch.Event{
  388. Type: watch.Deleted,
  389. Object: oldObj,
  390. })
  391. }
  392. // Do nothing if neither new nor old object passed the filter.
  393. }
  394. func (w *etcdWatcher) sendDelete(res *etcd.Response) {
  395. if res.PrevNode == nil {
  396. utilruntime.HandleError(fmt.Errorf("unexpected nil prev node: %#v", res))
  397. return
  398. }
  399. if w.include != nil && !w.include(res.PrevNode.Key) {
  400. return
  401. }
  402. node := *res.PrevNode
  403. if res.Node != nil {
  404. // Note that this sends the *old* object with the etcd index for the time at
  405. // which it gets deleted. This will allow users to restart the watch at the right
  406. // index.
  407. node.ModifiedIndex = res.Node.ModifiedIndex
  408. }
  409. obj, err := w.decodeObject(&node)
  410. if err != nil {
  411. utilruntime.HandleError(fmt.Errorf("failure to decode api object: %v\nfrom %#v %#v", err, res, res.Node))
  412. // TODO: expose an error through watch.Interface?
  413. // Ignore this value. If we stop the watch on a bad value, a client that uses
  414. // the resourceVersion to resume will never be able to get past a bad value.
  415. return
  416. }
  417. if !w.filter.Filter(obj) {
  418. return
  419. }
  420. w.emit(watch.Event{
  421. Type: watch.Deleted,
  422. Object: obj,
  423. })
  424. }
  425. func (w *etcdWatcher) sendResult(res *etcd.Response) {
  426. switch res.Action {
  427. case EtcdCreate, EtcdGet:
  428. w.sendAdd(res)
  429. case EtcdSet, EtcdCAS:
  430. w.sendModify(res)
  431. case EtcdDelete, EtcdExpire, EtcdCAD:
  432. w.sendDelete(res)
  433. default:
  434. utilruntime.HandleError(fmt.Errorf("unknown action: %v", res.Action))
  435. }
  436. }
  437. // ResultChan implements watch.Interface.
  438. func (w *etcdWatcher) ResultChan() <-chan watch.Event {
  439. return w.outgoing
  440. }
  441. // Stop implements watch.Interface.
  442. func (w *etcdWatcher) Stop() {
  443. w.stopLock.Lock()
  444. if w.cancel != nil {
  445. w.cancel()
  446. w.cancel = nil
  447. }
  448. if !w.stopped {
  449. w.stopped = true
  450. close(w.userStop)
  451. }
  452. w.stopLock.Unlock()
  453. // Wait until all calls to etcd are finished and no other
  454. // will be issued.
  455. w.wg.Wait()
  456. }