Package list golang-github-go-kit-kit / 8cce994
mv metrics3 metrics Peter Bourgon 5 years ago
98 changed file(s) with 3233 addition(s) and 7487 deletion(s). Raw diff Collapse all Expand all
00 # package metrics
11
22 `package metrics` provides a set of uniform interfaces for service instrumentation.
3 It has **[counters][]**, **[gauges][]**, and **[histograms][]**,
4 and provides adapters to popular metrics packages, like **[expvar][]**, **[statsd][]**, and **[Prometheus][]**.
5
6 [counters]: http://prometheus.io/docs/concepts/metric_types/#counter
7 [gauges]: http://prometheus.io/docs/concepts/metric_types/#gauge
8 [histograms]: http://prometheus.io/docs/concepts/metric_types/#histogram
9 [expvar]: https://golang.org/pkg/expvar
10 [statsd]: https://github.com/etsy/statsd
11 [Prometheus]: http://prometheus.io
3 It has
4 [counters](http://prometheus.io/docs/concepts/metric_types/#counter),
5 [gauges](http://prometheus.io/docs/concepts/metric_types/#gauge), and
6 [histograms](http://prometheus.io/docs/concepts/metric_types/#histogram),
7 and provides adapters to popular metrics packages, like
8 [expvar](https://golang.org/pkg/expvar),
9 [StatsD](https://github.com/etsy/statsd), and
10 [Prometheus](https://prometheus.io).
1211
1312 ## Rationale
1413
15 Code instrumentation is absolutely essential to achieve [observability][] into a distributed system.
14 Code instrumentation is absolutely essential to achieve
15 [observability](https://speakerdeck.com/mattheath/observability-in-micro-service-architectures)
16 into a distributed system.
1617 Metrics and instrumentation tools have coalesced around a few well-defined idioms.
17 `package metrics` provides a common, minimal interface those idioms for service authors.
18
19 [observability]: https://speakerdeck.com/mattheath/observability-in-micro-service-architectures
18 `package metrics` provides a common, minimal interface those idioms for service authors.
2019
2120 ## Usage
2221
3130 }
3231 ```
3332
34 A histogram for request duration, exported via a Prometheus summary with
35 dynamically-computed quantiles.
33 A histogram for request duration,
34 exported via a Prometheus summary with dynamically-computed quantiles.
3635
3736 ```go
3837 import (
4241 "github.com/go-kit/kit/metrics/prometheus"
4342 )
4443
45 var requestDuration = prometheus.NewSummary(stdprometheus.SummaryOpts{
44 var dur = prometheus.NewSummary(stdprometheus.SummaryOpts{
4645 Namespace: "myservice",
4746 Subsystem: "api",
48 Name: "request_duration_nanoseconds_count",
49 Help: "Total time spent serving requests.",
47 Name: "request_duration_seconds",
48 Help: "Total time spent serving requests.",
5049 }, []string{})
5150
5251 func handleRequest() {
53 defer func(begin time.Time) { requestDuration.Observe(time.Since(begin)) }(time.Now())
52 defer func(begin time.Time) { dur.Observe(time.Since(begin).Seconds()) }(time.Now())
5453 // handle request
5554 }
5655 ```
5756
58 A gauge for the number of goroutines currently running, exported via statsd.
57 A gauge for the number of goroutines currently running, exported via StatsD.
5958
6059 ```go
6160 import (
6564 "time"
6665
6766 "github.com/go-kit/kit/metrics/statsd"
67 "github.com/go-kit/kit/log"
6868 )
6969
7070 func main() {
71 statsdWriter, err := net.Dial("udp", "127.0.0.1:8126")
72 if err != nil {
73 panic(err)
74 }
71 statsd := statsd.New("foo_svc.", log.NewNopLogger())
7572
76 reportInterval := 5 * time.Second
77 goroutines := statsd.NewGauge(statsdWriter, "total_goroutines", reportInterval)
78 for range time.Tick(reportInterval) {
73 report := time.NewTicker(5*time.Second)
74 defer report.Stop()
75 go statsd.SendLoop(report.C, "tcp", "statsd.internal:8125")
76
77 goroutines := statsd.NewGauge("goroutine_count")
78 for range time.Tick(time.Second) {
7979 goroutines.Set(float64(runtime.NumGoroutine()))
8080 }
8181 }
0 // Package circonus provides a Circonus backend for metrics.
1 package circonus
2
3 import (
4 "github.com/circonus-labs/circonus-gometrics"
5
6 "github.com/go-kit/kit/metrics3"
7 )
8
9 // Circonus wraps a CirconusMetrics object and provides constructors for each of
10 // the Go kit metrics. The CirconusMetrics object manages aggregation of
11 // observations and emission to the Circonus server.
12 type Circonus struct {
13 m *circonusgometrics.CirconusMetrics
14 }
15
16 // New creates a new Circonus object wrapping the passed CirconusMetrics, which
17 // the caller should create and set in motion. The Circonus object can be used
18 // to construct individual Go kit metrics.
19 func New(m *circonusgometrics.CirconusMetrics) *Circonus {
20 return &Circonus{
21 m: m,
22 }
23 }
24
25 // NewCounter returns a counter metric with the given name.
26 func (c *Circonus) NewCounter(name string) *Counter {
27 return &Counter{
28 name: name,
29 m: c.m,
30 }
31 }
32
33 // NewGauge returns a gauge metric with the given name.
34 func (c *Circonus) NewGauge(name string) *Gauge {
35 return &Gauge{
36 name: name,
37 m: c.m,
38 }
39 }
40
41 // NewHistogram returns a histogram metric with the given name.
42 func (c *Circonus) NewHistogram(name string) *Histogram {
43 return &Histogram{
44 h: c.m.NewHistogram(name),
45 }
46 }
47
48 // Counter is a Circonus implementation of a counter metric.
49 type Counter struct {
50 name string
51 m *circonusgometrics.CirconusMetrics
52 }
53
54 // With implements Counter, but is a no-op, because Circonus metrics have no
55 // concept of per-observation label values.
56 func (c *Counter) With(labelValues ...string) metrics.Counter { return c }
57
58 // Add implements Counter. Delta is converted to uint64; precision will be lost.
59 func (c *Counter) Add(delta float64) { c.m.Add(c.name, uint64(delta)) }
60
61 // Gauge is a Circonus implementation of a gauge metric.
62 type Gauge struct {
63 name string
64 m *circonusgometrics.CirconusMetrics
65 }
66
67 // With implements Gauge, but is a no-op, because Circonus metrics have no
68 // concept of per-observation label values.
69 func (g *Gauge) With(labelValues ...string) metrics.Gauge { return g }
70
71 // Set implements Gauge.
72 func (g *Gauge) Set(value float64) { g.m.SetGauge(g.name, value) }
73
74 // Histogram is a Circonus implementation of a histogram metric.
75 type Histogram struct {
76 h *circonusgometrics.Histogram
77 }
78
79 // With implements Histogram, but is a no-op, because Circonus metrics have no
80 // concept of per-observation label values.
81 func (h *Histogram) With(labelValues ...string) metrics.Histogram { return h }
82
83 // Observe implements Histogram. No precision is lost.
84 func (h *Histogram) Observe(value float64) { h.h.RecordValue(value) }
0 package circonus
1
2 import (
3 "encoding/json"
4 "net/http"
5 "net/http/httptest"
6 "regexp"
7 "strconv"
8 "testing"
9
10 "github.com/circonus-labs/circonus-gometrics"
11 "github.com/circonus-labs/circonus-gometrics/checkmgr"
12
13 "github.com/go-kit/kit/metrics3/generic"
14 "github.com/go-kit/kit/metrics3/teststat"
15 )
16
17 func TestCounter(t *testing.T) {
18 // The only way to extract values from Circonus is to pose as a Circonus
19 // server and receive real HTTP writes.
20 const name = "abc"
21 var val int64
22 s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
23 var res map[string]struct {
24 Value int64 `json:"_value"` // reverse-engineered :\
25 }
26 json.NewDecoder(r.Body).Decode(&res)
27 val = res[name].Value
28 }))
29 defer s.Close()
30
31 // Set up a Circonus object, submitting to our HTTP server.
32 m := newCirconusMetrics(s.URL)
33 counter := New(m).NewCounter(name).With("label values", "not supported")
34 value := func() float64 { m.Flush(); return float64(val) }
35
36 // Engage.
37 if err := teststat.TestCounter(counter, value); err != nil {
38 t.Fatal(err)
39 }
40 }
41
42 func TestGauge(t *testing.T) {
43 const name = "def"
44 var val float64
45 s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
46 var res map[string]struct {
47 Value string `json:"_value"`
48 }
49 json.NewDecoder(r.Body).Decode(&res)
50 val, _ = strconv.ParseFloat(res[name].Value, 64)
51 }))
52 defer s.Close()
53
54 m := newCirconusMetrics(s.URL)
55 gauge := New(m).NewGauge(name).With("label values", "not supported")
56 value := func() float64 { m.Flush(); return val }
57
58 if err := teststat.TestGauge(gauge, value); err != nil {
59 t.Fatal(err)
60 }
61 }
62
63 func TestHistogram(t *testing.T) {
64 const name = "ghi"
65
66 // Circonus just emits bucketed counts. We'll dump them into a generic
67 // histogram (losing some precision) and take statistics from there. Note
68 // this does assume that the generic histogram computes statistics properly,
69 // but we have another test for that :)
70 re := regexp.MustCompile(`^H\[([0-9\.e\+]+)\]=([0-9]+)$`) // H[1.2e+03]=456
71
72 var p50, p90, p95, p99 float64
73 s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
74 var res map[string]struct {
75 Values []string `json:"_value"` // reverse-engineered :\
76 }
77 json.NewDecoder(r.Body).Decode(&res)
78
79 h := generic.NewHistogram("dummy", len(res[name].Values)) // match tbe bucket counts
80 for _, v := range res[name].Values {
81 match := re.FindStringSubmatch(v)
82 f, _ := strconv.ParseFloat(match[1], 64)
83 n, _ := strconv.ParseInt(match[2], 10, 64)
84 for i := int64(0); i < n; i++ {
85 h.Observe(f)
86 }
87 }
88
89 p50 = h.Quantile(0.50)
90 p90 = h.Quantile(0.90)
91 p95 = h.Quantile(0.95)
92 p99 = h.Quantile(0.99)
93 }))
94 defer s.Close()
95
96 m := newCirconusMetrics(s.URL)
97 histogram := New(m).NewHistogram(name).With("label values", "not supported")
98 quantiles := func() (float64, float64, float64, float64) { m.Flush(); return p50, p90, p95, p99 }
99
100 // Circonus metrics, because they do their own bucketing, are less precise
101 // than other systems. So, we bump the tolerance to 5 percent.
102 if err := teststat.TestHistogram(histogram, quantiles, 0.05); err != nil {
103 t.Fatal(err)
104 }
105 }
106
107 func newCirconusMetrics(url string) *circonusgometrics.CirconusMetrics {
108 m, err := circonusgometrics.NewCirconusMetrics(&circonusgometrics.Config{
109 CheckManager: checkmgr.Config{
110 Check: checkmgr.CheckConfig{
111 SubmissionURL: url,
112 },
113 },
114 })
115 if err != nil {
116 panic(err)
117 }
118 return m
119 }
0 // Package discard implements a backend for package metrics that succeeds
1 // without doing anything.
0 // Package discard provides a no-op metrics backend.
21 package discard
32
4 import "github.com/go-kit/kit/metrics"
3 import "github.com/go-kit/kit/metrics3"
54
6 type counter struct {
7 name string
8 }
5 type counter struct{}
96
10 // NewCounter returns a Counter that does nothing.
11 func NewCounter(name string) metrics.Counter { return &counter{name} }
7 // NewCounter returns a new no-op counter.
8 func NewCounter() metrics.Counter { return counter{} }
129
13 func (c *counter) Name() string { return c.name }
14 func (c *counter) With(metrics.Field) metrics.Counter { return c }
15 func (c *counter) Add(delta uint64) {}
10 // With implements Counter.
11 func (c counter) With(labelValues ...string) metrics.Counter { return c }
1612
17 type gauge struct {
18 name string
19 }
13 // Add implements Counter.
14 func (c counter) Add(delta float64) {}
2015
21 // NewGauge returns a Gauge that does nothing.
22 func NewGauge(name string) metrics.Gauge { return &gauge{name} }
16 type gauge struct{}
2317
24 func (g *gauge) Name() string { return g.name }
25 func (g *gauge) With(metrics.Field) metrics.Gauge { return g }
26 func (g *gauge) Set(value float64) {}
27 func (g *gauge) Add(delta float64) {}
28 func (g *gauge) Get() float64 { return 0 }
18 // NewGauge returns a new no-op gauge.
19 func NewGauge() metrics.Gauge { return gauge{} }
2920
30 type histogram struct {
31 name string
32 }
21 // With implements Gauge.
22 func (g gauge) With(labelValues ...string) metrics.Gauge { return g }
3323
34 // NewHistogram returns a Histogram that does nothing.
35 func NewHistogram(name string) metrics.Histogram { return &histogram{name} }
24 // Set implements Gauge.
25 func (g gauge) Set(value float64) {}
3626
37 func (h *histogram) Name() string { return h.name }
38 func (h *histogram) With(metrics.Field) metrics.Histogram { return h }
39 func (h *histogram) Observe(value int64) {}
40 func (h *histogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
41 return []metrics.Bucket{}, []metrics.Quantile{}
42 }
27 type histogram struct{}
28
29 // NewHistogram returns a new no-op histogram.
30 func NewHistogram() metrics.Histogram { return histogram{} }
31
32 // With implements Histogram.
33 func (h histogram) With(labelValues ...string) metrics.Histogram { return h }
34
35 // Observe implements histogram.
36 func (h histogram) Observe(value float64) {}
00 // Package metrics provides a framework for application instrumentation. All
11 // metrics are safe for concurrent use. Considerable design influence has been
22 // taken from https://github.com/codahale/metrics and https://prometheus.io.
3 //
4 // This package contains the common interfaces. Your code should take these
5 // interfaces as parameters. Implementations are provided for different
6 // instrumentation systems in the various subdirectories.
7 //
8 // Usage
9 //
10 // Metrics are dependencies and should be passed to the components that need
11 // them in the same way you'd construct and pass a database handle, or reference
12 // to another component. So, create metrics in your func main, using whichever
13 // concrete implementation is appropriate for your organization.
14 //
15 // latency := prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
16 // Namespace: "myteam",
17 // Subsystem: "foosvc",
18 // Name: "request_latency_seconds",
19 // Help: "Incoming request latency in seconds."
20 // }, []string{"method", "status_code"})
21 //
22 // Write your components to take the metrics they will use as parameters to
23 // their constructors. Use the interface types, not the concrete types. That is,
24 //
25 // // NewAPI takes metrics.Histogram, not *prometheus.Summary
26 // func NewAPI(s Store, logger log.Logger, latency metrics.Histogram) *API {
27 // // ...
28 // }
29 //
30 // func (a *API) ServeFoo(w http.ResponseWriter, r *http.Request) {
31 // begin := time.Now()
32 // // ...
33 // a.latency.Observe(time.Since(begin).Seconds())
34 // }
35 //
36 // Finally, pass the metrics as dependencies when building your object graph.
37 // This should happen in func main, not in the global scope.
38 //
39 // api := NewAPI(store, logger, latency)
40 // http.ListenAndServe("/", api)
41 //
42 // Implementation details
43 //
44 // Each telemetry system has different semantics for label values, push vs.
45 // pull, support for histograms, etc. These properties influence the design of
46 // their respective packages. This table attempts to summarize the key points of
47 // distinction.
48 //
49 // SYSTEM DIM COUNTERS GAUGES HISTOGRAMS
50 // dogstatsd n batch, push-aggregate batch, push-aggregate native, batch, push-each
51 // statsd 1 batch, push-aggregate batch, push-aggregate native, batch, push-each
52 // graphite 1 batch, push-aggregate batch, push-aggregate synthetic, batch, push-aggregate
53 // expvar 1 atomic atomic synthetic, batch, in-place expose
54 // influx n custom custom custom
55 // prometheus n native native native
56 // circonus 1 native native native
57 //
358 package metrics
0 // Package dogstatsd implements a DogStatsD backend for package metrics.
0 // Package dogstatsd provides a DogStatsD backend for package metrics. It's very
1 // similar to StatsD, but supports arbitrary tags per-metric, which map to Go
2 // kit's label values. So, while label values are no-ops in StatsD, they are
3 // supported here. For more details, see the documentation at
4 // http://docs.datadoghq.com/guides/dogstatsd/.
15 //
2 // This implementation supports Datadog tags that provide additional metric
3 // filtering capabilities. See the DogStatsD documentation for protocol
4 // specifics:
5 // http://docs.datadoghq.com/guides/dogstatsd/
6 //
6 // This package batches observations and emits them on some schedule to the
7 // remote server. This is useful even if you connect to your DogStatsD server
8 // over UDP. Emitting one network packet per observation can quickly overwhelm
9 // even the fastest internal network.
710 package dogstatsd
811
912 import (
10 "bytes"
1113 "fmt"
1214 "io"
13 "log"
14 "math"
15 "strings"
1516 "time"
1617
17 "sync/atomic"
18
19 "github.com/go-kit/kit/metrics"
18 "github.com/go-kit/kit/log"
19 "github.com/go-kit/kit/metrics3"
20 "github.com/go-kit/kit/metrics3/internal/lv"
21 "github.com/go-kit/kit/metrics3/internal/ratemap"
22 "github.com/go-kit/kit/util/conn"
2023 )
2124
22 // dogstatsd metrics were based on the statsd package in go-kit
23
24 const maxBufferSize = 1400 // bytes
25
26 type counter struct {
27 key string
28 c chan string
29 tags []metrics.Field
30 }
31
32 // NewCounter returns a Counter that emits observations in the DogStatsD protocol
33 // to the passed writer. Observations are buffered for the report interval or
34 // until the buffer exceeds a max packet size, whichever comes first.
25 // Dogstatsd receives metrics observations and forwards them to a DogStatsD
26 // server. Create a Dogstatsd object, use it to create metrics, and pass those
27 // metrics as dependencies to the components that will use them.
3528 //
36 // TODO: support for sampling.
37 func NewCounter(w io.Writer, key string, reportInterval time.Duration, globalTags []metrics.Field) metrics.Counter {
38 return NewCounterTick(w, key, time.Tick(reportInterval), globalTags)
39 }
40
41 // NewCounterTick is the same as NewCounter, but allows the user to pass in a
42 // ticker channel instead of invoking time.Tick.
43 func NewCounterTick(w io.Writer, key string, reportTicker <-chan time.Time, tags []metrics.Field) metrics.Counter {
44 c := &counter{
45 key: key,
46 c: make(chan string),
47 tags: tags,
48 }
49 go fwd(w, key, reportTicker, c.c)
50 return c
51 }
52
53 func (c *counter) Name() string { return c.key }
54
55 func (c *counter) With(f metrics.Field) metrics.Counter {
56 return &counter{
57 key: c.key,
58 c: c.c,
59 tags: append(c.tags, f),
60 }
61 }
62
63 func (c *counter) Add(delta uint64) { c.c <- applyTags(fmt.Sprintf("%d|c", delta), c.tags) }
64
65 type gauge struct {
66 key string
67 lastValue uint64 // math.Float64frombits
68 g chan string
69 tags []metrics.Field
70 }
71
72 // NewGauge returns a Gauge that emits values in the DogStatsD protocol to the
73 // passed writer. Values are buffered for the report interval or until the
74 // buffer exceeds a max packet size, whichever comes first.
29 // All metrics are buffered until WriteTo is called. Counters and gauges are
30 // aggregated into a single observation per timeseries per write. Timings and
31 // histograms are buffered but not aggregated.
7532 //
76 // TODO: support for sampling.
77 func NewGauge(w io.Writer, key string, reportInterval time.Duration, tags []metrics.Field) metrics.Gauge {
78 return NewGaugeTick(w, key, time.Tick(reportInterval), tags)
79 }
80
81 // NewGaugeTick is the same as NewGauge, but allows the user to pass in a ticker
82 // channel instead of invoking time.Tick.
83 func NewGaugeTick(w io.Writer, key string, reportTicker <-chan time.Time, tags []metrics.Field) metrics.Gauge {
84 g := &gauge{
85 key: key,
86 g: make(chan string),
87 tags: tags,
88 }
89 go fwd(w, key, reportTicker, g.g)
90 return g
91 }
92
93 func (g *gauge) Name() string { return g.key }
94
95 func (g *gauge) With(f metrics.Field) metrics.Gauge {
96 return &gauge{
97 key: g.key,
98 lastValue: g.lastValue,
99 g: g.g,
100 tags: append(g.tags, f),
101 }
102 }
103
104 func (g *gauge) Add(delta float64) {
105 // https://github.com/etsy/statsd/blob/master/docs/metric_types.md#gauges
106 sign := "+"
107 if delta < 0 {
108 sign, delta = "-", -delta
109 }
110 g.g <- applyTags(fmt.Sprintf("%s%f|g", sign, delta), g.tags)
111 }
112
113 func (g *gauge) Set(value float64) {
114 atomic.StoreUint64(&g.lastValue, math.Float64bits(value))
115 g.g <- applyTags(fmt.Sprintf("%f|g", value), g.tags)
116 }
117
118 func (g *gauge) Get() float64 {
119 return math.Float64frombits(atomic.LoadUint64(&g.lastValue))
120 }
121
122 // NewCallbackGauge emits values in the DogStatsD protocol to the passed writer.
123 // It collects values every scrape interval from the callback. Values are
124 // buffered for the report interval or until the buffer exceeds a max packet
125 // size, whichever comes first. The report and scrape intervals may be the
126 // same. The callback determines the value, and fields are ignored, so
127 // NewCallbackGauge returns nothing.
128 func NewCallbackGauge(w io.Writer, key string, reportInterval, scrapeInterval time.Duration, callback func() float64) {
129 NewCallbackGaugeTick(w, key, time.Tick(reportInterval), time.Tick(scrapeInterval), callback)
130 }
131
132 // NewCallbackGaugeTick is the same as NewCallbackGauge, but allows the user to
133 // pass in ticker channels instead of durations to control report and scrape
134 // intervals.
135 func NewCallbackGaugeTick(w io.Writer, key string, reportTicker, scrapeTicker <-chan time.Time, callback func() float64) {
136 go fwd(w, key, reportTicker, emitEvery(scrapeTicker, callback))
137 }
138
139 func emitEvery(emitTicker <-chan time.Time, callback func() float64) <-chan string {
140 c := make(chan string)
141 go func() {
142 for range emitTicker {
143 c <- fmt.Sprintf("%f|g", callback())
144 }
145 }()
146 return c
147 }
148
149 type histogram struct {
150 key string
151 h chan string
152 tags []metrics.Field
153 }
154
155 // NewHistogram returns a Histogram that emits observations in the DogStatsD
156 // protocol to the passed writer. Observations are buffered for the reporting
157 // interval or until the buffer exceeds a max packet size, whichever comes
158 // first.
159 //
160 // NewHistogram is mapped to a statsd Timing, so observations should represent
161 // milliseconds. If you observe in units of nanoseconds, you can make the
162 // translation with a ScaledHistogram:
163 //
164 // NewScaledHistogram(dogstatsdHistogram, time.Millisecond)
165 //
166 // You can also enforce the constraint in a typesafe way with a millisecond
167 // TimeHistogram:
168 //
169 // NewTimeHistogram(dogstatsdHistogram, time.Millisecond)
170 //
171 // TODO: support for sampling.
172 func NewHistogram(w io.Writer, key string, reportInterval time.Duration, tags []metrics.Field) metrics.Histogram {
173 return NewHistogramTick(w, key, time.Tick(reportInterval), tags)
174 }
175
176 // NewHistogramTick is the same as NewHistogram, but allows the user to pass a
177 // ticker channel instead of invoking time.Tick.
178 func NewHistogramTick(w io.Writer, key string, reportTicker <-chan time.Time, tags []metrics.Field) metrics.Histogram {
179 h := &histogram{
180 key: key,
181 h: make(chan string),
182 tags: tags,
183 }
184 go fwd(w, key, reportTicker, h.h)
185 return h
186 }
187
188 func (h *histogram) Name() string { return h.key }
189
190 func (h *histogram) With(f metrics.Field) metrics.Histogram {
191 return &histogram{
192 key: h.key,
193 h: h.h,
194 tags: append(h.tags, f),
195 }
196 }
197
198 func (h *histogram) Observe(value int64) {
199 h.h <- applyTags(fmt.Sprintf("%d|ms", value), h.tags)
200 }
201
202 func (h *histogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
203 // TODO(pb): no way to do this without introducing e.g. codahale/hdrhistogram
204 return []metrics.Bucket{}, []metrics.Quantile{}
205 }
206
207 func fwd(w io.Writer, key string, reportTicker <-chan time.Time, c <-chan string) {
208 buf := &bytes.Buffer{}
209 for {
210 select {
211 case s := <-c:
212 fmt.Fprintf(buf, "%s:%s\n", key, s)
213 if buf.Len() > maxBufferSize {
214 flush(w, buf)
33 // To regularly report metrics to an io.Writer, use the WriteLoop helper method.
34 // To send to a DogStatsD server, use the SendLoop helper method.
35 type Dogstatsd struct {
36 prefix string
37 rates *ratemap.RateMap
38 counters *lv.Space
39 gauges *lv.Space
40 timings *lv.Space
41 histograms *lv.Space
42 logger log.Logger
43 }
44
45 // New returns a Dogstatsd object that may be used to create metrics. Prefix is
46 // applied to all created metrics. Callers must ensure that regular calls to
47 // WriteTo are performed, either manually or with one of the helper methods.
48 func New(prefix string, logger log.Logger) *Dogstatsd {
49 return &Dogstatsd{
50 prefix: prefix,
51 rates: ratemap.New(),
52 counters: lv.NewSpace(),
53 gauges: lv.NewSpace(),
54 timings: lv.NewSpace(),
55 histograms: lv.NewSpace(),
56 logger: logger,
57 }
58 }
59
60 // NewCounter returns a counter, sending observations to this Dogstatsd object.
61 func (d *Dogstatsd) NewCounter(name string, sampleRate float64) *Counter {
62 d.rates.Set(d.prefix+name, sampleRate)
63 return &Counter{
64 name: d.prefix + name,
65 obs: d.counters.Observe,
66 }
67 }
68
69 // NewGauge returns a gauge, sending observations to this Dogstatsd object.
70 func (d *Dogstatsd) NewGauge(name string) *Gauge {
71 return &Gauge{
72 name: d.prefix + name,
73 obs: d.gauges.Observe,
74 }
75 }
76
77 // NewTiming returns a histogram whose observations are interpreted as
78 // millisecond durations, and are forwarded to this Dogstatsd object.
79 func (d *Dogstatsd) NewTiming(name string, sampleRate float64) *Timing {
80 d.rates.Set(d.prefix+name, sampleRate)
81 return &Timing{
82 name: d.prefix + name,
83 obs: d.timings.Observe,
84 }
85 }
86
87 // NewHistogram returns a histogram whose observations are of an unspecified
88 // unit, and are forwarded to this Dogstatsd object.
89 func (d *Dogstatsd) NewHistogram(name string, sampleRate float64) *Histogram {
90 d.rates.Set(d.prefix+name, sampleRate)
91 return &Histogram{
92 name: d.prefix + name,
93 obs: d.histograms.Observe,
94 }
95 }
96
97 // WriteLoop is a helper method that invokes WriteTo to the passed writer every
98 // time the passed channel fires. This method blocks until the channel is
99 // closed, so clients probably want to run it in its own goroutine. For typical
100 // usage, create a time.Ticker and pass its C channel to this method.
101 func (d *Dogstatsd) WriteLoop(c <-chan time.Time, w io.Writer) {
102 for range c {
103 if _, err := d.WriteTo(w); err != nil {
104 d.logger.Log("during", "WriteTo", "err", err)
105 }
106 }
107 }
108
109 // SendLoop is a helper method that wraps WriteLoop, passing a managed
110 // connection to the network and address. Like WriteLoop, this method blocks
111 // until the channel is closed, so clients probably want to start it in its own
112 // goroutine. For typical usage, create a time.Ticker and pass its C channel to
113 // this method.
114 func (d *Dogstatsd) SendLoop(c <-chan time.Time, network, address string) {
115 d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger))
116 }
117
118 // WriteTo flushes the buffered content of the metrics to the writer, in
119 // DogStatsD format. WriteTo abides best-effort semantics, so observations are
120 // lost if there is a problem with the write. Clients should be sure to call
121 // WriteTo regularly, ideally through the WriteLoop or SendLoop helper methods.
122 func (d *Dogstatsd) WriteTo(w io.Writer) (count int64, err error) {
123 var n int
124
125 d.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
126 n, err = fmt.Fprintf(w, "%s:%f|c%s%s\n", name, sum(values), sampling(d.rates.Get(name)), tagValues(lvs))
127 if err != nil {
128 return false
129 }
130 count += int64(n)
131 return true
132 })
133 if err != nil {
134 return count, err
135 }
136
137 d.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
138 n, err = fmt.Fprintf(w, "%s:%f|g%s\n", name, last(values), tagValues(lvs))
139 if err != nil {
140 return false
141 }
142 count += int64(n)
143 return true
144 })
145 if err != nil {
146 return count, err
147 }
148
149 d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
150 sampleRate := d.rates.Get(name)
151 for _, value := range values {
152 n, err = fmt.Fprintf(w, "%s:%f|ms%s%s\n", name, value, sampling(sampleRate), tagValues(lvs))
153 if err != nil {
154 return false
215155 }
216
217 case <-reportTicker:
218 flush(w, buf)
219 }
220 }
221 }
222
223 func flush(w io.Writer, buf *bytes.Buffer) {
224 if buf.Len() <= 0 {
225 return
226 }
227 if _, err := w.Write(buf.Bytes()); err != nil {
228 log.Printf("error: could not write to dogstatsd: %v", err)
229 }
230 buf.Reset()
231 }
232
233 func applyTags(value string, tags []metrics.Field) string {
234 if len(tags) > 0 {
235 var tagsString string
236 for _, t := range tags {
237 switch tagsString {
238 case "":
239 tagsString = t.Key + ":" + t.Value
240 default:
241 tagsString = tagsString + "," + t.Key + ":" + t.Value
156 count += int64(n)
157 }
158 return true
159 })
160 if err != nil {
161 return count, err
162 }
163
164 d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
165 sampleRate := d.rates.Get(name)
166 for _, value := range values {
167 n, err = fmt.Fprintf(w, "%s:%f|h%s%s\n", name, value, sampling(sampleRate), tagValues(lvs))
168 if err != nil {
169 return false
242170 }
243 }
244 value = value + "|#" + tagsString
245 }
246 return value
247 }
171 count += int64(n)
172 }
173 return true
174 })
175 if err != nil {
176 return count, err
177 }
178
179 return count, err
180 }
181
182 func sum(a []float64) float64 {
183 var v float64
184 for _, f := range a {
185 v += f
186 }
187 return v
188 }
189
190 func last(a []float64) float64 {
191 return a[len(a)-1]
192 }
193
194 func sampling(r float64) string {
195 var sv string
196 if r < 1.0 {
197 sv = fmt.Sprintf("|@%f", r)
198 }
199 return sv
200 }
201
202 func tagValues(labelValues []string) string {
203 if len(labelValues) == 0 {
204 return ""
205 }
206 if len(labelValues)%2 != 0 {
207 panic("tagValues received a labelValues with an odd number of strings")
208 }
209 pairs := make([]string, 0, len(labelValues)/2)
210 for i := 0; i < len(labelValues); i += 2 {
211 pairs = append(pairs, labelValues[i]+":"+labelValues[i+1])
212 }
213 return "|#" + strings.Join(pairs, ",")
214 }
215
216 type observeFunc func(name string, lvs lv.LabelValues, value float64)
217
218 // Counter is a DogStatsD counter. Observations are forwarded to a Dogstatsd
219 // object, and aggregated (summed) per timeseries.
220 type Counter struct {
221 name string
222 lvs lv.LabelValues
223 obs observeFunc
224 }
225
226 // With implements metrics.Counter.
227 func (c *Counter) With(labelValues ...string) metrics.Counter {
228 return &Counter{
229 name: c.name,
230 lvs: c.lvs.With(labelValues...),
231 obs: c.obs,
232 }
233 }
234
235 // Add implements metrics.Counter.
236 func (c *Counter) Add(delta float64) {
237 c.obs(c.name, c.lvs, delta)
238 }
239
240 // Gauge is a DogStatsD gauge. Observations are forwarded to a Dogstatsd
241 // object, and aggregated (the last observation selected) per timeseries.
242 type Gauge struct {
243 name string
244 lvs lv.LabelValues
245 obs observeFunc
246 }
247
248 // With implements metrics.Gauge.
249 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
250 return &Gauge{
251 name: g.name,
252 lvs: g.lvs.With(labelValues...),
253 obs: g.obs,
254 }
255 }
256
257 // Set implements metrics.Gauge.
258 func (g *Gauge) Set(value float64) {
259 g.obs(g.name, g.lvs, value)
260 }
261
262 // Timing is a DogStatsD timing, or metrics.Histogram. Observations are
263 // forwarded to a Dogstatsd object, and collected (but not aggregated) per
264 // timeseries.
265 type Timing struct {
266 name string
267 lvs lv.LabelValues
268 obs observeFunc
269 }
270
271 // With implements metrics.Timing.
272 func (t *Timing) With(labelValues ...string) metrics.Histogram {
273 return &Timing{
274 name: t.name,
275 lvs: t.lvs.With(labelValues...),
276 obs: t.obs,
277 }
278 }
279
280 // Observe implements metrics.Histogram. Value is interpreted as milliseconds.
281 func (t *Timing) Observe(value float64) {
282 t.obs(t.name, t.lvs, value)
283 }
284
285 // Histogram is a DogStatsD histrogram. Observations are forwarded to a
286 // Dogstatsd object, and collected (but not aggregated) per timeseries.
287 type Histogram struct {
288 name string
289 lvs lv.LabelValues
290 obs observeFunc
291 }
292
293 // With implements metrics.Histogram.
294 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
295 return &Histogram{
296 name: h.name,
297 lvs: h.lvs.With(labelValues...),
298 obs: h.obs,
299 }
300 }
301
302 // Observe implements metrics.Histogram.
303 func (h *Histogram) Observe(value float64) {
304 h.obs(h.name, h.lvs, value)
305 }
00 package dogstatsd
11
22 import (
3 "bytes"
4 "fmt"
5 "net"
6 "strings"
7 "sync"
83 "testing"
9 "time"
104
115 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/metrics"
13 "github.com/go-kit/kit/util/conn"
6 "github.com/go-kit/kit/metrics3/teststat"
147 )
158
16 func TestEmitterCounter(t *testing.T) {
17 e, buf := testEmitter()
18
19 c := e.NewCounter("test_statsd_counter")
20 c.Add(1)
21 c.Add(2)
22
23 // give time for things to emit
24 time.Sleep(time.Millisecond * 250)
25 // force a flush and stop
26 e.Stop()
27
28 want := "prefix.test_statsd_counter:1|c\nprefix.test_statsd_counter:2|c\n"
29 have := buf.String()
30 if want != have {
31 t.Errorf("want %q, have %q", want, have)
9 func TestCounter(t *testing.T) {
10 prefix, name := "abc.", "def"
11 label, value := "label", "value"
12 regex := `^` + prefix + name + `:([0-9\.]+)\|c\|#` + label + `:` + value + `$`
13 d := New(prefix, log.NewNopLogger())
14 counter := d.NewCounter(name, 1.0).With(label, value)
15 valuef := teststat.SumLines(d, regex)
16 if err := teststat.TestCounter(counter, valuef); err != nil {
17 t.Fatal(err)
3218 }
3319 }
3420
35 func TestEmitterGauge(t *testing.T) {
36 e, buf := testEmitter()
21 func TestCounterSampled(t *testing.T) {
22 // This will involve multiplying the observed sum by the inverse of the
23 // sample rate and checking against the expected value within some
24 // tolerance.
25 t.Skip("TODO")
26 }
3727
38 g := e.NewGauge("test_statsd_gauge")
39
40 delta := 1.0
41 g.Add(delta)
42
43 // give time for things to emit
44 time.Sleep(time.Millisecond * 250)
45 // force a flush and stop
46 e.Stop()
47
48 want := fmt.Sprintf("prefix.test_statsd_gauge:+%f|g\n", delta)
49 have := buf.String()
50 if want != have {
51 t.Errorf("want %q, have %q", want, have)
28 func TestGauge(t *testing.T) {
29 prefix, name := "ghi.", "jkl"
30 label, value := "xyz", "abc"
31 regex := `^` + prefix + name + `:([0-9\.]+)\|g\|#` + label + `:` + value + `$`
32 d := New(prefix, log.NewNopLogger())
33 gauge := d.NewGauge(name).With(label, value)
34 valuef := teststat.LastLine(d, regex)
35 if err := teststat.TestGauge(gauge, valuef); err != nil {
36 t.Fatal(err)
5237 }
5338 }
5439
55 func TestEmitterHistogram(t *testing.T) {
56 e, buf := testEmitter()
57 h := e.NewHistogram("test_statsd_histogram")
40 // DogStatsD histograms just emit all observations. So, we collect them into
41 // a generic histogram, and run the statistics test on that.
5842
59 h.Observe(123)
60
61 // give time for things to emit
62 time.Sleep(time.Millisecond * 250)
63 // force a flush and stop
64 e.Stop()
65
66 want := "prefix.test_statsd_histogram:123|ms\n"
67 have := buf.String()
68 if want != have {
69 t.Errorf("want %q, have %q", want, have)
43 func TestHistogram(t *testing.T) {
44 prefix, name := "dogstatsd.", "histogram_test"
45 label, value := "abc", "def"
46 regex := `^` + prefix + name + `:([0-9\.]+)\|h\|#` + label + `:` + value + `$`
47 d := New(prefix, log.NewNopLogger())
48 histogram := d.NewHistogram(name, 1.0).With(label, value)
49 quantiles := teststat.Quantiles(d, regex, 50) // no |@0.X
50 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
51 t.Fatal(err)
7052 }
7153 }
7254
73 func TestCounter(t *testing.T) {
74 buf := &syncbuf{buf: &bytes.Buffer{}}
75 reportc := make(chan time.Time)
76 tags := []metrics.Field{}
77 c := NewCounterTick(buf, "test_statsd_counter", reportc, tags)
78
79 c.Add(1)
80 c.With(metrics.Field{"foo", "bar"}).Add(2)
81 c.With(metrics.Field{"foo", "bar"}).With(metrics.Field{"abc", "123"}).Add(2)
82 c.Add(3)
83
84 want, have := "test_statsd_counter:1|c\ntest_statsd_counter:2|c|#foo:bar\ntest_statsd_counter:2|c|#foo:bar,abc:123\ntest_statsd_counter:3|c\n", ""
85 by(t, 100*time.Millisecond, func() bool {
86 have = buf.String()
87 return want == have
88 }, func() {
89 reportc <- time.Now()
90 }, fmt.Sprintf("want %q, have %q", want, have))
91 }
92
93 func TestGauge(t *testing.T) {
94 buf := &syncbuf{buf: &bytes.Buffer{}}
95 reportc := make(chan time.Time)
96 tags := []metrics.Field{}
97 g := NewGaugeTick(buf, "test_statsd_gauge", reportc, tags)
98
99 delta := 1.0
100 g.Add(delta)
101
102 want, have := fmt.Sprintf("test_statsd_gauge:+%f|g\n", delta), ""
103 by(t, 100*time.Millisecond, func() bool {
104 have = buf.String()
105 return want == have
106 }, func() {
107 reportc <- time.Now()
108 }, fmt.Sprintf("want %q, have %q", want, have))
109
110 buf.Reset()
111 delta = -2.0
112 g.With(metrics.Field{"foo", "bar"}).Add(delta)
113
114 want, have = fmt.Sprintf("test_statsd_gauge:%f|g|#foo:bar\n", delta), ""
115 by(t, 100*time.Millisecond, func() bool {
116 have = buf.String()
117 return want == have
118 }, func() {
119 reportc <- time.Now()
120 }, fmt.Sprintf("want %q, have %q", want, have))
121
122 buf.Reset()
123 value := 3.0
124 g.With(metrics.Field{"foo", "bar"}).With(metrics.Field{"abc", "123"}).Set(value)
125
126 want, have = fmt.Sprintf("test_statsd_gauge:%f|g|#foo:bar,abc:123\n", value), ""
127 by(t, 100*time.Millisecond, func() bool {
128 have = buf.String()
129 return want == have
130 }, func() {
131 reportc <- time.Now()
132 }, fmt.Sprintf("want %q, have %q", want, have))
133 }
134
135 func TestCallbackGauge(t *testing.T) {
136 buf := &syncbuf{buf: &bytes.Buffer{}}
137 reportc, scrapec := make(chan time.Time), make(chan time.Time)
138 value := 55.55
139 cb := func() float64 { return value }
140 NewCallbackGaugeTick(buf, "test_statsd_callback_gauge", reportc, scrapec, cb)
141
142 scrapec <- time.Now()
143 reportc <- time.Now()
144
145 // Travis is annoying
146 by(t, time.Second, func() bool {
147 return buf.String() != ""
148 }, func() {
149 reportc <- time.Now()
150 }, "buffer never got write+flush")
151
152 want, have := fmt.Sprintf("test_statsd_callback_gauge:%f|g\n", value), ""
153 by(t, 100*time.Millisecond, func() bool {
154 have = buf.String()
155 return strings.HasPrefix(have, want) // HasPrefix because we might get multiple writes
156 }, func() {
157 reportc <- time.Now()
158 }, fmt.Sprintf("want %q, have %q", want, have))
159 }
160
161 func TestHistogram(t *testing.T) {
162 buf := &syncbuf{buf: &bytes.Buffer{}}
163 reportc := make(chan time.Time)
164 tags := []metrics.Field{}
165 h := NewHistogramTick(buf, "test_statsd_histogram", reportc, tags)
166
167 h.Observe(123)
168 h.With(metrics.Field{"foo", "bar"}).Observe(456)
169
170 want, have := "test_statsd_histogram:123|ms\ntest_statsd_histogram:456|ms|#foo:bar\n", ""
171 by(t, 100*time.Millisecond, func() bool {
172 have = buf.String()
173 return want == have
174 }, func() {
175 reportc <- time.Now()
176 }, fmt.Sprintf("want %q, have %q", want, have))
177 }
178
179 func by(t *testing.T, d time.Duration, check func() bool, execute func(), msg string) {
180 deadline := time.Now().Add(d)
181 for !check() {
182 if time.Now().After(deadline) {
183 t.Fatal(msg)
184 }
185 execute()
55 func TestHistogramSampled(t *testing.T) {
56 prefix, name := "dogstatsd.", "sampled_histogram_test"
57 label, value := "foo", "bar"
58 regex := `^` + prefix + name + `:([0-9\.]+)\|h\|@0\.01[0]*\|#` + label + `:` + value + `$`
59 d := New(prefix, log.NewNopLogger())
60 histogram := d.NewHistogram(name, 0.01).With(label, value)
61 quantiles := teststat.Quantiles(d, regex, 50)
62 if err := teststat.TestHistogram(histogram, quantiles, 0.02); err != nil {
63 t.Fatal(err)
18664 }
18765 }
18866
189 type syncbuf struct {
190 mtx sync.Mutex
191 buf *bytes.Buffer
192 }
193
194 func (s *syncbuf) Write(p []byte) (int, error) {
195 s.mtx.Lock()
196 defer s.mtx.Unlock()
197 return s.buf.Write(p)
198 }
199
200 func (s *syncbuf) String() string {
201 s.mtx.Lock()
202 defer s.mtx.Unlock()
203 return s.buf.String()
204 }
205
206 func (s *syncbuf) Reset() {
207 s.mtx.Lock()
208 defer s.mtx.Unlock()
209 s.buf.Reset()
210 }
211
212 func testEmitter() (*Emitter, *syncbuf) {
213 buf := &syncbuf{buf: &bytes.Buffer{}}
214 e := &Emitter{
215 prefix: "prefix.",
216 mgr: conn.NewManager(mockDialer(buf), "", "", time.After, log.NewNopLogger()),
217 logger: log.NewNopLogger(),
218 keyVals: make(chan keyVal),
219 quitc: make(chan chan struct{}),
220 }
221 go e.loop(time.Millisecond * 20)
222 return e, buf
223 }
224
225 func mockDialer(buf *syncbuf) conn.Dialer {
226 return func(net, addr string) (net.Conn, error) {
227 return &mockConn{buf}, nil
67 func TestTiming(t *testing.T) {
68 prefix, name := "dogstatsd.", "timing_test"
69 label, value := "wiggle", "bottom"
70 regex := `^` + prefix + name + `:([0-9\.]+)\|ms\|#` + label + `:` + value + `$`
71 d := New(prefix, log.NewNopLogger())
72 histogram := d.NewTiming(name, 1.0).With(label, value)
73 quantiles := teststat.Quantiles(d, regex, 50) // no |@0.X
74 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
75 t.Fatal(err)
22876 }
22977 }
23078
231 type mockConn struct {
232 buf *syncbuf
79 func TestTimingSampled(t *testing.T) {
80 prefix, name := "dogstatsd.", "sampled_timing_test"
81 label, value := "internal", "external"
82 regex := `^` + prefix + name + `:([0-9\.]+)\|ms\|@0.03[0]*\|#` + label + `:` + value + `$`
83 d := New(prefix, log.NewNopLogger())
84 histogram := d.NewTiming(name, 0.03).With(label, value)
85 quantiles := teststat.Quantiles(d, regex, 50)
86 if err := teststat.TestHistogram(histogram, quantiles, 0.02); err != nil {
87 t.Fatal(err)
88 }
23389 }
234
235 func (c *mockConn) Read(b []byte) (n int, err error) {
236 panic("not implemented")
237 }
238
239 func (c *mockConn) Write(b []byte) (n int, err error) {
240 return c.buf.Write(b)
241 }
242
243 func (c *mockConn) Close() error {
244 panic("not implemented")
245 }
246
247 func (c *mockConn) LocalAddr() net.Addr {
248 panic("not implemented")
249 }
250
251 func (c *mockConn) RemoteAddr() net.Addr {
252 panic("not implemented")
253 }
254
255 func (c *mockConn) SetDeadline(t time.Time) error {
256 panic("not implemented")
257 }
258
259 func (c *mockConn) SetReadDeadline(t time.Time) error {
260 panic("not implemented")
261 }
262
263 func (c *mockConn) SetWriteDeadline(t time.Time) error {
264 panic("not implemented")
265 }
+0
-159
metrics/dogstatsd/emitter.go less more
0 package dogstatsd
1
2 import (
3 "bytes"
4 "fmt"
5 "net"
6 "time"
7
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/metrics"
10 "github.com/go-kit/kit/util/conn"
11 )
12
13 // Emitter is a struct to manage connections and orchestrate the emission of
14 // metrics to a DogStatsd process.
15 type Emitter struct {
16 prefix string
17 keyVals chan keyVal
18 mgr *conn.Manager
19 logger log.Logger
20 quitc chan chan struct{}
21 }
22
23 type keyVal struct {
24 key string
25 val string
26 }
27
28 func stringToKeyVal(key string, keyVals chan keyVal) chan string {
29 vals := make(chan string)
30 go func() {
31 for val := range vals {
32 keyVals <- keyVal{key: key, val: val}
33 }
34 }()
35 return vals
36 }
37
38 // NewEmitter will return an Emitter that will prefix all metrics names with the
39 // given prefix. Once started, it will attempt to create a connection with the
40 // given network and address via `net.Dial` and periodically post metrics to the
41 // connection in the DogStatsD protocol.
42 func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
43 return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
44 }
45
46 // NewEmitterDial is the same as NewEmitter, but allows you to specify your own
47 // Dialer function. This is primarily useful for tests.
48 func NewEmitterDial(dialer conn.Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
49 e := &Emitter{
50 prefix: metricsPrefix,
51 mgr: conn.NewManager(dialer, network, address, time.After, logger),
52 logger: logger,
53 keyVals: make(chan keyVal),
54 quitc: make(chan chan struct{}),
55 }
56 go e.loop(flushInterval)
57 return e
58 }
59
60 // NewCounter returns a Counter that emits observations in the DogStatsD protocol
61 // via the Emitter's connection manager. Observations are buffered for the
62 // report interval or until the buffer exceeds a max packet size, whichever
63 // comes first. Fields are ignored.
64 func (e *Emitter) NewCounter(key string) metrics.Counter {
65 key = e.prefix + key
66 return &counter{
67 key: key,
68 c: stringToKeyVal(key, e.keyVals),
69 }
70 }
71
72 // NewHistogram returns a Histogram that emits observations in the DogStatsD
73 // protocol via the Emitter's connection manager. Observations are buffered for
74 // the reporting interval or until the buffer exceeds a max packet size,
75 // whichever comes first. Fields are ignored.
76 //
77 // NewHistogram is mapped to a statsd Timing, so observations should represent
78 // milliseconds. If you observe in units of nanoseconds, you can make the
79 // translation with a ScaledHistogram:
80 //
81 // NewScaledHistogram(histogram, time.Millisecond)
82 //
83 // You can also enforce the constraint in a typesafe way with a millisecond
84 // TimeHistogram:
85 //
86 // NewTimeHistogram(histogram, time.Millisecond)
87 //
88 // TODO: support for sampling.
89 func (e *Emitter) NewHistogram(key string) metrics.Histogram {
90 key = e.prefix + key
91 return &histogram{
92 key: key,
93 h: stringToKeyVal(key, e.keyVals),
94 }
95 }
96
97 // NewGauge returns a Gauge that emits values in the DogStatsD protocol via the
98 // the Emitter's connection manager. Values are buffered for the report
99 // interval or until the buffer exceeds a max packet size, whichever comes
100 // first. Fields are ignored.
101 //
102 // TODO: support for sampling
103 func (e *Emitter) NewGauge(key string) metrics.Gauge {
104 key = e.prefix + key
105 return &gauge{
106 key: key,
107 g: stringToKeyVal(key, e.keyVals),
108 }
109 }
110
111 func (e *Emitter) loop(d time.Duration) {
112 ticker := time.NewTicker(d)
113 defer ticker.Stop()
114 buf := &bytes.Buffer{}
115 for {
116 select {
117 case kv := <-e.keyVals:
118 fmt.Fprintf(buf, "%s:%s\n", kv.key, kv.val)
119 if buf.Len() > maxBufferSize {
120 e.Flush(buf)
121 }
122
123 case <-ticker.C:
124 e.Flush(buf)
125
126 case q := <-e.quitc:
127 e.Flush(buf)
128 close(q)
129 return
130 }
131 }
132 }
133
134 // Stop will flush the current metrics and close the active connection. Calling
135 // stop more than once is a programmer error.
136 func (e *Emitter) Stop() {
137 q := make(chan struct{})
138 e.quitc <- q
139 <-q
140 }
141
142 // Flush will write the given buffer to a connection provided by the Emitter's
143 // connection manager.
144 func (e *Emitter) Flush(buf *bytes.Buffer) {
145 conn := e.mgr.Take()
146 if conn == nil {
147 e.logger.Log("during", "flush", "err", "connection unavailable")
148 return
149 }
150
151 _, err := conn.Write(buf.Bytes())
152 if err != nil {
153 e.logger.Log("during", "flush", "err", err)
154 }
155 buf.Reset()
156
157 e.mgr.Put(err)
158 }
0 // Package expvar implements an expvar backend for package metrics.
1 //
2 // The current implementation ignores fields. In the future, it would be good
3 // to have an implementation that accepted a set of predeclared field names at
4 // construction time, and used field values to produce delimiter-separated
5 // bucket (key) names. That is,
6 //
7 // c := NewFieldedCounter(..., "path", "status")
8 // c.Add(1) // "myprefix_unknown_unknown" += 1
9 // c2 := c.With("path", "foo").With("status": "200")
10 // c2.Add(1) // "myprefix_foo_200" += 1
11 //
12 // It would also be possible to have an implementation that generated more
13 // sophisticated expvar.Values. For example, a Counter could be implemented as
14 // a map, representing a tree of key/value pairs whose leaves were the actual
15 // expvar.Ints.
0 // Package expvar provides expvar backends for metrics.
1 // Label values are not supported.
162 package expvar
173
184 import (
195 "expvar"
20 "fmt"
21 "sort"
22 "strconv"
236 "sync"
24 "time"
257
26 "github.com/codahale/hdrhistogram"
27
28 "github.com/go-kit/kit/metrics"
8 "github.com/go-kit/kit/metrics3"
9 "github.com/go-kit/kit/metrics3/generic"
2910 )
3011
31 type counter struct {
32 name string
33 v *expvar.Int
12 // Counter implements the counter metric with an expvar float.
13 // Label values are not supported.
14 type Counter struct {
15 f *expvar.Float
3416 }
3517
36 // NewCounter returns a new Counter backed by an expvar with the given name.
37 // Fields are ignored.
38 func NewCounter(name string) metrics.Counter {
39 return &counter{
40 name: name,
41 v: expvar.NewInt(name),
18 // NewCounter creates an expvar Float with the given name, and returns an object
19 // that implements the Counter interface.
20 func NewCounter(name string) *Counter {
21 return &Counter{
22 f: expvar.NewFloat(name),
4223 }
4324 }
4425
45 func (c *counter) Name() string { return c.name }
46 func (c *counter) With(metrics.Field) metrics.Counter { return c }
47 func (c *counter) Add(delta uint64) { c.v.Add(int64(delta)) }
26 // With is a no-op.
27 func (c *Counter) With(labelValues ...string) metrics.Counter { return c }
4828
49 type gauge struct {
50 name string
51 v *expvar.Float
29 // Add implements Counter.
30 func (c *Counter) Add(delta float64) { c.f.Add(delta) }
31
32 // Gauge implements the gauge metric wtih an expvar float.
33 // Label values are not supported.
34 type Gauge struct {
35 f *expvar.Float
5236 }
5337
54 // NewGauge returns a new Gauge backed by an expvar with the given name. It
55 // should be updated manually; for a callback-based approach, see
56 // PublishCallbackGauge. Fields are ignored.
57 func NewGauge(name string) metrics.Gauge {
58 return &gauge{
59 name: name,
60 v: expvar.NewFloat(name),
38 // NewGauge creates an expvar Float with the given name, and returns an object
39 // that implements the Gauge interface.
40 func NewGauge(name string) *Gauge {
41 return &Gauge{
42 f: expvar.NewFloat(name),
6143 }
6244 }
6345
64 func (g *gauge) Name() string { return g.name }
65 func (g *gauge) With(metrics.Field) metrics.Gauge { return g }
66 func (g *gauge) Add(delta float64) { g.v.Add(delta) }
67 func (g *gauge) Set(value float64) { g.v.Set(value) }
68 func (g *gauge) Get() float64 { return mustParseFloat64(g.v.String()) }
46 // With is a no-op.
47 func (g *Gauge) With(labelValues ...string) metrics.Gauge { return g }
6948
70 // PublishCallbackGauge publishes a Gauge as an expvar with the given name,
71 // whose value is determined at collect time by the passed callback function.
72 // The callback determines the value, and fields are ignored, so
73 // PublishCallbackGauge returns nothing.
74 func PublishCallbackGauge(name string, callback func() float64) {
75 expvar.Publish(name, callbackGauge(callback))
49 // Set implements Gauge.
50 func (g *Gauge) Set(value float64) { g.f.Set(value) }
51
52 // Histogram implements the histogram metric with a combination of the generic
53 // Histogram object and several expvar Floats, one for each of the 50th, 90th,
54 // 95th, and 99th quantiles of observed values, with the quantile attached to
55 // the name as a suffix. Label values are not supported.
56 type Histogram struct {
57 mtx sync.Mutex
58 h *generic.Histogram
59 p50 *expvar.Float
60 p90 *expvar.Float
61 p95 *expvar.Float
62 p99 *expvar.Float
7663 }
7764
78 type callbackGauge func() float64
79
80 func (g callbackGauge) String() string { return strconv.FormatFloat(g(), 'g', -1, 64) }
81
82 type histogram struct {
83 mu sync.Mutex
84 hist *hdrhistogram.WindowedHistogram
85
86 name string
87 gauges map[int]metrics.Gauge
88 }
89
90 // NewHistogram is taken from http://github.com/codahale/metrics. It returns a
91 // windowed HDR histogram which drops data older than five minutes.
92 //
93 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
94 // should be integers in the range 1..99. The gauge names are assigned by
95 // using the passed name as a prefix and appending "_pNN" e.g. "_p50".
96 func NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) metrics.Histogram {
97 gauges := map[int]metrics.Gauge{}
98 for _, quantile := range quantiles {
99 if quantile <= 0 || quantile >= 100 {
100 panic(fmt.Sprintf("invalid quantile %d", quantile))
101 }
102 gauges[quantile] = NewGauge(fmt.Sprintf("%s_p%02d", name, quantile))
103 }
104 h := &histogram{
105 hist: hdrhistogram.NewWindowed(5, minValue, maxValue, sigfigs),
106 name: name,
107 gauges: gauges,
108 }
109 go h.rotateLoop(1 * time.Minute)
110 return h
111 }
112
113 func (h *histogram) Name() string { return h.name }
114 func (h *histogram) With(metrics.Field) metrics.Histogram { return h }
115
116 func (h *histogram) Observe(value int64) {
117 h.mu.Lock()
118 err := h.hist.Current.RecordValue(value)
119 h.mu.Unlock()
120
121 if err != nil {
122 panic(err.Error())
123 }
124
125 for q, gauge := range h.gauges {
126 gauge.Set(float64(h.hist.Current.ValueAtQuantile(float64(q))))
65 // NewHistogram returns a Histogram object with the given name and number of
66 // buckets in the underlying histogram object. 50 is a good default number of
67 // buckets.
68 func NewHistogram(name string, buckets int) *Histogram {
69 return &Histogram{
70 h: generic.NewHistogram(name, buckets),
71 p50: expvar.NewFloat(name + ".p50"),
72 p90: expvar.NewFloat(name + ".p90"),
73 p95: expvar.NewFloat(name + ".p95"),
74 p99: expvar.NewFloat(name + ".p99"),
12775 }
12876 }
12977
130 func (h *histogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
131 bars := h.hist.Merge().Distribution()
132 buckets := make([]metrics.Bucket, len(bars))
133 for i, bar := range bars {
134 buckets[i] = metrics.Bucket{
135 From: bar.From,
136 To: bar.To,
137 Count: bar.Count,
138 }
139 }
140 quantiles := make([]metrics.Quantile, 0, len(h.gauges))
141 for quantile, gauge := range h.gauges {
142 quantiles = append(quantiles, metrics.Quantile{
143 Quantile: quantile,
144 Value: int64(gauge.Get()),
145 })
146 }
147 sort.Sort(quantileSlice(quantiles))
148 return buckets, quantiles
78 // With is a no-op.
79 func (h *Histogram) With(labelValues ...string) metrics.Histogram { return h }
80
81 // Observe impleemts Histogram.
82 func (h *Histogram) Observe(value float64) {
83 h.mtx.Lock()
84 defer h.mtx.Unlock()
85 h.h.Observe(value)
86 h.p50.Set(h.h.Quantile(0.50))
87 h.p90.Set(h.h.Quantile(0.90))
88 h.p95.Set(h.h.Quantile(0.95))
89 h.p99.Set(h.h.Quantile(0.99))
14990 }
150
151 func (h *histogram) rotateLoop(d time.Duration) {
152 for range time.Tick(d) {
153 h.mu.Lock()
154 h.hist.Rotate()
155 h.mu.Unlock()
156 }
157 }
158
159 func mustParseFloat64(s string) float64 {
160 f, err := strconv.ParseFloat(s, 64)
161 if err != nil {
162 panic(err)
163 }
164 return f
165 }
166
167 type quantileSlice []metrics.Quantile
168
169 func (a quantileSlice) Len() int { return len(a) }
170 func (a quantileSlice) Less(i, j int) bool { return a[i].Quantile < a[j].Quantile }
171 func (a quantileSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
0 package expvar_test
0 package expvar
11
22 import (
3 stdexpvar "expvar"
4 "fmt"
3 "strconv"
54 "testing"
65
7 "github.com/go-kit/kit/metrics"
8 "github.com/go-kit/kit/metrics/expvar"
9 "github.com/go-kit/kit/metrics/teststat"
6 "github.com/go-kit/kit/metrics3/teststat"
107 )
118
12 func TestHistogramQuantiles(t *testing.T) {
13 var (
14 name = "test_histogram_quantiles"
15 quantiles = []int{50, 90, 95, 99}
16 h = expvar.NewHistogram(name, 0, 100, 3, quantiles...).With(metrics.Field{Key: "ignored", Value: "field"})
17 )
18 const seed, mean, stdev int64 = 424242, 50, 10
19 teststat.PopulateNormalHistogram(t, h, seed, mean, stdev)
20 teststat.AssertExpvarNormalHistogram(t, name, mean, stdev, quantiles)
21 }
22
23 func TestCallbackGauge(t *testing.T) {
24 var (
25 name = "foo"
26 value = 42.43
27 )
28 expvar.PublishCallbackGauge(name, func() float64 { return value })
29 if want, have := fmt.Sprint(value), stdexpvar.Get(name).String(); want != have {
30 t.Errorf("want %q, have %q", want, have)
31 }
32 }
33
349 func TestCounter(t *testing.T) {
35 var (
36 name = "m"
37 value = 123
38 )
39 expvar.NewCounter(name).With(metrics.Field{Key: "ignored", Value: "field"}).Add(uint64(value))
40 if want, have := fmt.Sprint(value), stdexpvar.Get(name).String(); want != have {
41 t.Errorf("want %q, have %q", want, have)
10 counter := NewCounter("expvar_counter").With("label values", "not supported").(*Counter)
11 value := func() float64 { f, _ := strconv.ParseFloat(counter.f.String(), 64); return f }
12 if err := teststat.TestCounter(counter, value); err != nil {
13 t.Fatal(err)
4214 }
4315 }
4416
4517 func TestGauge(t *testing.T) {
46 var (
47 name = "xyz"
48 value = 54321
49 delta = 12345
50 g = expvar.NewGauge(name).With(metrics.Field{Key: "ignored", Value: "field"})
51 )
52 g.Set(float64(value))
53 g.Add(float64(delta))
54 if want, have := fmt.Sprint(value+delta), stdexpvar.Get(name).String(); want != have {
55 t.Errorf("want %q, have %q", want, have)
18 gauge := NewGauge("expvar_gauge").With("label values", "not supported").(*Gauge)
19 value := func() float64 { f, _ := strconv.ParseFloat(gauge.f.String(), 64); return f }
20 if err := teststat.TestGauge(gauge, value); err != nil {
21 t.Fatal(err)
5622 }
5723 }
5824
59 func TestInvalidQuantile(t *testing.T) {
60 defer func() {
61 if err := recover(); err == nil {
62 t.Errorf("expected panic, got none")
63 } else {
64 t.Logf("got expected panic: %v", err)
65 }
66 }()
67 expvar.NewHistogram("foo", 0.0, 100.0, 3, 50, 90, 95, 99, 101)
25 func TestHistogram(t *testing.T) {
26 histogram := NewHistogram("expvar_histogram", 50).With("label values", "not supported").(*Histogram)
27 quantiles := func() (float64, float64, float64, float64) {
28 p50, _ := strconv.ParseFloat(histogram.p50.String(), 64)
29 p90, _ := strconv.ParseFloat(histogram.p90.String(), 64)
30 p95, _ := strconv.ParseFloat(histogram.p95.String(), 64)
31 p99, _ := strconv.ParseFloat(histogram.p99.String(), 64)
32 return p50, p90, p95, p99
33 }
34 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
35 t.Fatal(err)
36 }
6837 }
0 // Package generic implements generic versions of each of the metric types. They
1 // can be embedded by other implementations, and converted to specific formats
2 // as necessary.
3 package generic
4
5 import (
6 "fmt"
7 "io"
8 "math"
9 "sync"
10 "sync/atomic"
11
12 "github.com/VividCortex/gohistogram"
13
14 "github.com/go-kit/kit/metrics3"
15 "github.com/go-kit/kit/metrics3/internal/lv"
16 )
17
18 // Counter is an in-memory implementation of a Counter.
19 type Counter struct {
20 Name string
21 lvs lv.LabelValues
22 bits uint64
23 }
24
25 // NewCounter returns a new, usable Counter.
26 func NewCounter(name string) *Counter {
27 return &Counter{
28 Name: name,
29 }
30 }
31
32 // With implements Counter.
33 func (c *Counter) With(labelValues ...string) metrics.Counter {
34 return &Counter{
35 bits: atomic.LoadUint64(&c.bits),
36 lvs: c.lvs.With(labelValues...),
37 }
38 }
39
40 // Add implements Counter.
41 func (c *Counter) Add(delta float64) {
42 for {
43 var (
44 old = atomic.LoadUint64(&c.bits)
45 newf = math.Float64frombits(old) + delta
46 new = math.Float64bits(newf)
47 )
48 if atomic.CompareAndSwapUint64(&c.bits, old, new) {
49 break
50 }
51 }
52 }
53
54 // Value returns the current value of the counter.
55 func (c *Counter) Value() float64 {
56 return math.Float64frombits(atomic.LoadUint64(&c.bits))
57 }
58
59 // ValueReset returns the current value of the counter, and resets it to zero.
60 // This is useful for metrics backends whose counter aggregations expect deltas,
61 // like Graphite.
62 func (c *Counter) ValueReset() float64 {
63 for {
64 var (
65 old = atomic.LoadUint64(&c.bits)
66 newf = 0.0
67 new = math.Float64bits(newf)
68 )
69 if atomic.CompareAndSwapUint64(&c.bits, old, new) {
70 return math.Float64frombits(old)
71 }
72 }
73 }
74
75 // LabelValues returns the set of label values attached to the counter.
76 func (c *Counter) LabelValues() []string {
77 return c.lvs
78 }
79
80 // Gauge is an in-memory implementation of a Gauge.
81 type Gauge struct {
82 Name string
83 lvs lv.LabelValues
84 bits uint64
85 }
86
87 // NewGauge returns a new, usable Gauge.
88 func NewGauge(name string) *Gauge {
89 return &Gauge{
90 Name: name,
91 }
92 }
93
94 // With implements Gauge.
95 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
96 return &Gauge{
97 bits: atomic.LoadUint64(&g.bits),
98 lvs: g.lvs.With(labelValues...),
99 }
100 }
101
102 // Set implements Gauge.
103 func (g *Gauge) Set(value float64) {
104 atomic.StoreUint64(&g.bits, math.Float64bits(value))
105 }
106
107 // Value returns the current value of the gauge.
108 func (g *Gauge) Value() float64 {
109 return math.Float64frombits(atomic.LoadUint64(&g.bits))
110 }
111
112 // LabelValues returns the set of label values attached to the gauge.
113 func (g *Gauge) LabelValues() []string {
114 return g.lvs
115 }
116
117 // Histogram is an in-memory implementation of a streaming histogram, based on
118 // VividCortex/gohistogram. It dynamically computes quantiles, so it's not
119 // suitable for aggregation.
120 type Histogram struct {
121 Name string
122 lvs lv.LabelValues
123 h gohistogram.Histogram
124 }
125
126 // NewHistogram returns a numeric histogram based on VividCortex/gohistogram. A
127 // good default value for buckets is 50.
128 func NewHistogram(name string, buckets int) *Histogram {
129 return &Histogram{
130 Name: name,
131 h: gohistogram.NewHistogram(buckets),
132 }
133 }
134
135 // With implements Histogram.
136 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
137 return &Histogram{
138 lvs: h.lvs.With(labelValues...),
139 h: h.h,
140 }
141 }
142
143 // Observe implements Histogram.
144 func (h *Histogram) Observe(value float64) {
145 h.h.Add(value)
146 }
147
148 // Quantile returns the value of the quantile q, 0.0 < q < 1.0.
149 func (h *Histogram) Quantile(q float64) float64 {
150 return h.h.Quantile(q)
151 }
152
153 // LabelValues returns the set of label values attached to the histogram.
154 func (h *Histogram) LabelValues() []string {
155 return h.lvs
156 }
157
158 // Print writes a string representation of the histogram to the passed writer.
159 // Useful for printing to a terminal.
160 func (h *Histogram) Print(w io.Writer) {
161 fmt.Fprintf(w, h.h.String())
162 }
163
164 // Bucket is a range in a histogram which aggregates observations.
165 type Bucket struct {
166 From, To, Count int64
167 }
168
169 // Quantile is a pair of a quantile (0..100) and its observed maximum value.
170 type Quantile struct {
171 Quantile int // 0..100
172 Value int64
173 }
174
175 // SimpleHistogram is an in-memory implementation of a Histogram. It only tracks
176 // an approximate moving average, so is likely too naïve for many use cases.
177 type SimpleHistogram struct {
178 mtx sync.RWMutex
179 lvs lv.LabelValues
180 avg float64
181 n uint64
182 }
183
184 // NewSimpleHistogram returns a SimpleHistogram, ready for observations.
185 func NewSimpleHistogram() *SimpleHistogram {
186 return &SimpleHistogram{}
187 }
188
189 // With implements Histogram.
190 func (h *SimpleHistogram) With(labelValues ...string) metrics.Histogram {
191 return &SimpleHistogram{
192 lvs: h.lvs.With(labelValues...),
193 avg: h.avg,
194 n: h.n,
195 }
196 }
197
198 // Observe implements Histogram.
199 func (h *SimpleHistogram) Observe(value float64) {
200 h.mtx.Lock()
201 defer h.mtx.Unlock()
202 h.n++
203 h.avg -= h.avg / float64(h.n)
204 h.avg += value / float64(h.n)
205 }
206
207 // ApproximateMovingAverage returns the approximate moving average of observations.
208 func (h *SimpleHistogram) ApproximateMovingAverage() float64 {
209 h.mtx.RLock()
210 h.mtx.RUnlock()
211 return h.avg
212 }
213
214 // LabelValues returns the set of label values attached to the histogram.
215 func (h *SimpleHistogram) LabelValues() []string {
216 return h.lvs
217 }
0 package generic_test
1
2 // This is package generic_test in order to get around an import cycle: this
3 // package imports teststat to do its testing, but package teststat imports
4 // generic to use its Histogram in the Quantiles helper function.
5
6 import (
7 "math"
8 "math/rand"
9 "testing"
10
11 "github.com/go-kit/kit/metrics3/generic"
12 "github.com/go-kit/kit/metrics3/teststat"
13 )
14
15 func TestCounter(t *testing.T) {
16 counter := generic.NewCounter("my_counter").With("label", "counter").(*generic.Counter)
17 value := func() float64 { return counter.Value() }
18 if err := teststat.TestCounter(counter, value); err != nil {
19 t.Fatal(err)
20 }
21 }
22
23 func TestValueReset(t *testing.T) {
24 counter := generic.NewCounter("test_value_reset")
25 counter.Add(123)
26 counter.Add(456)
27 counter.Add(789)
28 if want, have := float64(123+456+789), counter.ValueReset(); want != have {
29 t.Errorf("want %f, have %f", want, have)
30 }
31 if want, have := float64(0), counter.Value(); want != have {
32 t.Errorf("want %f, have %f", want, have)
33 }
34 }
35
36 func TestGauge(t *testing.T) {
37 gauge := generic.NewGauge("my_gauge").With("label", "gauge").(*generic.Gauge)
38 value := func() float64 { return gauge.Value() }
39 if err := teststat.TestGauge(gauge, value); err != nil {
40 t.Fatal(err)
41 }
42 }
43
44 func TestHistogram(t *testing.T) {
45 histogram := generic.NewHistogram("my_histogram", 50).With("label", "histogram").(*generic.Histogram)
46 quantiles := func() (float64, float64, float64, float64) {
47 return histogram.Quantile(0.50), histogram.Quantile(0.90), histogram.Quantile(0.95), histogram.Quantile(0.99)
48 }
49 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
50 t.Fatal(err)
51 }
52 }
53
54 func TestSimpleHistogram(t *testing.T) {
55 histogram := generic.NewSimpleHistogram().With("label", "simple_histogram").(*generic.SimpleHistogram)
56 var (
57 sum int
58 count = 1234 // not too big
59 )
60 for i := 0; i < count; i++ {
61 value := rand.Intn(1000)
62 sum += value
63 histogram.Observe(float64(value))
64 }
65
66 var (
67 want = float64(sum) / float64(count)
68 have = histogram.ApproximateMovingAverage()
69 tolerance = 0.001 // real real slim
70 )
71 if math.Abs(want-have)/want > tolerance {
72 t.Errorf("want %f, have %f", want, have)
73 }
74 }
+0
-159
metrics/graphite/emitter.go less more
0 package graphite
1
2 import (
3 "bufio"
4 "fmt"
5 "io"
6 "net"
7 "sync"
8 "time"
9
10 "github.com/go-kit/kit/log"
11 "github.com/go-kit/kit/metrics"
12 "github.com/go-kit/kit/util/conn"
13 )
14
15 // Emitter is a struct to manage connections and orchestrate the emission of
16 // metrics to a Graphite system.
17 type Emitter struct {
18 mtx sync.Mutex
19 prefix string
20 mgr *conn.Manager
21 counters []*counter
22 histograms []*windowedHistogram
23 gauges []*gauge
24 logger log.Logger
25 quitc chan chan struct{}
26 }
27
28 // NewEmitter will return an Emitter that will prefix all metrics names with the
29 // given prefix. Once started, it will attempt to create a connection with the
30 // given network and address via `net.Dial` and periodically post metrics to the
31 // connection in the Graphite plaintext protocol.
32 func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
33 return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
34 }
35
36 // NewEmitterDial is the same as NewEmitter, but allows you to specify your own
37 // Dialer function. This is primarily useful for tests.
38 func NewEmitterDial(dialer conn.Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
39 e := &Emitter{
40 prefix: metricsPrefix,
41 mgr: conn.NewManager(dialer, network, address, time.After, logger),
42 logger: logger,
43 quitc: make(chan chan struct{}),
44 }
45 go e.loop(flushInterval)
46 return e
47 }
48
49 // NewCounter returns a Counter whose value will be periodically emitted in
50 // a Graphite-compatible format once the Emitter is started. Fields are ignored.
51 func (e *Emitter) NewCounter(name string) metrics.Counter {
52 e.mtx.Lock()
53 defer e.mtx.Unlock()
54 c := newCounter(name)
55 e.counters = append(e.counters, c)
56 return c
57 }
58
59 // NewHistogram is taken from http://github.com/codahale/metrics. It returns a
60 // windowed HDR histogram which drops data older than five minutes.
61 //
62 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
63 // should be integers in the range 1..99. The gauge names are assigned by using
64 // the passed name as a prefix and appending "_pNN" e.g. "_p50".
65 //
66 // The values of this histogram will be periodically emitted in a
67 // Graphite-compatible format once the Emitter is started. Fields are ignored.
68 func (e *Emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
69 gauges := map[int]metrics.Gauge{}
70 for _, quantile := range quantiles {
71 if quantile <= 0 || quantile >= 100 {
72 return nil, fmt.Errorf("invalid quantile %d", quantile)
73 }
74 gauges[quantile] = e.gauge(fmt.Sprintf("%s_p%02d", name, quantile))
75 }
76 h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges, e.logger)
77
78 e.mtx.Lock()
79 defer e.mtx.Unlock()
80 e.histograms = append(e.histograms, h)
81 return h, nil
82 }
83
84 // NewGauge returns a Gauge whose value will be periodically emitted in a
85 // Graphite-compatible format once the Emitter is started. Fields are ignored.
86 func (e *Emitter) NewGauge(name string) metrics.Gauge {
87 e.mtx.Lock()
88 defer e.mtx.Unlock()
89 return e.gauge(name)
90 }
91
92 func (e *Emitter) gauge(name string) metrics.Gauge {
93 g := &gauge{name, 0}
94 e.gauges = append(e.gauges, g)
95 return g
96 }
97
98 func (e *Emitter) loop(d time.Duration) {
99 ticker := time.NewTicker(d)
100 defer ticker.Stop()
101
102 for {
103 select {
104 case <-ticker.C:
105 e.Flush()
106
107 case q := <-e.quitc:
108 e.Flush()
109 close(q)
110 return
111 }
112 }
113 }
114
115 // Stop will flush the current metrics and close the active connection. Calling
116 // stop more than once is a programmer error.
117 func (e *Emitter) Stop() {
118 q := make(chan struct{})
119 e.quitc <- q
120 <-q
121 }
122
123 // Flush will write the current metrics to the Emitter's connection in the
124 // Graphite plaintext protocol.
125 func (e *Emitter) Flush() {
126 e.mtx.Lock() // one flush at a time
127 defer e.mtx.Unlock()
128
129 conn := e.mgr.Take()
130 if conn == nil {
131 e.logger.Log("during", "flush", "err", "connection unavailable")
132 return
133 }
134
135 err := e.flush(conn)
136 if err != nil {
137 e.logger.Log("during", "flush", "err", err)
138 }
139 e.mgr.Put(err)
140 }
141
142 func (e *Emitter) flush(w io.Writer) error {
143 bw := bufio.NewWriter(w)
144
145 for _, c := range e.counters {
146 c.flush(bw, e.prefix)
147 }
148
149 for _, h := range e.histograms {
150 h.flush(bw, e.prefix)
151 }
152
153 for _, g := range e.gauges {
154 g.flush(bw, e.prefix)
155 }
156
157 return bw.Flush()
158 }
0 // Package graphite implements a Graphite backend for package metrics. Metrics
1 // will be emitted to a Graphite server in the plaintext protocol which looks
2 // like:
0 // Package graphite provides a Graphite backend for metrics. Metrics are batched
1 // and emitted in the plaintext protocol. For more information, see
2 // http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol
33 //
4 // "<metric path> <metric value> <metric timestamp>"
5 //
6 // See http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol.
7 // The current implementation ignores fields.
4 // Graphite does not have a native understanding of metric parameterization, so
5 // label values not supported. Use distinct metrics for each unique combination
6 // of label values.
87 package graphite
98
109 import (
1110 "fmt"
1211 "io"
13 "math"
14 "sort"
1512 "sync"
16 "sync/atomic"
1713 "time"
1814
19 "github.com/codahale/hdrhistogram"
20
2115 "github.com/go-kit/kit/log"
22 "github.com/go-kit/kit/metrics"
16 "github.com/go-kit/kit/metrics3"
17 "github.com/go-kit/kit/metrics3/generic"
18 "github.com/go-kit/kit/util/conn"
2319 )
2420
25 func newCounter(name string) *counter {
26 return &counter{name, 0}
27 }
28
29 func newGauge(name string) *gauge {
30 return &gauge{name, 0}
31 }
32
33 // counter implements the metrics.counter interface but also provides a
34 // Flush method to emit the current counter values in the Graphite plaintext
35 // protocol.
36 type counter struct {
37 key string
38 count uint64
39 }
40
41 func (c *counter) Name() string { return c.key }
42
43 // With currently ignores fields.
44 func (c *counter) With(metrics.Field) metrics.Counter { return c }
45
46 func (c *counter) Add(delta uint64) { atomic.AddUint64(&c.count, delta) }
47
48 func (c *counter) get() uint64 { return atomic.LoadUint64(&c.count) }
49
50 // flush will emit the current counter value in the Graphite plaintext
51 // protocol to the given io.Writer.
52 func (c *counter) flush(w io.Writer, prefix string) {
53 fmt.Fprintf(w, "%s.count %d %d\n", prefix+c.Name(), c.get(), time.Now().Unix())
54 }
55
56 // gauge implements the metrics.gauge interface but also provides a
57 // Flush method to emit the current counter values in the Graphite plaintext
58 // protocol.
59 type gauge struct {
60 key string
61 value uint64 // math.Float64bits
62 }
63
64 func (g *gauge) Name() string { return g.key }
65
66 // With currently ignores fields.
67 func (g *gauge) With(metrics.Field) metrics.Gauge { return g }
68
69 func (g *gauge) Add(delta float64) {
70 for {
71 old := atomic.LoadUint64(&g.value)
72 new := math.Float64bits(math.Float64frombits(old) + delta)
73 if atomic.CompareAndSwapUint64(&g.value, old, new) {
74 return
75 }
76 }
77 }
78
79 func (g *gauge) Set(value float64) {
80 atomic.StoreUint64(&g.value, math.Float64bits(value))
81 }
82
83 func (g *gauge) Get() float64 {
84 return math.Float64frombits(atomic.LoadUint64(&g.value))
85 }
86
87 // Flush will emit the current gauge value in the Graphite plaintext
88 // protocol to the given io.Writer.
89 func (g *gauge) flush(w io.Writer, prefix string) {
90 fmt.Fprintf(w, "%s %.2f %d\n", prefix+g.Name(), g.Get(), time.Now().Unix())
91 }
92
93 // windowedHistogram is taken from http://github.com/codahale/metrics. It
94 // is a windowed HDR histogram which drops data older than five minutes.
21 // Graphite receives metrics observations and forwards them to a Graphite server.
22 // Create a Graphite object, use it to create metrics, and pass those metrics as
23 // dependencies to the components that will use them.
9524 //
96 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
97 // should be integers in the range 1..99. The gauge names are assigned by using
98 // the passed name as a prefix and appending "_pNN" e.g. "_p50".
25 // All metrics are buffered until WriteTo is called. Counters and gauges are
26 // aggregated into a single observation per timeseries per write. Histograms are
27 // exploded into per-quantile gauges and reported once per write.
9928 //
100 // The values of this histogram will be periodically emitted in a
101 // Graphite-compatible format once the GraphiteProvider is started. Fields are ignored.
102 type windowedHistogram struct {
103 mtx sync.Mutex
104 hist *hdrhistogram.WindowedHistogram
105
106 name string
107 gauges map[int]metrics.Gauge
108 logger log.Logger
109 }
110
111 func newWindowedHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles map[int]metrics.Gauge, logger log.Logger) *windowedHistogram {
112 h := &windowedHistogram{
113 hist: hdrhistogram.NewWindowed(5, minValue, maxValue, sigfigs),
114 name: name,
115 gauges: quantiles,
116 logger: logger,
117 }
118 go h.rotateLoop(1 * time.Minute)
29 // To regularly report metrics to an io.Writer, use the WriteLoop helper method.
30 // To send to a Graphite server, use the SendLoop helper method.
31 type Graphite struct {
32 mtx sync.RWMutex
33 prefix string
34 counters map[string]*Counter
35 gauges map[string]*Gauge
36 histograms map[string]*Histogram
37 logger log.Logger
38 }
39
40 // New returns a Statsd object that may be used to create metrics. Prefix is
41 // applied to all created metrics. Callers must ensure that regular calls to
42 // WriteTo are performed, either manually or with one of the helper methods.
43 func New(prefix string, logger log.Logger) *Graphite {
44 return &Graphite{
45 prefix: prefix,
46 counters: map[string]*Counter{},
47 gauges: map[string]*Gauge{},
48 histograms: map[string]*Histogram{},
49 logger: logger,
50 }
51 }
52
53 // NewCounter returns a counter. Observations are aggregated and emitted once
54 // per write invocation.
55 func (g *Graphite) NewCounter(name string) *Counter {
56 c := NewCounter(g.prefix + name)
57 g.mtx.Lock()
58 g.counters[g.prefix+name] = c
59 g.mtx.Unlock()
60 return c
61 }
62
63 // NewGauge returns a gauge. Observations are aggregated and emitted once per
64 // write invocation.
65 func (g *Graphite) NewGauge(name string) *Gauge {
66 ga := NewGauge(g.prefix + name)
67 g.mtx.Lock()
68 g.gauges[g.prefix+name] = ga
69 g.mtx.Unlock()
70 return ga
71 }
72
73 // NewHistogram returns a histogram. Observations are aggregated and emitted as
74 // per-quantile gauges, once per write invocation. 50 is a good default value
75 // for buckets.
76 func (g *Graphite) NewHistogram(name string, buckets int) *Histogram {
77 h := NewHistogram(g.prefix+name, buckets)
78 g.mtx.Lock()
79 g.histograms[g.prefix+name] = h
80 g.mtx.Unlock()
11981 return h
12082 }
12183
122 func (h *windowedHistogram) Name() string { return h.name }
123
124 func (h *windowedHistogram) With(metrics.Field) metrics.Histogram { return h }
125
126 func (h *windowedHistogram) Observe(value int64) {
127 h.mtx.Lock()
128 err := h.hist.Current.RecordValue(value)
129 h.mtx.Unlock()
130
131 if err != nil {
132 h.logger.Log("err", err, "msg", "unable to record histogram value")
133 return
134 }
135
136 for q, gauge := range h.gauges {
137 gauge.Set(float64(h.hist.Current.ValueAtQuantile(float64(q))))
138 }
139 }
140
141 func (h *windowedHistogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
142 bars := h.hist.Merge().Distribution()
143 buckets := make([]metrics.Bucket, len(bars))
144 for i, bar := range bars {
145 buckets[i] = metrics.Bucket{
146 From: bar.From,
147 To: bar.To,
148 Count: bar.Count,
149 }
150 }
151 quantiles := make([]metrics.Quantile, 0, len(h.gauges))
152 for quantile, gauge := range h.gauges {
153 quantiles = append(quantiles, metrics.Quantile{
154 Quantile: quantile,
155 Value: int64(gauge.Get()),
156 })
157 }
158 sort.Sort(quantileSlice(quantiles))
159 return buckets, quantiles
160 }
161
162 func (h *windowedHistogram) flush(w io.Writer, prefix string) {
163 name := prefix + h.Name()
164 hist := h.hist.Merge()
84 // WriteLoop is a helper method that invokes WriteTo to the passed writer every
85 // time the passed channel fires. This method blocks until the channel is
86 // closed, so clients probably want to run it in its own goroutine. For typical
87 // usage, create a time.Ticker and pass its C channel to this method.
88 func (g *Graphite) WriteLoop(c <-chan time.Time, w io.Writer) {
89 for range c {
90 if _, err := g.WriteTo(w); err != nil {
91 g.logger.Log("during", "WriteTo", "err", err)
92 }
93 }
94 }
95
96 // SendLoop is a helper method that wraps WriteLoop, passing a managed
97 // connection to the network and address. Like WriteLoop, this method blocks
98 // until the channel is closed, so clients probably want to start it in its own
99 // goroutine. For typical usage, create a time.Ticker and pass its C channel to
100 // this method.
101 func (g *Graphite) SendLoop(c <-chan time.Time, network, address string) {
102 g.WriteLoop(c, conn.NewDefaultManager(network, address, g.logger))
103 }
104
105 // WriteTo flushes the buffered content of the metrics to the writer, in
106 // Graphite plaintext format. WriteTo abides best-effort semantics, so
107 // observations are lost if there is a problem with the write. Clients should be
108 // sure to call WriteTo regularly, ideally through the WriteLoop or SendLoop
109 // helper methods.
110 func (g *Graphite) WriteTo(w io.Writer) (count int64, err error) {
111 g.mtx.RLock()
112 defer g.mtx.RUnlock()
165113 now := time.Now().Unix()
166 fmt.Fprintf(w, "%s.count %d %d\n", name, hist.TotalCount(), now)
167 fmt.Fprintf(w, "%s.min %d %d\n", name, hist.Min(), now)
168 fmt.Fprintf(w, "%s.max %d %d\n", name, hist.Max(), now)
169 fmt.Fprintf(w, "%s.mean %.2f %d\n", name, hist.Mean(), now)
170 fmt.Fprintf(w, "%s.std-dev %.2f %d\n", name, hist.StdDev(), now)
171 }
172
173 func (h *windowedHistogram) rotateLoop(d time.Duration) {
174 for range time.Tick(d) {
175 h.mtx.Lock()
176 h.hist.Rotate()
177 h.mtx.Unlock()
178 }
179 }
180
181 type quantileSlice []metrics.Quantile
182
183 func (a quantileSlice) Len() int { return len(a) }
184 func (a quantileSlice) Less(i, j int) bool { return a[i].Quantile < a[j].Quantile }
185 func (a quantileSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
114
115 for name, c := range g.counters {
116 n, err := fmt.Fprintf(w, "%s %f %d\n", name, c.c.ValueReset(), now)
117 if err != nil {
118 return count, err
119 }
120 count += int64(n)
121 }
122
123 for name, ga := range g.gauges {
124 n, err := fmt.Fprintf(w, "%s %f %d\n", name, ga.g.Value(), now)
125 if err != nil {
126 return count, err
127 }
128 count += int64(n)
129 }
130
131 for name, h := range g.histograms {
132 for _, p := range []struct {
133 s string
134 f float64
135 }{
136 {"50", 0.50},
137 {"90", 0.90},
138 {"95", 0.95},
139 {"99", 0.99},
140 } {
141 n, err := fmt.Fprintf(w, "%s.p%s %f %d\n", name, p.s, h.h.Quantile(p.f), now)
142 if err != nil {
143 return count, err
144 }
145 count += int64(n)
146 }
147 }
148
149 return count, err
150 }
151
152 // Counter is a Graphite counter metric.
153 type Counter struct {
154 c *generic.Counter
155 }
156
157 // NewCounter returns a new usable counter metric.
158 func NewCounter(name string) *Counter {
159 return &Counter{generic.NewCounter(name)}
160 }
161
162 // With is a no-op.
163 func (c *Counter) With(...string) metrics.Counter { return c }
164
165 // Add implements counter.
166 func (c *Counter) Add(delta float64) { c.c.Add(delta) }
167
168 // Gauge is a Graphite gauge metric.
169 type Gauge struct {
170 g *generic.Gauge
171 }
172
173 // NewGauge returns a new usable Gauge metric.
174 func NewGauge(name string) *Gauge {
175 return &Gauge{generic.NewGauge(name)}
176 }
177
178 // With is a no-op.
179 func (g *Gauge) With(...string) metrics.Gauge { return g }
180
181 // Set implements gauge.
182 func (g *Gauge) Set(value float64) { g.g.Set(value) }
183
184 // Histogram is a Graphite histogram metric. Observations are bucketed into
185 // per-quantile gauges.
186 type Histogram struct {
187 h *generic.Histogram
188 }
189
190 // NewHistogram returns a new usable Histogram metric.
191 func NewHistogram(name string, buckets int) *Histogram {
192 return &Histogram{generic.NewHistogram(name, buckets)}
193 }
194
195 // With is a no-op.
196 func (h *Histogram) With(...string) metrics.Histogram { return h }
197
198 // Observe implements histogram.
199 func (h *Histogram) Observe(value float64) { h.h.Observe(value) }
11
22 import (
33 "bytes"
4 "fmt"
5 "strings"
4 "regexp"
5 "strconv"
66 "testing"
7 "time"
87
98 "github.com/go-kit/kit/log"
10 "github.com/go-kit/kit/metrics"
11 "github.com/go-kit/kit/metrics/teststat"
9 "github.com/go-kit/kit/metrics3/teststat"
1210 )
1311
14 func TestHistogramQuantiles(t *testing.T) {
15 prefix := "prefix."
16 e := NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
17 var (
18 name = "test_histogram_quantiles"
19 quantiles = []int{50, 90, 95, 99}
20 )
21 h, err := e.NewHistogram(name, 0, 100, 3, quantiles...)
22 if err != nil {
23 t.Fatalf("unable to create test histogram: %v", err)
24 }
25 h = h.With(metrics.Field{Key: "ignored", Value: "field"})
26 const seed, mean, stdev int64 = 424242, 50, 10
27 teststat.PopulateNormalHistogram(t, h, seed, mean, stdev)
28
29 // flush the current metrics into a buffer to examine
30 var b bytes.Buffer
31 e.flush(&b)
32 teststat.AssertGraphiteNormalHistogram(t, prefix, name, mean, stdev, quantiles, b.String())
33 }
34
3512 func TestCounter(t *testing.T) {
36 var (
37 prefix = "prefix."
38 name = "m"
39 value = 123
40 e = NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
41 b bytes.Buffer
42 )
43 e.NewCounter(name).With(metrics.Field{Key: "ignored", Value: "field"}).Add(uint64(value))
44 e.flush(&b)
45 want := fmt.Sprintf("%s%s.count %d", prefix, name, value)
46 payload := b.String()
47 if !strings.HasPrefix(payload, want) {
48 t.Errorf("counter %s want\n%s, have\n%s", name, want, payload)
13 prefix, name := "abc.", "def"
14 label, value := "label", "value" // ignored for Graphite
15 regex := `^` + prefix + name + ` ([0-9\.]+) [0-9]+$`
16 g := New(prefix, log.NewNopLogger())
17 counter := g.NewCounter(name).With(label, value)
18 valuef := teststat.SumLines(g, regex)
19 if err := teststat.TestCounter(counter, valuef); err != nil {
20 t.Fatal(err)
4921 }
5022 }
5123
5224 func TestGauge(t *testing.T) {
53 var (
54 prefix = "prefix."
55 name = "xyz"
56 value = 54321
57 delta = 12345
58 e = NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
59 b bytes.Buffer
60 g = e.NewGauge(name).With(metrics.Field{Key: "ignored", Value: "field"})
61 )
62
63 g.Set(float64(value))
64 g.Add(float64(delta))
65
66 e.flush(&b)
67 payload := b.String()
68
69 want := fmt.Sprintf("%s%s %d", prefix, name, value+delta)
70 if !strings.HasPrefix(payload, want) {
71 t.Errorf("gauge %s want\n%s, have\n%s", name, want, payload)
25 prefix, name := "ghi.", "jkl"
26 label, value := "xyz", "abc" // ignored for Graphite
27 regex := `^` + prefix + name + ` ([0-9\.]+) [0-9]+$`
28 g := New(prefix, log.NewNopLogger())
29 gauge := g.NewGauge(name).With(label, value)
30 valuef := teststat.LastLine(g, regex)
31 if err := teststat.TestGauge(gauge, valuef); err != nil {
32 t.Fatal(err)
7233 }
7334 }
7435
75 func TestEmitterStops(t *testing.T) {
76 e := NewEmitter("foo", "bar", "baz", time.Second, log.NewNopLogger())
77 time.Sleep(100 * time.Millisecond)
78 e.Stop()
36 func TestHistogram(t *testing.T) {
37 // The histogram test is actually like 4 gauge tests.
38 prefix, name := "statsd.", "histogram_test"
39 label, value := "abc", "def" // ignored for Graphite
40 re50 := regexp.MustCompile(prefix + name + `.p50 ([0-9\.]+) [0-9]+`)
41 re90 := regexp.MustCompile(prefix + name + `.p90 ([0-9\.]+) [0-9]+`)
42 re95 := regexp.MustCompile(prefix + name + `.p95 ([0-9\.]+) [0-9]+`)
43 re99 := regexp.MustCompile(prefix + name + `.p99 ([0-9\.]+) [0-9]+`)
44 g := New(prefix, log.NewNopLogger())
45 histogram := g.NewHistogram(name, 50).With(label, value)
46 quantiles := func() (float64, float64, float64, float64) {
47 var buf bytes.Buffer
48 g.WriteTo(&buf)
49 match50 := re50.FindStringSubmatch(buf.String())
50 p50, _ := strconv.ParseFloat(match50[1], 64)
51 match90 := re90.FindStringSubmatch(buf.String())
52 p90, _ := strconv.ParseFloat(match90[1], 64)
53 match95 := re95.FindStringSubmatch(buf.String())
54 p95, _ := strconv.ParseFloat(match95[1], 64)
55 match99 := re99.FindStringSubmatch(buf.String())
56 p99, _ := strconv.ParseFloat(match99[1], 64)
57 return p50, p90, p95, p99
58 }
59 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
60 t.Fatal(err)
61 }
7962 }
0 // Package influx provides an InfluxDB implementation for metrics. The model is
1 // similar to other push-based instrumentation systems. Observations are
2 // aggregated locally and emitted to the Influx server on regular intervals.
3 package influx
4
5 import (
6 "time"
7
8 influxdb "github.com/influxdata/influxdb/client/v2"
9
10 "github.com/go-kit/kit/log"
11 "github.com/go-kit/kit/metrics3"
12 "github.com/go-kit/kit/metrics3/internal/lv"
13 )
14
15 // Influx is a store for metrics that will be emitted to an Influx database.
16 //
17 // Influx is a general purpose time-series database, and has no native concepts
18 // of counters, gauges, or histograms. Counters are modeled as a timeseries with
19 // one data point per flush, with a "count" field that reflects all adds since
20 // the last flush. Gauges are modeled as a timeseries with one data point per
21 // flush, with a "value" field that reflects the current state of the gauge.
22 // Histograms are modeled as a timeseries with one data point per observation,
23 // with a "value" field that reflects each observation; use e.g. the HISTOGRAM
24 // aggregate function to compute histograms.
25 //
26 // Influx tags are immutable, attached to the Influx object, and given to each
27 // metric at construction. Influx fields are mapped to Go kit label values, and
28 // may be mutated via With functions. Actual metric values are provided as
29 // fields with specific names depending on the metric.
30 //
31 // All observations are collected in memory locally, and flushed on demand.
32 type Influx struct {
33 counters *lv.Space
34 gauges *lv.Space
35 histograms *lv.Space
36 tags map[string]string
37 conf influxdb.BatchPointsConfig
38 logger log.Logger
39 }
40
41 // New returns an Influx, ready to create metrics and collect observations. Tags
42 // are applied to all metrics created from this object. The BatchPointsConfig is
43 // used during flushing.
44 func New(tags map[string]string, conf influxdb.BatchPointsConfig, logger log.Logger) *Influx {
45 return &Influx{
46 counters: lv.NewSpace(),
47 gauges: lv.NewSpace(),
48 histograms: lv.NewSpace(),
49 tags: tags,
50 conf: conf,
51 logger: logger,
52 }
53 }
54
55 // NewCounter returns an Influx counter.
56 func (in *Influx) NewCounter(name string) *Counter {
57 return &Counter{
58 name: name,
59 obs: in.counters.Observe,
60 }
61 }
62
63 // NewGauge returns an Influx gauge.
64 func (in *Influx) NewGauge(name string) *Gauge {
65 return &Gauge{
66 name: name,
67 obs: in.gauges.Observe,
68 }
69 }
70
71 // NewHistogram returns an Influx histogram.
72 func (in *Influx) NewHistogram(name string) *Histogram {
73 return &Histogram{
74 name: name,
75 obs: in.histograms.Observe,
76 }
77 }
78
79 // BatchPointsWriter captures a subset of the influxdb.Client methods necessary
80 // for emitting metrics observations.
81 type BatchPointsWriter interface {
82 Write(influxdb.BatchPoints) error
83 }
84
85 // WriteLoop is a helper method that invokes WriteTo to the passed writer every
86 // time the passed channel fires. This method blocks until the channel is
87 // closed, so clients probably want to run it in its own goroutine. For typical
88 // usage, create a time.Ticker and pass its C channel to this method.
89 func (in *Influx) WriteLoop(c <-chan time.Time, w BatchPointsWriter) {
90 for range c {
91 if err := in.WriteTo(w); err != nil {
92 in.logger.Log("during", "WriteTo", "err", err)
93 }
94 }
95 }
96
97 // WriteTo flushes the buffered content of the metrics to the writer, in an
98 // Influx BatchPoints format. WriteTo abides best-effort semantics, so
99 // observations are lost if there is a problem with the write. Clients should be
100 // sure to call WriteTo regularly, ideally through the WriteLoop helper method.
101 func (in *Influx) WriteTo(w BatchPointsWriter) (err error) {
102 bp, err := influxdb.NewBatchPoints(in.conf)
103 if err != nil {
104 return err
105 }
106
107 now := time.Now()
108
109 in.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
110 fields := fieldsFrom(lvs)
111 fields["count"] = sum(values)
112 var p *influxdb.Point
113 p, err = influxdb.NewPoint(name, in.tags, fields, now)
114 if err != nil {
115 return false
116 }
117 bp.AddPoint(p)
118 return true
119 })
120 if err != nil {
121 return err
122 }
123
124 in.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
125 fields := fieldsFrom(lvs)
126 fields["value"] = last(values)
127 var p *influxdb.Point
128 p, err = influxdb.NewPoint(name, in.tags, fields, now)
129 if err != nil {
130 return false
131 }
132 bp.AddPoint(p)
133 return true
134 })
135 if err != nil {
136 return err
137 }
138
139 in.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
140 fields := fieldsFrom(lvs)
141 ps := make([]*influxdb.Point, len(values))
142 for i, v := range values {
143 fields["value"] = v // overwrite each time
144 ps[i], err = influxdb.NewPoint(name, in.tags, fields, now)
145 if err != nil {
146 return false
147 }
148 }
149 bp.AddPoints(ps)
150 return true
151 })
152 if err != nil {
153 return err
154 }
155
156 return w.Write(bp)
157 }
158
159 func fieldsFrom(labelValues []string) map[string]interface{} {
160 if len(labelValues)%2 != 0 {
161 panic("fieldsFrom received a labelValues with an odd number of strings")
162 }
163 fields := make(map[string]interface{}, len(labelValues)/2)
164 for i := 0; i < len(labelValues); i += 2 {
165 fields[labelValues[i]] = labelValues[i+1]
166 }
167 return fields
168 }
169
170 func sum(a []float64) float64 {
171 var v float64
172 for _, f := range a {
173 v += f
174 }
175 return v
176 }
177
178 func last(a []float64) float64 {
179 return a[len(a)-1]
180 }
181
182 type observeFunc func(name string, lvs lv.LabelValues, value float64)
183
184 // Counter is an Influx counter. Observations are forwarded to an Influx
185 // object, and aggregated (summed) per timeseries.
186 type Counter struct {
187 name string
188 lvs lv.LabelValues
189 obs observeFunc
190 }
191
192 // With implements metrics.Counter.
193 func (c *Counter) With(labelValues ...string) metrics.Counter {
194 return &Counter{
195 name: c.name,
196 lvs: c.lvs.With(labelValues...),
197 obs: c.obs,
198 }
199 }
200
201 // Add implements metrics.Counter.
202 func (c *Counter) Add(delta float64) {
203 c.obs(c.name, c.lvs, delta)
204 }
205
206 // Gauge is an Influx gauge. Observations are forwarded to a Dogstatsd
207 // object, and aggregated (the last observation selected) per timeseries.
208 type Gauge struct {
209 name string
210 lvs lv.LabelValues
211 obs observeFunc
212 }
213
214 // With implements metrics.Gauge.
215 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
216 return &Gauge{
217 name: g.name,
218 lvs: g.lvs.With(labelValues...),
219 obs: g.obs,
220 }
221 }
222
223 // Set implements metrics.Gauge.
224 func (g *Gauge) Set(value float64) {
225 g.obs(g.name, g.lvs, value)
226 }
227
228 // Histogram is an Influx histrogram. Observations are aggregated into a
229 // generic.Histogram and emitted as per-quantile gauges to the Influx server.
230 type Histogram struct {
231 name string
232 lvs lv.LabelValues
233 obs observeFunc
234 }
235
236 // With implements metrics.Histogram.
237 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
238 return &Histogram{
239 name: h.name,
240 lvs: h.lvs.With(labelValues...),
241 obs: h.obs,
242 }
243 }
244
245 // Observe implements metrics.Histogram.
246 func (h *Histogram) Observe(value float64) {
247 h.obs(h.name, h.lvs, value)
248 }
0 package influx
1
2 import (
3 "bytes"
4 "fmt"
5 "regexp"
6 "strconv"
7 "strings"
8 "testing"
9
10 "github.com/go-kit/kit/log"
11 "github.com/go-kit/kit/metrics3/generic"
12 "github.com/go-kit/kit/metrics3/teststat"
13 influxdb "github.com/influxdata/influxdb/client/v2"
14 )
15
16 func TestCounter(t *testing.T) {
17 in := New(map[string]string{"a": "b"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
18 re := regexp.MustCompile(`influx_counter,a=b count=([0-9\.]+) [0-9]+`) // reverse-engineered :\
19 counter := in.NewCounter("influx_counter")
20 value := func() float64 {
21 client := &bufWriter{}
22 in.WriteTo(client)
23 match := re.FindStringSubmatch(client.buf.String())
24 f, _ := strconv.ParseFloat(match[1], 64)
25 return f
26 }
27 if err := teststat.TestCounter(counter, value); err != nil {
28 t.Fatal(err)
29 }
30 }
31
32 func TestGauge(t *testing.T) {
33 in := New(map[string]string{"foo": "alpha"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
34 re := regexp.MustCompile(`influx_gauge,foo=alpha value=([0-9\.]+) [0-9]+`)
35 gauge := in.NewGauge("influx_gauge")
36 value := func() float64 {
37 client := &bufWriter{}
38 in.WriteTo(client)
39 match := re.FindStringSubmatch(client.buf.String())
40 f, _ := strconv.ParseFloat(match[1], 64)
41 return f
42 }
43 if err := teststat.TestGauge(gauge, value); err != nil {
44 t.Fatal(err)
45 }
46 }
47
48 func TestHistogram(t *testing.T) {
49 in := New(map[string]string{"foo": "alpha"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
50 re := regexp.MustCompile(`influx_histogram,foo=alpha bar="beta",value=([0-9\.]+) [0-9]+`)
51 histogram := in.NewHistogram("influx_histogram").With("bar", "beta")
52 quantiles := func() (float64, float64, float64, float64) {
53 w := &bufWriter{}
54 in.WriteTo(w)
55 h := generic.NewHistogram("h", 50)
56 matches := re.FindAllStringSubmatch(w.buf.String(), -1)
57 for _, match := range matches {
58 f, _ := strconv.ParseFloat(match[1], 64)
59 h.Observe(f)
60 }
61 return h.Quantile(0.50), h.Quantile(0.90), h.Quantile(0.95), h.Quantile(0.99)
62 }
63 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
64 t.Fatal(err)
65 }
66 }
67
68 func TestHistogramLabels(t *testing.T) {
69 in := New(map[string]string{}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
70 h := in.NewHistogram("foo")
71 h.Observe(123)
72 h.With("abc", "xyz").Observe(456)
73 w := &bufWriter{}
74 if err := in.WriteTo(w); err != nil {
75 t.Fatal(err)
76 }
77 if want, have := 2, len(strings.Split(strings.TrimSpace(w.buf.String()), "\n")); want != have {
78 t.Errorf("want %d, have %d", want, have)
79 }
80 }
81
82 type bufWriter struct {
83 buf bytes.Buffer
84 }
85
86 func (w *bufWriter) Write(bp influxdb.BatchPoints) error {
87 for _, p := range bp.Points() {
88 fmt.Fprintf(&w.buf, p.String()+"\n")
89 }
90 return nil
91 }
+0
-254
metrics/influxdb/influxdb.go less more
0 // Package influxdb implements a InfluxDB backend for package metrics.
1 package influxdb
2
3 import (
4 "fmt"
5 "sort"
6 "sync"
7 "time"
8
9 "github.com/codahale/hdrhistogram"
10 stdinflux "github.com/influxdata/influxdb/client/v2"
11
12 "github.com/go-kit/kit/metrics"
13 )
14
15 type counter struct {
16 key string
17 tags []metrics.Field
18 fields []metrics.Field
19 value uint64
20 bp stdinflux.BatchPoints
21 }
22
23 // NewCounter returns a Counter that writes values in the reportInterval
24 // to the given InfluxDB client, utilizing batching.
25 func NewCounter(client stdinflux.Client, bp stdinflux.BatchPoints, key string, tags []metrics.Field, reportInterval time.Duration) metrics.Counter {
26 return NewCounterTick(client, bp, key, tags, time.Tick(reportInterval))
27 }
28
29 // NewCounterTick is the same as NewCounter, but allows the user to pass a own
30 // channel to trigger the write process to the client.
31 func NewCounterTick(client stdinflux.Client, bp stdinflux.BatchPoints, key string, tags []metrics.Field, reportTicker <-chan time.Time) metrics.Counter {
32 c := &counter{
33 key: key,
34 tags: tags,
35 value: 0,
36 bp: bp,
37 }
38 go watch(client, bp, reportTicker)
39 return c
40 }
41
42 func (c *counter) Name() string {
43 return c.key
44 }
45
46 func (c *counter) With(field metrics.Field) metrics.Counter {
47 return &counter{
48 key: c.key,
49 tags: c.tags,
50 value: c.value,
51 bp: c.bp,
52 fields: append(c.fields, field),
53 }
54 }
55
56 func (c *counter) Add(delta uint64) {
57 c.value = c.value + delta
58
59 tags := map[string]string{}
60
61 for _, tag := range c.tags {
62 tags[tag.Key] = tag.Value
63 }
64
65 fields := map[string]interface{}{}
66
67 for _, field := range c.fields {
68 fields[field.Key] = field.Value
69 }
70 fields["value"] = c.value
71 pt, _ := stdinflux.NewPoint(c.key, tags, fields, time.Now())
72 c.bp.AddPoint(pt)
73 }
74
75 type gauge struct {
76 key string
77 tags []metrics.Field
78 fields []metrics.Field
79 value float64
80 bp stdinflux.BatchPoints
81 }
82
83 // NewGauge creates a new gauge instance, reporting points in the defined reportInterval.
84 func NewGauge(client stdinflux.Client, bp stdinflux.BatchPoints, key string, tags []metrics.Field, reportInterval time.Duration) metrics.Gauge {
85 return NewGaugeTick(client, bp, key, tags, time.Tick(reportInterval))
86 }
87
88 // NewGaugeTick is the same as NewGauge with a ticker channel instead of a interval.
89 func NewGaugeTick(client stdinflux.Client, bp stdinflux.BatchPoints, key string, tags []metrics.Field, reportTicker <-chan time.Time) metrics.Gauge {
90 g := &gauge{
91 key: key,
92 tags: tags,
93 value: 0,
94 bp: bp,
95 }
96 go watch(client, bp, reportTicker)
97 return g
98 }
99
100 func (g *gauge) Name() string {
101 return g.key
102 }
103
104 func (g *gauge) With(field metrics.Field) metrics.Gauge {
105 return &gauge{
106 key: g.key,
107 tags: g.tags,
108 value: g.value,
109 bp: g.bp,
110 fields: append(g.fields, field),
111 }
112 }
113
114 func (g *gauge) Add(delta float64) {
115 g.value = g.value + delta
116 g.createPoint()
117 }
118
119 func (g *gauge) Set(value float64) {
120 g.value = value
121 g.createPoint()
122 }
123
124 func (g *gauge) Get() float64 {
125 return g.value
126 }
127
128 func (g *gauge) createPoint() {
129 tags := map[string]string{}
130
131 for _, tag := range g.tags {
132 tags[tag.Key] = tag.Value
133 }
134
135 fields := map[string]interface{}{}
136
137 for _, field := range g.fields {
138 fields[field.Key] = field.Value
139 }
140 fields["value"] = g.value
141 pt, _ := stdinflux.NewPoint(g.key, tags, fields, time.Now())
142 g.bp.AddPoint(pt)
143 }
144
145 // The implementation from histogram is taken from metrics/expvar
146
147 type histogram struct {
148 mu sync.Mutex
149 hist *hdrhistogram.WindowedHistogram
150
151 key string
152 gauges map[int]metrics.Gauge
153 }
154
155 // NewHistogram is taken from http://github.com/codahale/metrics. It returns a
156 // windowed HDR histogram which drops data older than five minutes.
157 //
158 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
159 // should be integers in the range 1..99. The gauge names are assigned by
160 // using the passed name as a prefix and appending "_pNN" e.g. "_p50".
161 func NewHistogram(client stdinflux.Client, bp stdinflux.BatchPoints, key string, tags []metrics.Field,
162 reportInterval time.Duration, minValue, maxValue int64, sigfigs int, quantiles ...int) metrics.Histogram {
163 return NewHistogramTick(client, bp, key, tags, time.Tick(reportInterval), minValue, maxValue, sigfigs, quantiles...)
164 }
165
166 // NewHistogramTick is the same as NewHistoGram, but allows to pass a custom reportTicker.
167 func NewHistogramTick(client stdinflux.Client, bp stdinflux.BatchPoints, key string, tags []metrics.Field,
168 reportTicker <-chan time.Time, minValue, maxValue int64, sigfigs int, quantiles ...int) metrics.Histogram {
169 gauges := map[int]metrics.Gauge{}
170
171 for _, quantile := range quantiles {
172 if quantile <= 0 || quantile >= 100 {
173 panic(fmt.Sprintf("invalid quantile %d", quantile))
174 }
175 gauges[quantile] = NewGaugeTick(client, bp, fmt.Sprintf("%s_p%02d", key, quantile), tags, reportTicker)
176 }
177
178 h := &histogram{
179 hist: hdrhistogram.NewWindowed(5, minValue, maxValue, sigfigs),
180 key: key,
181 gauges: gauges,
182 }
183
184 go h.rotateLoop(1 * time.Minute)
185 return h
186 }
187
188 func (h *histogram) Name() string {
189 return h.key
190 }
191
192 func (h *histogram) With(field metrics.Field) metrics.Histogram {
193 for q, gauge := range h.gauges {
194 h.gauges[q] = gauge.With(field)
195 }
196
197 return h
198 }
199
200 func (h *histogram) Observe(value int64) {
201 h.mu.Lock()
202 err := h.hist.Current.RecordValue(value)
203 h.mu.Unlock()
204
205 if err != nil {
206 panic(err.Error())
207 }
208
209 for q, gauge := range h.gauges {
210 gauge.Set(float64(h.hist.Current.ValueAtQuantile(float64(q))))
211 }
212 }
213
214 func (h *histogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
215 bars := h.hist.Merge().Distribution()
216 buckets := make([]metrics.Bucket, len(bars))
217 for i, bar := range bars {
218 buckets[i] = metrics.Bucket{
219 From: bar.From,
220 To: bar.To,
221 Count: bar.Count,
222 }
223 }
224 quantiles := make([]metrics.Quantile, 0, len(h.gauges))
225 for quantile, gauge := range h.gauges {
226 quantiles = append(quantiles, metrics.Quantile{
227 Quantile: quantile,
228 Value: int64(gauge.Get()),
229 })
230 }
231 sort.Sort(quantileSlice(quantiles))
232 return buckets, quantiles
233 }
234
235 func (h *histogram) rotateLoop(d time.Duration) {
236 for range time.Tick(d) {
237 h.mu.Lock()
238 h.hist.Rotate()
239 h.mu.Unlock()
240 }
241 }
242
243 type quantileSlice []metrics.Quantile
244
245 func (a quantileSlice) Len() int { return len(a) }
246 func (a quantileSlice) Less(i, j int) bool { return a[i].Quantile < a[j].Quantile }
247 func (a quantileSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
248
249 func watch(client stdinflux.Client, bp stdinflux.BatchPoints, reportTicker <-chan time.Time) {
250 for range reportTicker {
251 client.Write(bp)
252 }
253 }
+0
-348
metrics/influxdb/influxdb_test.go less more
0 package influxdb_test
1
2 import (
3 "reflect"
4 "sync"
5 "testing"
6 "time"
7
8 stdinflux "github.com/influxdata/influxdb/client/v2"
9
10 "github.com/go-kit/kit/metrics"
11 "github.com/go-kit/kit/metrics/influxdb"
12 )
13
14 func TestCounter(t *testing.T) {
15 expectedName := "test_counter"
16 expectedTags := map[string]string{}
17 expectedFields := []map[string]interface{}{
18 {"value": "2"},
19 {"value": "7"},
20 {"value": "10"},
21 }
22
23 cl := &mockClient{}
24 cl.Add(3)
25 bp, _ := stdinflux.NewBatchPoints(stdinflux.BatchPointsConfig{
26 Database: "testing",
27 Precision: "s",
28 })
29
30 tags := []metrics.Field{}
31 for key, value := range expectedTags {
32 tags = append(tags, metrics.Field{Key: key, Value: value})
33 }
34
35 triggerChan := make(chan time.Time)
36 counter := influxdb.NewCounterTick(cl, bp, expectedName, tags, triggerChan)
37 counter.Add(2)
38 counter.Add(5)
39 counter.Add(3)
40
41 triggerChan <- time.Now()
42 cl.Wait()
43
44 for i := 0; i <= 2; i++ {
45 givenPoint := mockPoint{
46 Name: expectedName,
47 Tags: expectedTags,
48 Fields: expectedFields[i],
49 }
50 comparePoint(t, i, givenPoint, cl.Points[i])
51 }
52 }
53
54 func TestCounterWithTags(t *testing.T) {
55 expectedName := "test_counter"
56 expectedTags := map[string]string{
57 "key1": "value1",
58 "key2": "value2",
59 }
60 expectedFields := []map[string]interface{}{
61 {"value": "2"},
62 {"Test": "Test", "value": "7"},
63 {"Test": "Test", "value": "10"},
64 }
65
66 cl := &mockClient{}
67 cl.Add(3)
68 bp, _ := stdinflux.NewBatchPoints(stdinflux.BatchPointsConfig{
69 Database: "testing",
70 Precision: "s",
71 })
72
73 tags := []metrics.Field{}
74 for key, value := range expectedTags {
75 tags = append(tags, metrics.Field{Key: key, Value: value})
76 }
77
78 triggerChan := make(chan time.Time)
79 counter := influxdb.NewCounterTick(cl, bp, expectedName, tags, triggerChan)
80 counter.Add(2)
81 counter = counter.With(metrics.Field{Key: "Test", Value: "Test"})
82