wait_test.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package wait
  14. import (
  15. "errors"
  16. "fmt"
  17. "sync"
  18. "sync/atomic"
  19. "testing"
  20. "time"
  21. )
  22. func TestUntil(t *testing.T) {
  23. ch := make(chan struct{})
  24. close(ch)
  25. Until(func() {
  26. t.Fatal("should not have been invoked")
  27. }, 0, ch)
  28. ch = make(chan struct{})
  29. called := make(chan struct{})
  30. go func() {
  31. Until(func() {
  32. called <- struct{}{}
  33. }, 0, ch)
  34. close(called)
  35. }()
  36. <-called
  37. close(ch)
  38. <-called
  39. }
  40. func TestNonSlidingUntil(t *testing.T) {
  41. ch := make(chan struct{})
  42. close(ch)
  43. NonSlidingUntil(func() {
  44. t.Fatal("should not have been invoked")
  45. }, 0, ch)
  46. ch = make(chan struct{})
  47. called := make(chan struct{})
  48. go func() {
  49. NonSlidingUntil(func() {
  50. called <- struct{}{}
  51. }, 0, ch)
  52. close(called)
  53. }()
  54. <-called
  55. close(ch)
  56. <-called
  57. }
  58. func TestUntilReturnsImmediately(t *testing.T) {
  59. now := time.Now()
  60. ch := make(chan struct{})
  61. Until(func() {
  62. close(ch)
  63. }, 30*time.Second, ch)
  64. if now.Add(25 * time.Second).Before(time.Now()) {
  65. t.Errorf("Until did not return immediately when the stop chan was closed inside the func")
  66. }
  67. }
  68. func TestJitterUntil(t *testing.T) {
  69. ch := make(chan struct{})
  70. // if a channel is closed JitterUntil never calls function f
  71. // and returns immediately
  72. close(ch)
  73. JitterUntil(func() {
  74. t.Fatal("should not have been invoked")
  75. }, 0, 1.0, true, ch)
  76. ch = make(chan struct{})
  77. called := make(chan struct{})
  78. go func() {
  79. JitterUntil(func() {
  80. called <- struct{}{}
  81. }, 0, 1.0, true, ch)
  82. close(called)
  83. }()
  84. <-called
  85. close(ch)
  86. <-called
  87. }
  88. func TestJitterUntilReturnsImmediately(t *testing.T) {
  89. now := time.Now()
  90. ch := make(chan struct{})
  91. JitterUntil(func() {
  92. close(ch)
  93. }, 30*time.Second, 1.0, true, ch)
  94. if now.Add(25 * time.Second).Before(time.Now()) {
  95. t.Errorf("JitterUntil did not return immediately when the stop chan was closed inside the func")
  96. }
  97. }
  98. func TestJitterUntilNegativeFactor(t *testing.T) {
  99. now := time.Now()
  100. ch := make(chan struct{})
  101. called := make(chan struct{})
  102. received := make(chan struct{})
  103. go func() {
  104. JitterUntil(func() {
  105. called <- struct{}{}
  106. <-received
  107. }, time.Second, -30.0, true, ch)
  108. }()
  109. // first loop
  110. <-called
  111. received <- struct{}{}
  112. // second loop
  113. <-called
  114. close(ch)
  115. received <- struct{}{}
  116. // it should take at most 2 seconds + some overhead, not 3
  117. if now.Add(3 * time.Second).Before(time.Now()) {
  118. t.Errorf("JitterUntil did not returned after predefined period with negative jitter factor when the stop chan was closed inside the func")
  119. }
  120. }
  121. func TestExponentialBackoff(t *testing.T) {
  122. opts := Backoff{Factor: 1.0, Steps: 3}
  123. // waits up to steps
  124. i := 0
  125. err := ExponentialBackoff(opts, func() (bool, error) {
  126. i++
  127. return false, nil
  128. })
  129. if err != ErrWaitTimeout || i != opts.Steps {
  130. t.Errorf("unexpected error: %v", err)
  131. }
  132. // returns immediately
  133. i = 0
  134. err = ExponentialBackoff(opts, func() (bool, error) {
  135. i++
  136. return true, nil
  137. })
  138. if err != nil || i != 1 {
  139. t.Errorf("unexpected error: %v", err)
  140. }
  141. // returns immediately on error
  142. testErr := fmt.Errorf("some other error")
  143. err = ExponentialBackoff(opts, func() (bool, error) {
  144. return false, testErr
  145. })
  146. if err != testErr {
  147. t.Errorf("unexpected error: %v", err)
  148. }
  149. // invoked multiple times
  150. i = 1
  151. err = ExponentialBackoff(opts, func() (bool, error) {
  152. if i < opts.Steps {
  153. i++
  154. return false, nil
  155. }
  156. return true, nil
  157. })
  158. if err != nil || i != opts.Steps {
  159. t.Errorf("unexpected error: %v", err)
  160. }
  161. }
  162. func TestPoller(t *testing.T) {
  163. done := make(chan struct{})
  164. defer close(done)
  165. w := poller(time.Millisecond, 2*time.Millisecond)
  166. ch := w(done)
  167. count := 0
  168. DRAIN:
  169. for {
  170. select {
  171. case _, open := <-ch:
  172. if !open {
  173. break DRAIN
  174. }
  175. count++
  176. case <-time.After(ForeverTestTimeout):
  177. t.Errorf("unexpected timeout after poll")
  178. }
  179. }
  180. if count > 3 {
  181. t.Errorf("expected up to three values, got %d", count)
  182. }
  183. }
  184. type fakePoller struct {
  185. max int
  186. used int32 // accessed with atomics
  187. wg sync.WaitGroup
  188. }
  189. func fakeTicker(max int, used *int32, doneFunc func()) WaitFunc {
  190. return func(done <-chan struct{}) <-chan struct{} {
  191. ch := make(chan struct{})
  192. go func() {
  193. defer doneFunc()
  194. defer close(ch)
  195. for i := 0; i < max; i++ {
  196. select {
  197. case ch <- struct{}{}:
  198. case <-done:
  199. return
  200. }
  201. if used != nil {
  202. atomic.AddInt32(used, 1)
  203. }
  204. }
  205. }()
  206. return ch
  207. }
  208. }
  209. func (fp *fakePoller) GetWaitFunc() WaitFunc {
  210. fp.wg.Add(1)
  211. return fakeTicker(fp.max, &fp.used, fp.wg.Done)
  212. }
  213. func TestPoll(t *testing.T) {
  214. invocations := 0
  215. f := ConditionFunc(func() (bool, error) {
  216. invocations++
  217. return true, nil
  218. })
  219. fp := fakePoller{max: 1}
  220. if err := pollInternal(fp.GetWaitFunc(), f); err != nil {
  221. t.Fatalf("unexpected error %v", err)
  222. }
  223. fp.wg.Wait()
  224. if invocations != 1 {
  225. t.Errorf("Expected exactly one invocation, got %d", invocations)
  226. }
  227. used := atomic.LoadInt32(&fp.used)
  228. if used != 1 {
  229. t.Errorf("Expected exactly one tick, got %d", used)
  230. }
  231. }
  232. func TestPollError(t *testing.T) {
  233. expectedError := errors.New("Expected error")
  234. f := ConditionFunc(func() (bool, error) {
  235. return false, expectedError
  236. })
  237. fp := fakePoller{max: 1}
  238. if err := pollInternal(fp.GetWaitFunc(), f); err == nil || err != expectedError {
  239. t.Fatalf("Expected error %v, got none %v", expectedError, err)
  240. }
  241. fp.wg.Wait()
  242. used := atomic.LoadInt32(&fp.used)
  243. if used != 1 {
  244. t.Errorf("Expected exactly one tick, got %d", used)
  245. }
  246. }
  247. func TestPollImmediate(t *testing.T) {
  248. invocations := 0
  249. f := ConditionFunc(func() (bool, error) {
  250. invocations++
  251. return true, nil
  252. })
  253. fp := fakePoller{max: 0}
  254. if err := pollImmediateInternal(fp.GetWaitFunc(), f); err != nil {
  255. t.Fatalf("unexpected error %v", err)
  256. }
  257. // We don't need to wait for fp.wg, as pollImmediate shouldn't call WaitFunc at all.
  258. if invocations != 1 {
  259. t.Errorf("Expected exactly one invocation, got %d", invocations)
  260. }
  261. used := atomic.LoadInt32(&fp.used)
  262. if used != 0 {
  263. t.Errorf("Expected exactly zero ticks, got %d", used)
  264. }
  265. }
  266. func TestPollImmediateError(t *testing.T) {
  267. expectedError := errors.New("Expected error")
  268. f := ConditionFunc(func() (bool, error) {
  269. return false, expectedError
  270. })
  271. fp := fakePoller{max: 0}
  272. if err := pollImmediateInternal(fp.GetWaitFunc(), f); err == nil || err != expectedError {
  273. t.Fatalf("Expected error %v, got none %v", expectedError, err)
  274. }
  275. // We don't need to wait for fp.wg, as pollImmediate shouldn't call WaitFunc at all.
  276. used := atomic.LoadInt32(&fp.used)
  277. if used != 0 {
  278. t.Errorf("Expected exactly zero ticks, got %d", used)
  279. }
  280. }
  281. func TestPollForever(t *testing.T) {
  282. ch := make(chan struct{})
  283. done := make(chan struct{}, 1)
  284. complete := make(chan struct{})
  285. go func() {
  286. f := ConditionFunc(func() (bool, error) {
  287. ch <- struct{}{}
  288. select {
  289. case <-done:
  290. return true, nil
  291. default:
  292. }
  293. return false, nil
  294. })
  295. if err := PollInfinite(time.Microsecond, f); err != nil {
  296. t.Fatalf("unexpected error %v", err)
  297. }
  298. close(ch)
  299. complete <- struct{}{}
  300. }()
  301. // ensure the condition is opened
  302. <-ch
  303. // ensure channel sends events
  304. for i := 0; i < 10; i++ {
  305. select {
  306. case _, open := <-ch:
  307. if !open {
  308. t.Fatalf("did not expect channel to be closed")
  309. }
  310. case <-time.After(ForeverTestTimeout):
  311. t.Fatalf("channel did not return at least once within the poll interval")
  312. }
  313. }
  314. // at most one poll notification should be sent once we return from the condition
  315. done <- struct{}{}
  316. go func() {
  317. for i := 0; i < 2; i++ {
  318. _, open := <-ch
  319. if !open {
  320. return
  321. }
  322. }
  323. t.Fatalf("expected closed channel after two iterations")
  324. }()
  325. <-complete
  326. }
  327. func TestWaitFor(t *testing.T) {
  328. var invocations int
  329. testCases := map[string]struct {
  330. F ConditionFunc
  331. Ticks int
  332. Invoked int
  333. Err bool
  334. }{
  335. "invoked once": {
  336. ConditionFunc(func() (bool, error) {
  337. invocations++
  338. return true, nil
  339. }),
  340. 2,
  341. 1,
  342. false,
  343. },
  344. "invoked and returns a timeout": {
  345. ConditionFunc(func() (bool, error) {
  346. invocations++
  347. return false, nil
  348. }),
  349. 2,
  350. 3, // the contract of WaitFor() says the func is called once more at the end of the wait
  351. true,
  352. },
  353. "returns immediately on error": {
  354. ConditionFunc(func() (bool, error) {
  355. invocations++
  356. return false, errors.New("test")
  357. }),
  358. 2,
  359. 1,
  360. true,
  361. },
  362. }
  363. for k, c := range testCases {
  364. invocations = 0
  365. ticker := fakeTicker(c.Ticks, nil, func() {})
  366. err := func() error {
  367. done := make(chan struct{})
  368. defer close(done)
  369. return WaitFor(ticker, c.F, done)
  370. }()
  371. switch {
  372. case c.Err && err == nil:
  373. t.Errorf("%s: Expected error, got nil", k)
  374. continue
  375. case !c.Err && err != nil:
  376. t.Errorf("%s: Expected no error, got: %#v", k, err)
  377. continue
  378. }
  379. if invocations != c.Invoked {
  380. t.Errorf("%s: Expected %d invocations, got %d", k, c.Invoked, invocations)
  381. }
  382. }
  383. }
  384. func TestWaitForWithDelay(t *testing.T) {
  385. done := make(chan struct{})
  386. defer close(done)
  387. WaitFor(poller(time.Millisecond, ForeverTestTimeout), func() (bool, error) {
  388. time.Sleep(10 * time.Millisecond)
  389. return true, nil
  390. }, done)
  391. // If polling goroutine doesn't see the done signal it will leak timers.
  392. select {
  393. case done <- struct{}{}:
  394. case <-time.After(ForeverTestTimeout):
  395. t.Errorf("expected an ack of the done signal.")
  396. }
  397. }