cacher_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. /*
  2. Copyright 2015 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package storage_test
  14. import (
  15. "fmt"
  16. "reflect"
  17. goruntime "runtime"
  18. "strconv"
  19. "testing"
  20. "time"
  21. "k8s.io/kubernetes/pkg/api"
  22. "k8s.io/kubernetes/pkg/api/errors"
  23. "k8s.io/kubernetes/pkg/api/meta"
  24. "k8s.io/kubernetes/pkg/api/testapi"
  25. apitesting "k8s.io/kubernetes/pkg/api/testing"
  26. "k8s.io/kubernetes/pkg/api/unversioned"
  27. "k8s.io/kubernetes/pkg/labels"
  28. "k8s.io/kubernetes/pkg/runtime"
  29. "k8s.io/kubernetes/pkg/storage"
  30. etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
  31. "k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
  32. etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
  33. "k8s.io/kubernetes/pkg/util/sets"
  34. "k8s.io/kubernetes/pkg/util/wait"
  35. "k8s.io/kubernetes/pkg/watch"
  36. "golang.org/x/net/context"
  37. )
  38. func newEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
  39. server := etcdtesting.NewEtcdTestClientServer(t)
  40. storage := etcdstorage.NewEtcdStorage(server.Client, codec, prefix, false, etcdtest.DeserializationCacheSize)
  41. return server, storage
  42. }
  43. func newTestCacher(s storage.Interface) *storage.Cacher {
  44. prefix := "pods"
  45. config := storage.CacherConfig{
  46. CacheCapacity: 10,
  47. Storage: s,
  48. Versioner: etcdstorage.APIObjectVersioner{},
  49. Type: &api.Pod{},
  50. ResourcePrefix: prefix,
  51. KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
  52. NewListFunc: func() runtime.Object { return &api.PodList{} },
  53. Codec: testapi.Default.Codec(),
  54. }
  55. return storage.NewCacherFromConfig(config)
  56. }
  57. func makeTestPod(name string) *api.Pod {
  58. return &api.Pod{
  59. ObjectMeta: api.ObjectMeta{Namespace: "ns", Name: name},
  60. Spec: apitesting.DeepEqualSafePodSpec(),
  61. }
  62. }
  63. func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
  64. updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
  65. newObj, err := api.Scheme.DeepCopy(obj)
  66. if err != nil {
  67. t.Errorf("unexpected error: %v", err)
  68. return nil, nil, err
  69. }
  70. return newObj.(*api.Pod), nil, nil
  71. }
  72. key := etcdtest.AddPrefix("pods/" + obj.Namespace + "/" + obj.Name)
  73. if err := s.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, old == nil, nil, updateFn); err != nil {
  74. t.Errorf("unexpected error: %v", err)
  75. }
  76. obj.ResourceVersion = ""
  77. result := &api.Pod{}
  78. if err := s.Get(context.TODO(), key, result, false); err != nil {
  79. t.Errorf("unexpected error: %v", err)
  80. }
  81. return result
  82. }
  83. func TestList(t *testing.T) {
  84. server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
  85. defer server.Terminate(t)
  86. cacher := newTestCacher(etcdStorage)
  87. defer cacher.Stop()
  88. podFoo := makeTestPod("foo")
  89. podBar := makeTestPod("bar")
  90. podBaz := makeTestPod("baz")
  91. podFooPrime := makeTestPod("foo")
  92. podFooPrime.Spec.NodeName = "fakeNode"
  93. fooCreated := updatePod(t, etcdStorage, podFoo, nil)
  94. _ = updatePod(t, etcdStorage, podBar, nil)
  95. _ = updatePod(t, etcdStorage, podBaz, nil)
  96. _ = updatePod(t, etcdStorage, podFooPrime, fooCreated)
  97. // Create a pod in a namespace that contains "ns" as a prefix
  98. // Make sure it is not returned in a watch of "ns"
  99. podFooNS2 := makeTestPod("foo")
  100. podFooNS2.Namespace += "2"
  101. updatePod(t, etcdStorage, podFooNS2, nil)
  102. deleted := api.Pod{}
  103. if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/bar"), &deleted, nil); err != nil {
  104. t.Errorf("Unexpected error: %v", err)
  105. }
  106. // We first List directly from etcd by passing empty resourceVersion,
  107. // to get the current etcd resourceVersion.
  108. rvResult := &api.PodList{}
  109. if err := cacher.List(context.TODO(), "pods/ns", "", storage.Everything, rvResult); err != nil {
  110. t.Errorf("Unexpected error: %v", err)
  111. }
  112. deletedPodRV := rvResult.ListMeta.ResourceVersion
  113. result := &api.PodList{}
  114. // We pass the current etcd ResourceVersion received from the above List() operation,
  115. // since there is not easy way to get ResourceVersion of barPod deletion operation.
  116. if err := cacher.List(context.TODO(), "pods/ns", deletedPodRV, storage.Everything, result); err != nil {
  117. t.Errorf("Unexpected error: %v", err)
  118. }
  119. if result.ListMeta.ResourceVersion != deletedPodRV {
  120. t.Errorf("Incorrect resource version: %v", result.ListMeta.ResourceVersion)
  121. }
  122. if len(result.Items) != 2 {
  123. t.Errorf("Unexpected list result: %d", len(result.Items))
  124. }
  125. keys := sets.String{}
  126. for _, item := range result.Items {
  127. keys.Insert(item.Name)
  128. }
  129. if !keys.HasAll("foo", "baz") {
  130. t.Errorf("Unexpected list result: %#v", result)
  131. }
  132. for _, item := range result.Items {
  133. // unset fields that are set by the infrastructure
  134. item.ResourceVersion = ""
  135. item.CreationTimestamp = unversioned.Time{}
  136. if item.Namespace != "ns" {
  137. t.Errorf("Unexpected namespace: %s", item.Namespace)
  138. }
  139. var expected *api.Pod
  140. switch item.Name {
  141. case "foo":
  142. expected = podFooPrime
  143. case "baz":
  144. expected = podBaz
  145. default:
  146. t.Errorf("Unexpected item: %v", item)
  147. }
  148. if e, a := *expected, item; !reflect.DeepEqual(e, a) {
  149. t.Errorf("Expected: %#v, got: %#v", e, a)
  150. }
  151. }
  152. }
  153. func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
  154. _, _, line, _ := goruntime.Caller(1)
  155. select {
  156. case event := <-w.ResultChan():
  157. if e, a := eventType, event.Type; e != a {
  158. t.Logf("(called from line %d)", line)
  159. t.Errorf("Expected: %s, got: %s", eventType, event.Type)
  160. }
  161. if e, a := eventObject, event.Object; !api.Semantic.DeepDerivative(e, a) {
  162. t.Logf("(called from line %d)", line)
  163. t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
  164. }
  165. case <-time.After(wait.ForeverTestTimeout):
  166. t.Logf("(called from line %d)", line)
  167. t.Errorf("Timed out waiting for an event")
  168. }
  169. }
  170. type injectListError struct {
  171. errors int
  172. storage.Interface
  173. }
  174. func (self *injectListError) List(ctx context.Context, key string, resourceVersion string, filter storage.Filter, listObj runtime.Object) error {
  175. if self.errors > 0 {
  176. self.errors--
  177. return fmt.Errorf("injected error")
  178. }
  179. return self.Interface.List(ctx, key, resourceVersion, filter, listObj)
  180. }
  181. func TestWatch(t *testing.T) {
  182. server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
  183. // Inject one list error to make sure we test the relist case.
  184. etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
  185. defer server.Terminate(t)
  186. cacher := newTestCacher(etcdStorage)
  187. defer cacher.Stop()
  188. podFoo := makeTestPod("foo")
  189. podBar := makeTestPod("bar")
  190. podFooPrime := makeTestPod("foo")
  191. podFooPrime.Spec.NodeName = "fakeNode"
  192. podFooBis := makeTestPod("foo")
  193. podFooBis.Spec.NodeName = "anotherFakeNode"
  194. podFooNS2 := makeTestPod("foo")
  195. podFooNS2.Namespace += "2"
  196. // initialVersion is used to initate the watcher at the beginning of the world,
  197. // which is not defined precisely in etcd.
  198. initialVersion, err := cacher.LastSyncResourceVersion()
  199. if err != nil {
  200. t.Fatalf("Unexpected error: %v", err)
  201. }
  202. startVersion := strconv.Itoa(int(initialVersion))
  203. // Set up Watch for object "podFoo".
  204. watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
  205. if err != nil {
  206. t.Fatalf("Unexpected error: %v", err)
  207. }
  208. defer watcher.Stop()
  209. // Create in another namespace first to make sure events from other namespaces don't get delivered
  210. updatePod(t, etcdStorage, podFooNS2, nil)
  211. fooCreated := updatePod(t, etcdStorage, podFoo, nil)
  212. _ = updatePod(t, etcdStorage, podBar, nil)
  213. fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated)
  214. verifyWatchEvent(t, watcher, watch.Added, podFoo)
  215. verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
  216. // Check whether we get too-old error via the watch channel
  217. tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
  218. if err != nil {
  219. t.Fatalf("Expected no direct error, got %v", err)
  220. }
  221. defer tooOldWatcher.Stop()
  222. // Ensure we get a "Gone" error
  223. expectedGoneError := errors.NewGone("").ErrStatus
  224. verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedGoneError)
  225. initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
  226. if err != nil {
  227. t.Fatalf("Unexpected error: %v", err)
  228. }
  229. defer initialWatcher.Stop()
  230. verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
  231. // Now test watch from "now".
  232. nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything)
  233. if err != nil {
  234. t.Fatalf("Unexpected error: %v", err)
  235. }
  236. defer nowWatcher.Stop()
  237. verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime)
  238. _ = updatePod(t, etcdStorage, podFooBis, fooUpdated)
  239. verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
  240. }
  241. func TestWatcherTimeout(t *testing.T) {
  242. server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
  243. defer server.Terminate(t)
  244. cacher := newTestCacher(etcdStorage)
  245. defer cacher.Stop()
  246. // initialVersion is used to initate the watcher at the beginning of the world,
  247. // which is not defined precisely in etcd.
  248. initialVersion, err := cacher.LastSyncResourceVersion()
  249. if err != nil {
  250. t.Fatalf("Unexpected error: %v", err)
  251. }
  252. startVersion := strconv.Itoa(int(initialVersion))
  253. // Create a watcher that will not be reading any result.
  254. watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
  255. if err != nil {
  256. t.Fatalf("Unexpected error: %v", err)
  257. }
  258. defer watcher.Stop()
  259. // Create a second watcher that will be reading result.
  260. readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
  261. if err != nil {
  262. t.Fatalf("Unexpected error: %v", err)
  263. }
  264. defer readingWatcher.Stop()
  265. for i := 1; i <= 22; i++ {
  266. pod := makeTestPod(strconv.Itoa(i))
  267. _ = updatePod(t, etcdStorage, pod, nil)
  268. verifyWatchEvent(t, readingWatcher, watch.Added, pod)
  269. }
  270. }
  271. func TestFiltering(t *testing.T) {
  272. server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
  273. defer server.Terminate(t)
  274. cacher := newTestCacher(etcdStorage)
  275. defer cacher.Stop()
  276. // Ensure that the cacher is initialized, before creating any pods,
  277. // so that we are sure that all events will be present in cacher.
  278. syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything)
  279. if err != nil {
  280. t.Fatalf("Unexpected error: %v", err)
  281. }
  282. syncWatcher.Stop()
  283. podFoo := makeTestPod("foo")
  284. podFoo.Labels = map[string]string{"filter": "foo"}
  285. podFooFiltered := makeTestPod("foo")
  286. podFooPrime := makeTestPod("foo")
  287. podFooPrime.Labels = map[string]string{"filter": "foo"}
  288. podFooPrime.Spec.NodeName = "fakeNode"
  289. podFooNS2 := makeTestPod("foo")
  290. podFooNS2.Namespace += "2"
  291. podFooNS2.Labels = map[string]string{"filter": "foo"}
  292. // Create in another namespace first to make sure events from other namespaces don't get delivered
  293. updatePod(t, etcdStorage, podFooNS2, nil)
  294. fooCreated := updatePod(t, etcdStorage, podFoo, nil)
  295. fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated)
  296. fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered)
  297. _ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered)
  298. deleted := api.Pod{}
  299. if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/foo"), &deleted, nil); err != nil {
  300. t.Errorf("Unexpected error: %v", err)
  301. }
  302. // Set up Watch for object "podFoo" with label filter set.
  303. selector := labels.SelectorFromSet(labels.Set{"filter": "foo"})
  304. filterFunc := func(obj runtime.Object) bool {
  305. metadata, err := meta.Accessor(obj)
  306. if err != nil {
  307. t.Errorf("Unexpected error: %v", err)
  308. return false
  309. }
  310. return selector.Matches(labels.Set(metadata.GetLabels()))
  311. }
  312. filter := storage.NewSimpleFilter(filterFunc, storage.NoTriggerFunc)
  313. watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, filter)
  314. if err != nil {
  315. t.Fatalf("Unexpected error: %v", err)
  316. }
  317. defer watcher.Stop()
  318. verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered)
  319. verifyWatchEvent(t, watcher, watch.Added, podFoo)
  320. verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
  321. verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
  322. }
  323. func TestStartingResourceVersion(t *testing.T) {
  324. server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
  325. defer server.Terminate(t)
  326. cacher := newTestCacher(etcdStorage)
  327. defer cacher.Stop()
  328. // add 1 object
  329. podFoo := makeTestPod("foo")
  330. fooCreated := updatePod(t, etcdStorage, podFoo, nil)
  331. // Set up Watch starting at fooCreated.ResourceVersion + 10
  332. rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
  333. if err != nil {
  334. t.Fatalf("Unexpected error: %v", err)
  335. }
  336. rv += 10
  337. startVersion := strconv.Itoa(int(rv))
  338. watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
  339. if err != nil {
  340. t.Fatalf("Unexpected error: %v", err)
  341. }
  342. defer watcher.Stop()
  343. lastFoo := fooCreated
  344. for i := 0; i < 11; i++ {
  345. podFooForUpdate := makeTestPod("foo")
  346. podFooForUpdate.Labels = map[string]string{"foo": strconv.Itoa(i)}
  347. lastFoo = updatePod(t, etcdStorage, podFooForUpdate, lastFoo)
  348. }
  349. select {
  350. case e := <-watcher.ResultChan():
  351. pod := e.Object.(*api.Pod)
  352. podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion)
  353. if err != nil {
  354. t.Fatalf("unexpected error: %v", err)
  355. }
  356. // event should have at least rv + 1, since we're starting the watch at rv
  357. if podRV <= rv {
  358. t.Errorf("expected event with resourceVersion of at least %d, got %d", rv+1, podRV)
  359. }
  360. case <-time.After(wait.ForeverTestTimeout):
  361. t.Errorf("timed out waiting for event")
  362. }
  363. }