123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455 |
- /*
- Copyright 2016 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package etcd3
- import (
- "bytes"
- "errors"
- "fmt"
- "path"
- "reflect"
- "strings"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/api/meta"
- "k8s.io/kubernetes/pkg/conversion"
- "k8s.io/kubernetes/pkg/runtime"
- "k8s.io/kubernetes/pkg/storage"
- "k8s.io/kubernetes/pkg/storage/etcd"
- "k8s.io/kubernetes/pkg/watch"
- "github.com/coreos/etcd/clientv3"
- "github.com/golang/glog"
- "golang.org/x/net/context"
- )
- type store struct {
- client *clientv3.Client
- codec runtime.Codec
- versioner storage.Versioner
- pathPrefix string
- watcher *watcher
- }
- type elemForDecode struct {
- data []byte
- rev uint64
- }
- type objState struct {
- obj runtime.Object
- meta *storage.ResponseMeta
- rev int64
- data []byte
- }
- // New returns an etcd3 implementation of storage.Interface.
- func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
- return newStore(c, codec, prefix)
- }
- func newStore(c *clientv3.Client, codec runtime.Codec, prefix string) *store {
- versioner := etcd.APIObjectVersioner{}
- return &store{
- client: c,
- versioner: versioner,
- codec: codec,
- pathPrefix: prefix,
- watcher: newWatcher(c, codec, versioner),
- }
- }
- // Versioner implements storage.Interface.Versioner.
- func (s *store) Versioner() storage.Versioner {
- return s.versioner
- }
- // Get implements storage.Interface.Get.
- func (s *store) Get(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool) error {
- key = keyWithPrefix(s.pathPrefix, key)
- getResp, err := s.client.KV.Get(ctx, key)
- if err != nil {
- return err
- }
- if len(getResp.Kvs) == 0 {
- if ignoreNotFound {
- return runtime.SetZeroValue(out)
- }
- return storage.NewKeyNotFoundError(key, 0)
- }
- kv := getResp.Kvs[0]
- return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision)
- }
- // Create implements storage.Interface.Create.
- func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
- if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
- return errors.New("resourceVersion should not be set on objects to be created")
- }
- data, err := runtime.Encode(s.codec, obj)
- if err != nil {
- return err
- }
- key = keyWithPrefix(s.pathPrefix, key)
- opts, err := s.ttlOpts(ctx, int64(ttl))
- if err != nil {
- return err
- }
- txnResp, err := s.client.KV.Txn(ctx).If(
- notFound(key),
- ).Then(
- clientv3.OpPut(key, string(data), opts...),
- ).Commit()
- if err != nil {
- return err
- }
- if !txnResp.Succeeded {
- return storage.NewKeyExistsError(key, 0)
- }
- if out != nil {
- putResp := txnResp.Responses[0].GetResponsePut()
- return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
- }
- return nil
- }
- // Delete implements storage.Interface.Delete.
- func (s *store) Delete(ctx context.Context, key string, out runtime.Object, precondtions *storage.Preconditions) error {
- v, err := conversion.EnforcePtr(out)
- if err != nil {
- panic("unable to convert output object to pointer")
- }
- key = keyWithPrefix(s.pathPrefix, key)
- if precondtions == nil {
- return s.unconditionalDelete(ctx, key, out)
- }
- return s.conditionalDelete(ctx, key, out, v, precondtions)
- }
- func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error {
- // We need to do get and delete in single transaction in order to
- // know the value and revision before deleting it.
- txnResp, err := s.client.KV.Txn(ctx).If().Then(
- clientv3.OpGet(key),
- clientv3.OpDelete(key),
- ).Commit()
- if err != nil {
- return err
- }
- getResp := txnResp.Responses[0].GetResponseRange()
- if len(getResp.Kvs) == 0 {
- return storage.NewKeyNotFoundError(key, 0)
- }
- kv := getResp.Kvs[0]
- return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision)
- }
- func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, precondtions *storage.Preconditions) error {
- getResp, err := s.client.KV.Get(ctx, key)
- if err != nil {
- return err
- }
- for {
- origState, err := s.getState(getResp, key, v, false)
- if err != nil {
- return err
- }
- if err := checkPreconditions(key, precondtions, origState.obj); err != nil {
- return err
- }
- txnResp, err := s.client.KV.Txn(ctx).If(
- clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
- ).Then(
- clientv3.OpDelete(key),
- ).Else(
- clientv3.OpGet(key),
- ).Commit()
- if err != nil {
- return err
- }
- if !txnResp.Succeeded {
- getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
- glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
- continue
- }
- return decode(s.codec, s.versioner, origState.data, out, origState.rev)
- }
- }
- // GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
- func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, precondtions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
- v, err := conversion.EnforcePtr(out)
- if err != nil {
- panic("unable to convert output object to pointer")
- }
- key = keyWithPrefix(s.pathPrefix, key)
- getResp, err := s.client.KV.Get(ctx, key)
- if err != nil {
- return err
- }
- for {
- origState, err := s.getState(getResp, key, v, ignoreNotFound)
- if err != nil {
- return err
- }
- if err := checkPreconditions(key, precondtions, origState.obj); err != nil {
- return err
- }
- ret, ttl, err := s.updateState(origState, tryUpdate)
- if err != nil {
- return err
- }
- data, err := runtime.Encode(s.codec, ret)
- if err != nil {
- return err
- }
- if bytes.Equal(data, origState.data) {
- return decode(s.codec, s.versioner, origState.data, out, origState.rev)
- }
- opts, err := s.ttlOpts(ctx, int64(ttl))
- if err != nil {
- return err
- }
- txnResp, err := s.client.KV.Txn(ctx).If(
- clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
- ).Then(
- clientv3.OpPut(key, string(data), opts...),
- ).Else(
- clientv3.OpGet(key),
- ).Commit()
- if err != nil {
- return err
- }
- if !txnResp.Succeeded {
- getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
- glog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
- continue
- }
- putResp := txnResp.Responses[0].GetResponsePut()
- return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
- }
- }
- // GetToList implements storage.Interface.GetToList.
- func (s *store) GetToList(ctx context.Context, key string, filter storage.Filter, listObj runtime.Object) error {
- listPtr, err := meta.GetItemsPtr(listObj)
- if err != nil {
- return err
- }
- key = keyWithPrefix(s.pathPrefix, key)
- getResp, err := s.client.KV.Get(ctx, key)
- if err != nil {
- return err
- }
- if len(getResp.Kvs) == 0 {
- return nil
- }
- elems := []*elemForDecode{{
- data: getResp.Kvs[0].Value,
- rev: uint64(getResp.Kvs[0].ModRevision),
- }}
- if err := decodeList(elems, filter, listPtr, s.codec, s.versioner); err != nil {
- return err
- }
- // update version with cluster level revision
- return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision))
- }
- // List implements storage.Interface.List.
- func (s *store) List(ctx context.Context, key, resourceVersion string, filter storage.Filter, listObj runtime.Object) error {
- listPtr, err := meta.GetItemsPtr(listObj)
- if err != nil {
- return err
- }
- key = keyWithPrefix(s.pathPrefix, key)
- // We need to make sure the key ended with "/" so that we only get children "directories".
- // e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
- // while with prefix "/a/" will return only "/a/b" which is the correct answer.
- if !strings.HasSuffix(key, "/") {
- key += "/"
- }
- getResp, err := s.client.KV.Get(ctx, key, clientv3.WithPrefix())
- if err != nil {
- return err
- }
- elems := make([]*elemForDecode, len(getResp.Kvs))
- for i, kv := range getResp.Kvs {
- elems[i] = &elemForDecode{
- data: kv.Value,
- rev: uint64(kv.ModRevision),
- }
- }
- if err := decodeList(elems, filter, listPtr, s.codec, s.versioner); err != nil {
- return err
- }
- // update version with cluster level revision
- return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision))
- }
- // Watch implements storage.Interface.Watch.
- func (s *store) Watch(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
- return s.watch(ctx, key, resourceVersion, filter, false)
- }
- // WatchList implements storage.Interface.WatchList.
- func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
- return s.watch(ctx, key, resourceVersion, filter, true)
- }
- func (s *store) watch(ctx context.Context, key string, rv string, filter storage.Filter, recursive bool) (watch.Interface, error) {
- rev, err := storage.ParseWatchResourceVersion(rv)
- if err != nil {
- return nil, err
- }
- key = keyWithPrefix(s.pathPrefix, key)
- return s.watcher.Watch(ctx, key, int64(rev), recursive, filter)
- }
- func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
- state := &objState{
- obj: reflect.New(v.Type()).Interface().(runtime.Object),
- meta: &storage.ResponseMeta{},
- }
- if len(getResp.Kvs) == 0 {
- if !ignoreNotFound {
- return nil, storage.NewKeyNotFoundError(key, 0)
- }
- if err := runtime.SetZeroValue(state.obj); err != nil {
- return nil, err
- }
- } else {
- state.rev = getResp.Kvs[0].ModRevision
- state.meta.ResourceVersion = uint64(state.rev)
- state.data = getResp.Kvs[0].Value
- if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
- return nil, err
- }
- }
- return state, nil
- }
- func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) {
- ret, ttlPtr, err := userUpdate(st.obj, *st.meta)
- if err != nil {
- return nil, 0, err
- }
- version, err := s.versioner.ObjectResourceVersion(ret)
- if err != nil {
- return nil, 0, err
- }
- if version != 0 {
- // We cannot store object with resourceVersion in etcd. We need to reset it.
- if err := s.versioner.UpdateObject(ret, 0); err != nil {
- return nil, 0, fmt.Errorf("UpdateObject failed: %v", err)
- }
- }
- var ttl uint64
- if ttlPtr != nil {
- ttl = *ttlPtr
- }
- return ret, ttl, nil
- }
- // ttlOpts returns client options based on given ttl.
- // ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
- func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) {
- if ttl == 0 {
- return nil, nil
- }
- // TODO: one lease per ttl key is expensive. Based on current use case, we can have a long window to
- // put keys within into same lease. We shall benchmark this and optimize the performance.
- lcr, err := s.client.Lease.Grant(ctx, ttl)
- if err != nil {
- return nil, err
- }
- return []clientv3.OpOption{clientv3.WithLease(clientv3.LeaseID(lcr.ID))}, nil
- }
- func keyWithPrefix(prefix, key string) string {
- if strings.HasPrefix(key, prefix) {
- return key
- }
- return path.Join(prefix, key)
- }
- // decode decodes value of bytes into object. It will also set the object resource version to rev.
- // On success, objPtr would be set to the object.
- func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
- if _, err := conversion.EnforcePtr(objPtr); err != nil {
- panic("unable to convert output object to pointer")
- }
- _, _, err := codec.Decode(value, nil, objPtr)
- if err != nil {
- return err
- }
- // being unable to set the version does not prevent the object from being extracted
- versioner.UpdateObject(objPtr, uint64(rev))
- return nil
- }
- // decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev.
- // On success, ListPtr would be set to the list of objects.
- func decodeList(elems []*elemForDecode, filter storage.Filter, ListPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error {
- v, err := conversion.EnforcePtr(ListPtr)
- if err != nil || v.Kind() != reflect.Slice {
- panic("need ptr to slice")
- }
- for _, elem := range elems {
- obj, _, err := codec.Decode(elem.data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
- if err != nil {
- return err
- }
- // being unable to set the version does not prevent the object from being extracted
- versioner.UpdateObject(obj, elem.rev)
- if filter.Filter(obj) {
- v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
- }
- }
- return nil
- }
- func checkPreconditions(key string, preconditions *storage.Preconditions, out runtime.Object) error {
- if preconditions == nil {
- return nil
- }
- objMeta, err := api.ObjectMetaFor(out)
- if err != nil {
- return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err)
- }
- if preconditions.UID != nil && *preconditions.UID != objMeta.UID {
- errMsg := fmt.Sprintf("Precondition failed: UID in precondition: %v, UID in object meta: %v", preconditions.UID, objMeta.UID)
- return storage.NewInvalidObjError(key, errMsg)
- }
- return nil
- }
- func notFound(key string) clientv3.Cmp {
- return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
- }
|