delta_fifo_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. /*
  2. Copyright 2014 The Kubernetes Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package cache
  14. import (
  15. "fmt"
  16. "reflect"
  17. "testing"
  18. "time"
  19. )
  20. // helper function to reduce stuttering
  21. func testPop(f *DeltaFIFO) testFifoObject {
  22. return Pop(f).(Deltas).Newest().Object.(testFifoObject)
  23. }
  24. // keyLookupFunc adapts a raw function to be a KeyLookup.
  25. type keyLookupFunc func() []testFifoObject
  26. // ListKeys just calls kl.
  27. func (kl keyLookupFunc) ListKeys() []string {
  28. result := []string{}
  29. for _, fifoObj := range kl() {
  30. result = append(result, fifoObj.name)
  31. }
  32. return result
  33. }
  34. // GetByKey returns the key if it exists in the list returned by kl.
  35. func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) {
  36. for _, v := range kl() {
  37. if v.name == key {
  38. return v, true, nil
  39. }
  40. }
  41. return nil, false, nil
  42. }
  43. func TestDeltaFIFO_basic(t *testing.T) {
  44. f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
  45. const amount = 500
  46. go func() {
  47. for i := 0; i < amount; i++ {
  48. f.Add(mkFifoObj(string([]rune{'a', rune(i)}), i+1))
  49. }
  50. }()
  51. go func() {
  52. for u := uint64(0); u < amount; u++ {
  53. f.Add(mkFifoObj(string([]rune{'b', rune(u)}), u+1))
  54. }
  55. }()
  56. lastInt := int(0)
  57. lastUint := uint64(0)
  58. for i := 0; i < amount*2; i++ {
  59. switch obj := testPop(f).val.(type) {
  60. case int:
  61. if obj <= lastInt {
  62. t.Errorf("got %v (int) out of order, last was %v", obj, lastInt)
  63. }
  64. lastInt = obj
  65. case uint64:
  66. if obj <= lastUint {
  67. t.Errorf("got %v (uint) out of order, last was %v", obj, lastUint)
  68. } else {
  69. lastUint = obj
  70. }
  71. default:
  72. t.Fatalf("unexpected type %#v", obj)
  73. }
  74. }
  75. }
  76. func TestDeltaFIFO_requeueOnPop(t *testing.T) {
  77. f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
  78. f.Add(mkFifoObj("foo", 10))
  79. _, err := f.Pop(func(obj interface{}) error {
  80. if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" {
  81. t.Fatalf("unexpected object: %#v", obj)
  82. }
  83. return ErrRequeue{Err: nil}
  84. })
  85. if err != nil {
  86. t.Fatalf("unexpected error: %v", err)
  87. }
  88. if _, ok, err := f.GetByKey("foo"); !ok || err != nil {
  89. t.Fatalf("object should have been requeued: %t %v", ok, err)
  90. }
  91. _, err = f.Pop(func(obj interface{}) error {
  92. if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" {
  93. t.Fatalf("unexpected object: %#v", obj)
  94. }
  95. return ErrRequeue{Err: fmt.Errorf("test error")}
  96. })
  97. if err == nil || err.Error() != "test error" {
  98. t.Fatalf("unexpected error: %v", err)
  99. }
  100. if _, ok, err := f.GetByKey("foo"); !ok || err != nil {
  101. t.Fatalf("object should have been requeued: %t %v", ok, err)
  102. }
  103. _, err = f.Pop(func(obj interface{}) error {
  104. if obj.(Deltas)[0].Object.(testFifoObject).name != "foo" {
  105. t.Fatalf("unexpected object: %#v", obj)
  106. }
  107. return nil
  108. })
  109. if err != nil {
  110. t.Fatalf("unexpected error: %v", err)
  111. }
  112. if _, ok, err := f.GetByKey("foo"); ok || err != nil {
  113. t.Fatalf("object should have been removed: %t %v", ok, err)
  114. }
  115. }
  116. func TestDeltaFIFO_compressorWorks(t *testing.T) {
  117. oldestTypes := []DeltaType{}
  118. f := NewDeltaFIFO(
  119. testFifoObjectKeyFunc,
  120. // This function just keeps the most recent delta
  121. // and puts deleted ones in the list.
  122. DeltaCompressorFunc(func(d Deltas) Deltas {
  123. if n := len(d); n > 1 {
  124. oldestTypes = append(oldestTypes, d[0].Type)
  125. d = d[1:]
  126. }
  127. return d
  128. }),
  129. nil,
  130. )
  131. f.Add(mkFifoObj("foo", 10))
  132. f.Update(mkFifoObj("foo", 12))
  133. f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0")
  134. f.Delete(mkFifoObj("foo", 22))
  135. f.Add(mkFifoObj("foo", 25)) // flush the last one out
  136. expect := []DeltaType{Added, Updated, Sync, Deleted}
  137. if e, a := expect, oldestTypes; !reflect.DeepEqual(e, a) {
  138. t.Errorf("Expected %#v, got %#v", e, a)
  139. }
  140. if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), Pop(f).(Deltas); !reflect.DeepEqual(e, a) {
  141. t.Fatalf("Expected %#v, got %#v", e, a)
  142. }
  143. }
  144. func TestDeltaFIFO_addUpdate(t *testing.T) {
  145. f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
  146. f.Add(mkFifoObj("foo", 10))
  147. f.Update(mkFifoObj("foo", 12))
  148. f.Delete(mkFifoObj("foo", 15))
  149. if e, a := []interface{}{mkFifoObj("foo", 15)}, f.List(); !reflect.DeepEqual(e, a) {
  150. t.Errorf("Expected %+v, got %+v", e, a)
  151. }
  152. if e, a := []string{"foo"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
  153. t.Errorf("Expected %+v, got %+v", e, a)
  154. }
  155. got := make(chan testFifoObject, 2)
  156. go func() {
  157. for {
  158. obj := testPop(f)
  159. t.Logf("got a thing %#v", obj)
  160. t.Logf("D len: %v", len(f.queue))
  161. got <- obj
  162. }
  163. }()
  164. first := <-got
  165. if e, a := 15, first.val; e != a {
  166. t.Errorf("Didn't get updated value (%v), got %v", e, a)
  167. }
  168. select {
  169. case unexpected := <-got:
  170. t.Errorf("Got second value %v", unexpected.val)
  171. case <-time.After(50 * time.Millisecond):
  172. }
  173. _, exists, _ := f.Get(mkFifoObj("foo", ""))
  174. if exists {
  175. t.Errorf("item did not get removed")
  176. }
  177. }
  178. func TestDeltaFIFO_enqueueingNoLister(t *testing.T) {
  179. f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
  180. f.Add(mkFifoObj("foo", 10))
  181. f.Update(mkFifoObj("bar", 15))
  182. f.Add(mkFifoObj("qux", 17))
  183. f.Delete(mkFifoObj("qux", 18))
  184. // This delete does not enqueue anything because baz doesn't exist.
  185. f.Delete(mkFifoObj("baz", 20))
  186. expectList := []int{10, 15, 18}
  187. for _, expect := range expectList {
  188. if e, a := expect, testPop(f).val; e != a {
  189. t.Errorf("Didn't get updated value (%v), got %v", e, a)
  190. }
  191. }
  192. if e, a := 0, len(f.items); e != a {
  193. t.Errorf("queue unexpectedly not empty: %v != %v\n%#v", e, a, f.items)
  194. }
  195. }
  196. func TestDeltaFIFO_enqueueingWithLister(t *testing.T) {
  197. f := NewDeltaFIFO(
  198. testFifoObjectKeyFunc,
  199. nil,
  200. keyLookupFunc(func() []testFifoObject {
  201. return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
  202. }),
  203. )
  204. f.Add(mkFifoObj("foo", 10))
  205. f.Update(mkFifoObj("bar", 15))
  206. // This delete does enqueue the deletion, because "baz" is in the key lister.
  207. f.Delete(mkFifoObj("baz", 20))
  208. expectList := []int{10, 15, 20}
  209. for _, expect := range expectList {
  210. if e, a := expect, testPop(f).val; e != a {
  211. t.Errorf("Didn't get updated value (%v), got %v", e, a)
  212. }
  213. }
  214. if e, a := 0, len(f.items); e != a {
  215. t.Errorf("queue unexpectedly not empty: %v != %v", e, a)
  216. }
  217. }
  218. func TestDeltaFIFO_addReplace(t *testing.T) {
  219. f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
  220. f.Add(mkFifoObj("foo", 10))
  221. f.Replace([]interface{}{mkFifoObj("foo", 15)}, "0")
  222. got := make(chan testFifoObject, 2)
  223. go func() {
  224. for {
  225. got <- testPop(f)
  226. }
  227. }()
  228. first := <-got
  229. if e, a := 15, first.val; e != a {
  230. t.Errorf("Didn't get updated value (%v), got %v", e, a)
  231. }
  232. select {
  233. case unexpected := <-got:
  234. t.Errorf("Got second value %v", unexpected.val)
  235. case <-time.After(50 * time.Millisecond):
  236. }
  237. _, exists, _ := f.Get(mkFifoObj("foo", ""))
  238. if exists {
  239. t.Errorf("item did not get removed")
  240. }
  241. }
  242. func TestDeltaFIFO_ResyncNonExisting(t *testing.T) {
  243. f := NewDeltaFIFO(
  244. testFifoObjectKeyFunc,
  245. nil,
  246. keyLookupFunc(func() []testFifoObject {
  247. return []testFifoObject{mkFifoObj("foo", 5)}
  248. }),
  249. )
  250. f.Delete(mkFifoObj("foo", 10))
  251. f.Resync()
  252. deltas := f.items["foo"]
  253. if len(deltas) != 1 {
  254. t.Fatalf("unexpected deltas length: %v", deltas)
  255. }
  256. if deltas[0].Type != Deleted {
  257. t.Errorf("unexpected delta: %v", deltas[0])
  258. }
  259. }
  260. func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) {
  261. f := NewDeltaFIFO(
  262. testFifoObjectKeyFunc,
  263. nil,
  264. keyLookupFunc(func() []testFifoObject {
  265. return []testFifoObject{}
  266. }),
  267. )
  268. f.Add(mkFifoObj("foo", 5))
  269. f.Delete(mkFifoObj("foo", 6))
  270. deltas := f.items["foo"]
  271. if len(deltas) != 2 {
  272. t.Fatalf("unexpected deltas length: %v", deltas)
  273. }
  274. if deltas[len(deltas)-1].Type != Deleted {
  275. t.Errorf("unexpected delta: %v", deltas[len(deltas)-1])
  276. }
  277. }
  278. func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
  279. f := NewDeltaFIFO(
  280. testFifoObjectKeyFunc,
  281. nil,
  282. keyLookupFunc(func() []testFifoObject {
  283. return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
  284. }),
  285. )
  286. f.Delete(mkFifoObj("baz", 10))
  287. f.Replace([]interface{}{mkFifoObj("foo", 5)}, "0")
  288. expectedList := []Deltas{
  289. {{Deleted, mkFifoObj("baz", 10)}},
  290. {{Sync, mkFifoObj("foo", 5)}},
  291. // Since "bar" didn't have a delete event and wasn't in the Replace list
  292. // it should get a tombstone key with the right Obj.
  293. {{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},
  294. }
  295. for _, expected := range expectedList {
  296. cur := Pop(f).(Deltas)
  297. if e, a := expected, cur; !reflect.DeepEqual(e, a) {
  298. t.Errorf("Expected %#v, got %#v", e, a)
  299. }
  300. }
  301. }
  302. func TestDeltaFIFO_detectLineJumpers(t *testing.T) {
  303. f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
  304. f.Add(mkFifoObj("foo", 10))
  305. f.Add(mkFifoObj("bar", 1))
  306. f.Add(mkFifoObj("foo", 11))
  307. f.Add(mkFifoObj("foo", 13))
  308. f.Add(mkFifoObj("zab", 30))
  309. if e, a := 13, testPop(f).val; a != e {
  310. t.Fatalf("expected %d, got %d", e, a)
  311. }
  312. f.Add(mkFifoObj("foo", 14)) // ensure foo doesn't jump back in line
  313. if e, a := 1, testPop(f).val; a != e {
  314. t.Fatalf("expected %d, got %d", e, a)
  315. }
  316. if e, a := 30, testPop(f).val; a != e {
  317. t.Fatalf("expected %d, got %d", e, a)
  318. }
  319. if e, a := 14, testPop(f).val; a != e {
  320. t.Fatalf("expected %d, got %d", e, a)
  321. }
  322. }
  323. func TestDeltaFIFO_addIfNotPresent(t *testing.T) {
  324. f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
  325. f.Add(mkFifoObj("b", 3))
  326. b3 := Pop(f)
  327. f.Add(mkFifoObj("c", 4))
  328. c4 := Pop(f)
  329. if e, a := 0, len(f.items); e != a {
  330. t.Fatalf("Expected %v, got %v items in queue", e, a)
  331. }
  332. f.Add(mkFifoObj("a", 1))
  333. f.Add(mkFifoObj("b", 2))
  334. f.AddIfNotPresent(b3)
  335. f.AddIfNotPresent(c4)
  336. if e, a := 3, len(f.items); a != e {
  337. t.Fatalf("expected queue length %d, got %d", e, a)
  338. }
  339. expectedValues := []int{1, 2, 4}
  340. for _, expected := range expectedValues {
  341. if actual := testPop(f).val; actual != expected {
  342. t.Fatalf("expected value %d, got %d", expected, actual)
  343. }
  344. }
  345. }
  346. func TestDeltaFIFO_KeyOf(t *testing.T) {
  347. f := DeltaFIFO{keyFunc: testFifoObjectKeyFunc}
  348. table := []struct {
  349. obj interface{}
  350. key string
  351. }{
  352. {obj: testFifoObject{name: "A"}, key: "A"},
  353. {obj: DeletedFinalStateUnknown{Key: "B", Obj: nil}, key: "B"},
  354. {obj: Deltas{{Object: testFifoObject{name: "C"}}}, key: "C"},
  355. {obj: Deltas{{Object: DeletedFinalStateUnknown{Key: "D", Obj: nil}}}, key: "D"},
  356. }
  357. for _, item := range table {
  358. got, err := f.KeyOf(item.obj)
  359. if err != nil {
  360. t.Errorf("Unexpected error for %q: %v", item.obj, err)
  361. continue
  362. }
  363. if e, a := item.key, got; e != a {
  364. t.Errorf("Expected %v, got %v", e, a)
  365. }
  366. }
  367. }
  368. func TestDeltaFIFO_HasSynced(t *testing.T) {
  369. tests := []struct {
  370. actions []func(f *DeltaFIFO)
  371. expectedSynced bool
  372. }{
  373. {
  374. actions: []func(f *DeltaFIFO){},
  375. expectedSynced: false,
  376. },
  377. {
  378. actions: []func(f *DeltaFIFO){
  379. func(f *DeltaFIFO) { f.Add(mkFifoObj("a", 1)) },
  380. },
  381. expectedSynced: true,
  382. },
  383. {
  384. actions: []func(f *DeltaFIFO){
  385. func(f *DeltaFIFO) { f.Replace([]interface{}{}, "0") },
  386. },
  387. expectedSynced: true,
  388. },
  389. {
  390. actions: []func(f *DeltaFIFO){
  391. func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
  392. },
  393. expectedSynced: false,
  394. },
  395. {
  396. actions: []func(f *DeltaFIFO){
  397. func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
  398. func(f *DeltaFIFO) { Pop(f) },
  399. },
  400. expectedSynced: false,
  401. },
  402. {
  403. actions: []func(f *DeltaFIFO){
  404. func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
  405. func(f *DeltaFIFO) { Pop(f) },
  406. func(f *DeltaFIFO) { Pop(f) },
  407. },
  408. expectedSynced: true,
  409. },
  410. }
  411. for i, test := range tests {
  412. f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
  413. for _, action := range test.actions {
  414. action(f)
  415. }
  416. if e, a := test.expectedSynced, f.HasSynced(); a != e {
  417. t.Errorf("test case %v failed, expected: %v , got %v", i, e, a)
  418. }
  419. }
  420. }