acker_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. // Copyright 2016 Google Inc. All Rights Reserved.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package pubsub
  15. import (
  16. "reflect"
  17. "sort"
  18. "testing"
  19. "time"
  20. "golang.org/x/net/context"
  21. )
  22. func TestAcker(t *testing.T) {
  23. tick := make(chan time.Time)
  24. s := &testService{acknowledgeCalled: make(chan acknowledgeCall)}
  25. c := &Client{projectID: "projid", s: s}
  26. processed := make(chan string, 10)
  27. acker := &acker{
  28. Client: c,
  29. Ctx: context.Background(),
  30. Sub: "subname",
  31. AckTick: tick,
  32. Notify: func(ackID string) { processed <- ackID },
  33. }
  34. acker.Start()
  35. checkAckProcessed := func(ackIDs []string) {
  36. got := <-s.acknowledgeCalled
  37. sort.Strings(got.ackIDs)
  38. want := acknowledgeCall{
  39. subName: "subname",
  40. ackIDs: ackIDs,
  41. }
  42. if !reflect.DeepEqual(got, want) {
  43. t.Errorf("acknowledge: got:\n%v\nwant:\n%v", got, want)
  44. }
  45. }
  46. acker.Ack("a")
  47. acker.Ack("b")
  48. tick <- time.Time{}
  49. checkAckProcessed([]string{"a", "b"})
  50. acker.Ack("c")
  51. tick <- time.Time{}
  52. checkAckProcessed([]string{"c"})
  53. acker.Stop()
  54. // all IDS should have been sent to processed.
  55. close(processed)
  56. processedIDs := []string{}
  57. for id := range processed {
  58. processedIDs = append(processedIDs, id)
  59. }
  60. sort.Strings(processedIDs)
  61. want := []string{"a", "b", "c"}
  62. if !reflect.DeepEqual(processedIDs, want) {
  63. t.Errorf("acker processed: got:\n%v\nwant:\n%v", processedIDs, want)
  64. }
  65. }
  66. // TestAckerStop checks that Stop blocks until all ackIDs have been acked.
  67. func TestAckerStop(t *testing.T) {
  68. tick := make(chan time.Time)
  69. s := &testService{acknowledgeCalled: make(chan acknowledgeCall, 10)}
  70. c := &Client{projectID: "projid", s: s}
  71. processed := make(chan string)
  72. acker := &acker{
  73. Client: c,
  74. Ctx: context.Background(),
  75. Sub: "subname",
  76. AckTick: tick,
  77. Notify: func(ackID string) { processed <- ackID },
  78. }
  79. acker.Start()
  80. stopped := make(chan struct{})
  81. // Add an ackID so that acker.Stop will not return immediately.
  82. acker.Ack("a")
  83. go func() {
  84. acker.Stop()
  85. stopped <- struct{}{}
  86. }()
  87. // If acker,Stop fails to block, stopped should have been written to by the time
  88. // this sleep completes.
  89. time.Sleep(time.Millisecond)
  90. // Receiving from processed should cause Stop to subsequently return,
  91. // so it should never be possible to read from stopped before
  92. // processed.
  93. select {
  94. case <-processed:
  95. case <-stopped:
  96. t.Errorf("acker.Stop returned before cleanup was complete")
  97. case <-time.After(time.Millisecond):
  98. t.Errorf("send to processed never arrived")
  99. }
  100. select {
  101. case <-stopped:
  102. case <-time.After(time.Millisecond):
  103. t.Errorf("acker.Stop never returned")
  104. }
  105. }