123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package wait
- import (
- "errors"
- "fmt"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- )
- func TestUntil(t *testing.T) {
- ch := make(chan struct{})
- close(ch)
- Until(func() {
- t.Fatal("should not have been invoked")
- }, 0, ch)
- ch = make(chan struct{})
- called := make(chan struct{})
- go func() {
- Until(func() {
- called <- struct{}{}
- }, 0, ch)
- close(called)
- }()
- <-called
- close(ch)
- <-called
- }
- func TestNonSlidingUntil(t *testing.T) {
- ch := make(chan struct{})
- close(ch)
- NonSlidingUntil(func() {
- t.Fatal("should not have been invoked")
- }, 0, ch)
- ch = make(chan struct{})
- called := make(chan struct{})
- go func() {
- NonSlidingUntil(func() {
- called <- struct{}{}
- }, 0, ch)
- close(called)
- }()
- <-called
- close(ch)
- <-called
- }
- func TestUntilReturnsImmediately(t *testing.T) {
- now := time.Now()
- ch := make(chan struct{})
- Until(func() {
- close(ch)
- }, 30*time.Second, ch)
- if now.Add(25 * time.Second).Before(time.Now()) {
- t.Errorf("Until did not return immediately when the stop chan was closed inside the func")
- }
- }
- func TestJitterUntil(t *testing.T) {
- ch := make(chan struct{})
- // if a channel is closed JitterUntil never calls function f
- // and returns immediately
- close(ch)
- JitterUntil(func() {
- t.Fatal("should not have been invoked")
- }, 0, 1.0, true, ch)
- ch = make(chan struct{})
- called := make(chan struct{})
- go func() {
- JitterUntil(func() {
- called <- struct{}{}
- }, 0, 1.0, true, ch)
- close(called)
- }()
- <-called
- close(ch)
- <-called
- }
- func TestJitterUntilReturnsImmediately(t *testing.T) {
- now := time.Now()
- ch := make(chan struct{})
- JitterUntil(func() {
- close(ch)
- }, 30*time.Second, 1.0, true, ch)
- if now.Add(25 * time.Second).Before(time.Now()) {
- t.Errorf("JitterUntil did not return immediately when the stop chan was closed inside the func")
- }
- }
- func TestJitterUntilNegativeFactor(t *testing.T) {
- now := time.Now()
- ch := make(chan struct{})
- called := make(chan struct{})
- received := make(chan struct{})
- go func() {
- JitterUntil(func() {
- called <- struct{}{}
- <-received
- }, time.Second, -30.0, true, ch)
- }()
- // first loop
- <-called
- received <- struct{}{}
- // second loop
- <-called
- close(ch)
- received <- struct{}{}
- // it should take at most 2 seconds + some overhead, not 3
- if now.Add(3 * time.Second).Before(time.Now()) {
- t.Errorf("JitterUntil did not returned after predefined period with negative jitter factor when the stop chan was closed inside the func")
- }
- }
- func TestExponentialBackoff(t *testing.T) {
- opts := Backoff{Factor: 1.0, Steps: 3}
- // waits up to steps
- i := 0
- err := ExponentialBackoff(opts, func() (bool, error) {
- i++
- return false, nil
- })
- if err != ErrWaitTimeout || i != opts.Steps {
- t.Errorf("unexpected error: %v", err)
- }
- // returns immediately
- i = 0
- err = ExponentialBackoff(opts, func() (bool, error) {
- i++
- return true, nil
- })
- if err != nil || i != 1 {
- t.Errorf("unexpected error: %v", err)
- }
- // returns immediately on error
- testErr := fmt.Errorf("some other error")
- err = ExponentialBackoff(opts, func() (bool, error) {
- return false, testErr
- })
- if err != testErr {
- t.Errorf("unexpected error: %v", err)
- }
- // invoked multiple times
- i = 1
- err = ExponentialBackoff(opts, func() (bool, error) {
- if i < opts.Steps {
- i++
- return false, nil
- }
- return true, nil
- })
- if err != nil || i != opts.Steps {
- t.Errorf("unexpected error: %v", err)
- }
- }
- func TestPoller(t *testing.T) {
- done := make(chan struct{})
- defer close(done)
- w := poller(time.Millisecond, 2*time.Millisecond)
- ch := w(done)
- count := 0
- DRAIN:
- for {
- select {
- case _, open := <-ch:
- if !open {
- break DRAIN
- }
- count++
- case <-time.After(ForeverTestTimeout):
- t.Errorf("unexpected timeout after poll")
- }
- }
- if count > 3 {
- t.Errorf("expected up to three values, got %d", count)
- }
- }
- type fakePoller struct {
- max int
- used int32 // accessed with atomics
- wg sync.WaitGroup
- }
- func fakeTicker(max int, used *int32, doneFunc func()) WaitFunc {
- return func(done <-chan struct{}) <-chan struct{} {
- ch := make(chan struct{})
- go func() {
- defer doneFunc()
- defer close(ch)
- for i := 0; i < max; i++ {
- select {
- case ch <- struct{}{}:
- case <-done:
- return
- }
- if used != nil {
- atomic.AddInt32(used, 1)
- }
- }
- }()
- return ch
- }
- }
- func (fp *fakePoller) GetWaitFunc() WaitFunc {
- fp.wg.Add(1)
- return fakeTicker(fp.max, &fp.used, fp.wg.Done)
- }
- func TestPoll(t *testing.T) {
- invocations := 0
- f := ConditionFunc(func() (bool, error) {
- invocations++
- return true, nil
- })
- fp := fakePoller{max: 1}
- if err := pollInternal(fp.GetWaitFunc(), f); err != nil {
- t.Fatalf("unexpected error %v", err)
- }
- fp.wg.Wait()
- if invocations != 1 {
- t.Errorf("Expected exactly one invocation, got %d", invocations)
- }
- used := atomic.LoadInt32(&fp.used)
- if used != 1 {
- t.Errorf("Expected exactly one tick, got %d", used)
- }
- }
- func TestPollError(t *testing.T) {
- expectedError := errors.New("Expected error")
- f := ConditionFunc(func() (bool, error) {
- return false, expectedError
- })
- fp := fakePoller{max: 1}
- if err := pollInternal(fp.GetWaitFunc(), f); err == nil || err != expectedError {
- t.Fatalf("Expected error %v, got none %v", expectedError, err)
- }
- fp.wg.Wait()
- used := atomic.LoadInt32(&fp.used)
- if used != 1 {
- t.Errorf("Expected exactly one tick, got %d", used)
- }
- }
- func TestPollImmediate(t *testing.T) {
- invocations := 0
- f := ConditionFunc(func() (bool, error) {
- invocations++
- return true, nil
- })
- fp := fakePoller{max: 0}
- if err := pollImmediateInternal(fp.GetWaitFunc(), f); err != nil {
- t.Fatalf("unexpected error %v", err)
- }
- // We don't need to wait for fp.wg, as pollImmediate shouldn't call WaitFunc at all.
- if invocations != 1 {
- t.Errorf("Expected exactly one invocation, got %d", invocations)
- }
- used := atomic.LoadInt32(&fp.used)
- if used != 0 {
- t.Errorf("Expected exactly zero ticks, got %d", used)
- }
- }
- func TestPollImmediateError(t *testing.T) {
- expectedError := errors.New("Expected error")
- f := ConditionFunc(func() (bool, error) {
- return false, expectedError
- })
- fp := fakePoller{max: 0}
- if err := pollImmediateInternal(fp.GetWaitFunc(), f); err == nil || err != expectedError {
- t.Fatalf("Expected error %v, got none %v", expectedError, err)
- }
- // We don't need to wait for fp.wg, as pollImmediate shouldn't call WaitFunc at all.
- used := atomic.LoadInt32(&fp.used)
- if used != 0 {
- t.Errorf("Expected exactly zero ticks, got %d", used)
- }
- }
- func TestPollForever(t *testing.T) {
- ch := make(chan struct{})
- done := make(chan struct{}, 1)
- complete := make(chan struct{})
- go func() {
- f := ConditionFunc(func() (bool, error) {
- ch <- struct{}{}
- select {
- case <-done:
- return true, nil
- default:
- }
- return false, nil
- })
- if err := PollInfinite(time.Microsecond, f); err != nil {
- t.Fatalf("unexpected error %v", err)
- }
- close(ch)
- complete <- struct{}{}
- }()
- // ensure the condition is opened
- <-ch
- // ensure channel sends events
- for i := 0; i < 10; i++ {
- select {
- case _, open := <-ch:
- if !open {
- t.Fatalf("did not expect channel to be closed")
- }
- case <-time.After(ForeverTestTimeout):
- t.Fatalf("channel did not return at least once within the poll interval")
- }
- }
- // at most one poll notification should be sent once we return from the condition
- done <- struct{}{}
- go func() {
- for i := 0; i < 2; i++ {
- _, open := <-ch
- if !open {
- return
- }
- }
- t.Fatalf("expected closed channel after two iterations")
- }()
- <-complete
- }
- func TestWaitFor(t *testing.T) {
- var invocations int
- testCases := map[string]struct {
- F ConditionFunc
- Ticks int
- Invoked int
- Err bool
- }{
- "invoked once": {
- ConditionFunc(func() (bool, error) {
- invocations++
- return true, nil
- }),
- 2,
- 1,
- false,
- },
- "invoked and returns a timeout": {
- ConditionFunc(func() (bool, error) {
- invocations++
- return false, nil
- }),
- 2,
- 3, // the contract of WaitFor() says the func is called once more at the end of the wait
- true,
- },
- "returns immediately on error": {
- ConditionFunc(func() (bool, error) {
- invocations++
- return false, errors.New("test")
- }),
- 2,
- 1,
- true,
- },
- }
- for k, c := range testCases {
- invocations = 0
- ticker := fakeTicker(c.Ticks, nil, func() {})
- err := func() error {
- done := make(chan struct{})
- defer close(done)
- return WaitFor(ticker, c.F, done)
- }()
- switch {
- case c.Err && err == nil:
- t.Errorf("%s: Expected error, got nil", k)
- continue
- case !c.Err && err != nil:
- t.Errorf("%s: Expected no error, got: %#v", k, err)
- continue
- }
- if invocations != c.Invoked {
- t.Errorf("%s: Expected %d invocations, got %d", k, c.Invoked, invocations)
- }
- }
- }
- func TestWaitForWithDelay(t *testing.T) {
- done := make(chan struct{})
- defer close(done)
- WaitFor(poller(time.Millisecond, ForeverTestTimeout), func() (bool, error) {
- time.Sleep(10 * time.Millisecond)
- return true, nil
- }, done)
- // If polling goroutine doesn't see the done signal it will leak timers.
- select {
- case done <- struct{}{}:
- case <-time.After(ForeverTestTimeout):
- t.Errorf("expected an ack of the done signal.")
- }
- }
|