123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- /*
- Copyright 2014 The Kubernetes Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package wait
- import (
- "errors"
- "math/rand"
- "time"
- )
- // For any test of the style:
- // ...
- // <- time.After(timeout):
- // t.Errorf("Timed out")
- // The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s
- // is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine
- // (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test.
- var ForeverTestTimeout = time.Second * 30
- // NeverStop may be passed to Until to make it never stop.
- var NeverStop <-chan struct{} = make(chan struct{})
- // Forever is syntactic sugar on top of Until
- func Forever(f func(), period time.Duration) {
- Until(f, period, NeverStop)
- }
- // Until loops until stop channel is closed, running f every period.
- // Until is syntactic sugar on top of JitterUntil with zero jitter
- // factor, with sliding = true (which means the timer for period
- // starts after the f completes).
- func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
- JitterUntil(f, period, 0.0, true, stopCh)
- }
- // NonSlidingUntil loops until stop channel is closed, running f every
- // period. NonSlidingUntil is syntactic sugar on top of JitterUntil
- // with zero jitter factor, with sliding = false (meaning the timer for
- // period starts at the same time as the function starts).
- func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) {
- JitterUntil(f, period, 0.0, false, stopCh)
- }
- // JitterUntil loops until stop channel is closed, running f every period.
- // If jitterFactor is positive, the period is jittered before every run of f.
- // If jitterFactor is not positive, the period is unchanged.
- // Catches any panics, and keeps going. f may not be invoked if
- // stop channel is already closed. Pass NeverStop to Until if you
- // don't want it stop.
- func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) {
- for {
- select {
- case <-stopCh:
- return
- default:
- }
- jitteredPeriod := period
- if jitterFactor > 0.0 {
- jitteredPeriod = Jitter(period, jitterFactor)
- }
- var t *time.Timer
- if !sliding {
- t = time.NewTimer(jitteredPeriod)
- }
- func() {
- f()
- }()
- if sliding {
- t = time.NewTimer(jitteredPeriod)
- }
- // NOTE: b/c there is no priority selection in golang
- // it is possible for this to race, meaning we could
- // trigger t.C and stopCh, and t.C select falls through.
- // In order to mitigate we re-check stopCh at the beginning
- // of every loop to prevent extra executions of f().
- select {
- case <-stopCh:
- return
- case <-t.C:
- }
- }
- }
- // Jitter returns a time.Duration between duration and duration + maxFactor * duration,
- // to allow clients to avoid converging on periodic behavior. If maxFactor is 0.0, a
- // suggested default value will be chosen.
- func Jitter(duration time.Duration, maxFactor float64) time.Duration {
- if maxFactor <= 0.0 {
- maxFactor = 1.0
- }
- wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
- return wait
- }
- // ErrWaitTimeout is returned when the condition exited without success
- var ErrWaitTimeout = errors.New("timed out waiting for the condition")
- // ConditionFunc returns true if the condition is satisfied, or an error
- // if the loop should be aborted.
- type ConditionFunc func() (done bool, err error)
- // Backoff is parameters applied to a Backoff function.
- type Backoff struct {
- Duration time.Duration
- Factor float64
- Jitter float64
- Steps int
- }
- // ExponentialBackoff repeats a condition check up to steps times, increasing the wait
- // by multipling the previous duration by factor. If jitter is greater than zero,
- // a random amount of each duration is added (between duration and duration*(1+jitter)).
- // If the condition never returns true, ErrWaitTimeout is returned. All other errors
- // terminate immediately.
- func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error {
- duration := backoff.Duration
- for i := 0; i < backoff.Steps; i++ {
- if i != 0 {
- adjusted := duration
- if backoff.Jitter > 0.0 {
- adjusted = Jitter(duration, backoff.Jitter)
- }
- time.Sleep(adjusted)
- duration = time.Duration(float64(duration) * backoff.Factor)
- }
- if ok, err := condition(); err != nil || ok {
- return err
- }
- }
- return ErrWaitTimeout
- }
- // Poll tries a condition func until it returns true, an error, or the timeout
- // is reached. condition will always be invoked at least once but some intervals
- // may be missed if the condition takes too long or the time window is too short.
- // If you want to Poll something forever, see PollInfinite.
- // Poll always waits the interval before the first check of the condition.
- func Poll(interval, timeout time.Duration, condition ConditionFunc) error {
- return pollInternal(poller(interval, timeout), condition)
- }
- func pollInternal(wait WaitFunc, condition ConditionFunc) error {
- done := make(chan struct{})
- defer close(done)
- return WaitFor(wait, condition, done)
- }
- // PollImmediate is identical to Poll, except that it performs the first check
- // immediately, not waiting interval beforehand.
- func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error {
- return pollImmediateInternal(poller(interval, timeout), condition)
- }
- func pollImmediateInternal(wait WaitFunc, condition ConditionFunc) error {
- done, err := condition()
- if err != nil {
- return err
- }
- if done {
- return nil
- }
- return pollInternal(wait, condition)
- }
- // PollInfinite polls forever.
- func PollInfinite(interval time.Duration, condition ConditionFunc) error {
- done := make(chan struct{})
- defer close(done)
- return WaitFor(poller(interval, 0), condition, done)
- }
- // WaitFunc creates a channel that receives an item every time a test
- // should be executed and is closed when the last test should be invoked.
- type WaitFunc func(done <-chan struct{}) <-chan struct{}
- // WaitFor gets a channel from wait(), and then invokes fn once for every value
- // placed on the channel and once more when the channel is closed. If fn
- // returns an error the loop ends and that error is returned, and if fn returns
- // true the loop ends and nil is returned. ErrWaitTimeout will be returned if
- // the channel is closed without fn ever returning true.
- func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error {
- c := wait(done)
- for {
- _, open := <-c
- ok, err := fn()
- if err != nil {
- return err
- }
- if ok {
- return nil
- }
- if !open {
- break
- }
- }
- return ErrWaitTimeout
- }
- // poller returns a WaitFunc that will send to the channel every
- // interval until timeout has elapsed and then close the channel.
- // Over very short intervals you may receive no ticks before
- // the channel is closed. If timeout is 0, the channel
- // will never be closed.
- func poller(interval, timeout time.Duration) WaitFunc {
- return WaitFunc(func(done <-chan struct{}) <-chan struct{} {
- ch := make(chan struct{})
- go func() {
- defer close(ch)
- tick := time.NewTicker(interval)
- defer tick.Stop()
- var after <-chan time.Time
- if timeout != 0 {
- // time.After is more convenient, but it
- // potentially leaves timers around much longer
- // than necessary if we exit early.
- timer := time.NewTimer(timeout)
- after = timer.C
- defer timer.Stop()
- }
- for {
- select {
- case <-tick.C:
- // If the consumer isn't ready for this signal drop it and
- // check the other channels.
- select {
- case ch <- struct{}{}:
- default:
- }
- case <-after:
- return
- case <-done:
- return
- }
- }
- }()
- return ch
- })
- }
|