sinks.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package notifications
  2. import (
  3. "container/list"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "github.com/Sirupsen/logrus"
  8. )
  9. // NOTE(stevvooe): This file contains definitions for several utility sinks.
  10. // Typically, the broadcaster is the only sink that should be required
  11. // externally, but others are suitable for export if the need arises. Albeit,
  12. // the tight integration with endpoint metrics should be removed.
  13. // Broadcaster sends events to multiple, reliable Sinks. The goal of this
  14. // component is to dispatch events to configured endpoints. Reliability can be
  15. // provided by wrapping incoming sinks.
  16. type Broadcaster struct {
  17. sinks []Sink
  18. events chan []Event
  19. closed chan chan struct{}
  20. }
  21. // NewBroadcaster ...
  22. // Add appends one or more sinks to the list of sinks. The broadcaster
  23. // behavior will be affected by the properties of the sink. Generally, the
  24. // sink should accept all messages and deal with reliability on its own. Use
  25. // of EventQueue and RetryingSink should be used here.
  26. func NewBroadcaster(sinks ...Sink) *Broadcaster {
  27. b := Broadcaster{
  28. sinks: sinks,
  29. events: make(chan []Event),
  30. closed: make(chan chan struct{}),
  31. }
  32. // Start the broadcaster
  33. go b.run()
  34. return &b
  35. }
  36. // Write accepts a block of events to be dispatched to all sinks. This method
  37. // will never fail and should never block (hopefully!). The caller cedes the
  38. // slice memory to the broadcaster and should not modify it after calling
  39. // write.
  40. func (b *Broadcaster) Write(events ...Event) error {
  41. select {
  42. case b.events <- events:
  43. case <-b.closed:
  44. return ErrSinkClosed
  45. }
  46. return nil
  47. }
  48. // Close the broadcaster, ensuring that all messages are flushed to the
  49. // underlying sink before returning.
  50. func (b *Broadcaster) Close() error {
  51. logrus.Infof("broadcaster: closing")
  52. select {
  53. case <-b.closed:
  54. // already closed
  55. return fmt.Errorf("broadcaster: already closed")
  56. default:
  57. // do a little chan handoff dance to synchronize closing
  58. closed := make(chan struct{})
  59. b.closed <- closed
  60. close(b.closed)
  61. <-closed
  62. return nil
  63. }
  64. }
  65. // run is the main broadcast loop, started when the broadcaster is created.
  66. // Under normal conditions, it waits for events on the event channel. After
  67. // Close is called, this goroutine will exit.
  68. func (b *Broadcaster) run() {
  69. for {
  70. select {
  71. case block := <-b.events:
  72. for _, sink := range b.sinks {
  73. if err := sink.Write(block...); err != nil {
  74. logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err)
  75. }
  76. }
  77. case closing := <-b.closed:
  78. // close all the underlying sinks
  79. for _, sink := range b.sinks {
  80. if err := sink.Close(); err != nil {
  81. logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err)
  82. }
  83. }
  84. closing <- struct{}{}
  85. logrus.Debugf("broadcaster: closed")
  86. return
  87. }
  88. }
  89. }
  90. // eventQueue accepts all messages into a queue for asynchronous consumption
  91. // by a sink. It is unbounded and thread safe but the sink must be reliable or
  92. // events will be dropped.
  93. type eventQueue struct {
  94. sink Sink
  95. events *list.List
  96. listeners []eventQueueListener
  97. cond *sync.Cond
  98. mu sync.Mutex
  99. closed bool
  100. }
  101. // eventQueueListener is called when various events happen on the queue.
  102. type eventQueueListener interface {
  103. ingress(events ...Event)
  104. egress(events ...Event)
  105. }
  106. // newEventQueue returns a queue to the provided sink. If the updater is non-
  107. // nil, it will be called to update pending metrics on ingress and egress.
  108. func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue {
  109. eq := eventQueue{
  110. sink: sink,
  111. events: list.New(),
  112. listeners: listeners,
  113. }
  114. eq.cond = sync.NewCond(&eq.mu)
  115. go eq.run()
  116. return &eq
  117. }
  118. // Write accepts the events into the queue, only failing if the queue has
  119. // beend closed.
  120. func (eq *eventQueue) Write(events ...Event) error {
  121. eq.mu.Lock()
  122. defer eq.mu.Unlock()
  123. if eq.closed {
  124. return ErrSinkClosed
  125. }
  126. for _, listener := range eq.listeners {
  127. listener.ingress(events...)
  128. }
  129. eq.events.PushBack(events)
  130. eq.cond.Signal() // signal waiters
  131. return nil
  132. }
  133. // Close shutsdown the event queue, flushing
  134. func (eq *eventQueue) Close() error {
  135. eq.mu.Lock()
  136. defer eq.mu.Unlock()
  137. if eq.closed {
  138. return fmt.Errorf("eventqueue: already closed")
  139. }
  140. // set closed flag
  141. eq.closed = true
  142. eq.cond.Signal() // signal flushes queue
  143. eq.cond.Wait() // wait for signal from last flush
  144. return eq.sink.Close()
  145. }
  146. // run is the main goroutine to flush events to the target sink.
  147. func (eq *eventQueue) run() {
  148. for {
  149. block := eq.next()
  150. if block == nil {
  151. return // nil block means event queue is closed.
  152. }
  153. if err := eq.sink.Write(block...); err != nil {
  154. logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
  155. }
  156. for _, listener := range eq.listeners {
  157. listener.egress(block...)
  158. }
  159. }
  160. }
  161. // next encompasses the critical section of the run loop. When the queue is
  162. // empty, it will block on the condition. If new data arrives, it will wake
  163. // and return a block. When closed, a nil slice will be returned.
  164. func (eq *eventQueue) next() []Event {
  165. eq.mu.Lock()
  166. defer eq.mu.Unlock()
  167. for eq.events.Len() < 1 {
  168. if eq.closed {
  169. eq.cond.Broadcast()
  170. return nil
  171. }
  172. eq.cond.Wait()
  173. }
  174. front := eq.events.Front()
  175. block := front.Value.([]Event)
  176. eq.events.Remove(front)
  177. return block
  178. }
  179. // retryingSink retries the write until success or an ErrSinkClosed is
  180. // returned. Underlying sink must have p > 0 of succeeding or the sink will
  181. // block. Internally, it is a circuit breaker retries to manage reset.
  182. // Concurrent calls to a retrying sink are serialized through the sink,
  183. // meaning that if one is in-flight, another will not proceed.
  184. type retryingSink struct {
  185. mu sync.Mutex
  186. sink Sink
  187. closed bool
  188. // circuit breaker heuristics
  189. failures struct {
  190. threshold int
  191. recent int
  192. last time.Time
  193. backoff time.Duration // time after which we retry after failure.
  194. }
  195. }
  196. type retryingSinkListener interface {
  197. active(events ...Event)
  198. retry(events ...Event)
  199. }
  200. // TODO(stevvooe): We are using circuit break here, which actually doesn't
  201. // make a whole lot of sense for this use case, since we always retry. Move
  202. // this to use bounded exponential backoff.
  203. // newRetryingSink returns a sink that will retry writes to a sink, backing
  204. // off on failure. Parameters threshold and backoff adjust the behavior of the
  205. // circuit breaker.
  206. func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink {
  207. rs := &retryingSink{
  208. sink: sink,
  209. }
  210. rs.failures.threshold = threshold
  211. rs.failures.backoff = backoff
  212. return rs
  213. }
  214. // Write attempts to flush the events to the downstream sink until it succeeds
  215. // or the sink is closed.
  216. func (rs *retryingSink) Write(events ...Event) error {
  217. rs.mu.Lock()
  218. defer rs.mu.Unlock()
  219. retry:
  220. if rs.closed {
  221. return ErrSinkClosed
  222. }
  223. if !rs.proceed() {
  224. logrus.Warnf("%v encountered too many errors, backing off", rs.sink)
  225. rs.wait(rs.failures.backoff)
  226. goto retry
  227. }
  228. if err := rs.write(events...); err != nil {
  229. if err == ErrSinkClosed {
  230. // terminal!
  231. return err
  232. }
  233. logrus.Errorf("retryingsink: error writing events: %v, retrying", err)
  234. goto retry
  235. }
  236. return nil
  237. }
  238. // Close closes the sink and the underlying sink.
  239. func (rs *retryingSink) Close() error {
  240. rs.mu.Lock()
  241. defer rs.mu.Unlock()
  242. if rs.closed {
  243. return fmt.Errorf("retryingsink: already closed")
  244. }
  245. rs.closed = true
  246. return rs.sink.Close()
  247. }
  248. // write provides a helper that dispatches failure and success properly. Used
  249. // by write as the single-flight write call.
  250. func (rs *retryingSink) write(events ...Event) error {
  251. if err := rs.sink.Write(events...); err != nil {
  252. rs.failure()
  253. return err
  254. }
  255. rs.reset()
  256. return nil
  257. }
  258. // wait backoff time against the sink, unlocking so others can proceed. Should
  259. // only be called by methods that currently have the mutex.
  260. func (rs *retryingSink) wait(backoff time.Duration) {
  261. rs.mu.Unlock()
  262. defer rs.mu.Lock()
  263. // backoff here
  264. time.Sleep(backoff)
  265. }
  266. // reset marks a successful call.
  267. func (rs *retryingSink) reset() {
  268. rs.failures.recent = 0
  269. rs.failures.last = time.Time{}
  270. }
  271. // failure records a failure.
  272. func (rs *retryingSink) failure() {
  273. rs.failures.recent++
  274. rs.failures.last = time.Now().UTC()
  275. }
  276. // proceed returns true if the call should proceed based on circuit breaker
  277. // heuristics.
  278. func (rs *retryingSink) proceed() bool {
  279. return rs.failures.recent < rs.failures.threshold ||
  280. time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff))
  281. }