123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package storage_test
- import (
- "fmt"
- "reflect"
- goruntime "runtime"
- "strconv"
- "testing"
- "time"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/api/errors"
- "k8s.io/kubernetes/pkg/api/meta"
- "k8s.io/kubernetes/pkg/api/testapi"
- apitesting "k8s.io/kubernetes/pkg/api/testing"
- "k8s.io/kubernetes/pkg/api/unversioned"
- "k8s.io/kubernetes/pkg/labels"
- "k8s.io/kubernetes/pkg/runtime"
- "k8s.io/kubernetes/pkg/storage"
- etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
- "k8s.io/kubernetes/pkg/storage/etcd/etcdtest"
- etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
- "k8s.io/kubernetes/pkg/util/sets"
- "k8s.io/kubernetes/pkg/util/wait"
- "k8s.io/kubernetes/pkg/watch"
- "golang.org/x/net/context"
- )
- func newEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
- server := etcdtesting.NewEtcdTestClientServer(t)
- storage := etcdstorage.NewEtcdStorage(server.Client, codec, prefix, false, etcdtest.DeserializationCacheSize)
- return server, storage
- }
- func newTestCacher(s storage.Interface) *storage.Cacher {
- prefix := "pods"
- config := storage.CacherConfig{
- CacheCapacity: 10,
- Storage: s,
- Versioner: etcdstorage.APIObjectVersioner{},
- Type: &api.Pod{},
- ResourcePrefix: prefix,
- KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
- NewListFunc: func() runtime.Object { return &api.PodList{} },
- Codec: testapi.Default.Codec(),
- }
- return storage.NewCacherFromConfig(config)
- }
- func makeTestPod(name string) *api.Pod {
- return &api.Pod{
- ObjectMeta: api.ObjectMeta{Namespace: "ns", Name: name},
- Spec: apitesting.DeepEqualSafePodSpec(),
- }
- }
- func updatePod(t *testing.T, s storage.Interface, obj, old *api.Pod) *api.Pod {
- updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
- newObj, err := api.Scheme.DeepCopy(obj)
- if err != nil {
- t.Errorf("unexpected error: %v", err)
- return nil, nil, err
- }
- return newObj.(*api.Pod), nil, nil
- }
- key := etcdtest.AddPrefix("pods/" + obj.Namespace + "/" + obj.Name)
- if err := s.GuaranteedUpdate(context.TODO(), key, &api.Pod{}, old == nil, nil, updateFn); err != nil {
- t.Errorf("unexpected error: %v", err)
- }
- obj.ResourceVersion = ""
- result := &api.Pod{}
- if err := s.Get(context.TODO(), key, result, false); err != nil {
- t.Errorf("unexpected error: %v", err)
- }
- return result
- }
- func TestList(t *testing.T) {
- server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
- defer server.Terminate(t)
- cacher := newTestCacher(etcdStorage)
- defer cacher.Stop()
- podFoo := makeTestPod("foo")
- podBar := makeTestPod("bar")
- podBaz := makeTestPod("baz")
- podFooPrime := makeTestPod("foo")
- podFooPrime.Spec.NodeName = "fakeNode"
- fooCreated := updatePod(t, etcdStorage, podFoo, nil)
- _ = updatePod(t, etcdStorage, podBar, nil)
- _ = updatePod(t, etcdStorage, podBaz, nil)
- _ = updatePod(t, etcdStorage, podFooPrime, fooCreated)
- // Create a pod in a namespace that contains "ns" as a prefix
- // Make sure it is not returned in a watch of "ns"
- podFooNS2 := makeTestPod("foo")
- podFooNS2.Namespace += "2"
- updatePod(t, etcdStorage, podFooNS2, nil)
- deleted := api.Pod{}
- if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/bar"), &deleted, nil); err != nil {
- t.Errorf("Unexpected error: %v", err)
- }
- // We first List directly from etcd by passing empty resourceVersion,
- // to get the current etcd resourceVersion.
- rvResult := &api.PodList{}
- if err := cacher.List(context.TODO(), "pods/ns", "", storage.Everything, rvResult); err != nil {
- t.Errorf("Unexpected error: %v", err)
- }
- deletedPodRV := rvResult.ListMeta.ResourceVersion
- result := &api.PodList{}
- // We pass the current etcd ResourceVersion received from the above List() operation,
- // since there is not easy way to get ResourceVersion of barPod deletion operation.
- if err := cacher.List(context.TODO(), "pods/ns", deletedPodRV, storage.Everything, result); err != nil {
- t.Errorf("Unexpected error: %v", err)
- }
- if result.ListMeta.ResourceVersion != deletedPodRV {
- t.Errorf("Incorrect resource version: %v", result.ListMeta.ResourceVersion)
- }
- if len(result.Items) != 2 {
- t.Errorf("Unexpected list result: %d", len(result.Items))
- }
- keys := sets.String{}
- for _, item := range result.Items {
- keys.Insert(item.Name)
- }
- if !keys.HasAll("foo", "baz") {
- t.Errorf("Unexpected list result: %#v", result)
- }
- for _, item := range result.Items {
- // unset fields that are set by the infrastructure
- item.ResourceVersion = ""
- item.CreationTimestamp = unversioned.Time{}
- if item.Namespace != "ns" {
- t.Errorf("Unexpected namespace: %s", item.Namespace)
- }
- var expected *api.Pod
- switch item.Name {
- case "foo":
- expected = podFooPrime
- case "baz":
- expected = podBaz
- default:
- t.Errorf("Unexpected item: %v", item)
- }
- if e, a := *expected, item; !reflect.DeepEqual(e, a) {
- t.Errorf("Expected: %#v, got: %#v", e, a)
- }
- }
- }
- func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
- _, _, line, _ := goruntime.Caller(1)
- select {
- case event := <-w.ResultChan():
- if e, a := eventType, event.Type; e != a {
- t.Logf("(called from line %d)", line)
- t.Errorf("Expected: %s, got: %s", eventType, event.Type)
- }
- if e, a := eventObject, event.Object; !api.Semantic.DeepDerivative(e, a) {
- t.Logf("(called from line %d)", line)
- t.Errorf("Expected (%s): %#v, got: %#v", eventType, e, a)
- }
- case <-time.After(wait.ForeverTestTimeout):
- t.Logf("(called from line %d)", line)
- t.Errorf("Timed out waiting for an event")
- }
- }
- type injectListError struct {
- errors int
- storage.Interface
- }
- func (self *injectListError) List(ctx context.Context, key string, resourceVersion string, filter storage.Filter, listObj runtime.Object) error {
- if self.errors > 0 {
- self.errors--
- return fmt.Errorf("injected error")
- }
- return self.Interface.List(ctx, key, resourceVersion, filter, listObj)
- }
- func TestWatch(t *testing.T) {
- server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
- // Inject one list error to make sure we test the relist case.
- etcdStorage = &injectListError{errors: 1, Interface: etcdStorage}
- defer server.Terminate(t)
- cacher := newTestCacher(etcdStorage)
- defer cacher.Stop()
- podFoo := makeTestPod("foo")
- podBar := makeTestPod("bar")
- podFooPrime := makeTestPod("foo")
- podFooPrime.Spec.NodeName = "fakeNode"
- podFooBis := makeTestPod("foo")
- podFooBis.Spec.NodeName = "anotherFakeNode"
- podFooNS2 := makeTestPod("foo")
- podFooNS2.Namespace += "2"
- // initialVersion is used to initate the watcher at the beginning of the world,
- // which is not defined precisely in etcd.
- initialVersion, err := cacher.LastSyncResourceVersion()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- startVersion := strconv.Itoa(int(initialVersion))
- // Set up Watch for object "podFoo".
- watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- defer watcher.Stop()
- // Create in another namespace first to make sure events from other namespaces don't get delivered
- updatePod(t, etcdStorage, podFooNS2, nil)
- fooCreated := updatePod(t, etcdStorage, podFoo, nil)
- _ = updatePod(t, etcdStorage, podBar, nil)
- fooUpdated := updatePod(t, etcdStorage, podFooPrime, fooCreated)
- verifyWatchEvent(t, watcher, watch.Added, podFoo)
- verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
- // Check whether we get too-old error via the watch channel
- tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
- if err != nil {
- t.Fatalf("Expected no direct error, got %v", err)
- }
- defer tooOldWatcher.Stop()
- // Ensure we get a "Gone" error
- expectedGoneError := errors.NewGone("").ErrStatus
- verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedGoneError)
- initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- defer initialWatcher.Stop()
- verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
- // Now test watch from "now".
- nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything)
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- defer nowWatcher.Stop()
- verifyWatchEvent(t, nowWatcher, watch.Added, podFooPrime)
- _ = updatePod(t, etcdStorage, podFooBis, fooUpdated)
- verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
- }
- func TestWatcherTimeout(t *testing.T) {
- server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
- defer server.Terminate(t)
- cacher := newTestCacher(etcdStorage)
- defer cacher.Stop()
- // initialVersion is used to initate the watcher at the beginning of the world,
- // which is not defined precisely in etcd.
- initialVersion, err := cacher.LastSyncResourceVersion()
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- startVersion := strconv.Itoa(int(initialVersion))
- // Create a watcher that will not be reading any result.
- watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- defer watcher.Stop()
- // Create a second watcher that will be reading result.
- readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- defer readingWatcher.Stop()
- for i := 1; i <= 22; i++ {
- pod := makeTestPod(strconv.Itoa(i))
- _ = updatePod(t, etcdStorage, pod, nil)
- verifyWatchEvent(t, readingWatcher, watch.Added, pod)
- }
- }
- func TestFiltering(t *testing.T) {
- server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
- defer server.Terminate(t)
- cacher := newTestCacher(etcdStorage)
- defer cacher.Stop()
- // Ensure that the cacher is initialized, before creating any pods,
- // so that we are sure that all events will be present in cacher.
- syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything)
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- syncWatcher.Stop()
- podFoo := makeTestPod("foo")
- podFoo.Labels = map[string]string{"filter": "foo"}
- podFooFiltered := makeTestPod("foo")
- podFooPrime := makeTestPod("foo")
- podFooPrime.Labels = map[string]string{"filter": "foo"}
- podFooPrime.Spec.NodeName = "fakeNode"
- podFooNS2 := makeTestPod("foo")
- podFooNS2.Namespace += "2"
- podFooNS2.Labels = map[string]string{"filter": "foo"}
- // Create in another namespace first to make sure events from other namespaces don't get delivered
- updatePod(t, etcdStorage, podFooNS2, nil)
- fooCreated := updatePod(t, etcdStorage, podFoo, nil)
- fooFiltered := updatePod(t, etcdStorage, podFooFiltered, fooCreated)
- fooUnfiltered := updatePod(t, etcdStorage, podFoo, fooFiltered)
- _ = updatePod(t, etcdStorage, podFooPrime, fooUnfiltered)
- deleted := api.Pod{}
- if err := etcdStorage.Delete(context.TODO(), etcdtest.AddPrefix("pods/ns/foo"), &deleted, nil); err != nil {
- t.Errorf("Unexpected error: %v", err)
- }
- // Set up Watch for object "podFoo" with label filter set.
- selector := labels.SelectorFromSet(labels.Set{"filter": "foo"})
- filterFunc := func(obj runtime.Object) bool {
- metadata, err := meta.Accessor(obj)
- if err != nil {
- t.Errorf("Unexpected error: %v", err)
- return false
- }
- return selector.Matches(labels.Set(metadata.GetLabels()))
- }
- filter := storage.NewSimpleFilter(filterFunc, storage.NoTriggerFunc)
- watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, filter)
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- defer watcher.Stop()
- verifyWatchEvent(t, watcher, watch.Deleted, podFooFiltered)
- verifyWatchEvent(t, watcher, watch.Added, podFoo)
- verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
- verifyWatchEvent(t, watcher, watch.Deleted, podFooPrime)
- }
- func TestStartingResourceVersion(t *testing.T) {
- server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
- defer server.Terminate(t)
- cacher := newTestCacher(etcdStorage)
- defer cacher.Stop()
- // add 1 object
- podFoo := makeTestPod("foo")
- fooCreated := updatePod(t, etcdStorage, podFoo, nil)
- // Set up Watch starting at fooCreated.ResourceVersion + 10
- rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- rv += 10
- startVersion := strconv.Itoa(int(rv))
- watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
- if err != nil {
- t.Fatalf("Unexpected error: %v", err)
- }
- defer watcher.Stop()
- lastFoo := fooCreated
- for i := 0; i < 11; i++ {
- podFooForUpdate := makeTestPod("foo")
- podFooForUpdate.Labels = map[string]string{"foo": strconv.Itoa(i)}
- lastFoo = updatePod(t, etcdStorage, podFooForUpdate, lastFoo)
- }
- select {
- case e := <-watcher.ResultChan():
- pod := e.Object.(*api.Pod)
- podRV, err := storage.ParseWatchResourceVersion(pod.ResourceVersion)
- if err != nil {
- t.Fatalf("unexpected error: %v", err)
- }
- // event should have at least rv + 1, since we're starting the watch at rv
- if podRV <= rv {
- t.Errorf("expected event with resourceVersion of at least %d, got %d", rv+1, podRV)
- }
- case <-time.After(wait.ForeverTestTimeout):
- t.Errorf("timed out waiting for event")
- }
- }
|