Merge pull request #369 from kpacha/fix_influxdb_metrics
metrics/influx: fix the package implementation
Peter Bourgon authored 7 years ago
GitHub committed 7 years ago
0 | package influx | |
1 | ||
2 | import ( | |
3 | "fmt" | |
4 | "regexp" | |
5 | ||
6 | influxdb "github.com/influxdata/influxdb/client/v2" | |
7 | ||
8 | "github.com/go-kit/kit/log" | |
9 | ) | |
10 | ||
11 | func ExampleCounter() { | |
12 | in := New(map[string]string{"a": "b"}, influxdb.BatchPointsConfig{}, log.NewNopLogger()) | |
13 | counter := in.NewCounter("influx_counter") | |
14 | counter.Add(10) | |
15 | counter.With("error", "true").Add(1) | |
16 | counter.With("error", "false").Add(2) | |
17 | counter.Add(50) | |
18 | ||
19 | client := &bufWriter{} | |
20 | in.WriteTo(client) | |
21 | ||
22 | expectedLines := []string{ | |
23 | `(influx_counter,a=b count=60) [0-9]{19}`, | |
24 | `(influx_counter,a=b,error=true count=1) [0-9]{19}`, | |
25 | `(influx_counter,a=b,error=false count=2) [0-9]{19}`, | |
26 | } | |
27 | ||
28 | if err := extractAndPrintMessage(expectedLines, client.buf.String()); err != nil { | |
29 | fmt.Println(err.Error()) | |
30 | } | |
31 | ||
32 | // Output: | |
33 | // influx_counter,a=b count=60 | |
34 | // influx_counter,a=b,error=true count=1 | |
35 | // influx_counter,a=b,error=false count=2 | |
36 | } | |
37 | ||
38 | func ExampleGauge() { | |
39 | in := New(map[string]string{"a": "b"}, influxdb.BatchPointsConfig{}, log.NewNopLogger()) | |
40 | gauge := in.NewGauge("influx_gauge") | |
41 | gauge.Set(10) | |
42 | gauge.With("error", "true").Set(2) | |
43 | gauge.With("error", "true").Set(1) | |
44 | gauge.With("error", "false").Set(2) | |
45 | gauge.Set(50) | |
46 | ||
47 | client := &bufWriter{} | |
48 | in.WriteTo(client) | |
49 | ||
50 | expectedLines := []string{ | |
51 | `(influx_gauge,a=b value=50) [0-9]{19}`, | |
52 | `(influx_gauge,a=b,error=true value=1) [0-9]{19}`, | |
53 | `(influx_gauge,a=b,error=false value=2) [0-9]{19}`, | |
54 | } | |
55 | ||
56 | if err := extractAndPrintMessage(expectedLines, client.buf.String()); err != nil { | |
57 | fmt.Println(err.Error()) | |
58 | } | |
59 | ||
60 | // Output: | |
61 | // influx_gauge,a=b value=50 | |
62 | // influx_gauge,a=b,error=true value=1 | |
63 | // influx_gauge,a=b,error=false value=2 | |
64 | } | |
65 | ||
66 | func ExampleHistogram() { | |
67 | in := New(map[string]string{"foo": "alpha"}, influxdb.BatchPointsConfig{}, log.NewNopLogger()) | |
68 | histogram := in.NewHistogram("influx_histogram") | |
69 | histogram.Observe(float64(10)) | |
70 | histogram.With("error", "true").Observe(float64(1)) | |
71 | histogram.With("error", "false").Observe(float64(2)) | |
72 | histogram.Observe(float64(50)) | |
73 | ||
74 | client := &bufWriter{} | |
75 | in.WriteTo(client) | |
76 | ||
77 | expectedLines := []string{ | |
78 | `(influx_histogram,foo=alpha p50=10,p90=50,p95=50,p99=50) [0-9]{19}`, | |
79 | `(influx_histogram,error=true,foo=alpha p50=1,p90=1,p95=1,p99=1) [0-9]{19}`, | |
80 | `(influx_histogram,error=false,foo=alpha p50=2,p90=2,p95=2,p99=2) [0-9]{19}`, | |
81 | } | |
82 | ||
83 | if err := extractAndPrintMessage(expectedLines, client.buf.String()); err != nil { | |
84 | fmt.Println(err.Error()) | |
85 | } | |
86 | ||
87 | // Output: | |
88 | // influx_histogram,foo=alpha p50=10,p90=50,p95=50,p99=50 | |
89 | // influx_histogram,error=true,foo=alpha p50=1,p90=1,p95=1,p99=1 | |
90 | // influx_histogram,error=false,foo=alpha p50=2,p90=2,p95=2,p99=2 | |
91 | } | |
92 | ||
93 | func extractAndPrintMessage(expected []string, msg string) error { | |
94 | for _, pattern := range expected { | |
95 | re := regexp.MustCompile(pattern) | |
96 | match := re.FindStringSubmatch(msg) | |
97 | if len(match) != 2 { | |
98 | return fmt.Errorf("Pattern not found! {%s} [%s]: %v\n", pattern, msg, match) | |
99 | } | |
100 | fmt.Println(match[1]) | |
101 | } | |
102 | return nil | |
103 | } |
9 | 9 | |
10 | 10 | "github.com/go-kit/kit/log" |
11 | 11 | "github.com/go-kit/kit/metrics" |
12 | "github.com/go-kit/kit/metrics/generic" | |
12 | 13 | "github.com/go-kit/kit/metrics/internal/lv" |
13 | 14 | ) |
14 | 15 | |
19 | 20 | // one data point per flush, with a "count" field that reflects all adds since |
20 | 21 | // the last flush. Gauges are modeled as a timeseries with one data point per |
21 | 22 | // flush, with a "value" field that reflects the current state of the gauge. |
22 | // Histograms are modeled as a timeseries with one data point per observation, | |
23 | // with a "value" field that reflects each observation; use e.g. the HISTOGRAM | |
24 | // aggregate function to compute histograms. | |
23 | // Histograms are modeled as a timeseries with one data point per combination of tags, | |
24 | // with a set of quantile fields that reflects the p50, p90, p95 & p99. | |
25 | 25 | // |
26 | // Influx tags are immutable, attached to the Influx object, and given to each | |
27 | // metric at construction. Influx fields are mapped to Go kit label values, and | |
28 | // may be mutated via With functions. Actual metric values are provided as | |
29 | // fields with specific names depending on the metric. | |
26 | // Influx tags are attached to the Influx object, can be given to each | |
27 | // metric at construction and can be updated anytime via With function. Influx fields | |
28 | // are mapped to Go kit label values directly by this collector. Actual metric | |
29 | // values are provided as fields with specific names depending on the metric. | |
30 | 30 | // |
31 | 31 | // All observations are collected in memory locally, and flushed on demand. |
32 | 32 | type Influx struct { |
107 | 107 | now := time.Now() |
108 | 108 | |
109 | 109 | in.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { |
110 | fields := fieldsFrom(lvs) | |
111 | fields["count"] = sum(values) | |
110 | tags := mergeTags(in.tags, lvs) | |
112 | 111 | var p *influxdb.Point |
113 | p, err = influxdb.NewPoint(name, in.tags, fields, now) | |
112 | fields := map[string]interface{}{"count": sum(values)} | |
113 | p, err = influxdb.NewPoint(name, tags, fields, now) | |
114 | 114 | if err != nil { |
115 | 115 | return false |
116 | 116 | } |
122 | 122 | } |
123 | 123 | |
124 | 124 | in.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { |
125 | fields := fieldsFrom(lvs) | |
126 | fields["value"] = last(values) | |
125 | tags := mergeTags(in.tags, lvs) | |
127 | 126 | var p *influxdb.Point |
128 | p, err = influxdb.NewPoint(name, in.tags, fields, now) | |
127 | fields := map[string]interface{}{"value": last(values)} | |
128 | p, err = influxdb.NewPoint(name, tags, fields, now) | |
129 | 129 | if err != nil { |
130 | 130 | return false |
131 | 131 | } |
137 | 137 | } |
138 | 138 | |
139 | 139 | in.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { |
140 | fields := fieldsFrom(lvs) | |
141 | ps := make([]*influxdb.Point, len(values)) | |
142 | for i, v := range values { | |
143 | fields["value"] = v // overwrite each time | |
144 | ps[i], err = influxdb.NewPoint(name, in.tags, fields, now) | |
145 | if err != nil { | |
146 | return false | |
147 | } | |
148 | } | |
149 | bp.AddPoints(ps) | |
140 | histogram := generic.NewHistogram(name, 50) | |
141 | tags := mergeTags(in.tags, lvs) | |
142 | var p *influxdb.Point | |
143 | for _, v := range values { | |
144 | histogram.Observe(v) | |
145 | } | |
146 | fields := map[string]interface{}{ | |
147 | "p50": histogram.Quantile(0.50), | |
148 | "p90": histogram.Quantile(0.90), | |
149 | "p95": histogram.Quantile(0.95), | |
150 | "p99": histogram.Quantile(0.99), | |
151 | } | |
152 | p, err = influxdb.NewPoint(name, tags, fields, now) | |
153 | if err != nil { | |
154 | return false | |
155 | } | |
156 | bp.AddPoint(p) | |
150 | 157 | return true |
151 | 158 | }) |
152 | 159 | if err != nil { |
156 | 163 | return w.Write(bp) |
157 | 164 | } |
158 | 165 | |
159 | func fieldsFrom(labelValues []string) map[string]interface{} { | |
166 | func mergeTags(tags map[string]string, labelValues []string) map[string]string { | |
160 | 167 | if len(labelValues)%2 != 0 { |
161 | panic("fieldsFrom received a labelValues with an odd number of strings") | |
162 | } | |
163 | fields := make(map[string]interface{}, len(labelValues)/2) | |
168 | panic("mergeTags received a labelValues with an odd number of strings") | |
169 | } | |
164 | 170 | for i := 0; i < len(labelValues); i += 2 { |
165 | fields[labelValues[i]] = labelValues[i+1] | |
166 | } | |
167 | return fields | |
171 | tags[labelValues[i]] = labelValues[i+1] | |
172 | } | |
173 | return tags | |
168 | 174 | } |
169 | 175 | |
170 | 176 | func sum(a []float64) float64 { |
10 | 10 | influxdb "github.com/influxdata/influxdb/client/v2" |
11 | 11 | |
12 | 12 | "github.com/go-kit/kit/log" |
13 | "github.com/go-kit/kit/metrics/generic" | |
14 | 13 | "github.com/go-kit/kit/metrics/teststat" |
15 | 14 | ) |
16 | 15 | |
48 | 47 | |
49 | 48 | func TestHistogram(t *testing.T) { |
50 | 49 | in := New(map[string]string{"foo": "alpha"}, influxdb.BatchPointsConfig{}, log.NewNopLogger()) |
51 | re := regexp.MustCompile(`influx_histogram,foo=alpha bar="beta",value=([0-9\.]+) [0-9]+`) | |
50 | re := regexp.MustCompile(`influx_histogram,bar=beta,foo=alpha p50=([0-9\.]+),p90=([0-9\.]+),p95=([0-9\.]+),p99=([0-9\.]+) [0-9]+`) | |
52 | 51 | histogram := in.NewHistogram("influx_histogram").With("bar", "beta") |
53 | 52 | quantiles := func() (float64, float64, float64, float64) { |
54 | 53 | w := &bufWriter{} |
55 | 54 | in.WriteTo(w) |
56 | h := generic.NewHistogram("h", 50) | |
57 | matches := re.FindAllStringSubmatch(w.buf.String(), -1) | |
58 | for _, match := range matches { | |
59 | f, _ := strconv.ParseFloat(match[1], 64) | |
60 | h.Observe(f) | |
55 | match := re.FindStringSubmatch(w.buf.String()) | |
56 | if len(match) != 5 { | |
57 | t.Errorf("These are not the quantiles you're looking for: %v\n", match) | |
61 | 58 | } |
62 | return h.Quantile(0.50), h.Quantile(0.90), h.Quantile(0.95), h.Quantile(0.99) | |
59 | var result [4]float64 | |
60 | for i, q := range match[1:] { | |
61 | result[i], _ = strconv.ParseFloat(q, 64) | |
62 | } | |
63 | return result[0], result[1], result[2], result[3] | |
63 | 64 | } |
64 | 65 | if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil { |
65 | 66 | t.Fatal(err) |