store.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455
  1. /*
  2. Copyright 2016 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd3
  14. import (
  15. "bytes"
  16. "errors"
  17. "fmt"
  18. "path"
  19. "reflect"
  20. "strings"
  21. "k8s.io/kubernetes/pkg/api"
  22. "k8s.io/kubernetes/pkg/api/meta"
  23. "k8s.io/kubernetes/pkg/conversion"
  24. "k8s.io/kubernetes/pkg/runtime"
  25. "k8s.io/kubernetes/pkg/storage"
  26. "k8s.io/kubernetes/pkg/storage/etcd"
  27. "k8s.io/kubernetes/pkg/watch"
  28. "github.com/coreos/etcd/clientv3"
  29. "github.com/golang/glog"
  30. "golang.org/x/net/context"
  31. )
  32. type store struct {
  33. client *clientv3.Client
  34. codec runtime.Codec
  35. versioner storage.Versioner
  36. pathPrefix string
  37. watcher *watcher
  38. }
  39. type elemForDecode struct {
  40. data []byte
  41. rev uint64
  42. }
  43. type objState struct {
  44. obj runtime.Object
  45. meta *storage.ResponseMeta
  46. rev int64
  47. data []byte
  48. }
  49. // New returns an etcd3 implementation of storage.Interface.
  50. func New(c *clientv3.Client, codec runtime.Codec, prefix string) storage.Interface {
  51. return newStore(c, codec, prefix)
  52. }
  53. func newStore(c *clientv3.Client, codec runtime.Codec, prefix string) *store {
  54. versioner := etcd.APIObjectVersioner{}
  55. return &store{
  56. client: c,
  57. versioner: versioner,
  58. codec: codec,
  59. pathPrefix: prefix,
  60. watcher: newWatcher(c, codec, versioner),
  61. }
  62. }
  63. // Versioner implements storage.Interface.Versioner.
  64. func (s *store) Versioner() storage.Versioner {
  65. return s.versioner
  66. }
  67. // Get implements storage.Interface.Get.
  68. func (s *store) Get(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool) error {
  69. key = keyWithPrefix(s.pathPrefix, key)
  70. getResp, err := s.client.KV.Get(ctx, key)
  71. if err != nil {
  72. return err
  73. }
  74. if len(getResp.Kvs) == 0 {
  75. if ignoreNotFound {
  76. return runtime.SetZeroValue(out)
  77. }
  78. return storage.NewKeyNotFoundError(key, 0)
  79. }
  80. kv := getResp.Kvs[0]
  81. return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision)
  82. }
  83. // Create implements storage.Interface.Create.
  84. func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
  85. if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
  86. return errors.New("resourceVersion should not be set on objects to be created")
  87. }
  88. data, err := runtime.Encode(s.codec, obj)
  89. if err != nil {
  90. return err
  91. }
  92. key = keyWithPrefix(s.pathPrefix, key)
  93. opts, err := s.ttlOpts(ctx, int64(ttl))
  94. if err != nil {
  95. return err
  96. }
  97. txnResp, err := s.client.KV.Txn(ctx).If(
  98. notFound(key),
  99. ).Then(
  100. clientv3.OpPut(key, string(data), opts...),
  101. ).Commit()
  102. if err != nil {
  103. return err
  104. }
  105. if !txnResp.Succeeded {
  106. return storage.NewKeyExistsError(key, 0)
  107. }
  108. if out != nil {
  109. putResp := txnResp.Responses[0].GetResponsePut()
  110. return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
  111. }
  112. return nil
  113. }
  114. // Delete implements storage.Interface.Delete.
  115. func (s *store) Delete(ctx context.Context, key string, out runtime.Object, precondtions *storage.Preconditions) error {
  116. v, err := conversion.EnforcePtr(out)
  117. if err != nil {
  118. panic("unable to convert output object to pointer")
  119. }
  120. key = keyWithPrefix(s.pathPrefix, key)
  121. if precondtions == nil {
  122. return s.unconditionalDelete(ctx, key, out)
  123. }
  124. return s.conditionalDelete(ctx, key, out, v, precondtions)
  125. }
  126. func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error {
  127. // We need to do get and delete in single transaction in order to
  128. // know the value and revision before deleting it.
  129. txnResp, err := s.client.KV.Txn(ctx).If().Then(
  130. clientv3.OpGet(key),
  131. clientv3.OpDelete(key),
  132. ).Commit()
  133. if err != nil {
  134. return err
  135. }
  136. getResp := txnResp.Responses[0].GetResponseRange()
  137. if len(getResp.Kvs) == 0 {
  138. return storage.NewKeyNotFoundError(key, 0)
  139. }
  140. kv := getResp.Kvs[0]
  141. return decode(s.codec, s.versioner, kv.Value, out, kv.ModRevision)
  142. }
  143. func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, precondtions *storage.Preconditions) error {
  144. getResp, err := s.client.KV.Get(ctx, key)
  145. if err != nil {
  146. return err
  147. }
  148. for {
  149. origState, err := s.getState(getResp, key, v, false)
  150. if err != nil {
  151. return err
  152. }
  153. if err := checkPreconditions(key, precondtions, origState.obj); err != nil {
  154. return err
  155. }
  156. txnResp, err := s.client.KV.Txn(ctx).If(
  157. clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
  158. ).Then(
  159. clientv3.OpDelete(key),
  160. ).Else(
  161. clientv3.OpGet(key),
  162. ).Commit()
  163. if err != nil {
  164. return err
  165. }
  166. if !txnResp.Succeeded {
  167. getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
  168. glog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
  169. continue
  170. }
  171. return decode(s.codec, s.versioner, origState.data, out, origState.rev)
  172. }
  173. }
  174. // GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
  175. func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, precondtions *storage.Preconditions, tryUpdate storage.UpdateFunc) error {
  176. v, err := conversion.EnforcePtr(out)
  177. if err != nil {
  178. panic("unable to convert output object to pointer")
  179. }
  180. key = keyWithPrefix(s.pathPrefix, key)
  181. getResp, err := s.client.KV.Get(ctx, key)
  182. if err != nil {
  183. return err
  184. }
  185. for {
  186. origState, err := s.getState(getResp, key, v, ignoreNotFound)
  187. if err != nil {
  188. return err
  189. }
  190. if err := checkPreconditions(key, precondtions, origState.obj); err != nil {
  191. return err
  192. }
  193. ret, ttl, err := s.updateState(origState, tryUpdate)
  194. if err != nil {
  195. return err
  196. }
  197. data, err := runtime.Encode(s.codec, ret)
  198. if err != nil {
  199. return err
  200. }
  201. if bytes.Equal(data, origState.data) {
  202. return decode(s.codec, s.versioner, origState.data, out, origState.rev)
  203. }
  204. opts, err := s.ttlOpts(ctx, int64(ttl))
  205. if err != nil {
  206. return err
  207. }
  208. txnResp, err := s.client.KV.Txn(ctx).If(
  209. clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
  210. ).Then(
  211. clientv3.OpPut(key, string(data), opts...),
  212. ).Else(
  213. clientv3.OpGet(key),
  214. ).Commit()
  215. if err != nil {
  216. return err
  217. }
  218. if !txnResp.Succeeded {
  219. getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
  220. glog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
  221. continue
  222. }
  223. putResp := txnResp.Responses[0].GetResponsePut()
  224. return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
  225. }
  226. }
  227. // GetToList implements storage.Interface.GetToList.
  228. func (s *store) GetToList(ctx context.Context, key string, filter storage.Filter, listObj runtime.Object) error {
  229. listPtr, err := meta.GetItemsPtr(listObj)
  230. if err != nil {
  231. return err
  232. }
  233. key = keyWithPrefix(s.pathPrefix, key)
  234. getResp, err := s.client.KV.Get(ctx, key)
  235. if err != nil {
  236. return err
  237. }
  238. if len(getResp.Kvs) == 0 {
  239. return nil
  240. }
  241. elems := []*elemForDecode{{
  242. data: getResp.Kvs[0].Value,
  243. rev: uint64(getResp.Kvs[0].ModRevision),
  244. }}
  245. if err := decodeList(elems, filter, listPtr, s.codec, s.versioner); err != nil {
  246. return err
  247. }
  248. // update version with cluster level revision
  249. return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision))
  250. }
  251. // List implements storage.Interface.List.
  252. func (s *store) List(ctx context.Context, key, resourceVersion string, filter storage.Filter, listObj runtime.Object) error {
  253. listPtr, err := meta.GetItemsPtr(listObj)
  254. if err != nil {
  255. return err
  256. }
  257. key = keyWithPrefix(s.pathPrefix, key)
  258. // We need to make sure the key ended with "/" so that we only get children "directories".
  259. // e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
  260. // while with prefix "/a/" will return only "/a/b" which is the correct answer.
  261. if !strings.HasSuffix(key, "/") {
  262. key += "/"
  263. }
  264. getResp, err := s.client.KV.Get(ctx, key, clientv3.WithPrefix())
  265. if err != nil {
  266. return err
  267. }
  268. elems := make([]*elemForDecode, len(getResp.Kvs))
  269. for i, kv := range getResp.Kvs {
  270. elems[i] = &elemForDecode{
  271. data: kv.Value,
  272. rev: uint64(kv.ModRevision),
  273. }
  274. }
  275. if err := decodeList(elems, filter, listPtr, s.codec, s.versioner); err != nil {
  276. return err
  277. }
  278. // update version with cluster level revision
  279. return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision))
  280. }
  281. // Watch implements storage.Interface.Watch.
  282. func (s *store) Watch(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
  283. return s.watch(ctx, key, resourceVersion, filter, false)
  284. }
  285. // WatchList implements storage.Interface.WatchList.
  286. func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.Filter) (watch.Interface, error) {
  287. return s.watch(ctx, key, resourceVersion, filter, true)
  288. }
  289. func (s *store) watch(ctx context.Context, key string, rv string, filter storage.Filter, recursive bool) (watch.Interface, error) {
  290. rev, err := storage.ParseWatchResourceVersion(rv)
  291. if err != nil {
  292. return nil, err
  293. }
  294. key = keyWithPrefix(s.pathPrefix, key)
  295. return s.watcher.Watch(ctx, key, int64(rev), recursive, filter)
  296. }
  297. func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
  298. state := &objState{
  299. obj: reflect.New(v.Type()).Interface().(runtime.Object),
  300. meta: &storage.ResponseMeta{},
  301. }
  302. if len(getResp.Kvs) == 0 {
  303. if !ignoreNotFound {
  304. return nil, storage.NewKeyNotFoundError(key, 0)
  305. }
  306. if err := runtime.SetZeroValue(state.obj); err != nil {
  307. return nil, err
  308. }
  309. } else {
  310. state.rev = getResp.Kvs[0].ModRevision
  311. state.meta.ResourceVersion = uint64(state.rev)
  312. state.data = getResp.Kvs[0].Value
  313. if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
  314. return nil, err
  315. }
  316. }
  317. return state, nil
  318. }
  319. func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) {
  320. ret, ttlPtr, err := userUpdate(st.obj, *st.meta)
  321. if err != nil {
  322. return nil, 0, err
  323. }
  324. version, err := s.versioner.ObjectResourceVersion(ret)
  325. if err != nil {
  326. return nil, 0, err
  327. }
  328. if version != 0 {
  329. // We cannot store object with resourceVersion in etcd. We need to reset it.
  330. if err := s.versioner.UpdateObject(ret, 0); err != nil {
  331. return nil, 0, fmt.Errorf("UpdateObject failed: %v", err)
  332. }
  333. }
  334. var ttl uint64
  335. if ttlPtr != nil {
  336. ttl = *ttlPtr
  337. }
  338. return ret, ttl, nil
  339. }
  340. // ttlOpts returns client options based on given ttl.
  341. // ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
  342. func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) {
  343. if ttl == 0 {
  344. return nil, nil
  345. }
  346. // TODO: one lease per ttl key is expensive. Based on current use case, we can have a long window to
  347. // put keys within into same lease. We shall benchmark this and optimize the performance.
  348. lcr, err := s.client.Lease.Grant(ctx, ttl)
  349. if err != nil {
  350. return nil, err
  351. }
  352. return []clientv3.OpOption{clientv3.WithLease(clientv3.LeaseID(lcr.ID))}, nil
  353. }
  354. func keyWithPrefix(prefix, key string) string {
  355. if strings.HasPrefix(key, prefix) {
  356. return key
  357. }
  358. return path.Join(prefix, key)
  359. }
  360. // decode decodes value of bytes into object. It will also set the object resource version to rev.
  361. // On success, objPtr would be set to the object.
  362. func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
  363. if _, err := conversion.EnforcePtr(objPtr); err != nil {
  364. panic("unable to convert output object to pointer")
  365. }
  366. _, _, err := codec.Decode(value, nil, objPtr)
  367. if err != nil {
  368. return err
  369. }
  370. // being unable to set the version does not prevent the object from being extracted
  371. versioner.UpdateObject(objPtr, uint64(rev))
  372. return nil
  373. }
  374. // decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev.
  375. // On success, ListPtr would be set to the list of objects.
  376. func decodeList(elems []*elemForDecode, filter storage.Filter, ListPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error {
  377. v, err := conversion.EnforcePtr(ListPtr)
  378. if err != nil || v.Kind() != reflect.Slice {
  379. panic("need ptr to slice")
  380. }
  381. for _, elem := range elems {
  382. obj, _, err := codec.Decode(elem.data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
  383. if err != nil {
  384. return err
  385. }
  386. // being unable to set the version does not prevent the object from being extracted
  387. versioner.UpdateObject(obj, elem.rev)
  388. if filter.Filter(obj) {
  389. v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
  390. }
  391. }
  392. return nil
  393. }
  394. func checkPreconditions(key string, preconditions *storage.Preconditions, out runtime.Object) error {
  395. if preconditions == nil {
  396. return nil
  397. }
  398. objMeta, err := api.ObjectMetaFor(out)
  399. if err != nil {
  400. return storage.NewInternalErrorf("can't enforce preconditions %v on un-introspectable object %v, got error: %v", *preconditions, out, err)
  401. }
  402. if preconditions.UID != nil && *preconditions.UID != objMeta.UID {
  403. errMsg := fmt.Sprintf("Precondition failed: UID in precondition: %v, UID in object meta: %v", preconditions.UID, objMeta.UID)
  404. return storage.NewInvalidObjError(key, errMsg)
  405. }
  406. return nil
  407. }
  408. func notFound(key string) clientv3.Cmp {
  409. return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
  410. }