123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- /*
- Copyright 2019 The Vitess Authors.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package stats
- import (
- "encoding/json"
- "math"
- "sync"
- "time"
- )
- var timeNow = time.Now
- // CountTracker defines the interface that needs to
- // be supported by a variable for being tracked by
- // Rates.
- type CountTracker interface {
- // Counts returns a map which maps each category to a count.
- // Subsequent calls must return a monotonously increasing count for the same
- // category.
- // Optionally, an implementation may include the "All" category which has
- // the total count across all categories (e.g. timing.go does this).
- Counts() map[string]int64
- }
- // wrappedCountTracker implements the CountTracker interface.
- // It is used in multidimensional.go to publish specific, one-dimensional
- // counters.
- type wrappedCountTracker struct {
- f func() map[string]int64
- }
- func (t wrappedCountTracker) Counts() map[string]int64 { return t.f() }
- // Rates is capable of reporting the rate (typically QPS)
- // for any variable that satisfies the CountTracker interface.
- type Rates struct {
- // mu guards all fields.
- mu sync.Mutex
- timeStamps *RingInt64
- counts map[string]*RingInt64
- countTracker CountTracker
- samples int
- interval time.Duration
- // previousTotalCount is the total number of counts (across all categories)
- // seen in the last sampling interval.
- // It's used to calculate the latest total rate.
- previousTotalCount int64
- // timestampLastSampling is the time the periodic sampling was run last.
- timestampLastSampling time.Time
- // totalRate is the rate of total counts per second seen in the latest
- // sampling interval e.g. 100 queries / 5 seconds sampling interval = 20 QPS.
- totalRate float64
- }
- // NewRates reports rolling rate information for countTracker. samples specifies
- // the number of samples to report, and interval specifies the time interval
- // between samples. The minimum interval is 1 second.
- // If passing the special value of -1s as interval, we don't snapshot.
- // (use this for tests).
- func NewRates(name string, countTracker CountTracker, samples int, interval time.Duration) *Rates {
- if interval < 1*time.Second && interval != -1*time.Second {
- panic("interval too small")
- }
- rt := &Rates{
- timeStamps: NewRingInt64(samples + 1),
- counts: make(map[string]*RingInt64),
- countTracker: countTracker,
- samples: samples + 1,
- interval: interval,
- timestampLastSampling: timeNow(),
- }
- if name != "" {
- publish(name, rt)
- }
- if interval > 0 {
- go rt.track()
- }
- return rt
- }
- func (rt *Rates) track() {
- for {
- rt.snapshot()
- <-time.After(rt.interval)
- }
- }
- func (rt *Rates) snapshot() {
- rt.mu.Lock()
- defer rt.mu.Unlock()
- now := timeNow()
- rt.timeStamps.Add(now.UnixNano())
- // Record current count for each category.
- var totalCount int64
- for k, v := range rt.countTracker.Counts() {
- if k != "All" {
- // Include call categories except "All" (which is returned by the
- // "Timer.Counts()" implementation) to avoid double counting.
- totalCount += v
- }
- if values, ok := rt.counts[k]; ok {
- values.Add(v)
- } else {
- rt.counts[k] = NewRingInt64(rt.samples)
- rt.counts[k].Add(0)
- rt.counts[k].Add(v)
- }
- }
- // Calculate current total rate.
- // NOTE: We assume that every category with a non-zero value, which was
- // tracked in "rt.previousTotalCount" in a previous sampling interval, is
- // tracked in the current sampling interval in "totalCount" as well.
- // (I.e. categories and their count must not "disappear" in
- // "rt.countTracker.Counts()".)
- durationSeconds := now.Sub(rt.timestampLastSampling).Seconds()
- rate := float64(totalCount-rt.previousTotalCount) / durationSeconds
- // Round rate with a precision of 0.1.
- rt.totalRate = math.Floor(rate*10+0.5) / 10
- rt.previousTotalCount = totalCount
- rt.timestampLastSampling = now
- }
- // Get returns for each category (string) its latest rates (up to X values
- // where X is the configured number of samples of the Rates struct).
- // Rates are ordered from least recent (index 0) to most recent (end of slice).
- func (rt *Rates) Get() (rateMap map[string][]float64) {
- rt.mu.Lock()
- defer rt.mu.Unlock()
- rateMap = make(map[string][]float64)
- timeStamps := rt.timeStamps.Values()
- if len(timeStamps) <= 1 {
- return
- }
- for k, v := range rt.counts {
- rateMap[k] = make([]float64, len(timeStamps)-1)
- values := v.Values()
- valueIndex := len(values) - 1
- for i := len(timeStamps) - 1; i > 0; i-- {
- if valueIndex <= 0 {
- rateMap[k][i-1] = 0
- continue
- }
- elapsed := float64((timeStamps[i] - timeStamps[i-1]) / 1e9)
- rateMap[k][i-1] = float64(values[valueIndex]-values[valueIndex-1]) / elapsed
- valueIndex--
- }
- }
- return
- }
- // TotalRate returns the current total rate (counted across categories).
- func (rt *Rates) TotalRate() float64 {
- rt.mu.Lock()
- defer rt.mu.Unlock()
- return rt.totalRate
- }
- func (rt *Rates) String() string {
- data, err := json.Marshal(rt.Get())
- if err != nil {
- data, _ = json.Marshal(err.Error())
- }
- return string(data)
- }
- type RatesFunc struct {
- F func() map[string][]float64
- help string
- }
- func NewRateFunc(name string, help string, f func() map[string][]float64) *RatesFunc {
- c := &RatesFunc{
- F: f,
- help: help,
- }
- if name != "" {
- publish(name, c)
- }
- return c
- }
- func (rf *RatesFunc) Help() string {
- return rf.help
- }
- func (rf *RatesFunc) String() string {
- data, err := json.Marshal(rf.F())
- if err != nil {
- data, _ = json.Marshal(err.Error())
- }
- return string(data)
- }
|