metrics.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package notifications
  2. import (
  3. "expvar"
  4. "fmt"
  5. "net/http"
  6. "sync"
  7. )
  8. // EndpointMetrics track various actions taken by the endpoint, typically by
  9. // number of events. The goal of this to export it via expvar but we may find
  10. // some other future solution to be better.
  11. type EndpointMetrics struct {
  12. Pending int // events pending in queue
  13. Events int // total events incoming
  14. Successes int // total events written successfully
  15. Failures int // total events failed
  16. Errors int // total events errored
  17. Statuses map[string]int // status code histogram, per call event
  18. }
  19. // safeMetrics guards the metrics implementation with a lock and provides a
  20. // safe update function.
  21. type safeMetrics struct {
  22. EndpointMetrics
  23. sync.Mutex // protects statuses map
  24. }
  25. // newSafeMetrics returns safeMetrics with map allocated.
  26. func newSafeMetrics() *safeMetrics {
  27. var sm safeMetrics
  28. sm.Statuses = make(map[string]int)
  29. return &sm
  30. }
  31. // httpStatusListener returns the listener for the http sink that updates the
  32. // relevant counters.
  33. func (sm *safeMetrics) httpStatusListener() httpStatusListener {
  34. return &endpointMetricsHTTPStatusListener{
  35. safeMetrics: sm,
  36. }
  37. }
  38. // eventQueueListener returns a listener that maintains queue related counters.
  39. func (sm *safeMetrics) eventQueueListener() eventQueueListener {
  40. return &endpointMetricsEventQueueListener{
  41. safeMetrics: sm,
  42. }
  43. }
  44. // endpointMetricsHTTPStatusListener increments counters related to http sinks
  45. // for the relevant events.
  46. type endpointMetricsHTTPStatusListener struct {
  47. *safeMetrics
  48. }
  49. var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}
  50. func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) {
  51. emsl.safeMetrics.Lock()
  52. defer emsl.safeMetrics.Unlock()
  53. emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
  54. emsl.Successes += len(events)
  55. }
  56. func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
  57. emsl.safeMetrics.Lock()
  58. defer emsl.safeMetrics.Unlock()
  59. emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
  60. emsl.Failures += len(events)
  61. }
  62. func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
  63. emsl.safeMetrics.Lock()
  64. defer emsl.safeMetrics.Unlock()
  65. emsl.Errors += len(events)
  66. }
  67. // endpointMetricsEventQueueListener maintains the incoming events counter and
  68. // the queues pending count.
  69. type endpointMetricsEventQueueListener struct {
  70. *safeMetrics
  71. }
  72. func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
  73. eqc.Lock()
  74. defer eqc.Unlock()
  75. eqc.Events += len(events)
  76. eqc.Pending += len(events)
  77. }
  78. func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
  79. eqc.Lock()
  80. defer eqc.Unlock()
  81. eqc.Pending -= len(events)
  82. }
  83. // endpoints is global registry of endpoints used to report metrics to expvar
  84. var endpoints struct {
  85. registered []*Endpoint
  86. mu sync.Mutex
  87. }
  88. // register places the endpoint into expvar so that stats are tracked.
  89. func register(e *Endpoint) {
  90. endpoints.mu.Lock()
  91. defer endpoints.mu.Unlock()
  92. endpoints.registered = append(endpoints.registered, e)
  93. }
  94. func init() {
  95. // NOTE(stevvooe): Setup registry metrics structure to report to expvar.
  96. // Ideally, we do more metrics through logging but we need some nice
  97. // realtime metrics for queue state for now.
  98. registry := expvar.Get("registry")
  99. if registry == nil {
  100. registry = expvar.NewMap("registry")
  101. }
  102. var notifications expvar.Map
  103. notifications.Init()
  104. notifications.Set("endpoints", expvar.Func(func() interface{} {
  105. endpoints.mu.Lock()
  106. defer endpoints.mu.Unlock()
  107. var names []interface{}
  108. for _, v := range endpoints.registered {
  109. var epjson struct {
  110. Name string `json:"name"`
  111. URL string `json:"url"`
  112. EndpointConfig
  113. Metrics EndpointMetrics
  114. }
  115. epjson.Name = v.Name()
  116. epjson.URL = v.URL()
  117. epjson.EndpointConfig = v.EndpointConfig
  118. v.ReadMetrics(&epjson.Metrics)
  119. names = append(names, epjson)
  120. }
  121. return names
  122. }))
  123. registry.(*expvar.Map).Set("notifications", &notifications)
  124. }