reflector_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "errors"
  16. "fmt"
  17. "math/rand"
  18. "strconv"
  19. "testing"
  20. "time"
  21. "k8s.io/kubernetes/pkg/api"
  22. "k8s.io/kubernetes/pkg/api/unversioned"
  23. "k8s.io/kubernetes/pkg/runtime"
  24. "k8s.io/kubernetes/pkg/util/wait"
  25. "k8s.io/kubernetes/pkg/watch"
  26. )
  27. var nevererrc chan error
  28. type testLW struct {
  29. ListFunc func() (runtime.Object, error)
  30. WatchFunc func(options api.ListOptions) (watch.Interface, error)
  31. }
  32. func (t *testLW) List(options api.ListOptions) (runtime.Object, error) {
  33. return t.ListFunc()
  34. }
  35. func (t *testLW) Watch(options api.ListOptions) (watch.Interface, error) {
  36. return t.WatchFunc(options)
  37. }
  38. func TestCloseWatchChannelOnError(t *testing.T) {
  39. r := NewReflector(&testLW{}, &api.Pod{}, NewStore(MetaNamespaceKeyFunc), 0)
  40. pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}
  41. fw := watch.NewFake()
  42. r.listerWatcher = &testLW{
  43. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  44. return fw, nil
  45. },
  46. ListFunc: func() (runtime.Object, error) {
  47. return &api.PodList{ListMeta: unversioned.ListMeta{ResourceVersion: "1"}}, nil
  48. },
  49. }
  50. go r.ListAndWatch(wait.NeverStop)
  51. fw.Error(pod)
  52. select {
  53. case _, ok := <-fw.ResultChan():
  54. if ok {
  55. t.Errorf("Watch channel left open after cancellation")
  56. }
  57. case <-time.After(wait.ForeverTestTimeout):
  58. t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
  59. break
  60. }
  61. }
  62. func TestRunUntil(t *testing.T) {
  63. stopCh := make(chan struct{})
  64. store := NewStore(MetaNamespaceKeyFunc)
  65. r := NewReflector(&testLW{}, &api.Pod{}, store, 0)
  66. fw := watch.NewFake()
  67. r.listerWatcher = &testLW{
  68. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  69. return fw, nil
  70. },
  71. ListFunc: func() (runtime.Object, error) {
  72. return &api.PodList{ListMeta: unversioned.ListMeta{ResourceVersion: "1"}}, nil
  73. },
  74. }
  75. r.RunUntil(stopCh)
  76. // Synchronously add a dummy pod into the watch channel so we
  77. // know the RunUntil go routine is in the watch handler.
  78. fw.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}})
  79. close(stopCh)
  80. select {
  81. case _, ok := <-fw.ResultChan():
  82. if ok {
  83. t.Errorf("Watch channel left open after stopping the watch")
  84. }
  85. case <-time.After(wait.ForeverTestTimeout):
  86. t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
  87. break
  88. }
  89. }
  90. func TestReflectorResyncChan(t *testing.T) {
  91. s := NewStore(MetaNamespaceKeyFunc)
  92. g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond)
  93. a, _ := g.resyncChan()
  94. b := time.After(wait.ForeverTestTimeout)
  95. select {
  96. case <-a:
  97. t.Logf("got timeout as expected")
  98. case <-b:
  99. t.Errorf("resyncChan() is at least 99 milliseconds late??")
  100. }
  101. }
  102. func BenchmarkReflectorResyncChanMany(b *testing.B) {
  103. s := NewStore(MetaNamespaceKeyFunc)
  104. g := NewReflector(&testLW{}, &api.Pod{}, s, 25*time.Millisecond)
  105. // The improvement to this (calling the timer's Stop() method) makes
  106. // this benchmark about 40% faster.
  107. for i := 0; i < b.N; i++ {
  108. g.resyncPeriod = time.Duration(rand.Float64() * float64(time.Millisecond) * 25)
  109. _, stop := g.resyncChan()
  110. stop()
  111. }
  112. }
  113. func TestReflectorWatchHandlerError(t *testing.T) {
  114. s := NewStore(MetaNamespaceKeyFunc)
  115. g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
  116. fw := watch.NewFake()
  117. go func() {
  118. fw.Stop()
  119. }()
  120. var resumeRV string
  121. err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
  122. if err == nil {
  123. t.Errorf("unexpected non-error")
  124. }
  125. }
  126. func TestReflectorWatchHandler(t *testing.T) {
  127. s := NewStore(MetaNamespaceKeyFunc)
  128. g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
  129. fw := watch.NewFake()
  130. s.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
  131. s.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}})
  132. go func() {
  133. fw.Add(&api.Service{ObjectMeta: api.ObjectMeta{Name: "rejected"}})
  134. fw.Delete(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
  135. fw.Modify(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "55"}})
  136. fw.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
  137. fw.Stop()
  138. }()
  139. var resumeRV string
  140. err := g.watchHandler(fw, &resumeRV, nevererrc, wait.NeverStop)
  141. if err != nil {
  142. t.Errorf("unexpected error %v", err)
  143. }
  144. mkPod := func(id string, rv string) *api.Pod {
  145. return &api.Pod{ObjectMeta: api.ObjectMeta{Name: id, ResourceVersion: rv}}
  146. }
  147. table := []struct {
  148. Pod *api.Pod
  149. exists bool
  150. }{
  151. {mkPod("foo", ""), false},
  152. {mkPod("rejected", ""), false},
  153. {mkPod("bar", "55"), true},
  154. {mkPod("baz", "32"), true},
  155. }
  156. for _, item := range table {
  157. obj, exists, _ := s.Get(item.Pod)
  158. if e, a := item.exists, exists; e != a {
  159. t.Errorf("%v: expected %v, got %v", item.Pod, e, a)
  160. }
  161. if !exists {
  162. continue
  163. }
  164. if e, a := item.Pod.ResourceVersion, obj.(*api.Pod).ResourceVersion; e != a {
  165. t.Errorf("%v: expected %v, got %v", item.Pod, e, a)
  166. }
  167. }
  168. // RV should send the last version we see.
  169. if e, a := "32", resumeRV; e != a {
  170. t.Errorf("expected %v, got %v", e, a)
  171. }
  172. // last sync resource version should be the last version synced with store
  173. if e, a := "32", g.LastSyncResourceVersion(); e != a {
  174. t.Errorf("expected %v, got %v", e, a)
  175. }
  176. }
  177. func TestReflectorStopWatch(t *testing.T) {
  178. s := NewStore(MetaNamespaceKeyFunc)
  179. g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
  180. fw := watch.NewFake()
  181. var resumeRV string
  182. stopWatch := make(chan struct{}, 1)
  183. stopWatch <- struct{}{}
  184. err := g.watchHandler(fw, &resumeRV, nevererrc, stopWatch)
  185. if err != errorStopRequested {
  186. t.Errorf("expected stop error, got %q", err)
  187. }
  188. }
  189. func TestReflectorListAndWatch(t *testing.T) {
  190. createdFakes := make(chan *watch.FakeWatcher)
  191. // The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc
  192. // to get called at the beginning of the watch with 1, and again with 3 when we
  193. // inject an error.
  194. expectedRVs := []string{"1", "3"}
  195. lw := &testLW{
  196. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  197. rv := options.ResourceVersion
  198. fw := watch.NewFake()
  199. if e, a := expectedRVs[0], rv; e != a {
  200. t.Errorf("Expected rv %v, but got %v", e, a)
  201. }
  202. expectedRVs = expectedRVs[1:]
  203. // channel is not buffered because the for loop below needs to block. But
  204. // we don't want to block here, so report the new fake via a go routine.
  205. go func() { createdFakes <- fw }()
  206. return fw, nil
  207. },
  208. ListFunc: func() (runtime.Object, error) {
  209. return &api.PodList{ListMeta: unversioned.ListMeta{ResourceVersion: "1"}}, nil
  210. },
  211. }
  212. s := NewFIFO(MetaNamespaceKeyFunc)
  213. r := NewReflector(lw, &api.Pod{}, s, 0)
  214. go r.ListAndWatch(wait.NeverStop)
  215. ids := []string{"foo", "bar", "baz", "qux", "zoo"}
  216. var fw *watch.FakeWatcher
  217. for i, id := range ids {
  218. if fw == nil {
  219. fw = <-createdFakes
  220. }
  221. sendingRV := strconv.FormatUint(uint64(i+2), 10)
  222. fw.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: id, ResourceVersion: sendingRV}})
  223. if sendingRV == "3" {
  224. // Inject a failure.
  225. fw.Stop()
  226. fw = nil
  227. }
  228. }
  229. // Verify we received the right ids with the right resource versions.
  230. for i, id := range ids {
  231. pod := Pop(s).(*api.Pod)
  232. if e, a := id, pod.Name; e != a {
  233. t.Errorf("%v: Expected %v, got %v", i, e, a)
  234. }
  235. if e, a := strconv.FormatUint(uint64(i+2), 10), pod.ResourceVersion; e != a {
  236. t.Errorf("%v: Expected %v, got %v", i, e, a)
  237. }
  238. }
  239. if len(expectedRVs) != 0 {
  240. t.Error("called watchStarter an unexpected number of times")
  241. }
  242. }
  243. func TestReflectorListAndWatchWithErrors(t *testing.T) {
  244. mkPod := func(id string, rv string) *api.Pod {
  245. return &api.Pod{ObjectMeta: api.ObjectMeta{Name: id, ResourceVersion: rv}}
  246. }
  247. mkList := func(rv string, pods ...*api.Pod) *api.PodList {
  248. list := &api.PodList{ListMeta: unversioned.ListMeta{ResourceVersion: rv}}
  249. for _, pod := range pods {
  250. list.Items = append(list.Items, *pod)
  251. }
  252. return list
  253. }
  254. table := []struct {
  255. list *api.PodList
  256. listErr error
  257. events []watch.Event
  258. watchErr error
  259. }{
  260. {
  261. list: mkList("1"),
  262. events: []watch.Event{
  263. {Type: watch.Added, Object: mkPod("foo", "2")},
  264. {Type: watch.Added, Object: mkPod("bar", "3")},
  265. },
  266. }, {
  267. list: mkList("3", mkPod("foo", "2"), mkPod("bar", "3")),
  268. events: []watch.Event{
  269. {Type: watch.Deleted, Object: mkPod("foo", "4")},
  270. {Type: watch.Added, Object: mkPod("qux", "5")},
  271. },
  272. }, {
  273. listErr: fmt.Errorf("a list error"),
  274. }, {
  275. list: mkList("5", mkPod("bar", "3"), mkPod("qux", "5")),
  276. watchErr: fmt.Errorf("a watch error"),
  277. }, {
  278. list: mkList("5", mkPod("bar", "3"), mkPod("qux", "5")),
  279. events: []watch.Event{
  280. {Type: watch.Added, Object: mkPod("baz", "6")},
  281. },
  282. }, {
  283. list: mkList("6", mkPod("bar", "3"), mkPod("qux", "5"), mkPod("baz", "6")),
  284. },
  285. }
  286. s := NewFIFO(MetaNamespaceKeyFunc)
  287. for line, item := range table {
  288. if item.list != nil {
  289. // Test that the list is what currently exists in the store.
  290. current := s.List()
  291. checkMap := map[string]string{}
  292. for _, item := range current {
  293. pod := item.(*api.Pod)
  294. checkMap[pod.Name] = pod.ResourceVersion
  295. }
  296. for _, pod := range item.list.Items {
  297. if e, a := pod.ResourceVersion, checkMap[pod.Name]; e != a {
  298. t.Errorf("%v: expected %v, got %v for pod %v", line, e, a, pod.Name)
  299. }
  300. }
  301. if e, a := len(item.list.Items), len(checkMap); e != a {
  302. t.Errorf("%v: expected %v, got %v", line, e, a)
  303. }
  304. }
  305. watchRet, watchErr := item.events, item.watchErr
  306. lw := &testLW{
  307. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  308. if watchErr != nil {
  309. return nil, watchErr
  310. }
  311. watchErr = fmt.Errorf("second watch")
  312. fw := watch.NewFake()
  313. go func() {
  314. for _, e := range watchRet {
  315. fw.Action(e.Type, e.Object)
  316. }
  317. fw.Stop()
  318. }()
  319. return fw, nil
  320. },
  321. ListFunc: func() (runtime.Object, error) {
  322. return item.list, item.listErr
  323. },
  324. }
  325. r := NewReflector(lw, &api.Pod{}, s, 0)
  326. r.ListAndWatch(wait.NeverStop)
  327. }
  328. }
  329. func TestReflectorResync(t *testing.T) {
  330. iteration := 0
  331. stopCh := make(chan struct{})
  332. rerr := errors.New("expected resync reached")
  333. s := &FakeCustomStore{
  334. ResyncFunc: func() error {
  335. iteration++
  336. if iteration == 2 {
  337. return rerr
  338. }
  339. return nil
  340. },
  341. }
  342. lw := &testLW{
  343. WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
  344. fw := watch.NewFake()
  345. return fw, nil
  346. },
  347. ListFunc: func() (runtime.Object, error) {
  348. return &api.PodList{ListMeta: unversioned.ListMeta{ResourceVersion: "0"}}, nil
  349. },
  350. }
  351. resyncPeriod := 1 * time.Millisecond
  352. r := NewReflector(lw, &api.Pod{}, s, resyncPeriod)
  353. if err := r.ListAndWatch(stopCh); err != nil {
  354. // error from Resync is not propaged up to here.
  355. t.Errorf("expected error %v", err)
  356. }
  357. if iteration != 2 {
  358. t.Errorf("exactly 2 iterations were expected, got: %v", iteration)
  359. }
  360. }