opentsdb.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. /*
  2. Copyright 2019 The Vitess Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. // Package opentsdb adds support for pushing stats to opentsdb.
  14. package opentsdb
  15. import (
  16. "bytes"
  17. "encoding/json"
  18. "expvar"
  19. "flag"
  20. "net/http"
  21. "sort"
  22. "strings"
  23. "time"
  24. "unicode"
  25. "git.nspix.com/golang/micro/stats"
  26. )
  27. var (
  28. openTsdbURI = flag.String("opentsdb_uri", "", "URI of opentsdb /api/put method")
  29. )
  30. // dataPoint represents a single OpenTSDB data point.
  31. type dataPoint struct {
  32. // Example: sys.cpu.nice
  33. Metric string `json:"metric"`
  34. // Seconds or milliseconds since unix epoch.
  35. Timestamp float64 `json:"timestamp"`
  36. Value float64 `json:"value"`
  37. Tags map[string]string `json:"tags"`
  38. }
  39. // sendDataPoints pushes a list of data points to openTSDB.
  40. // All other code in this file is just to support getting this function called
  41. // with all stats represented as data points.
  42. func sendDataPoints(data []dataPoint) error {
  43. json, err := json.Marshal(data)
  44. if err != nil {
  45. return err
  46. }
  47. resp, err := http.Post(*openTsdbURI, "application/json", bytes.NewReader(json))
  48. if err != nil {
  49. return err
  50. }
  51. resp.Body.Close()
  52. return nil
  53. }
  54. // openTSDBBackend implements stats.PushBackend
  55. type openTSDBBackend struct {
  56. // The prefix is the name of the binary (vtgate, vttablet, etc.) and will be
  57. // prepended to all the stats reported.
  58. prefix string
  59. // Tags that should be included with every data point. If there's a tag name
  60. // collision between the common tags and a single data point's tags, the data
  61. // point tag will override the common tag.
  62. commonTags map[string]string
  63. }
  64. // dataCollector tracks state for a single pass of stats reporting / data collection.
  65. type dataCollector struct {
  66. settings *openTSDBBackend
  67. timestamp int64
  68. dataPoints []dataPoint
  69. }
  70. // InitWithoutServenv initializes the opentsdb without servenv
  71. func InitWithoutServenv(prefix string) {
  72. if *openTsdbURI == "" {
  73. return
  74. }
  75. backend := &openTSDBBackend{
  76. prefix: prefix,
  77. commonTags: stats.ParseCommonTags(*stats.CommonTags),
  78. }
  79. stats.RegisterPushBackend("opentsdb", backend)
  80. http.HandleFunc("/debug/opentsdb", func(w http.ResponseWriter, r *http.Request) {
  81. w.Header().Set("Content-Type", "application/json; charset=utf-8")
  82. dataPoints := (*backend).getDataPoints()
  83. sort.Sort(byMetric(dataPoints))
  84. if b, err := json.MarshalIndent(dataPoints, "", " "); err != nil {
  85. w.Write([]byte(err.Error()))
  86. } else {
  87. w.Write(b)
  88. }
  89. })
  90. }
  91. // PushAll pushes all stats to OpenTSDB
  92. func (backend *openTSDBBackend) PushAll() error {
  93. return sendDataPoints(backend.getDataPoints())
  94. }
  95. // getDataPoints fetches all stats in an opentsdb-compatible format.
  96. // This is separated from PushAll() so it can be reused for the /debug/opentsdb handler.
  97. func (backend *openTSDBBackend) getDataPoints() []dataPoint {
  98. dataCollector := &dataCollector{
  99. settings: backend,
  100. timestamp: time.Now().Unix(),
  101. }
  102. expvar.Do(func(kv expvar.KeyValue) {
  103. dataCollector.addExpVar(kv)
  104. })
  105. return dataCollector.dataPoints
  106. }
  107. // combineMetricName joins parts of a hierarchical name with a "."
  108. func combineMetricName(parts ...string) string {
  109. return strings.Join(parts, ".")
  110. }
  111. func (dc *dataCollector) addInt(metric string, val int64, tags map[string]string) {
  112. dc.addFloat(metric, float64(val), tags)
  113. }
  114. func (dc *dataCollector) addFloat(metric string, val float64, tags map[string]string) {
  115. var fullMetric string
  116. if len(dc.settings.prefix) > 0 {
  117. fullMetric = combineMetricName(dc.settings.prefix, metric)
  118. } else {
  119. fullMetric = metric
  120. }
  121. // Restrict metric and tag name/values to legal characters:
  122. // http://opentsdb.net/docs/build/html/user_guide/writing.html#metrics-and-tags
  123. //
  124. // Also make everything lowercase, since opentsdb is case sensitive and lowercase
  125. // simplifies the convention.
  126. sanitize := func(text string) string {
  127. var b bytes.Buffer
  128. for _, r := range text {
  129. if unicode.IsDigit(r) || unicode.IsLetter(r) || r == '-' || r == '_' || r == '/' || r == '.' {
  130. b.WriteRune(r)
  131. } else {
  132. // For characters that would cause errors, write underscore instead
  133. b.WriteRune('_')
  134. }
  135. }
  136. return strings.ToLower(b.String())
  137. }
  138. fullTags := make(map[string]string)
  139. for k, v := range dc.settings.commonTags {
  140. fullTags[sanitize(k)] = sanitize(v)
  141. }
  142. for k, v := range tags {
  143. fullTags[sanitize(k)] = sanitize(v)
  144. }
  145. dp := dataPoint{
  146. Metric: sanitize(fullMetric),
  147. Value: val,
  148. Timestamp: float64(dc.timestamp),
  149. Tags: fullTags,
  150. }
  151. dc.dataPoints = append(dc.dataPoints, dp)
  152. }
  153. // addExpVar adds all the data points associated with a particular expvar to the list of
  154. // opentsdb data points. How an expvar is translated depends on its type.
  155. //
  156. // Well-known metric types like histograms and integers are directly converted (saving labels
  157. // as tags).
  158. //
  159. // Generic unrecognized expvars are serialized to json and their int/float values are exported.
  160. // Strings and lists in expvars are not exported.
  161. func (dc *dataCollector) addExpVar(kv expvar.KeyValue) {
  162. k := kv.Key
  163. switch v := kv.Value.(type) {
  164. case stats.FloatFunc:
  165. dc.addFloat(k, v(), nil)
  166. case *stats.Counter:
  167. dc.addInt(k, v.Get(), nil)
  168. case *stats.CounterFunc:
  169. dc.addInt(k, v.F(), nil)
  170. case *stats.Gauge:
  171. dc.addInt(k, v.Get(), nil)
  172. case *stats.GaugeFunc:
  173. dc.addInt(k, v.F(), nil)
  174. case *stats.CounterDuration:
  175. dc.addInt(k, int64(v.Get()), nil)
  176. case *stats.CounterDurationFunc:
  177. dc.addInt(k, int64(v.F()), nil)
  178. case *stats.MultiTimings:
  179. dc.addTimings(v.Labels(), &v.Timings, k)
  180. case *stats.Timings:
  181. dc.addTimings([]string{v.Label()}, v, k)
  182. case *stats.Histogram:
  183. dc.addHistogram(v, 1, k, make(map[string]string))
  184. case *stats.CountersWithSingleLabel:
  185. for labelVal, val := range v.Counts() {
  186. dc.addInt(k, val, makeLabel(v.Label(), labelVal))
  187. }
  188. case *stats.CountersWithMultiLabels:
  189. for labelVals, val := range v.Counts() {
  190. dc.addInt(k, val, makeLabels(v.Labels(), labelVals))
  191. }
  192. case *stats.CountersFuncWithMultiLabels:
  193. for labelVals, val := range v.Counts() {
  194. dc.addInt(k, val, makeLabels(v.Labels(), labelVals))
  195. }
  196. case *stats.GaugesWithMultiLabels:
  197. for labelVals, val := range v.Counts() {
  198. dc.addInt(k, val, makeLabels(v.Labels(), labelVals))
  199. }
  200. case *stats.GaugesFuncWithMultiLabels:
  201. for labelVals, val := range v.Counts() {
  202. dc.addInt(k, val, makeLabels(v.Labels(), labelVals))
  203. }
  204. case *stats.GaugesWithSingleLabel:
  205. for labelVal, val := range v.Counts() {
  206. dc.addInt(k, val, makeLabel(v.Label(), labelVal))
  207. }
  208. default:
  209. // Deal with generic expvars by converting them to JSON and pulling out
  210. // all the floats. Strings and lists will not be exported to opentsdb.
  211. var obj map[string]interface{}
  212. if err := json.Unmarshal([]byte(v.String()), &obj); err != nil {
  213. return
  214. }
  215. // Recursive helper function.
  216. dc.addUnrecognizedExpvars(combineMetricName("expvar", k), obj)
  217. }
  218. }
  219. // makeLabel builds a tag list with a single label + value.
  220. func makeLabel(labelName string, labelVal string) map[string]string {
  221. return map[string]string{labelName: labelVal}
  222. }
  223. // makeLabels takes the vitess stat representation of label values ("."-separated list) and breaks it
  224. // apart into a map of label name -> label value.
  225. func makeLabels(labelNames []string, labelValsCombined string) map[string]string {
  226. tags := make(map[string]string)
  227. labelVals := strings.Split(labelValsCombined, ".")
  228. for i, v := range labelVals {
  229. tags[labelNames[i]] = v
  230. }
  231. return tags
  232. }
  233. // addUnrecognizedExpvars recurses into a json object to pull out float64 variables to report.
  234. func (dc *dataCollector) addUnrecognizedExpvars(prefix string, obj map[string]interface{}) {
  235. for k, v := range obj {
  236. prefix := combineMetricName(prefix, k)
  237. switch v := v.(type) {
  238. case map[string]interface{}:
  239. dc.addUnrecognizedExpvars(prefix, v)
  240. case float64:
  241. dc.addFloat(prefix, v, nil)
  242. }
  243. }
  244. }
  245. // addTimings converts a vitess Timings stat to something opentsdb can deal with.
  246. func (dc *dataCollector) addTimings(labels []string, timings *stats.Timings, prefix string) {
  247. histograms := timings.Histograms()
  248. for labelValsCombined, histogram := range histograms {
  249. // If you prefer millisecond timings over nanoseconds you can pass 1000000 here instead of 1.
  250. dc.addHistogram(histogram, 1, prefix, makeLabels(labels, labelValsCombined))
  251. }
  252. }
  253. func (dc *dataCollector) addHistogram(histogram *stats.Histogram, divideBy int64, prefix string, tags map[string]string) {
  254. // TODO: OpenTSDB 2.3 doesn't have histogram support, although it's forthcoming.
  255. // For simplicity we report each bucket as a different metric.
  256. //
  257. // An alternative approach if you don't mind changing the code is to add a hook to Histogram creation that
  258. // associates each histogram with a shadow type that can track percentiles (like Timer from rcrowley/go-metrics).
  259. labels := histogram.Labels()
  260. buckets := histogram.Buckets()
  261. for i := range labels {
  262. dc.addInt(
  263. combineMetricName(prefix, labels[i]),
  264. buckets[i],
  265. tags,
  266. )
  267. }
  268. dc.addInt(
  269. combineMetricName(prefix, histogram.CountLabel()),
  270. (*histogram).Count(),
  271. tags,
  272. )
  273. dc.addInt(
  274. combineMetricName(prefix, histogram.TotalLabel()),
  275. (*histogram).Total()/divideBy,
  276. tags,
  277. )
  278. }
  279. // byMetric implements sort.Interface for []dataPoint based on the metric key
  280. // and then tag values (prioritized in tag name order). Having a consistent sort order
  281. // is convenient when refreshing /debug/opentsdb or for encoding and comparing JSON directly
  282. // in the tests.
  283. type byMetric []dataPoint
  284. func (m byMetric) Len() int { return len(m) }
  285. func (m byMetric) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
  286. func (m byMetric) Less(i, j int) bool {
  287. if m[i].Metric < m[j].Metric {
  288. return true
  289. }
  290. if m[i].Metric > m[j].Metric {
  291. return false
  292. }
  293. // Metric names are the same. We can use tag values to figure out the sort order.
  294. // The deciding tag will be the lexicographically earliest tag name where tag values differ.
  295. decidingTagName := ""
  296. result := false
  297. for tagName, iVal := range m[i].Tags {
  298. jVal, ok := m[j].Tags[tagName]
  299. if !ok {
  300. // We'll arbitrarily declare that if i has any tag name that j doesn't then it sorts earlier.
  301. // This shouldn't happen in practice, though, if metric code is correct...
  302. return true
  303. }
  304. if iVal != jVal && (tagName < decidingTagName || decidingTagName == "") {
  305. decidingTagName = tagName
  306. result = iVal < jVal
  307. }
  308. }
  309. return result
  310. }