123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- /*
- Copyright 2015 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package framework_test
- import (
- "fmt"
- "math/rand"
- "sync"
- "testing"
- "time"
- "k8s.io/kubernetes/pkg/api"
- "k8s.io/kubernetes/pkg/client/cache"
- "k8s.io/kubernetes/pkg/controller/framework"
- "k8s.io/kubernetes/pkg/runtime"
- "k8s.io/kubernetes/pkg/util/sets"
- "k8s.io/kubernetes/pkg/util/wait"
- "k8s.io/kubernetes/pkg/watch"
- "github.com/google/gofuzz"
- )
- type testLW struct {
- ListFunc func(options api.ListOptions) (runtime.Object, error)
- WatchFunc func(options api.ListOptions) (watch.Interface, error)
- }
- func (t *testLW) List(options api.ListOptions) (runtime.Object, error) {
- return t.ListFunc(options)
- }
- func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) {
- return t.WatchFunc(options)
- }
- func Example() {
- // source simulates an apiserver object endpoint.
- source := framework.NewFakeControllerSource()
- // This will hold the downstream state, as we know it.
- downstream := cache.NewStore(framework.DeletionHandlingMetaNamespaceKeyFunc)
- // This will hold incoming changes. Note how we pass downstream in as a
- // KeyLister, that way resync operations will result in the correct set
- // of update/delete deltas.
- fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, downstream)
- // Let's do threadsafe output to get predictable test results.
- deletionCounter := make(chan string, 1000)
- cfg := &framework.Config{
- Queue: fifo,
- ListerWatcher: source,
- ObjectType: &api.Pod{},
- FullResyncPeriod: time.Millisecond * 100,
- RetryOnError: false,
- // Let's implement a simple controller that just deletes
- // everything that comes in.
- Process: func(obj interface{}) error {
- // Obj is from the Pop method of the Queue we make above.
- newest := obj.(cache.Deltas).Newest()
- if newest.Type != cache.Deleted {
- // Update our downstream store.
- err := downstream.Add(newest.Object)
- if err != nil {
- return err
- }
- // Delete this object.
- source.Delete(newest.Object.(runtime.Object))
- } else {
- // Update our downstream store.
- err := downstream.Delete(newest.Object)
- if err != nil {
- return err
- }
- // fifo's KeyOf is easiest, because it handles
- // DeletedFinalStateUnknown markers.
- key, err := fifo.KeyOf(newest.Object)
- if err != nil {
- return err
- }
- // Report this deletion.
- deletionCounter <- key
- }
- return nil
- },
- }
- // Create the controller and run it until we close stop.
- stop := make(chan struct{})
- defer close(stop)
- go framework.New(cfg).Run(stop)
- // Let's add a few objects to the source.
- testIDs := []string{"a-hello", "b-controller", "c-framework"}
- for _, name := range testIDs {
- // Note that these pods are not valid-- the fake source doesn't
- // call validation or anything.
- source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}})
- }
- // Let's wait for the controller to process the things we just added.
- outputSet := sets.String{}
- for i := 0; i < len(testIDs); i++ {
- outputSet.Insert(<-deletionCounter)
- }
- for _, key := range outputSet.List() {
- fmt.Println(key)
- }
- // Output:
- // a-hello
- // b-controller
- // c-framework
- }
- func ExampleInformer() {
- // source simulates an apiserver object endpoint.
- source := framework.NewFakeControllerSource()
- // Let's do threadsafe output to get predictable test results.
- deletionCounter := make(chan string, 1000)
- // Make a controller that immediately deletes anything added to it, and
- // logs anything deleted.
- _, controller := framework.NewInformer(
- source,
- &api.Pod{},
- time.Millisecond*100,
- framework.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) {
- source.Delete(obj.(runtime.Object))
- },
- DeleteFunc: func(obj interface{}) {
- key, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj)
- if err != nil {
- key = "oops something went wrong with the key"
- }
- // Report this deletion.
- deletionCounter <- key
- },
- },
- )
- // Run the controller and run it until we close stop.
- stop := make(chan struct{})
- defer close(stop)
- go controller.Run(stop)
- // Let's add a few objects to the source.
- testIDs := []string{"a-hello", "b-controller", "c-framework"}
- for _, name := range testIDs {
- // Note that these pods are not valid-- the fake source doesn't
- // call validation or anything.
- source.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: name}})
- }
- // Let's wait for the controller to process the things we just added.
- outputSet := sets.String{}
- for i := 0; i < len(testIDs); i++ {
- outputSet.Insert(<-deletionCounter)
- }
- for _, key := range outputSet.List() {
- fmt.Println(key)
- }
- // Output:
- // a-hello
- // b-controller
- // c-framework
- }
- func TestHammerController(t *testing.T) {
- // This test executes a bunch of requests through the fake source and
- // controller framework to make sure there's no locking/threading
- // errors. If an error happens, it should hang forever or trigger the
- // race detector.
- // source simulates an apiserver object endpoint.
- source := framework.NewFakeControllerSource()
- // Let's do threadsafe output to get predictable test results.
- outputSetLock := sync.Mutex{}
- // map of key to operations done on the key
- outputSet := map[string][]string{}
- recordFunc := func(eventType string, obj interface{}) {
- key, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj)
- if err != nil {
- t.Errorf("something wrong with key: %v", err)
- key = "oops something went wrong with the key"
- }
- // Record some output when items are deleted.
- outputSetLock.Lock()
- defer outputSetLock.Unlock()
- outputSet[key] = append(outputSet[key], eventType)
- }
- // Make a controller which just logs all the changes it gets.
- _, controller := framework.NewInformer(
- source,
- &api.Pod{},
- time.Millisecond*100,
- framework.ResourceEventHandlerFuncs{
- AddFunc: func(obj interface{}) { recordFunc("add", obj) },
- UpdateFunc: func(oldObj, newObj interface{}) { recordFunc("update", newObj) },
- DeleteFunc: func(obj interface{}) { recordFunc("delete", obj) },
- },
- )
- if controller.HasSynced() {
- t.Errorf("Expected HasSynced() to return false before we started the controller")
- }
- // Run the controller and run it until we close stop.
- stop := make(chan struct{})
- go controller.Run(stop)
- // Let's wait for the controller to do its initial sync
- wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
- return controller.HasSynced(), nil
- })
- if !controller.HasSynced() {
- t.Errorf("Expected HasSynced() to return true after the initial sync")
- }
- wg := sync.WaitGroup{}
- const threads = 3
- wg.Add(threads)
- for i := 0; i < threads; i++ {
- go func() {
- defer wg.Done()
- // Let's add a few objects to the source.
- currentNames := sets.String{}
- rs := rand.NewSource(rand.Int63())
- f := fuzz.New().NilChance(.5).NumElements(0, 2).RandSource(rs)
- r := rand.New(rs) // Mustn't use r and f concurrently!
- for i := 0; i < 100; i++ {
- var name string
- var isNew bool
- if currentNames.Len() == 0 || r.Intn(3) == 1 {
- f.Fuzz(&name)
- isNew = true
- } else {
- l := currentNames.List()
- name = l[r.Intn(len(l))]
- }
- pod := &api.Pod{}
- f.Fuzz(pod)
- pod.ObjectMeta.Name = name
- pod.ObjectMeta.Namespace = "default"
- // Add, update, or delete randomly.
- // Note that these pods are not valid-- the fake source doesn't
- // call validation or perform any other checking.
- if isNew {
- currentNames.Insert(name)
- source.Add(pod)
- continue
- }
- switch r.Intn(2) {
- case 0:
- currentNames.Insert(name)
- source.Modify(pod)
- case 1:
- currentNames.Delete(name)
- source.Delete(pod)
- }
- }
- }()
- }
- wg.Wait()
- // Let's wait for the controller to finish processing the things we just added.
- // TODO: look in the queue to see how many items need to be processed.
- time.Sleep(100 * time.Millisecond)
- close(stop)
- outputSetLock.Lock()
- t.Logf("got: %#v", outputSet)
- }
- func TestUpdate(t *testing.T) {
- // This test is going to exercise the various paths that result in a
- // call to update.
- // source simulates an apiserver object endpoint.
- source := framework.NewFakeControllerSource()
- const (
- FROM = "from"
- TO = "to"
- )
- // These are the transitions we expect to see; because this is
- // asynchronous, there are a lot of valid possibilities.
- type pair struct{ from, to string }
- allowedTransitions := map[pair]bool{
- pair{FROM, TO}: true,
- // Because a resync can happen when we've already observed one
- // of the above but before the item is deleted.
- pair{TO, TO}: true,
- // Because a resync could happen before we observe an update.
- pair{FROM, FROM}: true,
- }
- pod := func(name, check string, final bool) *api.Pod {
- p := &api.Pod{
- ObjectMeta: api.ObjectMeta{
- Name: name,
- Labels: map[string]string{"check": check},
- },
- }
- if final {
- p.Labels["final"] = "true"
- }
- return p
- }
- deletePod := func(p *api.Pod) bool {
- return p.Labels["final"] == "true"
- }
- tests := []func(string){
- func(name string) {
- name = "a-" + name
- source.Add(pod(name, FROM, false))
- source.Modify(pod(name, TO, true))
- },
- }
- const threads = 3
- var testDoneWG sync.WaitGroup
- testDoneWG.Add(threads * len(tests))
- // Make a controller that deletes things once it observes an update.
- // It calls Done() on the wait group on deletions so we can tell when
- // everything we've added has been deleted.
- watchCh := make(chan struct{})
- _, controller := framework.NewInformer(
- &testLW{
- WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
- watch, err := source.Watch(options)
- close(watchCh)
- return watch, err
- },
- ListFunc: func(options api.ListOptions) (runtime.Object, error) {
- return source.List(options)
- },
- },
- &api.Pod{},
- 0,
- framework.ResourceEventHandlerFuncs{
- UpdateFunc: func(oldObj, newObj interface{}) {
- o, n := oldObj.(*api.Pod), newObj.(*api.Pod)
- from, to := o.Labels["check"], n.Labels["check"]
- if !allowedTransitions[pair{from, to}] {
- t.Errorf("observed transition %q -> %q for %v", from, to, n.Name)
- }
- if deletePod(n) {
- source.Delete(n)
- }
- },
- DeleteFunc: func(obj interface{}) {
- testDoneWG.Done()
- },
- },
- )
- // Run the controller and run it until we close stop.
- // Once Run() is called, calls to testDoneWG.Done() might start, so
- // all testDoneWG.Add() calls must happen before this point
- stop := make(chan struct{})
- go controller.Run(stop)
- <-watchCh
- // run every test a few times, in parallel
- var wg sync.WaitGroup
- wg.Add(threads * len(tests))
- for i := 0; i < threads; i++ {
- for j, f := range tests {
- go func(name string, f func(string)) {
- defer wg.Done()
- f(name)
- }(fmt.Sprintf("%v-%v", i, j), f)
- }
- }
- wg.Wait()
- // Let's wait for the controller to process the things we just added.
- testDoneWG.Wait()
- close(stop)
- }
|