123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- package notifications
- import (
- "expvar"
- "fmt"
- "net/http"
- "sync"
- )
- // EndpointMetrics track various actions taken by the endpoint, typically by
- // number of events. The goal of this to export it via expvar but we may find
- // some other future solution to be better.
- type EndpointMetrics struct {
- Pending int // events pending in queue
- Events int // total events incoming
- Successes int // total events written successfully
- Failures int // total events failed
- Errors int // total events errored
- Statuses map[string]int // status code histogram, per call event
- }
- // safeMetrics guards the metrics implementation with a lock and provides a
- // safe update function.
- type safeMetrics struct {
- EndpointMetrics
- sync.Mutex // protects statuses map
- }
- // newSafeMetrics returns safeMetrics with map allocated.
- func newSafeMetrics() *safeMetrics {
- var sm safeMetrics
- sm.Statuses = make(map[string]int)
- return &sm
- }
- // httpStatusListener returns the listener for the http sink that updates the
- // relevant counters.
- func (sm *safeMetrics) httpStatusListener() httpStatusListener {
- return &endpointMetricsHTTPStatusListener{
- safeMetrics: sm,
- }
- }
- // eventQueueListener returns a listener that maintains queue related counters.
- func (sm *safeMetrics) eventQueueListener() eventQueueListener {
- return &endpointMetricsEventQueueListener{
- safeMetrics: sm,
- }
- }
- // endpointMetricsHTTPStatusListener increments counters related to http sinks
- // for the relevant events.
- type endpointMetricsHTTPStatusListener struct {
- *safeMetrics
- }
- var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}
- func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) {
- emsl.safeMetrics.Lock()
- defer emsl.safeMetrics.Unlock()
- emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
- emsl.Successes += len(events)
- }
- func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
- emsl.safeMetrics.Lock()
- defer emsl.safeMetrics.Unlock()
- emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
- emsl.Failures += len(events)
- }
- func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
- emsl.safeMetrics.Lock()
- defer emsl.safeMetrics.Unlock()
- emsl.Errors += len(events)
- }
- // endpointMetricsEventQueueListener maintains the incoming events counter and
- // the queues pending count.
- type endpointMetricsEventQueueListener struct {
- *safeMetrics
- }
- func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
- eqc.Lock()
- defer eqc.Unlock()
- eqc.Events += len(events)
- eqc.Pending += len(events)
- }
- func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
- eqc.Lock()
- defer eqc.Unlock()
- eqc.Pending -= len(events)
- }
- // endpoints is global registry of endpoints used to report metrics to expvar
- var endpoints struct {
- registered []*Endpoint
- mu sync.Mutex
- }
- // register places the endpoint into expvar so that stats are tracked.
- func register(e *Endpoint) {
- endpoints.mu.Lock()
- defer endpoints.mu.Unlock()
- endpoints.registered = append(endpoints.registered, e)
- }
- func init() {
- // NOTE(stevvooe): Setup registry metrics structure to report to expvar.
- // Ideally, we do more metrics through logging but we need some nice
- // realtime metrics for queue state for now.
- registry := expvar.Get("registry")
- if registry == nil {
- registry = expvar.NewMap("registry")
- }
- var notifications expvar.Map
- notifications.Init()
- notifications.Set("endpoints", expvar.Func(func() interface{} {
- endpoints.mu.Lock()
- defer endpoints.mu.Unlock()
- var names []interface{}
- for _, v := range endpoints.registered {
- var epjson struct {
- Name string `json:"name"`
- URL string `json:"url"`
- EndpointConfig
- Metrics EndpointMetrics
- }
- epjson.Name = v.Name()
- epjson.URL = v.URL()
- epjson.EndpointConfig = v.EndpointConfig
- v.ReadMetrics(&epjson.Metrics)
- names = append(names, epjson)
- }
- return names
- }))
- registry.(*expvar.Map).Set("notifications", ¬ifications)
- }
|