endpoint.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package notifications
  2. import (
  3. "net/http"
  4. "time"
  5. )
  6. // EndpointConfig covers the optional configuration parameters for an active
  7. // endpoint.
  8. type EndpointConfig struct {
  9. Headers http.Header
  10. Timeout time.Duration
  11. Threshold int
  12. Backoff time.Duration
  13. }
  14. // defaults set any zero-valued fields to a reasonable default.
  15. func (ec *EndpointConfig) defaults() {
  16. if ec.Timeout <= 0 {
  17. ec.Timeout = time.Second
  18. }
  19. if ec.Threshold <= 0 {
  20. ec.Threshold = 10
  21. }
  22. if ec.Backoff <= 0 {
  23. ec.Backoff = time.Second
  24. }
  25. }
  26. // Endpoint is a reliable, queued, thread-safe sink that notify external http
  27. // services when events are written. Writes are non-blocking and always
  28. // succeed for callers but events may be queued internally.
  29. type Endpoint struct {
  30. Sink
  31. url string
  32. name string
  33. EndpointConfig
  34. metrics *safeMetrics
  35. }
  36. // NewEndpoint returns a running endpoint, ready to receive events.
  37. func NewEndpoint(name, url string, config EndpointConfig) *Endpoint {
  38. var endpoint Endpoint
  39. endpoint.name = name
  40. endpoint.url = url
  41. endpoint.EndpointConfig = config
  42. endpoint.defaults()
  43. endpoint.metrics = newSafeMetrics()
  44. // Configures the inmemory queue, retry, http pipeline.
  45. endpoint.Sink = newHTTPSink(
  46. endpoint.url, endpoint.Timeout, endpoint.Headers,
  47. endpoint.metrics.httpStatusListener())
  48. endpoint.Sink = newRetryingSink(endpoint.Sink, endpoint.Threshold, endpoint.Backoff)
  49. endpoint.Sink = newEventQueue(endpoint.Sink, endpoint.metrics.eventQueueListener())
  50. register(&endpoint)
  51. return &endpoint
  52. }
  53. // Name returns the name of the endpoint, generally used for debugging.
  54. func (e *Endpoint) Name() string {
  55. return e.name
  56. }
  57. // URL returns the url of the endpoint.
  58. func (e *Endpoint) URL() string {
  59. return e.url
  60. }
  61. // ReadMetrics populates em with metrics from the endpoint.
  62. func (e *Endpoint) ReadMetrics(em *EndpointMetrics) {
  63. e.metrics.Lock()
  64. defer e.metrics.Unlock()
  65. *em = e.metrics.EndpointMetrics
  66. // Map still need to copied in a threadsafe manner.
  67. em.Statuses = make(map[string]int)
  68. for k, v := range e.metrics.Statuses {
  69. em.Statuses[k] = v
  70. }
  71. }