rates.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. /*
  2. Copyright 2019 The Vitess Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package stats
  14. import (
  15. "encoding/json"
  16. "math"
  17. "sync"
  18. "time"
  19. )
  20. var timeNow = time.Now
  21. // CountTracker defines the interface that needs to
  22. // be supported by a variable for being tracked by
  23. // Rates.
  24. type CountTracker interface {
  25. // Counts returns a map which maps each category to a count.
  26. // Subsequent calls must return a monotonously increasing count for the same
  27. // category.
  28. // Optionally, an implementation may include the "All" category which has
  29. // the total count across all categories (e.g. timing.go does this).
  30. Counts() map[string]int64
  31. }
  32. // wrappedCountTracker implements the CountTracker interface.
  33. // It is used in multidimensional.go to publish specific, one-dimensional
  34. // counters.
  35. type wrappedCountTracker struct {
  36. f func() map[string]int64
  37. }
  38. func (t wrappedCountTracker) Counts() map[string]int64 { return t.f() }
  39. // Rates is capable of reporting the rate (typically QPS)
  40. // for any variable that satisfies the CountTracker interface.
  41. type Rates struct {
  42. // mu guards all fields.
  43. mu sync.Mutex
  44. timeStamps *RingInt64
  45. counts map[string]*RingInt64
  46. countTracker CountTracker
  47. samples int
  48. interval time.Duration
  49. // previousTotalCount is the total number of counts (across all categories)
  50. // seen in the last sampling interval.
  51. // It's used to calculate the latest total rate.
  52. previousTotalCount int64
  53. // timestampLastSampling is the time the periodic sampling was run last.
  54. timestampLastSampling time.Time
  55. // totalRate is the rate of total counts per second seen in the latest
  56. // sampling interval e.g. 100 queries / 5 seconds sampling interval = 20 QPS.
  57. totalRate float64
  58. }
  59. // NewRates reports rolling rate information for countTracker. samples specifies
  60. // the number of samples to report, and interval specifies the time interval
  61. // between samples. The minimum interval is 1 second.
  62. // If passing the special value of -1s as interval, we don't snapshot.
  63. // (use this for tests).
  64. func NewRates(name string, countTracker CountTracker, samples int, interval time.Duration) *Rates {
  65. if interval < 1*time.Second && interval != -1*time.Second {
  66. panic("interval too small")
  67. }
  68. rt := &Rates{
  69. timeStamps: NewRingInt64(samples + 1),
  70. counts: make(map[string]*RingInt64),
  71. countTracker: countTracker,
  72. samples: samples + 1,
  73. interval: interval,
  74. timestampLastSampling: timeNow(),
  75. }
  76. if name != "" {
  77. publish(name, rt)
  78. }
  79. if interval > 0 {
  80. go rt.track()
  81. }
  82. return rt
  83. }
  84. func (rt *Rates) track() {
  85. for {
  86. rt.snapshot()
  87. <-time.After(rt.interval)
  88. }
  89. }
  90. func (rt *Rates) snapshot() {
  91. rt.mu.Lock()
  92. defer rt.mu.Unlock()
  93. now := timeNow()
  94. rt.timeStamps.Add(now.UnixNano())
  95. // Record current count for each category.
  96. var totalCount int64
  97. for k, v := range rt.countTracker.Counts() {
  98. if k != "All" {
  99. // Include call categories except "All" (which is returned by the
  100. // "Timer.Counts()" implementation) to avoid double counting.
  101. totalCount += v
  102. }
  103. if values, ok := rt.counts[k]; ok {
  104. values.Add(v)
  105. } else {
  106. rt.counts[k] = NewRingInt64(rt.samples)
  107. rt.counts[k].Add(0)
  108. rt.counts[k].Add(v)
  109. }
  110. }
  111. // Calculate current total rate.
  112. // NOTE: We assume that every category with a non-zero value, which was
  113. // tracked in "rt.previousTotalCount" in a previous sampling interval, is
  114. // tracked in the current sampling interval in "totalCount" as well.
  115. // (I.e. categories and their count must not "disappear" in
  116. // "rt.countTracker.Counts()".)
  117. durationSeconds := now.Sub(rt.timestampLastSampling).Seconds()
  118. rate := float64(totalCount-rt.previousTotalCount) / durationSeconds
  119. // Round rate with a precision of 0.1.
  120. rt.totalRate = math.Floor(rate*10+0.5) / 10
  121. rt.previousTotalCount = totalCount
  122. rt.timestampLastSampling = now
  123. }
  124. // Get returns for each category (string) its latest rates (up to X values
  125. // where X is the configured number of samples of the Rates struct).
  126. // Rates are ordered from least recent (index 0) to most recent (end of slice).
  127. func (rt *Rates) Get() (rateMap map[string][]float64) {
  128. rt.mu.Lock()
  129. defer rt.mu.Unlock()
  130. rateMap = make(map[string][]float64)
  131. timeStamps := rt.timeStamps.Values()
  132. if len(timeStamps) <= 1 {
  133. return
  134. }
  135. for k, v := range rt.counts {
  136. rateMap[k] = make([]float64, len(timeStamps)-1)
  137. values := v.Values()
  138. valueIndex := len(values) - 1
  139. for i := len(timeStamps) - 1; i > 0; i-- {
  140. if valueIndex <= 0 {
  141. rateMap[k][i-1] = 0
  142. continue
  143. }
  144. elapsed := float64((timeStamps[i] - timeStamps[i-1]) / 1e9)
  145. rateMap[k][i-1] = float64(values[valueIndex]-values[valueIndex-1]) / elapsed
  146. valueIndex--
  147. }
  148. }
  149. return
  150. }
  151. // TotalRate returns the current total rate (counted across categories).
  152. func (rt *Rates) TotalRate() float64 {
  153. rt.mu.Lock()
  154. defer rt.mu.Unlock()
  155. return rt.totalRate
  156. }
  157. func (rt *Rates) String() string {
  158. data, err := json.Marshal(rt.Get())
  159. if err != nil {
  160. data, _ = json.Marshal(err.Error())
  161. }
  162. return string(data)
  163. }
  164. type RatesFunc struct {
  165. F func() map[string][]float64
  166. help string
  167. }
  168. func NewRateFunc(name string, help string, f func() map[string][]float64) *RatesFunc {
  169. c := &RatesFunc{
  170. F: f,
  171. help: help,
  172. }
  173. if name != "" {
  174. publish(name, c)
  175. }
  176. return c
  177. }
  178. func (rf *RatesFunc) Help() string {
  179. return rf.help
  180. }
  181. func (rf *RatesFunc) String() string {
  182. data, err := json.Marshal(rf.F())
  183. if err != nil {
  184. data, _ = json.Marshal(err.Error())
  185. }
  186. return string(data)
  187. }