sinks_test.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package notifications
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "sync"
  6. "time"
  7. "github.com/Sirupsen/logrus"
  8. "testing"
  9. )
  10. func TestBroadcaster(t *testing.T) {
  11. const nEvents = 1000
  12. var sinks []Sink
  13. for i := 0; i < 10; i++ {
  14. sinks = append(sinks, &testSink{})
  15. }
  16. b := NewBroadcaster(sinks...)
  17. var block []Event
  18. var wg sync.WaitGroup
  19. for i := 1; i <= nEvents; i++ {
  20. block = append(block, createTestEvent("push", "library/test", "blob"))
  21. if i%10 == 0 && i > 0 {
  22. wg.Add(1)
  23. go func(block ...Event) {
  24. if err := b.Write(block...); err != nil {
  25. t.Fatalf("error writing block of length %d: %v", len(block), err)
  26. }
  27. wg.Done()
  28. }(block...)
  29. block = nil
  30. }
  31. }
  32. wg.Wait() // Wait until writes complete
  33. checkClose(t, b)
  34. // Iterate through the sinks and check that they all have the expected length.
  35. for _, sink := range sinks {
  36. ts := sink.(*testSink)
  37. ts.mu.Lock()
  38. defer ts.mu.Unlock()
  39. if len(ts.events) != nEvents {
  40. t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
  41. }
  42. if !ts.closed {
  43. t.Fatalf("sink should have been closed")
  44. }
  45. }
  46. }
  47. func TestEventQueue(t *testing.T) {
  48. const nevents = 1000
  49. var ts testSink
  50. metrics := newSafeMetrics()
  51. eq := newEventQueue(
  52. // delayed sync simulates destination slower than channel comms
  53. &delayedSink{
  54. Sink: &ts,
  55. delay: time.Millisecond * 1,
  56. }, metrics.eventQueueListener())
  57. var wg sync.WaitGroup
  58. var block []Event
  59. for i := 1; i <= nevents; i++ {
  60. block = append(block, createTestEvent("push", "library/test", "blob"))
  61. if i%10 == 0 && i > 0 {
  62. wg.Add(1)
  63. go func(block ...Event) {
  64. if err := eq.Write(block...); err != nil {
  65. t.Fatalf("error writing event block: %v", err)
  66. }
  67. wg.Done()
  68. }(block...)
  69. block = nil
  70. }
  71. }
  72. wg.Wait()
  73. checkClose(t, eq)
  74. ts.mu.Lock()
  75. defer ts.mu.Unlock()
  76. metrics.Lock()
  77. defer metrics.Unlock()
  78. if len(ts.events) != nevents {
  79. t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
  80. }
  81. if !ts.closed {
  82. t.Fatalf("sink should have been closed")
  83. }
  84. if metrics.Events != nevents {
  85. t.Fatalf("unexpected ingress count: %d != %d", metrics.Events, nevents)
  86. }
  87. if metrics.Pending != 0 {
  88. t.Fatalf("unexpected egress count: %d != %d", metrics.Pending, 0)
  89. }
  90. }
  91. func TestRetryingSink(t *testing.T) {
  92. // Make a sync that fails most of the time, ensuring that all the events
  93. // make it through.
  94. var ts testSink
  95. flaky := &flakySink{
  96. rate: 1.0, // start out always failing.
  97. Sink: &ts,
  98. }
  99. s := newRetryingSink(flaky, 3, 10*time.Millisecond)
  100. var wg sync.WaitGroup
  101. var block []Event
  102. for i := 1; i <= 100; i++ {
  103. block = append(block, createTestEvent("push", "library/test", "blob"))
  104. // Above 50, set the failure rate lower
  105. if i > 50 {
  106. s.mu.Lock()
  107. flaky.rate = 0.90
  108. s.mu.Unlock()
  109. }
  110. if i%10 == 0 && i > 0 {
  111. wg.Add(1)
  112. go func(block ...Event) {
  113. defer wg.Done()
  114. if err := s.Write(block...); err != nil {
  115. t.Fatalf("error writing event block: %v", err)
  116. }
  117. }(block...)
  118. block = nil
  119. }
  120. }
  121. wg.Wait()
  122. checkClose(t, s)
  123. ts.mu.Lock()
  124. defer ts.mu.Unlock()
  125. if len(ts.events) != 100 {
  126. t.Fatalf("events not propagated: %d != %d", len(ts.events), 100)
  127. }
  128. }
  129. type testSink struct {
  130. events []Event
  131. mu sync.Mutex
  132. closed bool
  133. }
  134. func (ts *testSink) Write(events ...Event) error {
  135. ts.mu.Lock()
  136. defer ts.mu.Unlock()
  137. ts.events = append(ts.events, events...)
  138. return nil
  139. }
  140. func (ts *testSink) Close() error {
  141. ts.mu.Lock()
  142. defer ts.mu.Unlock()
  143. ts.closed = true
  144. logrus.Infof("closing testSink")
  145. return nil
  146. }
  147. type delayedSink struct {
  148. Sink
  149. delay time.Duration
  150. }
  151. func (ds *delayedSink) Write(events ...Event) error {
  152. time.Sleep(ds.delay)
  153. return ds.Sink.Write(events...)
  154. }
  155. type flakySink struct {
  156. Sink
  157. rate float64
  158. }
  159. func (fs *flakySink) Write(events ...Event) error {
  160. if rand.Float64() < fs.rate {
  161. return fmt.Errorf("error writing %d events", len(events))
  162. }
  163. return fs.Sink.Write(events...)
  164. }
  165. func checkClose(t *testing.T, sink Sink) {
  166. if err := sink.Close(); err != nil {
  167. t.Fatalf("unexpected error closing: %v", err)
  168. }
  169. // second close should not crash but should return an error.
  170. if err := sink.Close(); err == nil {
  171. t.Fatalf("no error on double close")
  172. }
  173. // Write after closed should be an error
  174. if err := sink.Write([]Event{}...); err == nil {
  175. t.Fatalf("write after closed did not have an error")
  176. } else if err != ErrSinkClosed {
  177. t.Fatalf("error should be ErrSinkClosed")
  178. }
  179. }