Codebase list golang-github-go-kit-kit / 95db089
Metrics - Influx StatsD (#688) James Smith authored 6 years ago Peter Bourgon committed 6 years ago
2 changed file(s) with 479 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
0 // Package influxstatsd provides support for InfluxData's StatsD Telegraf plugin. It's very
1 // similar to StatsD, but supports arbitrary tags per-metric, which map to Go
2 // kit's label values. So, while label values are no-ops in StatsD, they are
3 // supported here. For more details, see the article at
4 // https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/
5 //
6 // This package batches observations and emits them on some schedule to the
7 // remote server. This is useful even if you connect to your service
8 // over UDP. Emitting one network packet per observation can quickly overwhelm
9 // even the fastest internal network.
10 package influxstatsd
11
12 import (
13 "fmt"
14 "io"
15 "strings"
16 "sync"
17 "sync/atomic"
18 "time"
19
20 "github.com/go-kit/kit/log"
21 "github.com/go-kit/kit/metrics"
22 "github.com/go-kit/kit/metrics/generic"
23 "github.com/go-kit/kit/metrics/internal/lv"
24 "github.com/go-kit/kit/metrics/internal/ratemap"
25 "github.com/go-kit/kit/util/conn"
26 )
27
28 // Influxstatsd receives metrics observations and forwards them to a server.
29 // Create a Influxstatsd object, use it to create metrics, and pass those
30 // metrics as dependencies to the components that will use them.
31 //
32 // All metrics are buffered until WriteTo is called. Counters and gauges are
33 // aggregated into a single observation per timeseries per write. Timings and
34 // histograms are buffered but not aggregated.
35 //
36 // To regularly report metrics to an io.Writer, use the WriteLoop helper method.
37 // To send to a InfluxStatsD server, use the SendLoop helper method.
38 type Influxstatsd struct {
39 mtx sync.RWMutex
40 prefix string
41 rates *ratemap.RateMap
42 counters *lv.Space
43 gauges map[string]*gaugeNode
44 timings *lv.Space
45 histograms *lv.Space
46 logger log.Logger
47 lvs lv.LabelValues
48 }
49
50 // New returns a Influxstatsd object that may be used to create metrics. Prefix is
51 // applied to all created metrics. Callers must ensure that regular calls to
52 // WriteTo are performed, either manually or with one of the helper methods.
53 func New(prefix string, logger log.Logger, lvs ...string) *Influxstatsd {
54 if len(lvs)%2 != 0 {
55 panic("odd number of LabelValues; programmer error!")
56 }
57 return &Influxstatsd{
58 prefix: prefix,
59 rates: ratemap.New(),
60 counters: lv.NewSpace(),
61 gauges: map[string]*gaugeNode{}, // https://github.com/go-kit/kit/pull/588
62 timings: lv.NewSpace(),
63 histograms: lv.NewSpace(),
64 logger: logger,
65 lvs: lvs,
66 }
67 }
68
69 // NewCounter returns a counter, sending observations to this Influxstatsd object.
70 func (d *Influxstatsd) NewCounter(name string, sampleRate float64) *Counter {
71 d.rates.Set(name, sampleRate)
72 return &Counter{
73 name: name,
74 obs: d.counters.Observe,
75 }
76 }
77
78 // NewGauge returns a gauge, sending observations to this Influxstatsd object.
79 func (d *Influxstatsd) NewGauge(name string) *Gauge {
80 d.mtx.Lock()
81 n, ok := d.gauges[name]
82 if !ok {
83 n = &gaugeNode{gauge: &Gauge{g: generic.NewGauge(name), influx: d}}
84 d.gauges[name] = n
85 }
86 d.mtx.Unlock()
87 return n.gauge
88 }
89
90 // NewTiming returns a histogram whose observations are interpreted as
91 // millisecond durations, and are forwarded to this Influxstatsd object.
92 func (d *Influxstatsd) NewTiming(name string, sampleRate float64) *Timing {
93 d.rates.Set(name, sampleRate)
94 return &Timing{
95 name: name,
96 obs: d.timings.Observe,
97 }
98 }
99
100 // NewHistogram returns a histogram whose observations are of an unspecified
101 // unit, and are forwarded to this Influxstatsd object.
102 func (d *Influxstatsd) NewHistogram(name string, sampleRate float64) *Histogram {
103 d.rates.Set(name, sampleRate)
104 return &Histogram{
105 name: name,
106 obs: d.histograms.Observe,
107 }
108 }
109
110 // WriteLoop is a helper method that invokes WriteTo to the passed writer every
111 // time the passed channel fires. This method blocks until the channel is
112 // closed, so clients probably want to run it in its own goroutine. For typical
113 // usage, create a time.Ticker and pass its C channel to this method.
114 func (d *Influxstatsd) WriteLoop(c <-chan time.Time, w io.Writer) {
115 for range c {
116 if _, err := d.WriteTo(w); err != nil {
117 d.logger.Log("during", "WriteTo", "err", err)
118 }
119 }
120 }
121
122 // SendLoop is a helper method that wraps WriteLoop, passing a managed
123 // connection to the network and address. Like WriteLoop, this method blocks
124 // until the channel is closed, so clients probably want to start it in its own
125 // goroutine. For typical usage, create a time.Ticker and pass its C channel to
126 // this method.
127 func (d *Influxstatsd) SendLoop(c <-chan time.Time, network, address string) {
128 d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger))
129 }
130
131 // WriteTo flushes the buffered content of the metrics to the writer, in
132 // InfluxStatsD format. WriteTo abides best-effort semantics, so observations are
133 // lost if there is a problem with the write. Clients should be sure to call
134 // WriteTo regularly, ideally through the WriteLoop or SendLoop helper methods.
135 func (d *Influxstatsd) WriteTo(w io.Writer) (count int64, err error) {
136 var n int
137
138 d.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
139 n, err = fmt.Fprintf(w, "%s%s%s:%f|c%s\n", d.prefix, name, d.tagValues(lvs), sum(values), sampling(d.rates.Get(name)))
140 if err != nil {
141 return false
142 }
143 count += int64(n)
144 return true
145 })
146 if err != nil {
147 return count, err
148 }
149
150 d.mtx.RLock()
151 for _, root := range d.gauges {
152 root.walk(func(name string, lvs lv.LabelValues, value float64) bool {
153 n, err = fmt.Fprintf(w, "%s%s%s:%f|g\n", d.prefix, name, d.tagValues(lvs), value)
154 if err != nil {
155 return false
156 }
157 count += int64(n)
158 return true
159 })
160 }
161 d.mtx.RUnlock()
162
163 d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
164 sampleRate := d.rates.Get(name)
165 for _, value := range values {
166 n, err = fmt.Fprintf(w, "%s%s%s:%f|ms%s\n", d.prefix, name, d.tagValues(lvs), value, sampling(sampleRate))
167 if err != nil {
168 return false
169 }
170 count += int64(n)
171 }
172 return true
173 })
174 if err != nil {
175 return count, err
176 }
177
178 d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
179 sampleRate := d.rates.Get(name)
180 for _, value := range values {
181 n, err = fmt.Fprintf(w, "%s%s%s:%f|h%s\n", d.prefix, name, d.tagValues(lvs), value, sampling(sampleRate))
182 if err != nil {
183 return false
184 }
185 count += int64(n)
186 }
187 return true
188 })
189 if err != nil {
190 return count, err
191 }
192
193 return count, err
194 }
195
196 func sum(a []float64) float64 {
197 var v float64
198 for _, f := range a {
199 v += f
200 }
201 return v
202 }
203
204 func last(a []float64) float64 {
205 return a[len(a)-1]
206 }
207
208 func sampling(r float64) string {
209 var sv string
210 if r < 1.0 {
211 sv = fmt.Sprintf("|@%f", r)
212 }
213 return sv
214 }
215
216 func (d *Influxstatsd) tagValues(labelValues []string) string {
217 if len(labelValues) == 0 && len(d.lvs) == 0 {
218 return ""
219 }
220 if len(labelValues)%2 != 0 {
221 panic("tagValues received a labelValues with an odd number of strings")
222 }
223 pairs := make([]string, 0, (len(d.lvs)+len(labelValues))/2)
224 for i := 0; i < len(d.lvs); i += 2 {
225 pairs = append(pairs, d.lvs[i]+"="+d.lvs[i+1])
226 }
227 for i := 0; i < len(labelValues); i += 2 {
228 pairs = append(pairs, labelValues[i]+"="+labelValues[i+1])
229 }
230 return "," + strings.Join(pairs, ",")
231 }
232
233 type observeFunc func(name string, lvs lv.LabelValues, value float64)
234
235 // Counter is a InfluxStatsD counter. Observations are forwarded to a Influxstatsd
236 // object, and aggregated (summed) per timeseries.
237 type Counter struct {
238 name string
239 lvs lv.LabelValues
240 obs observeFunc
241 }
242
243 // With implements metrics.Counter.
244 func (c *Counter) With(labelValues ...string) metrics.Counter {
245 return &Counter{
246 name: c.name,
247 lvs: c.lvs.With(labelValues...),
248 obs: c.obs,
249 }
250 }
251
252 // Add implements metrics.Counter.
253 func (c *Counter) Add(delta float64) {
254 c.obs(c.name, c.lvs, delta)
255 }
256
257 // Gauge is a InfluxStatsD gauge. Observations are forwarded to a Influxstatsd
258 // object, and aggregated (the last observation selected) per timeseries.
259 type Gauge struct {
260 g *generic.Gauge
261 influx *Influxstatsd
262 set int32
263 }
264
265 // With implements metrics.Gauge.
266 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
267 g.influx.mtx.RLock()
268 node := g.influx.gauges[g.g.Name]
269 g.influx.mtx.RUnlock()
270
271 ga := &Gauge{g: g.g.With(labelValues...).(*generic.Gauge), influx: g.influx}
272 return node.addGauge(ga, ga.g.LabelValues())
273 }
274
275 // Set implements metrics.Gauge.
276 func (g *Gauge) Set(value float64) {
277 g.g.Set(value)
278 g.touch()
279 }
280
281 // Add implements metrics.Gauge.
282 func (g *Gauge) Add(delta float64) {
283 g.g.Add(delta)
284 g.touch()
285 }
286
287 // Timing is a InfluxStatsD timing, or metrics.Histogram. Observations are
288 // forwarded to a Influxstatsd object, and collected (but not aggregated) per
289 // timeseries.
290 type Timing struct {
291 name string
292 lvs lv.LabelValues
293 obs observeFunc
294 }
295
296 // With implements metrics.Timing.
297 func (t *Timing) With(labelValues ...string) metrics.Histogram {
298 return &Timing{
299 name: t.name,
300 lvs: t.lvs.With(labelValues...),
301 obs: t.obs,
302 }
303 }
304
305 // Observe implements metrics.Histogram. Value is interpreted as milliseconds.
306 func (t *Timing) Observe(value float64) {
307 t.obs(t.name, t.lvs, value)
308 }
309
310 // Histogram is a InfluxStatsD histrogram. Observations are forwarded to a
311 // Influxstatsd object, and collected (but not aggregated) per timeseries.
312 type Histogram struct {
313 name string
314 lvs lv.LabelValues
315 obs observeFunc
316 }
317
318 // With implements metrics.Histogram.
319 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
320 return &Histogram{
321 name: h.name,
322 lvs: h.lvs.With(labelValues...),
323 obs: h.obs,
324 }
325 }
326
327 // Observe implements metrics.Histogram.
328 func (h *Histogram) Observe(value float64) {
329 h.obs(h.name, h.lvs, value)
330 }
331
332 type pair struct{ label, value string }
333
334 type gaugeNode struct {
335 mtx sync.RWMutex
336 gauge *Gauge
337 children map[pair]*gaugeNode
338 }
339
340 func (n *gaugeNode) addGauge(g *Gauge, lvs lv.LabelValues) *Gauge {
341 n.mtx.Lock()
342 defer n.mtx.Unlock()
343 if len(lvs) == 0 {
344 if n.gauge == nil {
345 n.gauge = g
346 }
347 return n.gauge
348 }
349 if len(lvs) < 2 {
350 panic("too few LabelValues; programmer error!")
351 }
352 head, tail := pair{lvs[0], lvs[1]}, lvs[2:]
353 if n.children == nil {
354 n.children = map[pair]*gaugeNode{}
355 }
356 child, ok := n.children[head]
357 if !ok {
358 child = &gaugeNode{}
359 n.children[head] = child
360 }
361 return child.addGauge(g, tail)
362 }
363
364 func (n *gaugeNode) walk(fn func(string, lv.LabelValues, float64) bool) bool {
365 n.mtx.RLock()
366 defer n.mtx.RUnlock()
367 if n.gauge != nil {
368 value, ok := n.gauge.read()
369 if ok && !fn(n.gauge.g.Name, n.gauge.g.LabelValues(), value) {
370 return false
371 }
372 }
373 for _, child := range n.children {
374 if !child.walk(fn) {
375 return false
376 }
377 }
378 return true
379 }
380
381 func (g *Gauge) touch() {
382 atomic.StoreInt32(&(g.set), 1)
383 }
384
385 func (g *Gauge) read() (float64, bool) {
386 set := atomic.SwapInt32(&(g.set), 0)
387 return g.g.Value(), set != 0
388 }
0 package influxstatsd
1
2 import (
3 "testing"
4
5 "github.com/go-kit/kit/log"
6 "github.com/go-kit/kit/metrics/teststat"
7 )
8
9 func TestCounter(t *testing.T) {
10 prefix, name := "abc.", "def"
11 label, value := "label", "value"
12 regex := `^` + prefix + name + "," + label + `=` + value + `:([0-9\.]+)\|c$`
13 d := New(prefix, log.NewNopLogger())
14 counter := d.NewCounter(name, 1.0).With(label, value)
15 valuef := teststat.SumLines(d, regex)
16 if err := teststat.TestCounter(counter, valuef); err != nil {
17 t.Fatal(err)
18 }
19 }
20
21 func TestCounterSampled(t *testing.T) {
22 // This will involve multiplying the observed sum by the inverse of the
23 // sample rate and checking against the expected value within some
24 // tolerance.
25 t.Skip("TODO")
26 }
27
28 func TestGauge(t *testing.T) {
29 prefix, name := "ghi.", "jkl"
30 label, value := "xyz", "abc"
31 regex := `^` + prefix + name + `,hostname=foohost,` + label + `=` + value + `:([0-9\.]+)\|g$`
32 d := New(prefix, log.NewNopLogger(), "hostname", "foohost")
33 gauge := d.NewGauge(name).With(label, value)
34 valuef := teststat.LastLine(d, regex)
35 if err := teststat.TestGauge(gauge, valuef); err != nil {
36 t.Fatal(err)
37 }
38 }
39
40 // InfluxStatsD histograms just emit all observations. So, we collect them into
41 // a generic histogram, and run the statistics test on that.
42
43 func TestHistogram(t *testing.T) {
44 prefix, name := "influxstatsd.", "histogram_test"
45 label, value := "abc", "def"
46 regex := `^` + prefix + name + "," + label + `=` + value + `:([0-9\.]+)\|h$`
47 d := New(prefix, log.NewNopLogger())
48 histogram := d.NewHistogram(name, 1.0).With(label, value)
49 quantiles := teststat.Quantiles(d, regex, 50) // no |@0.X
50 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
51 t.Fatal(err)
52 }
53 }
54
55 func TestHistogramSampled(t *testing.T) {
56 prefix, name := "influxstatsd.", "sampled_histogram_test"
57 label, value := "foo", "bar"
58 regex := `^` + prefix + name + "," + label + `=` + value + `:([0-9\.]+)\|h\|@0\.01[0]*$`
59 d := New(prefix, log.NewNopLogger())
60 histogram := d.NewHistogram(name, 0.01).With(label, value)
61 quantiles := teststat.Quantiles(d, regex, 50)
62 if err := teststat.TestHistogram(histogram, quantiles, 0.02); err != nil {
63 t.Fatal(err)
64 }
65 }
66
67 func TestTiming(t *testing.T) {
68 prefix, name := "influxstatsd.", "timing_test"
69 label, value := "wiggle", "bottom"
70 regex := `^` + prefix + name + "," + label + `=` + value + `:([0-9\.]+)\|ms$`
71 d := New(prefix, log.NewNopLogger())
72 histogram := d.NewTiming(name, 1.0).With(label, value)
73 quantiles := teststat.Quantiles(d, regex, 50) // no |@0.X
74 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
75 t.Fatal(err)
76 }
77 }
78
79 func TestTimingSampled(t *testing.T) {
80 prefix, name := "influxstatsd.", "sampled_timing_test"
81 label, value := "internal", "external"
82 regex := `^` + prefix + name + "," + label + `=` + value + `:([0-9\.]+)\|ms\|@0.03[0]*$`
83 d := New(prefix, log.NewNopLogger())
84 histogram := d.NewTiming(name, 0.03).With(label, value)
85 quantiles := teststat.Quantiles(d, regex, 50)
86 if err := teststat.TestHistogram(histogram, quantiles, 0.02); err != nil {
87 t.Fatal(err)
88 }
89 }