123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package etcd
- import (
- "errors"
- "fmt"
- "path"
- "reflect"
- "strings"
- "time"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/api/meta"
- "k8s.io/kubernetes/pkg/conversion"
- "k8s.io/kubernetes/pkg/runtime"
- "k8s.io/kubernetes/pkg/storage"
- "k8s.io/kubernetes/pkg/storage/etcd/metrics"
- etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
- "k8s.io/kubernetes/pkg/util"
- utilcache "k8s.io/kubernetes/pkg/util/cache"
- "k8s.io/kubernetes/pkg/watch"
- etcd "github.com/coreos/etcd/client"
- "github.com/golang/glog"
- "golang.org/x/net/context"
- )
- // Creates a new storage interface from the client
- // TODO: deprecate in favor of storage.Config abstraction over time
- func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool, cacheSize int) storage.Interface {
- return &etcdHelper{
- etcdMembersAPI: etcd.NewMembersAPI(client),
- etcdKeysAPI: etcd.NewKeysAPI(client),
- codec: codec,
- versioner: APIObjectVersioner{},
- copier: api.Scheme,
- pathPrefix: path.Join("/", prefix),
- quorum: quorum,
- cache: utilcache.NewCache(cacheSize),
- }
- }
- // etcdHelper is the reference implementation of storage.Interface.
- type etcdHelper struct {
- etcdMembersAPI etcd.MembersAPI
- etcdKeysAPI etcd.KeysAPI
- codec runtime.Codec
- copier runtime.ObjectCopier
- // Note that versioner is required for etcdHelper to work correctly.
- // The public constructors (NewStorage & NewEtcdStorage) are setting it
- // correctly, so be careful when manipulating with it manually.
- // optional, has to be set to perform any atomic operations
- versioner storage.Versioner
- // prefix for all etcd keys
- pathPrefix string
- // if true, perform quorum read
- quorum bool
- // We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
- // to resourceVersion.
- // This depends on etcd's indexes being globally unique across all objects/types. This will
- // have to revisited if we decide to do things like multiple etcd clusters, or etcd will
- // support multi-object transaction that will result in many objects with the same index.
- // Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
- // TODO: Measure how much this cache helps after the conversion code is optimized.
- cache utilcache.Cache
- }
- func init() {
- metrics.Register()
- }
- // Implements storage.Interface.
- func (h *etcdHelper) Versioner() storage.Versioner {
- return h.versioner
- }
- // Implements storage.Interface.
- func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
- trace := util.NewTrace("etcdHelper::Create " + getTypeName(obj))
- defer trace.LogIfLong(250 * time.Millisecond)
- if ctx == nil {
- glog.Errorf("Context is nil")
- }
- key = h.prefixEtcdKey(key)
- data, err := runtime.Encode(h.codec, obj)
- trace.Step("Object encoded")
- if err != nil {
- return err
- }
- if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
- return errors.New("resourceVersion may not be set on objects to be created")
- }
- trace.Step("Version checked")
- startTime := time.Now()
- opts := etcd.SetOptions{
- TTL: time.Duration(ttl) * time.Second,
- PrevExist: etcd.PrevNoExist,
- }
- response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
- trace.Step("Object created")
- metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
- if err != nil {
- return toStorageErr(err, key, 0)
- }
- if out != nil {
- if _, err := conversion.EnforcePtr(out); err != nil {
- panic("unable to convert output object to pointer")
- }
- _, _, err = h.extractObj(response, err, out, false, false)
- }
- return err
- }
- func checkPreconditions(key string, preconditions *storage.Preconditions, out runtime.Object) error {
- if preconditions == nil {
- return nil
- }
- objMeta, err := api.ObjectMetaFor(out)
- if err != nil {
- return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err)
- }
- if preconditions.UID != nil && *preconditions.UID != objMeta.UID {
- errMsg := fmt.Sprintf("Precondition failed: UID in precondition: %v, UID in object meta: %v", preconditions.UID, objMeta.UID)
- return storage.NewInvalidObjError(key, errMsg)
- }
- return nil
- }
- // Implements storage.Interface.
- func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
- if ctx == nil {
- glog.Errorf("Context is nil")
- }
- key = h.prefixEtcdKey(key)
- v, err := conversion.EnforcePtr(out)
- if err != nil {
- panic("unable to convert output object to pointer")
- }
- if preconditions == nil {
- startTime := time.Now()
- response, err := h.etcdKeysAPI.Delete(ctx, key, nil)
- metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
- if !etcdutil.IsEtcdNotFound(err) {
- // if the object that existed prior to the delete is returned by etcd, update the out object.
- if err != nil || response.PrevNode != nil {
- _, _, err = h.extractObj(response, err, out, false, true)
- }
- }
- return toStorageErr(err, key, 0)
- }
- // Check the preconditions match.
- obj := reflect.New(v.Type()).Interface().(runtime.Object)
- for {
- _, node, res, err := h.bodyAndExtractObj(ctx, key, obj, false)
- if err != nil {
- return toStorageErr(err, key, 0)
- }
- if err := checkPreconditions(key, preconditions, obj); err != nil {
- return toStorageErr(err, key, 0)
- }
- index := uint64(0)
- if node != nil {
- index = node.ModifiedIndex
- } else if res != nil {
- index = res.Index
- }
- opt := etcd.DeleteOptions{PrevIndex: index}
- startTime := time.Now()
- response, err := h.etcdKeysAPI.Delete(ctx, key, &opt)
- metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
- if etcdutil.IsEtcdTestFailed(err) {
- glog.Infof("deletion of %s failed because of a conflict, going to retry", key)
- } else {
- if !etcdutil.IsEtcdNotFound(err) {
- // if the object that existed prior to the delete is returned by etcd, update the out object.
- if err != nil || response.PrevNode != nil {
- _, _, err = h.extractObj(response, err, out, false, true)
- }
- }
- return toStorageErr(err, key, 0)
- }
- }
- }
- // Implements storage.Interface.
- func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
- if ctx == nil {
- glog.Errorf("Context is nil")
- }
- watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
- if err != nil {
- return nil, err
- }
- key = h.prefixEtcdKey(key)
- w := newEtcdWatcher(false, h.quorum, nil, filter, h.codec, h.versioner, nil, h)
- go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
- return w, nil
- }
- // Implements storage.Interface.
- func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
- if ctx == nil {
- glog.Errorf("Context is nil")
- }
- watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
- if err != nil {
- return nil, err
- }
- key = h.prefixEtcdKey(key)
- w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h)
- go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
- return w, nil
- }
- // Implements storage.Interface.
- func (h *etcdHelper) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error {
- if ctx == nil {
- glog.Errorf("Context is nil")
- }
- key = h.prefixEtcdKey(key)
- _, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
- return err
- }
- // bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
- // about the response, like the current etcd index and the ttl.
- func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
- if ctx == nil {
- glog.Errorf("Context is nil")
- }
- startTime := time.Now()
- opts := &etcd.GetOptions{
- Quorum: h.quorum,
- }
- response, err := h.etcdKeysAPI.Get(ctx, key, opts)
- metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
- if err != nil && !etcdutil.IsEtcdNotFound(err) {
- return "", nil, nil, toStorageErr(err, key, 0)
- }
- body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
- return body, node, response, toStorageErr(err, key, 0)
- }
- func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) {
- if response != nil {
- if prevNode {
- node = response.PrevNode
- } else {
- node = response.Node
- }
- }
- if inErr != nil || node == nil || len(node.Value) == 0 {
- if ignoreNotFound {
- v, err := conversion.EnforcePtr(objPtr)
- if err != nil {
- return "", nil, err
- }
- v.Set(reflect.Zero(v.Type()))
- return "", nil, nil
- } else if inErr != nil {
- return "", nil, inErr
- }
- return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response)
- }
- body = node.Value
- out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
- if err != nil {
- return body, nil, err
- }
- if out != objPtr {
- return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
- }
- // being unable to set the version does not prevent the object from being extracted
- _ = h.versioner.UpdateObject(objPtr, node.ModifiedIndex)
- return body, node, err
- }
- // Implements storage.Interface.
- func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.Filter, listObj runtime.Object) error {
- if ctx == nil {
- glog.Errorf("Context is nil")
- }
- trace := util.NewTrace("GetToList " + getTypeName(listObj))
- listPtr, err := meta.GetItemsPtr(listObj)
- if err != nil {
- return err
- }
- key = h.prefixEtcdKey(key)
- startTime := time.Now()
- trace.Step("About to read etcd node")
- opts := &etcd.GetOptions{
- Quorum: h.quorum,
- }
- response, err := h.etcdKeysAPI.Get(ctx, key, opts)
- trace.Step("Etcd node read")
- metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
- if err != nil {
- if etcdutil.IsEtcdNotFound(err) {
- return nil
- }
- return toStorageErr(err, key, 0)
- }
- nodes := make([]*etcd.Node, 0)
- nodes = append(nodes, response.Node)
- if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
- return err
- }
- trace.Step("Object decoded")
- if err := h.versioner.UpdateList(listObj, response.Index); err != nil {
- return err
- }
- return nil
- }
- // decodeNodeList walks the tree of each node in the list and decodes into the specified object
- func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.Filter, slicePtr interface{}) error {
- trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
- defer trace.LogIfLong(400 * time.Millisecond)
- v, err := conversion.EnforcePtr(slicePtr)
- if err != nil || v.Kind() != reflect.Slice {
- // This should not happen at runtime.
- panic("need ptr to slice")
- }
- for _, node := range nodes {
- if node.Dir {
- trace.Step("Decoding dir " + node.Key + " START")
- if err := h.decodeNodeList(node.Nodes, filter, slicePtr); err != nil {
- return err
- }
- trace.Step("Decoding dir " + node.Key + " END")
- continue
- }
- if obj, found := h.getFromCache(node.ModifiedIndex, filter); found {
- // obj != nil iff it matches the filter function.
- if obj != nil {
- v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
- }
- } else {
- obj, _, err := h.codec.Decode([]byte(node.Value), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
- if err != nil {
- return err
- }
- // being unable to set the version does not prevent the object from being extracted
- _ = h.versioner.UpdateObject(obj, node.ModifiedIndex)
- if filter.Filter(obj) {
- v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
- }
- if node.ModifiedIndex != 0 {
- h.addToCache(node.ModifiedIndex, obj)
- }
- }
- }
- trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
- return nil
- }
- // Implements storage.Interface.
- func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, filter storage.Filter, listObj runtime.Object) error {
- if ctx == nil {
- glog.Errorf("Context is nil")
- }
- trace := util.NewTrace("List " + getTypeName(listObj))
- defer trace.LogIfLong(400 * time.Millisecond)
- listPtr, err := meta.GetItemsPtr(listObj)
- if err != nil {
- return err
- }
- key = h.prefixEtcdKey(key)
- startTime := time.Now()
- trace.Step("About to list etcd node")
- nodes, index, err := h.listEtcdNode(ctx, key)
- trace.Step("Etcd node listed")
- metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
- if err != nil {
- return err
- }
- if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
- return err
- }
- trace.Step("Node list decoded")
- if err := h.versioner.UpdateList(listObj, index); err != nil {
- return err
- }
- return nil
- }
- func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node, uint64, error) {
- if ctx == nil {
- glog.Errorf("Context is nil")
- }
- opts := etcd.GetOptions{
- Recursive: true,
- Sort: true,
- Quorum: h.quorum,
- }
- result, err := h.etcdKeysAPI.Get(ctx, key, &opts)
- if err != nil {
- var index uint64
- if etcdError, ok := err.(etcd.Error); ok {
- index = etcdError.Index
- }
- nodes := make([]*etcd.Node, 0)
- if etcdutil.IsEtcdNotFound(err) {
- return nodes, index, nil
- } else {
- return nodes, index, toStorageErr(err, key, 0)
- }
- }
- return result.Node.Nodes, result.Index, nil
- }
- // Implements storage.Interface.
- func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
- if ctx == nil {
- glog.Errorf("Context is nil")
- }
- v, err := conversion.EnforcePtr(ptrToType)
- if err != nil {
- // Panic is appropriate, because this is a programming error.
- panic("need ptr to type")
- }
- key = h.prefixEtcdKey(key)
- for {
- obj := reflect.New(v.Type()).Interface().(runtime.Object)
- origBody, node, res, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
- if err != nil {
- return toStorageErr(err, key, 0)
- }
- if err := checkPreconditions(key, preconditions, obj); err != nil {
- return toStorageErr(err, key, 0)
- }
- meta := storage.ResponseMeta{}
- if node != nil {
- meta.TTL = node.TTL
- meta.ResourceVersion = node.ModifiedIndex
- }
- // Get the object to be written by calling tryUpdate.
- ret, newTTL, err := tryUpdate(obj, meta)
- if err != nil {
- return toStorageErr(err, key, 0)
- }
- index := uint64(0)
- ttl := uint64(0)
- if node != nil {
- index = node.ModifiedIndex
- if node.TTL != 0 {
- ttl = uint64(node.TTL)
- }
- if node.Expiration != nil && ttl == 0 {
- ttl = 1
- }
- } else if res != nil {
- index = res.Index
- }
- if newTTL != nil {
- if ttl != 0 && *newTTL == 0 {
- // TODO: remove this after we have verified this is no longer an issue
- glog.V(4).Infof("GuaranteedUpdate is clearing TTL for %q, may not be intentional", key)
- }
- ttl = *newTTL
- }
- // Since update object may have a resourceVersion set, we need to clear it here.
- if err := h.versioner.UpdateObject(ret, 0); err != nil {
- return errors.New("resourceVersion cannot be set on objects store in etcd")
- }
- data, err := runtime.Encode(h.codec, ret)
- if err != nil {
- return err
- }
- // First time this key has been used, try creating new value.
- if index == 0 {
- startTime := time.Now()
- opts := etcd.SetOptions{
- TTL: time.Duration(ttl) * time.Second,
- PrevExist: etcd.PrevNoExist,
- }
- response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
- metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
- if etcdutil.IsEtcdNodeExist(err) {
- continue
- }
- _, _, err = h.extractObj(response, err, ptrToType, false, false)
- return toStorageErr(err, key, 0)
- }
- if string(data) == origBody {
- // If we don't send an update, we simply return the currently existing
- // version of the object.
- _, _, err := h.extractObj(res, nil, ptrToType, ignoreNotFound, false)
- return err
- }
- startTime := time.Now()
- // Swap origBody with data, if origBody is the latest etcd data.
- opts := etcd.SetOptions{
- PrevValue: origBody,
- PrevIndex: index,
- TTL: time.Duration(ttl) * time.Second,
- }
- response, err := h.etcdKeysAPI.Set(ctx, key, string(data), &opts)
- metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
- if etcdutil.IsEtcdTestFailed(err) {
- // Try again.
- continue
- }
- _, _, err = h.extractObj(response, err, ptrToType, false, false)
- return toStorageErr(err, key, int64(index))
- }
- }
- func (h *etcdHelper) prefixEtcdKey(key string) string {
- if strings.HasPrefix(key, h.pathPrefix) {
- return key
- }
- return path.Join(h.pathPrefix, key)
- }
- // etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
- // their Node.ModifiedIndex, which is unique across all types.
- // All implementations must be thread-safe.
- type etcdCache interface {
- getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool)
- addToCache(index uint64, obj runtime.Object)
- }
- func getTypeName(obj interface{}) string {
- return reflect.TypeOf(obj).String()
- }
- func (h *etcdHelper) getFromCache(index uint64, filter storage.Filter) (runtime.Object, bool) {
- startTime := time.Now()
- defer func() {
- metrics.ObserveGetCache(startTime)
- }()
- obj, found := h.cache.Get(index)
- if found {
- if !filter.Filter(obj.(runtime.Object)) {
- return nil, true
- }
- // We should not return the object itself to avoid polluting the cache if someone
- // modifies returned values.
- objCopy, err := h.copier.Copy(obj.(runtime.Object))
- if err != nil {
- glog.Errorf("Error during DeepCopy of cached object: %q", err)
- // We can't return a copy, thus we report the object as not found.
- return nil, false
- }
- metrics.ObserveCacheHit()
- return objCopy.(runtime.Object), true
- }
- metrics.ObserveCacheMiss()
- return nil, false
- }
- func (h *etcdHelper) addToCache(index uint64, obj runtime.Object) {
- startTime := time.Now()
- defer func() {
- metrics.ObserveAddCache(startTime)
- }()
- objCopy, err := h.copier.Copy(obj)
- if err != nil {
- glog.Errorf("Error during DeepCopy of cached object: %q", err)
- return
- }
- isOverwrite := h.cache.Add(index, objCopy)
- if !isOverwrite {
- metrics.ObserveNewEntry()
- }
- }
- func toStorageErr(err error, key string, rv int64) error {
- if err == nil {
- return nil
- }
- switch {
- case etcdutil.IsEtcdNotFound(err):
- return storage.NewKeyNotFoundError(key, rv)
- case etcdutil.IsEtcdNodeExist(err):
- return storage.NewKeyExistsError(key, rv)
- case etcdutil.IsEtcdTestFailed(err):
- return storage.NewResourceVersionConflictsError(key, rv)
- case etcdutil.IsEtcdUnreachable(err):
- return storage.NewUnreachableError(key, rv)
- default:
- return err
- }
- }
|