etcd_helper.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "errors"
  16. "fmt"
  17. "path"
  18. "reflect"
  19. "strings"
  20. "time"
  21. "k8s.io/kubernetes/pkg/api"
  22. "k8s.io/kubernetes/pkg/api/meta"
  23. "k8s.io/kubernetes/pkg/conversion"
  24. "k8s.io/kubernetes/pkg/runtime"
  25. "k8s.io/kubernetes/pkg/storage"
  26. "k8s.io/kubernetes/pkg/storage/etcd/metrics"
  27. etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
  28. "k8s.io/kubernetes/pkg/util"
  29. utilcache "k8s.io/kubernetes/pkg/util/cache"
  30. "k8s.io/kubernetes/pkg/watch"
  31. etcd "github.com/coreos/etcd/client"
  32. "github.com/golang/glog"
  33. "golang.org/x/net/context"
  34. )
  35. // Creates a new storage interface from the client
  36. // TODO: deprecate in favor of storage.Config abstraction over time
  37. func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface {
  38. return &etcdHelper{
  39. etcdMembersAPI: etcd.NewMembersAPI(client),
  40. etcdKeysAPI: etcd.NewKeysAPI(client),
  41. codec: codec,
  42. versioner: APIObjectVersioner{},
  43. copier: api.Scheme,
  44. pathPrefix: path.Join("/", prefix),
  45. quorum: quorum,
  46. cache: utilcache.NewCache(cacheSize),
  47. }
  48. }
  49. // etcdHelper is the reference implementation of storage.Interface.
  50. type etcdHelper struct {
  51. etcdMembersAPI etcd.MembersAPI
  52. etcdKeysAPI etcd.KeysAPI
  53. codec runtime.Codec
  54. copier runtime.ObjectCopier
  55. // Note that versioner is required for etcdHelper to work correctly.
  56. // The public constructors (NewStorage & NewEtcdStorage) are setting it
  57. // correctly, so be careful when manipulating with it manually.
  58. // optional, has to be set to perform any atomic operations
  59. versioner storage.Versioner
  60. // prefix for all etcd keys
  61. pathPrefix string
  62. // if true, perform quorum read
  63. quorum bool
  64. // We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
  65. // to resourceVersion.
  66. // This depends on etcd's indexes being globally unique across all objects/types. This will
  67. // have to revisited if we decide to do things like multiple etcd clusters, or etcd will
  68. // support multi-object transaction that will result in many objects with the same index.
  69. // Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
  70. // TODO: Measure how much this cache helps after the conversion code is optimized.
  71. cache utilcache.Cache
  72. }
  73. func init() {
  74. metrics.Register()
  75. }
  76. // Implements storage.Interface.
  77. func (h *etcdHelper) Versioner() storage.Versioner {
  78. return h.versioner
  79. }
  80. // Implements storage.Interface.
  81. func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
  82. trace := util.NewTrace("etcdHelper::Create " + getTypeName(obj))
  83. defer trace.LogIfLong(250 * time.Millisecond)
  84. if ctx == nil {
  85. glog.Errorf("Context is nil")
  86. }
  87. key = h.prefixEtcdKey(key)
  88. data, err := runtime.Encode(h.codec, obj)
  89. trace.Step("Object encoded")
  90. if err != nil {
  91. return err
  92. }
  93. if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
  94. return errors.New("resourceVersion may not be set on objects to be created")
  95. }
  96. trace.Step("Version checked")
  97. startTime := time.Now()
  98. opts := etcd.SetOptions{
  99. TTL: time.Duration(ttl) * time.Second,
  100. PrevExist: etcd.PrevNoExist,
  101. }
  102. response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
  103. trace.Step("Object created")
  104. metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
  105. if err != nil {
  106. return toStorageErr(err, key, 0)
  107. }
  108. if out != nil {
  109. if _, err := conversion.EnforcePtr(out); err != nil {
  110. panic("unable to convert output object to pointer")
  111. }
  112. _, _, err = h.extractObj(response, err, out, false, false)
  113. }
  114. return err
  115. }
  116. func checkPreconditions(key string, preconditions *storage.Preconditions, out runtime.Object) error {
  117. if preconditions == nil {
  118. return nil
  119. }
  120. objMeta, err := api.ObjectMetaFor(out)
  121. if err != nil {
  122. return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err)
  123. }
  124. if preconditions.UID != nil && *preconditions.UID != objMeta.UID {
  125. errMsg := fmt.Sprintf("Precondition failed: UID in precondition: %v, UID in object meta: %v", preconditions.UID, objMeta.UID)
  126. return storage.NewInvalidObjError(key, errMsg)
  127. }
  128. return nil
  129. }
  130. // Implements storage.Interface.
  131. func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
  132. if ctx == nil {
  133. glog.Errorf("Context is nil")
  134. }
  135. key = h.prefixEtcdKey(key)
  136. v, err := conversion.EnforcePtr(out)
  137. if err != nil {
  138. panic("unable to convert output object to pointer")
  139. }
  140. if preconditions == nil {
  141. startTime := time.Now()
  142. response, err := h.etcdKeysAPI.Delete(ctx, key, nil)
  143. metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
  144. if !etcdutil.IsEtcdNotFound(err) {
  145. // if the object that existed prior to the delete is returned by etcd, update the out object.
  146. if err != nil || response.PrevNode != nil {
  147. _, _, err = h.extractObj(response, err, out, false, true)
  148. }
  149. }
  150. return toStorageErr(err, key, 0)
  151. }
  152. // Check the preconditions match.
  153. obj := reflect.New(v.Type()).Interface().(runtime.Object)
  154. for {
  155. _, node, res, err := h.bodyAndExtractObj(ctx, key, obj, false)
  156. if err != nil {
  157. return toStorageErr(err, key, 0)
  158. }
  159. if err := checkPreconditions(key, preconditions, obj); err != nil {
  160. return toStorageErr(err, key, 0)
  161. }
  162. index := uint64(0)
  163. if node != nil {
  164. index = node.ModifiedIndex
  165. } else if res != nil {
  166. index = res.Index
  167. }
  168. opt := etcd.DeleteOptions{PrevIndex: index}
  169. startTime := time.Now()
  170. response, err := h.etcdKeysAPI.Delete(ctx, key, &opt)
  171. metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
  172. if etcdutil.IsEtcdTestFailed(err) {
  173. glog.Infof("deletion of %s failed because of a conflict, going to retry", key)
  174. } else {
  175. if !etcdutil.IsEtcdNotFound(err) {
  176. // if the object that existed prior to the delete is returned by etcd, update the out object.
  177. if err != nil || response.PrevNode != nil {
  178. _, _, err = h.extractObj(response, err, out, false, true)
  179. }
  180. }
  181. return toStorageErr(err, key, 0)
  182. }
  183. }
  184. }
  185. // Implements storage.Interface.
  186. func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
  187. if ctx == nil {
  188. glog.Errorf("Context is nil")
  189. }
  190. watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
  191. if err != nil {
  192. return nil, err
  193. }
  194. key = h.prefixEtcdKey(key)
  195. w := newEtcdWatcher(false, h.quorum, nil, filter, h.codec, h.versioner, nil, h)
  196. go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
  197. return w, nil
  198. }
  199. // Implements storage.Interface.
  200. func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
  201. if ctx == nil {
  202. glog.Errorf("Context is nil")
  203. }
  204. watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
  205. if err != nil {
  206. return nil, err
  207. }
  208. key = h.prefixEtcdKey(key)
  209. w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h)
  210. go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
  211. return w, nil
  212. }
  213. // Implements storage.Interface.
  214. func (h *etcdHelper) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error {
  215. if ctx == nil {
  216. glog.Errorf("Context is nil")
  217. }
  218. key = h.prefixEtcdKey(key)
  219. _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
  220. return err
  221. }
  222. // bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
  223. // about the response, like the current etcd index and the ttl.
  224. func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
  225. if ctx == nil {
  226. glog.Errorf("Context is nil")
  227. }
  228. startTime := time.Now()
  229. opts := &etcd.GetOptions{
  230. Quorum: h.quorum,
  231. }
  232. response, err := h.etcdKeysAPI.Get(ctx, key, opts)
  233. metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
  234. if err != nil && !etcdutil.IsEtcdNotFound(err) {
  235. return "", nil, nil, toStorageErr(err, key, 0)
  236. }
  237. body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
  238. return body, node, response, toStorageErr(err, key, 0)
  239. }
  240. func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) {
  241. if response != nil {
  242. if prevNode {
  243. node = response.PrevNode
  244. } else {
  245. node = response.Node
  246. }
  247. }
  248. if inErr != nil || node == nil || len(node.Value) == 0 {
  249. if ignoreNotFound {
  250. v, err := conversion.EnforcePtr(objPtr)
  251. if err != nil {
  252. return "", nil, err
  253. }
  254. v.Set(reflect.Zero(v.Type()))
  255. return "", nil, nil
  256. } else if inErr != nil {
  257. return "", nil, inErr
  258. }
  259. return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response)
  260. }
  261. body = node.Value
  262. out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
  263. if err != nil {
  264. return body, nil, err
  265. }
  266. if out != objPtr {
  267. return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
  268. }
  269. // being unable to set the version does not prevent the object from being extracted
  270. _ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex)
  271. return body, node, err
  272. }
  273. // Implements storage.Interface.
  274. func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.Filter, listObj runtime.Object) error {
  275. if ctx == nil {
  276. glog.Errorf("Context is nil")
  277. }
  278. trace := util.NewTrace("GetToList " + getTypeName(listObj))
  279. listPtr, err := meta.GetItemsPtr(listObj)
  280. if err != nil {
  281. return err
  282. }
  283. key = h.prefixEtcdKey(key)
  284. startTime := time.Now()
  285. trace.Step("About to read etcd node")
  286. opts := &etcd.GetOptions{
  287. Quorum: h.quorum,
  288. }
  289. response, err := h.etcdKeysAPI.Get(ctx, key, opts)
  290. trace.Step("Etcd node read")
  291. metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
  292. if err != nil {
  293. if etcdutil.IsEtcdNotFound(err) {
  294. return nil
  295. }
  296. return toStorageErr(err, key, 0)
  297. }
  298. nodes := make([]*etcd.Node, 0)
  299. nodes = append(nodes, response.Node)
  300. if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
  301. return err
  302. }
  303. trace.Step("Object decoded")
  304. if err := h.versioner.UpdateList(listObj, response.Index); err != nil {
  305. return err
  306. }
  307. return nil
  308. }
  309. // decodeNodeList walks the tree of each node in the list and decodes into the specified object
  310. func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.Filter, slicePtr interface{}) error {
  311. trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
  312. defer trace.LogIfLong(400 * time.Millisecond)
  313. v, err := conversion.EnforcePtr(slicePtr)
  314. if err != nil || v.Kind() != reflect.Slice {
  315. // This should not happen at runtime.
  316. panic("need ptr to slice")
  317. }
  318. for _, node := range nodes {
  319. if node.Dir {
  320. trace.Step("Decoding dir " + node.Key + " START")
  321. if err := h.decodeNodeList(node.Nodes, filter, slicePtr); err != nil {
  322. return err
  323. }
  324. trace.Step("Decoding dir " + node.Key + " END")
  325. continue
  326. }
  327. if obj, found := h.getFromCache(node.ModifiedIndex, filter); found {
  328. // obj != nil iff it matches the filter function.
  329. if obj != nil {
  330. v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
  331. }
  332. } else {
  333. obj, _, err := h.codec.Decode([]byte(node.Value), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
  334. if err != nil {
  335. return err
  336. }
  337. // being unable to set the version does not prevent the object from being extracted
  338. _ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
  339. if filter.Filter(obj) {
  340. v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
  341. }
  342. if node.ModifiedIndex != 0 {
  343. h.addToCache(node.ModifiedIndex, obj)
  344. }
  345. }
  346. }
  347. trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
  348. return nil
  349. }
  350. // Implements storage.Interface.
  351. func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, filter storage.Filter, listObj runtime.Object) error {
  352. if ctx == nil {
  353. glog.Errorf("Context is nil")
  354. }
  355. trace := util.NewTrace("List " + getTypeName(listObj))
  356. defer trace.LogIfLong(400 * time.Millisecond)
  357. listPtr, err := meta.GetItemsPtr(listObj)
  358. if err != nil {
  359. return err
  360. }
  361. key = h.prefixEtcdKey(key)
  362. startTime := time.Now()
  363. trace.Step("About to list etcd node")
  364. nodes, index, err := h.listEtcdNode(ctx, key)
  365. trace.Step("Etcd node listed")
  366. metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
  367. if err != nil {
  368. return err
  369. }
  370. if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
  371. return err
  372. }
  373. trace.Step("Node list decoded")
  374. if err := h.versioner.UpdateList(listObj, index); err != nil {
  375. return err
  376. }
  377. return nil
  378. }
  379. func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node, uint64, error) {
  380. if ctx == nil {
  381. glog.Errorf("Context is nil")
  382. }
  383. opts := etcd.GetOptions{
  384. Recursive: true,
  385. Sort: true,
  386. Quorum: h.quorum,
  387. }
  388. result, err := h.etcdKeysAPI.Get(ctx, key, &opts)
  389. if err != nil {
  390. var index uint64
  391. if etcdError, ok := err.(etcd.Error); ok {
  392. index = etcdError.Index
  393. }
  394. nodes := make([]*etcd.Node, 0)
  395. if etcdutil.IsEtcdNotFound(err) {
  396. return nodes, index, nil
  397. } else {
  398. return nodes, index, toStorageErr(err, key, 0)
  399. }
  400. }
  401. return result.Node.Nodes, result.Index, nil
  402. }
  403. // Implements storage.Interface.
  404. func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
  405. if ctx == nil {
  406. glog.Errorf("Context is nil")
  407. }
  408. v, err := conversion.EnforcePtr(ptrToType)
  409. if err != nil {
  410. // Panic is appropriate, because this is a programming error.
  411. panic("need ptr to type")
  412. }
  413. key = h.prefixEtcdKey(key)
  414. for {
  415. obj := reflect.New(v.Type()).Interface().(runtime.Object)
  416. origBody, node, res, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
  417. if err != nil {
  418. return toStorageErr(err, key, 0)
  419. }
  420. if err := checkPreconditions(key, preconditions, obj); err != nil {
  421. return toStorageErr(err, key, 0)
  422. }
  423. meta := storage.ResponseMeta{}
  424. if node != nil {
  425. meta.TTL = node.TTL
  426. meta.ResourceVersion = node.ModifiedIndex
  427. }
  428. // Get the object to be written by calling tryUpdate.
  429. ret, newTTL, err := tryUpdate(obj, meta)
  430. if err != nil {
  431. return toStorageErr(err, key, 0)
  432. }
  433. index := uint64(0)
  434. ttl := uint64(0)
  435. if node != nil {
  436. index = node.ModifiedIndex
  437. if node.TTL != 0 {
  438. ttl = uint64(node.TTL)
  439. }
  440. if node.Expiration != nil && ttl == 0 {
  441. ttl = 1
  442. }
  443. } else if res != nil {
  444. index = res.Index
  445. }
  446. if newTTL != nil {
  447. if ttl != 0 && *newTTL == 0 {
  448. // TODO: remove this after we have verified this is no longer an issue
  449. glog.V(4).Infof("GuaranteedUpdate is clearing TTL for %q, may not be intentional", key)
  450. }
  451. ttl = *newTTL
  452. }
  453. // Since update object may have a resourceVersion set, we need to clear it here.
  454. if err := h.versioner.UpdateObject(ret, 0); err != nil {
  455. return errors.New("resourceVersion cannot be set on objects store in etcd")
  456. }
  457. data, err := runtime.Encode(h.codec, ret)
  458. if err != nil {
  459. return err
  460. }
  461. // First time this key has been used, try creating new value.
  462. if index == 0 {
  463. startTime := time.Now()
  464. opts := etcd.SetOptions{
  465. TTL: time.Duration(ttl) * time.Second,
  466. PrevExist: etcd.PrevNoExist,
  467. }
  468. response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
  469. metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
  470. if etcdutil.IsEtcdNodeExist(err) {
  471. continue
  472. }
  473. _, _, err = h.extractObj(response, err, ptrToType, false, false)
  474. return toStorageErr(err, key, 0)
  475. }
  476. if string(data) == origBody {
  477. // If we don't send an update, we simply return the currently existing
  478. // version of the object.
  479. _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false)
  480. return err
  481. }
  482. startTime := time.Now()
  483. // Swap origBody with data, if origBody is the latest etcd data.
  484. opts := etcd.SetOptions{
  485. PrevValue: origBody,
  486. PrevIndex: index,
  487. TTL: time.Duration(ttl) * time.Second,
  488. }
  489. response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
  490. metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
  491. if etcdutil.IsEtcdTestFailed(err) {
  492. // Try again.
  493. continue
  494. }
  495. _, _, err = h.extractObj(response, err, ptrToType, false, false)
  496. return toStorageErr(err, key, int64(index))
  497. }
  498. }
  499. func (h *etcdHelper) prefixEtcdKey(key string) string {
  500. if strings.HasPrefix(key, h.pathPrefix) {
  501. return key
  502. }
  503. return path.Join(h.pathPrefix, key)
  504. }
  505. // etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
  506. // their Node.ModifiedIndex, which is unique across all types.
  507. // All implementations must be thread-safe.
  508. type etcdCache interface {
  509. getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool)
  510. addToCache(index uint64, obj runtime.Object)
  511. }
  512. func getTypeName(obj interface{}) string {
  513. return reflect.TypeOf(obj).String()
  514. }
  515. func (h *etcdHelper) getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool) {
  516. startTime := time.Now()
  517. defer func() {
  518. metrics.ObserveGetCache(startTime)
  519. }()
  520. obj, found := h.cache.Get(index)
  521. if found {
  522. if !filter.Filter(obj.(runtime.Object)) {
  523. return nil, true
  524. }
  525. // We should not return the object itself to avoid polluting the cache if someone
  526. // modifies returned values.
  527. objCopy, err := h.copier.Copy(obj.(runtime.Object))
  528. if err != nil {
  529. glog.Errorf("Error during DeepCopy of cached object: %q", err)
  530. // We can't return a copy, thus we report the object as not found.
  531. return nil, false
  532. }
  533. metrics.ObserveCacheHit()
  534. return objCopy.(runtime.Object), true
  535. }
  536. metrics.ObserveCacheMiss()
  537. return nil, false
  538. }
  539. func (h *etcdHelper) addToCache(index uint64, obj runtime.Object) {
  540. startTime := time.Now()
  541. defer func() {
  542. metrics.ObserveAddCache(startTime)
  543. }()
  544. objCopy, err := h.copier.Copy(obj)
  545. if err != nil {
  546. glog.Errorf("Error during DeepCopy of cached object: %q", err)
  547. return
  548. }
  549. isOverwrite := h.cache.Add(index, objCopy)
  550. if !isOverwrite {
  551. metrics.ObserveNewEntry()
  552. }
  553. }
  554. func toStorageErr(err error, key string, rv int64) error {
  555. if err == nil {
  556. return nil
  557. }
  558. switch {
  559. case etcdutil.IsEtcdNotFound(err):
  560. return storage.NewKeyNotFoundError(key, rv)
  561. case etcdutil.IsEtcdNodeExist(err):
  562. return storage.NewKeyExistsError(key, rv)
  563. case etcdutil.IsEtcdTestFailed(err):
  564. return storage.NewResourceVersionConflictsError(key, rv)
  565. case etcdutil.IsEtcdUnreachable(err):
  566. return storage.NewUnreachableError(key, rv)
  567. default:
  568. return err
  569. }
  570. }