Reimplement package metrics
Peter Bourgon
7 years ago
0 | // Package circonus provides a Circonus backend for metrics. | |
1 | package circonus | |
2 | ||
3 | import ( | |
4 | "github.com/circonus-labs/circonus-gometrics" | |
5 | ||
6 | "github.com/go-kit/kit/metrics2" | |
7 | ) | |
8 | ||
9 | // Circonus wraps a CirconusMetrics object and provides constructors for each of | |
10 | // the Go kit metrics. The CirconusMetrics object manages aggregation of | |
11 | // observations and emission to the Circonus server. | |
12 | type Circonus struct { | |
13 | m *circonusgometrics.CirconusMetrics | |
14 | } | |
15 | ||
16 | // New creates a new Circonus object wrapping the passed CirconusMetrics, which | |
17 | // the caller should create and set in motion. The Circonus object can be used | |
18 | // to construct individual Go kit metrics. | |
19 | func New(m *circonusgometrics.CirconusMetrics) *Circonus { | |
20 | return &Circonus{ | |
21 | m: m, | |
22 | } | |
23 | } | |
24 | ||
25 | // NewCounter returns a counter metric with the given name. | |
26 | func (c *Circonus) NewCounter(name string) *Counter { | |
27 | return &Counter{ | |
28 | name: name, | |
29 | m: c.m, | |
30 | } | |
31 | } | |
32 | ||
33 | // NewGauge returns a gauge metric with the given name. | |
34 | func (c *Circonus) NewGauge(name string) *Gauge { | |
35 | return &Gauge{ | |
36 | name: name, | |
37 | m: c.m, | |
38 | } | |
39 | } | |
40 | ||
41 | // NewHistogram returns a histogram metric with the given name. | |
42 | func (c *Circonus) NewHistogram(name string) *Histogram { | |
43 | return &Histogram{ | |
44 | h: c.m.NewHistogram(name), | |
45 | } | |
46 | } | |
47 | ||
48 | // Counter is a Circonus implementation of a counter metric. | |
49 | type Counter struct { | |
50 | name string | |
51 | m *circonusgometrics.CirconusMetrics | |
52 | } | |
53 | ||
54 | // With implements Counter, but is a no-op, because Circonus metrics have no | |
55 | // concept of per-observation label values. | |
56 | func (c *Counter) With(labelValues ...string) metrics.Counter { return c } | |
57 | ||
58 | // Add implements Counter. Delta is converted to uint64; precision will be lost. | |
59 | func (c *Counter) Add(delta float64) { c.m.Add(c.name, uint64(delta)) } | |
60 | ||
61 | // Gauge is a Circonus implementation of a gauge metric. | |
62 | type Gauge struct { | |
63 | name string | |
64 | m *circonusgometrics.CirconusMetrics | |
65 | } | |
66 | ||
67 | // With implements Gauge, but is a no-op, because Circonus metrics have no | |
68 | // concept of per-observation label values. | |
69 | func (g *Gauge) With(labelValues ...string) metrics.Gauge { return g } | |
70 | ||
71 | // Set implements Gauge. Value is converted to int64; precision will be lost. | |
72 | func (g *Gauge) Set(value float64) { g.m.SetGauge(g.name, int64(value)) } | |
73 | ||
74 | // Histogram is a Circonus implementation of a histogram metric. | |
75 | type Histogram struct { | |
76 | h *circonusgometrics.Histogram | |
77 | } | |
78 | ||
79 | // With implements Histogram, but is a no-op, because Circonus metrics have no | |
80 | // concept of per-observation label values. | |
81 | func (h *Histogram) With(labelValues ...string) metrics.Histogram { return h } | |
82 | ||
83 | // Observe implements Histogram. No precision is lost. | |
84 | func (h *Histogram) Observe(value float64) { h.h.RecordValue(value) } |
0 | // Package discard provides a no-op metrics backend. | |
1 | package discard | |
2 | ||
3 | import "github.com/go-kit/kit/metrics2" | |
4 | ||
5 | type counter struct{} | |
6 | ||
7 | // NewCounter returns a new no-op counter. | |
8 | func NewCounter() metrics.Counter { return counter{} } | |
9 | ||
10 | // With implements Counter. | |
11 | func (c counter) With(labelValues ...string) metrics.Counter { return c } | |
12 | ||
13 | // Add implements Counter. | |
14 | func (c counter) Add(delta float64) {} | |
15 | ||
16 | type gauge struct{} | |
17 | ||
18 | // NewGauge returns a new no-op gauge. | |
19 | func NewGauge() metrics.Gauge { return gauge{} } | |
20 | ||
21 | // With implements Gauge. | |
22 | func (g gauge) With(labelValues ...string) metrics.Gauge { return g } | |
23 | ||
24 | // Set implements Gauge. | |
25 | func (g gauge) Set(value float64) {} | |
26 | ||
27 | type histogram struct{} | |
28 | ||
29 | // NewHistogram returns a new no-op histogram. | |
30 | func NewHistogram() metrics.Histogram { return histogram{} } | |
31 | ||
32 | // With implements Histogram. | |
33 | func (h histogram) With(labelValues ...string) metrics.Histogram { return h } | |
34 | ||
35 | // Observe implements histogram. | |
36 | func (h histogram) Observe(value float64) {} |
0 | // Package metrics provides a framework for application instrumentation. All | |
1 | // metrics are safe for concurrent use. Considerable design influence has been | |
2 | // taken from https://github.com/codahale/metrics and https://prometheus.io. | |
3 | // | |
4 | // This package contains the common interfaces. Your code should take these | |
5 | // interfaces as parameters. Implementations are provided for different | |
6 | // instrumentation systems in the various subdirectories. | |
7 | // | |
8 | // Usage | |
9 | // | |
10 | // Metrics are dependencies and should be passed to the components that need | |
11 | // them in the same way you'd construct and pass a database handle, or reference | |
12 | // to another component. So, create metrics in your func main, using whichever | |
13 | // concrete implementation is appropriate for your organization. | |
14 | // | |
15 | // latency := prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ | |
16 | // Namespace: "myteam", | |
17 | // Subsystem: "foosvc", | |
18 | // Name: "request_latency_seconds", | |
19 | // Help: "Incoming request latency in seconds." | |
20 | // }, []string{"method", "status_code"}) | |
21 | // | |
22 | // Write your components to take the metrics they will use as parameters to | |
23 | // their constructors. Use the interface types, not the concrete types. That is, | |
24 | // | |
25 | // // NewAPI takes metrics.Histogram, not *prometheus.Summary | |
26 | // func NewAPI(s Store, logger log.Logger, latency metrics.Histogram) *API { | |
27 | // // ... | |
28 | // } | |
29 | // | |
30 | // func (a *API) ServeFoo(w http.ResponseWriter, r *http.Request) { | |
31 | // begin := time.Now() | |
32 | // // ... | |
33 | // a.latency.Observe(time.Since(begin).Seconds()) | |
34 | // } | |
35 | // | |
36 | // Finally, pass the metrics as dependencies when building your object graph. | |
37 | // This should happen in func main, not in the global scope. | |
38 | // | |
39 | // api := NewAPI(store, logger, latency) | |
40 | // http.ListenAndServe("/", api) | |
41 | // | |
42 | package metrics |
0 | // Package dogstatsd provides a DogStatsD backend for package metrics. It's very | |
1 | // similar to statsd, but supports arbitrary tags per-metric, which map to Go | |
2 | // kit's label values. So, while label values are no-ops in statsd, they are | |
3 | // supported here. For more details, see the documentation at | |
4 | // http://docs.datadoghq.com/guides/dogstatsd/. | |
5 | // | |
6 | // This package batches observations and emits them on some schedule to the | |
7 | // remote server. This is useful even if you connect to your DogStatsD server | |
8 | // over UDP. Emitting one network packet per observation can quickly overwhelm | |
9 | // even the fastest internal network. Batching allows you to more linearly scale | |
10 | // with growth. | |
11 | // | |
12 | // Typically you'll create a Dogstatsd object in your main function. | |
13 | // | |
14 | // d, stop := New("myprefix.", "udp", "dogstatsd:8125", time.Second, log.NewNopLogger()) | |
15 | // defer stop() | |
16 | // | |
17 | // Then, create the metrics that your application will track from that object. | |
18 | // Pass them as dependencies to the component that needs them; don't place them | |
19 | // in the global scope. | |
20 | // | |
21 | // requests := d.NewCounter("requests") | |
22 | // foo := NewFoo(store, logger, requests) | |
23 | // | |
24 | // Invoke them in your components when you have something to instrument. | |
25 | // | |
26 | // f.requests.Add(1) | |
27 | // | |
28 | package dogstatsd | |
29 | ||
30 | import ( | |
31 | "fmt" | |
32 | "io" | |
33 | "math/rand" | |
34 | "strings" | |
35 | "sync" | |
36 | "time" | |
37 | ||
38 | "github.com/go-kit/kit/log" | |
39 | "github.com/go-kit/kit/metrics2" | |
40 | "github.com/go-kit/kit/metrics2/generic" | |
41 | "github.com/go-kit/kit/metrics2/statsd" | |
42 | "github.com/go-kit/kit/util/conn" | |
43 | ) | |
44 | ||
45 | // Dogstatsd is a store for metrics that will be reported to a DogStatsD server. | |
46 | // Create a Dogstatsd object, use it to create metrics objects, and pass those | |
47 | // objects as dependencies to the components that will use them. | |
48 | type Dogstatsd struct { | |
49 | mtx sync.RWMutex | |
50 | prefix string | |
51 | counters map[string]*generic.Counter | |
52 | gauges map[string]*generic.Gauge | |
53 | histograms map[string]*Histogram | |
54 | timings map[string]*statsd.Timing | |
55 | sets map[string]*Set | |
56 | logger log.Logger | |
57 | } | |
58 | ||
59 | // NewRaw creates a Dogstatsd object. By default the metrics will not be emitted | |
60 | // anywhere. Use WriteTo to flush the metrics once, or FlushTo (in a separate | |
61 | // goroutine) to flush them on a regular schedule, or use the New constructor to | |
62 | // set up the object and flushing at the same time. | |
63 | func NewRaw(prefix string, logger log.Logger) *Dogstatsd { | |
64 | return &Dogstatsd{ | |
65 | prefix: prefix, | |
66 | counters: map[string]*generic.Counter{}, | |
67 | gauges: map[string]*generic.Gauge{}, | |
68 | histograms: map[string]*Histogram{}, | |
69 | timings: map[string]*statsd.Timing{}, | |
70 | sets: map[string]*Set{}, | |
71 | logger: logger, | |
72 | } | |
73 | } | |
74 | ||
75 | // New creates a Statsd object that flushes all metrics in the DogStatsD format | |
76 | // every flushInterval to the network and address. Use the returned stop | |
77 | // function to terminate the flushing goroutine. | |
78 | func New(prefix string, network, address string, flushInterval time.Duration, logger log.Logger) (res *Dogstatsd, stop func()) { | |
79 | d := NewRaw(prefix, logger) | |
80 | manager := conn.NewDefaultManager(network, address, logger) | |
81 | ticker := time.NewTicker(flushInterval) | |
82 | go d.FlushTo(manager, ticker) | |
83 | return d, ticker.Stop | |
84 | } | |
85 | ||
86 | // NewCounter returns a counter metric with the given name. Adds are buffered | |
87 | // until the underlying Statsd object is flushed. | |
88 | func (d *Dogstatsd) NewCounter(name string) *generic.Counter { | |
89 | d.mtx.Lock() | |
90 | defer d.mtx.Unlock() | |
91 | c := generic.NewCounter() | |
92 | d.counters[d.prefix+name] = c | |
93 | return c | |
94 | } | |
95 | ||
96 | // NewGauge returns a gauge metric with the given name. | |
97 | func (d *Dogstatsd) NewGauge(name string) *generic.Gauge { | |
98 | d.mtx.Lock() | |
99 | defer d.mtx.Unlock() | |
100 | g := generic.NewGauge() | |
101 | d.gauges[d.prefix+name] = g | |
102 | return g | |
103 | } | |
104 | ||
105 | // NewHistogram returns a histogram metric with the given name and sample rate. | |
106 | func (d *Dogstatsd) NewHistogram(name string, sampleRate float64) *Histogram { | |
107 | d.mtx.Lock() | |
108 | defer d.mtx.Unlock() | |
109 | h := newHistogram(sampleRate) | |
110 | d.histograms[d.prefix+name] = h | |
111 | return h | |
112 | } | |
113 | ||
114 | // NewTiming returns a StatsD timing metric (DogStatsD documentation calls them | |
115 | // Timers) with the given name, unit (e.g. "ms") and sample rate. Pass a sample | |
116 | // rate of 1.0 or greater to disable sampling. Sampling is done at observation | |
117 | // time. Observations are buffered until the underlying statsd object is | |
118 | // flushed. | |
119 | func (d *Dogstatsd) NewTiming(name, unit string, sampleRate float64) *statsd.Timing { | |
120 | d.mtx.Lock() | |
121 | defer d.mtx.Unlock() | |
122 | t := statsd.NewTiming(unit, sampleRate) | |
123 | d.timings[d.prefix+name] = t | |
124 | return t | |
125 | } | |
126 | ||
127 | // NewSet returns a DogStatsD set with the given name. | |
128 | func (d *Dogstatsd) NewSet(name string) *Set { | |
129 | d.mtx.Lock() | |
130 | defer d.mtx.Unlock() | |
131 | s := newSet() | |
132 | d.sets[d.prefix+name] = s | |
133 | return s | |
134 | } | |
135 | ||
136 | // FlushTo invokes WriteTo to the writer every time the ticker fires. FlushTo | |
137 | // blocks until the ticker is stopped. Most users won't need to call this method | |
138 | // directly, and should prefer to use the New constructor. | |
139 | func (d *Dogstatsd) FlushTo(w io.Writer, ticker *time.Ticker) { | |
140 | for range ticker.C { | |
141 | if _, err := d.WriteTo(w); err != nil { | |
142 | d.logger.Log("during", "flush", "err", err) | |
143 | } | |
144 | } | |
145 | } | |
146 | ||
147 | // WriteTo dumps the current state of all of the metrics to the given writer in | |
148 | // the DogStatsD format. Each metric has its current value(s) written in | |
149 | // sequential calls to Write. Counters and gauges are dumped with their current | |
150 | // values; counters are reset. Histograms and timers have all of their | |
151 | // (potentially sampled) observations dumped, and are reset. Sets have all of | |
152 | // their observations dumped and are reset. Clients probably shouldn't invoke | |
153 | // this method directly, and should prefer using FlushTo, or the New | |
154 | // constructor. | |
155 | func (d *Dogstatsd) WriteTo(w io.Writer) (int64, error) { | |
156 | d.mtx.RLock() | |
157 | defer d.mtx.RUnlock() | |
158 | var ( | |
159 | n int | |
160 | err error | |
161 | count int64 | |
162 | ) | |
163 | for name, c := range d.counters { | |
164 | value := c.ValueReset() | |
165 | tv := tagValues(c.LabelValues()) | |
166 | n, err = fmt.Fprintf(w, "%s:%f|c%s\n", name, value, tv) | |
167 | count += int64(n) | |
168 | if err != nil { | |
169 | return count, err | |
170 | } | |
171 | } | |
172 | for name, g := range d.gauges { | |
173 | value := g.Value() | |
174 | tv := tagValues(g.LabelValues()) | |
175 | n, err := fmt.Fprintf(w, "%s:%f|g%s\n", name, value, tv) | |
176 | count += int64(n) | |
177 | if err != nil { | |
178 | return count, err | |
179 | } | |
180 | } | |
181 | for name, h := range d.histograms { | |
182 | sv := sampling(h.sampleRate) | |
183 | tv := tagValues(h.lvs) | |
184 | for _, value := range h.values { | |
185 | n, err = fmt.Fprintf(w, "%s:%f%s%s\n", name, value, sv, tv) | |
186 | count += int64(n) | |
187 | if err != nil { | |
188 | return count, err | |
189 | } | |
190 | } | |
191 | } | |
192 | for name, t := range d.timings { | |
193 | un := t.Unit() | |
194 | sv := sampling(t.SampleRate()) | |
195 | tv := tagValues(t.LabelValues()) | |
196 | for _, value := range t.Values() { | |
197 | n, err = fmt.Fprintf(w, "%s:%d|%s%s%s\n", name, value, un, sv, tv) | |
198 | count += int64(n) | |
199 | if err != nil { | |
200 | return count, err | |
201 | } | |
202 | } | |
203 | } | |
204 | for name, s := range d.sets { | |
205 | for _, value := range s.Values() { | |
206 | n, err = fmt.Fprintf(w, "%s:%s|s\n", name, value) | |
207 | count += int64(n) | |
208 | if err != nil { | |
209 | return count, err | |
210 | } | |
211 | } | |
212 | } | |
213 | return count, nil | |
214 | } | |
215 | ||
216 | // Histogram is a denormalized collection of observed values. A general form of | |
217 | // a StatsD Timing. Create histograms through the Dogstatsd object. | |
218 | type Histogram struct { | |
219 | mtx sync.Mutex | |
220 | sampleRate float64 | |
221 | values []float64 | |
222 | lvs []string | |
223 | } | |
224 | ||
225 | func newHistogram(sampleRate float64) *Histogram { | |
226 | return &Histogram{ | |
227 | sampleRate: sampleRate, | |
228 | } | |
229 | } | |
230 | ||
231 | // With implements metrics.Histogram. | |
232 | func (h *Histogram) With(labelValues ...string) metrics.Histogram { | |
233 | if len(labelValues)%2 != 0 { | |
234 | labelValues = append(labelValues, generic.LabelValueUnknown) | |
235 | } | |
236 | return &Histogram{ | |
237 | sampleRate: h.sampleRate, | |
238 | values: h.values, | |
239 | lvs: append(h.lvs, labelValues...), | |
240 | } | |
241 | } | |
242 | ||
243 | // Observe implements metrics.Histogram. Values are simply aggregated into memory. | |
244 | // If the sample rate is less than 1.0, observations may be dropped. | |
245 | func (h *Histogram) Observe(value float64) { | |
246 | if h.sampleRate < 1.0 && rand.Float64() > h.sampleRate { | |
247 | return | |
248 | } | |
249 | h.mtx.Lock() | |
250 | defer h.mtx.Unlock() | |
251 | h.values = append(h.values, value) | |
252 | } | |
253 | ||
254 | // Values returns the observed values since the last call to Values. This method | |
255 | // clears the internal state of the Histogram; better get those values somewhere | |
256 | // safe! | |
257 | func (h *Histogram) Values() []float64 { | |
258 | h.mtx.Lock() | |
259 | defer h.mtx.Unlock() | |
260 | res := h.values | |
261 | h.values = []float64{} | |
262 | return res | |
263 | } | |
264 | ||
265 | // Set is a DogStatsD-specific metric for tracking unique identifiers. | |
266 | // Create sets through the Dogstatsd object. | |
267 | type Set struct { | |
268 | mtx sync.Mutex | |
269 | values map[string]struct{} | |
270 | } | |
271 | ||
272 | func newSet() *Set { | |
273 | return &Set{ | |
274 | values: map[string]struct{}{}, | |
275 | } | |
276 | } | |
277 | ||
278 | // Observe adds the value to the set. | |
279 | func (s *Set) Observe(value string) { | |
280 | s.mtx.Lock() | |
281 | defer s.mtx.Unlock() | |
282 | s.values[value] = struct{}{} | |
283 | } | |
284 | ||
285 | // Values returns the unique observed values since the last call to Values. This | |
286 | // method clears the internal state of the Set; better get those values | |
287 | // somewhere safe! | |
288 | func (s *Set) Values() []string { | |
289 | res := make([]string, 0, len(s.values)) | |
290 | for value := range s.values { | |
291 | res = append(res, value) | |
292 | } | |
293 | s.values = map[string]struct{}{} // TODO(pb): if GC is a problem, this can be improved | |
294 | return res | |
295 | } | |
296 | ||
297 | func sampling(r float64) string { | |
298 | var sv string | |
299 | if r < 1.0 { | |
300 | sv = fmt.Sprintf("|@%f", r) | |
301 | } | |
302 | return sv | |
303 | } | |
304 | ||
305 | func tagValues(labelValues []string) string { | |
306 | if len(labelValues) == 0 { | |
307 | return "" | |
308 | } | |
309 | if len(labelValues)%2 != 0 { | |
310 | panic("tagValues received a labelValues with an odd number of strings") | |
311 | } | |
312 | pairs := make([]string, 0, len(labelValues)/2) | |
313 | for i := 0; i < len(labelValues); i += 2 { | |
314 | pairs = append(pairs, labelValues[i]+":"+labelValues[i+1]) | |
315 | } | |
316 | return "|#" + strings.Join(pairs, ",") | |
317 | } |
0 | // Package expvar provides expvar backends for metrics. | |
1 | // Label values are not supported. | |
2 | package expvar | |
3 | ||
4 | import ( | |
5 | "expvar" | |
6 | "sync" | |
7 | ||
8 | "github.com/go-kit/kit/metrics2" | |
9 | "github.com/go-kit/kit/metrics2/generic" | |
10 | ) | |
11 | ||
12 | // Counter implements the counter metric with an expvar float. | |
13 | type Counter struct { | |
14 | f *expvar.Float | |
15 | } | |
16 | ||
17 | // NewCounter creates an expvar Float with the given name, and returns an object | |
18 | // that implements the Counter interface. | |
19 | func NewCounter(name string) *Counter { | |
20 | return &Counter{ | |
21 | f: expvar.NewFloat(name), | |
22 | } | |
23 | } | |
24 | ||
25 | // With is a no-op. | |
26 | func (c *Counter) With(labelValues ...string) metrics.Counter { return c } | |
27 | ||
28 | // Add implements Counter. | |
29 | func (c *Counter) Add(delta float64) { c.f.Add(delta) } | |
30 | ||
31 | // Gauge implements the gauge metric wtih an expvar float. | |
32 | type Gauge struct { | |
33 | f *expvar.Float | |
34 | } | |
35 | ||
36 | // NewGauge creates an expvar Float with the given name, and returns an object | |
37 | // that implements the Gauge interface. | |
38 | func NewGauge(name string) *Gauge { | |
39 | return &Gauge{ | |
40 | f: expvar.NewFloat(name), | |
41 | } | |
42 | } | |
43 | ||
44 | // With is a no-op. | |
45 | func (g *Gauge) With(labelValues ...string) metrics.Gauge { return g } | |
46 | ||
47 | // Set implements Gauge. | |
48 | func (g *Gauge) Set(value float64) { g.f.Set(value) } | |
49 | ||
50 | // Histogram implements the histogram metric with a combination of the generic | |
51 | // Histogram object and several expvar Floats, one for each of the 50th, 90th, | |
52 | // 95th, and 99th quantiles of observed values, with the quantile attached to | |
53 | // the name as a suffix. | |
54 | type Histogram struct { | |
55 | mtx sync.Mutex | |
56 | h *generic.Histogram | |
57 | p50 *expvar.Float | |
58 | p90 *expvar.Float | |
59 | p95 *expvar.Float | |
60 | p99 *expvar.Float | |
61 | } | |
62 | ||
63 | // NewHistogram returns a Histogram object with the given name and number of | |
64 | // buckets in the underlying histogram object. 50 is a good default number of | |
65 | // buckets. | |
66 | func NewHistogram(name string, buckets int) *Histogram { | |
67 | return &Histogram{ | |
68 | h: generic.NewHistogram(buckets), | |
69 | p50: expvar.NewFloat(name + ".p50"), | |
70 | p90: expvar.NewFloat(name + ".p90"), | |
71 | p95: expvar.NewFloat(name + ".p95"), | |
72 | p99: expvar.NewFloat(name + ".p99"), | |
73 | } | |
74 | } | |
75 | ||
76 | // With is a no-op. | |
77 | func (h *Histogram) With(labelValues ...string) metrics.Histogram { return h } | |
78 | ||
79 | // Observe impleemts Histogram. | |
80 | func (h *Histogram) Observe(value float64) { | |
81 | h.mtx.Lock() | |
82 | defer h.mtx.Unlock() | |
83 | h.Observe(value) | |
84 | h.p50.Set(h.h.Quantile(0.50)) | |
85 | h.p90.Set(h.h.Quantile(0.90)) | |
86 | h.p95.Set(h.h.Quantile(0.95)) | |
87 | h.p99.Set(h.h.Quantile(0.99)) | |
88 | } |
0 | // Package generic implements generic versions of each of the metric types. They | |
1 | // can be embedded by other implementations, and converted to specific formats | |
2 | // as necessary. | |
3 | package generic | |
4 | ||
5 | import ( | |
6 | "math" | |
7 | "sync" | |
8 | "sync/atomic" | |
9 | ||
10 | "github.com/VividCortex/gohistogram" | |
11 | ||
12 | "github.com/go-kit/kit/metrics2" | |
13 | ) | |
14 | ||
15 | // LabelValueUnknown is used as a label value when one is expected but not | |
16 | // provided, typically due to user error. | |
17 | const LabelValueUnknown = "unknown" | |
18 | ||
19 | // Counter is an in-memory implementation of a Counter. | |
20 | type Counter struct { | |
21 | sampleRate float64 | |
22 | bits uint64 | |
23 | lvs []string // immutable | |
24 | } | |
25 | ||
26 | // NewCounter returns a new, usable Counter. | |
27 | func NewCounter() *Counter { | |
28 | return &Counter{} | |
29 | } | |
30 | ||
31 | // With implements Counter. | |
32 | func (c *Counter) With(labelValues ...string) metrics.Counter { | |
33 | if len(labelValues)%2 != 0 { | |
34 | labelValues = append(labelValues, LabelValueUnknown) | |
35 | } | |
36 | return &Counter{ | |
37 | bits: atomic.LoadUint64(&c.bits), | |
38 | lvs: append(c.lvs, labelValues...), | |
39 | } | |
40 | } | |
41 | ||
42 | // Add implements Counter. | |
43 | func (c *Counter) Add(delta float64) { | |
44 | for { | |
45 | var ( | |
46 | old = atomic.LoadUint64(&c.bits) | |
47 | newf = math.Float64frombits(old) + delta | |
48 | new = math.Float64bits(newf) | |
49 | ) | |
50 | if atomic.CompareAndSwapUint64(&c.bits, old, new) { | |
51 | break | |
52 | } | |
53 | } | |
54 | } | |
55 | ||
56 | // Value returns the current value of the counter. | |
57 | func (c *Counter) Value() float64 { | |
58 | return math.Float64frombits(atomic.LoadUint64(&c.bits)) | |
59 | } | |
60 | ||
61 | // ValueReset returns the current value of the counter, and resets it to zero. | |
62 | // This is useful for metrics backends whose counter aggregations expect deltas, | |
63 | // like statsd. | |
64 | func (c *Counter) ValueReset() float64 { | |
65 | for { | |
66 | var ( | |
67 | old = atomic.LoadUint64(&c.bits) | |
68 | newf = 0.0 | |
69 | new = math.Float64bits(newf) | |
70 | ) | |
71 | if atomic.CompareAndSwapUint64(&c.bits, old, new) { | |
72 | return math.Float64frombits(old) | |
73 | } | |
74 | } | |
75 | } | |
76 | ||
77 | // LabelValues returns the set of label values attached to the counter. | |
78 | func (c *Counter) LabelValues() []string { | |
79 | return c.lvs | |
80 | } | |
81 | ||
82 | // Gauge is an in-memory implementation of a Gauge. | |
83 | type Gauge struct { | |
84 | bits uint64 | |
85 | lvs []string // immutable | |
86 | } | |
87 | ||
88 | // NewGauge returns a new, usable Gauge. | |
89 | func NewGauge() *Gauge { | |
90 | return &Gauge{} | |
91 | } | |
92 | ||
93 | // With implements Gauge. | |
94 | func (c *Gauge) With(labelValues ...string) metrics.Gauge { | |
95 | if len(labelValues)%2 != 0 { | |
96 | labelValues = append(labelValues, LabelValueUnknown) | |
97 | } | |
98 | return &Gauge{ | |
99 | bits: atomic.LoadUint64(&c.bits), | |
100 | lvs: append(c.lvs, labelValues...), | |
101 | } | |
102 | } | |
103 | ||
104 | // Set implements Gauge. | |
105 | func (c *Gauge) Set(value float64) { | |
106 | atomic.StoreUint64(&c.bits, math.Float64bits(value)) | |
107 | } | |
108 | ||
109 | // Value returns the current value of the gauge. | |
110 | func (c *Gauge) Value() float64 { | |
111 | return math.Float64frombits(atomic.LoadUint64(&c.bits)) | |
112 | } | |
113 | ||
114 | // LabelValues returns the set of label values attached to the gauge. | |
115 | func (c *Gauge) LabelValues() []string { | |
116 | return c.lvs | |
117 | } | |
118 | ||
119 | // Histogram is an in-memory implementation of a streaming histogram, based on | |
120 | // VividCortex/gohistogram. It dynamically computes quantiles, so it's not | |
121 | // suitable for aggregation. | |
122 | type Histogram struct { | |
123 | lvs []string // immutable | |
124 | h gohistogram.Histogram | |
125 | } | |
126 | ||
127 | // NewHistogram returns a numeric histogram based on VividCortex/gohistogram. A | |
128 | // good default value for buckets is 50. | |
129 | func NewHistogram(buckets int) *Histogram { | |
130 | return &Histogram{ | |
131 | h: gohistogram.NewHistogram(buckets), | |
132 | } | |
133 | } | |
134 | ||
135 | // With implements Histogram. | |
136 | func (h *Histogram) With(labelValues ...string) metrics.Histogram { | |
137 | if len(labelValues)%2 != 0 { | |
138 | labelValues = append(labelValues, LabelValueUnknown) | |
139 | } | |
140 | return &Histogram{ | |
141 | lvs: append(h.lvs, labelValues...), | |
142 | h: h.h, | |
143 | } | |
144 | } | |
145 | ||
146 | // Observe implements Histogram. | |
147 | func (h *Histogram) Observe(value float64) { | |
148 | h.h.Add(value) | |
149 | } | |
150 | ||
151 | // Quantile returns the value of the quantile q, 0.0 < q < 1.0. | |
152 | func (h *Histogram) Quantile(q float64) float64 { | |
153 | return h.h.Quantile(q) | |
154 | } | |
155 | ||
156 | // LabelValues returns the set of label values attached to the histogram. | |
157 | func (h *Histogram) LabelValues() []string { | |
158 | return h.lvs | |
159 | } | |
160 | ||
161 | // SimpleHistogram is an in-memory implementation of a Histogram. It only tracks | |
162 | // an approximate moving average, so is likely too naïve for many use cases. | |
163 | type SimpleHistogram struct { | |
164 | mtx sync.RWMutex | |
165 | lvs []string | |
166 | avg float64 | |
167 | n uint64 | |
168 | } | |
169 | ||
170 | // NewSimpleHistogram returns a SimpleHistogram, ready for observations. | |
171 | func NewSimpleHistogram() *SimpleHistogram { | |
172 | return &SimpleHistogram{} | |
173 | } | |
174 | ||
175 | // With implements Histogram. | |
176 | func (h *SimpleHistogram) With(labelValues ...string) metrics.Histogram { | |
177 | if len(labelValues)%2 != 0 { | |
178 | labelValues = append(labelValues, LabelValueUnknown) | |
179 | } | |
180 | return &SimpleHistogram{ | |
181 | lvs: append(h.lvs, labelValues...), | |
182 | avg: h.avg, | |
183 | n: h.n, | |
184 | } | |
185 | } | |
186 | ||
187 | // Observe implements Histogram. | |
188 | func (h *SimpleHistogram) Observe(value float64) { | |
189 | h.mtx.Lock() | |
190 | defer h.mtx.Unlock() | |
191 | h.n++ | |
192 | h.avg -= h.avg / float64(h.n) | |
193 | h.avg += value / float64(h.n) | |
194 | } | |
195 | ||
196 | // ApproximateMovingAverage returns the approximate moving average of observations. | |
197 | func (h *SimpleHistogram) ApproximateMovingAverage() float64 { | |
198 | h.mtx.RLock() | |
199 | h.mtx.RUnlock() | |
200 | return h.avg | |
201 | } | |
202 | ||
203 | // LabelValues returns the set of label values attached to the histogram. | |
204 | func (h *SimpleHistogram) LabelValues() []string { | |
205 | return h.lvs | |
206 | } |
0 | // Package graphite provides a Graphite backend for metrics. Metrics are emitted | |
1 | // with each observation in the plaintext protocol. See | |
2 | // http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol | |
3 | // for more information. | |
4 | // | |
5 | // Graphite does not have a native understanding of metric parameterization, so | |
6 | // label values are aggregated but not reported. Use distinct metrics for each | |
7 | // unique combination of label values. | |
8 | package graphite | |
9 | ||
10 | import ( | |
11 | "fmt" | |
12 | "io" | |
13 | "sync" | |
14 | "time" | |
15 | ||
16 | "github.com/go-kit/kit/log" | |
17 | "github.com/go-kit/kit/metrics2/generic" | |
18 | "github.com/go-kit/kit/util/conn" | |
19 | ) | |
20 | ||
21 | // Graphite is a store for metrics that will be reported to a Graphite server. | |
22 | // Create a Graphite object, use it to create metrics objects, and pass those | |
23 | // objects as dependencies to the components that will use them. | |
24 | type Graphite struct { | |
25 | mtx sync.RWMutex | |
26 | prefix string | |
27 | counters map[string]*generic.Counter | |
28 | gauges map[string]*generic.Gauge | |
29 | histograms map[string]*generic.Histogram | |
30 | logger log.Logger | |
31 | } | |
32 | ||
33 | // New creates a Statsd object that flushes all metrics in the Graphite | |
34 | // plaintext format every flushInterval to the network and address. Use the | |
35 | // returned stop function to terminate the flushing goroutine. | |
36 | func New(prefix string, network, address string, flushInterval time.Duration, logger log.Logger) (res *Graphite, stop func()) { | |
37 | s := NewRaw(prefix, logger) | |
38 | manager := conn.NewDefaultManager(network, address, logger) | |
39 | ticker := time.NewTicker(flushInterval) | |
40 | go s.FlushTo(manager, ticker) | |
41 | return s, ticker.Stop | |
42 | } | |
43 | ||
44 | // NewRaw returns a Graphite object capable of allocating individual metrics. | |
45 | // All metrics will share the given prefix in their path. All metrics can be | |
46 | // snapshotted, and their values and statistical summaries written to a writer, | |
47 | // via the WriteTo method. | |
48 | func NewRaw(prefix string, logger log.Logger) *Graphite { | |
49 | return &Graphite{ | |
50 | prefix: prefix, | |
51 | counters: map[string]*generic.Counter{}, | |
52 | gauges: map[string]*generic.Gauge{}, | |
53 | histograms: map[string]*generic.Histogram{}, | |
54 | logger: logger, | |
55 | } | |
56 | } | |
57 | ||
58 | // NewCounter allocates and returns a counter with the given name. | |
59 | func (g *Graphite) NewCounter(name string) *generic.Counter { | |
60 | g.mtx.Lock() | |
61 | defer g.mtx.Unlock() | |
62 | c := generic.NewCounter() | |
63 | g.counters[g.prefix+name] = c | |
64 | return c | |
65 | } | |
66 | ||
67 | // NewGauge allocates and returns a gauge with the given name. | |
68 | func (g *Graphite) NewGauge(name string) *generic.Gauge { | |
69 | g.mtx.Lock() | |
70 | defer g.mtx.Unlock() | |
71 | ga := generic.NewGauge() | |
72 | g.gauges[g.prefix+name] = ga | |
73 | return ga | |
74 | } | |
75 | ||
76 | // NewHistogram allocates and returns a histogram with the given name and bucket | |
77 | // count. 50 is a good default number of buckets. Histograms report their 50th, | |
78 | // 90th, 95th, and 99th quantiles in distinct metrics with the .p50, .p90, .p95, | |
79 | // and .p99 suffixes, respectively. | |
80 | func (g *Graphite) NewHistogram(name string, buckets int) *generic.Histogram { | |
81 | g.mtx.Lock() | |
82 | defer g.mtx.Unlock() | |
83 | h := generic.NewHistogram(buckets) | |
84 | g.histograms[g.prefix+name] = h | |
85 | return h | |
86 | } | |
87 | ||
88 | // FlushTo invokes WriteTo to the writer every time the ticker fires. FlushTo | |
89 | // blocks until the ticker is stopped. Most users won't need to call this method | |
90 | // directly, and should prefer to use the New constructor. | |
91 | func (g *Graphite) FlushTo(w io.Writer, ticker *time.Ticker) { | |
92 | for range ticker.C { | |
93 | if _, err := g.WriteTo(w); err != nil { | |
94 | g.logger.Log("during", "flush", "err", err) | |
95 | } | |
96 | } | |
97 | } | |
98 | ||
99 | // WriteTo writes a snapshot of all of the allocated metrics to the writer in | |
100 | // the Graphite plaintext format. Clients probably shouldn't invoke this method | |
101 | // directly, and should prefer using FlushTo, or the New constructor. | |
102 | func (g *Graphite) WriteTo(w io.Writer) (int64, error) { | |
103 | g.mtx.RLock() | |
104 | defer g.mtx.RUnlock() | |
105 | var ( | |
106 | n int | |
107 | err error | |
108 | count int64 | |
109 | now = time.Now().Unix() | |
110 | ) | |
111 | for path, c := range g.counters { | |
112 | n, err = fmt.Fprintf(w, "%s.count %f %d\n", path, c.Value(), now) | |
113 | if err != nil { | |
114 | return count, err | |
115 | } | |
116 | count += int64(n) | |
117 | } | |
118 | for path, ga := range g.gauges { | |
119 | n, err = fmt.Fprintf(w, "%s %f %d\n", path, ga.Value(), now) | |
120 | if err != nil { | |
121 | return count, err | |
122 | } | |
123 | count += int64(n) | |
124 | } | |
125 | for path, h := range g.histograms { | |
126 | n, err = fmt.Fprintf(w, "%s.p50 %f %d\n", path, h.Quantile(0.50), now) | |
127 | n, err = fmt.Fprintf(w, "%s.p90 %f %d\n", path, h.Quantile(0.90), now) | |
128 | n, err = fmt.Fprintf(w, "%s.p95 %f %d\n", path, h.Quantile(0.95), now) | |
129 | n, err = fmt.Fprintf(w, "%s.p99 %f %d\n", path, h.Quantile(0.99), now) | |
130 | if err != nil { | |
131 | return count, err | |
132 | } | |
133 | count += int64(n) | |
134 | } | |
135 | return count, nil | |
136 | } |
0 | // Package influx provides an InfluxDB implementation for metrics. The model is | |
1 | // similar to other push-based instrumentation systems. Observations are | |
2 | // aggregated locally and emitted to the Influx server on regular intervals. | |
3 | package influx | |
4 | ||
5 | import ( | |
6 | "sync" | |
7 | "time" | |
8 | ||
9 | "github.com/go-kit/kit/log" | |
10 | "github.com/go-kit/kit/metrics2/generic" | |
11 | influxdb "github.com/influxdata/influxdb/client/v2" | |
12 | ) | |
13 | ||
14 | // Influx is a store for metrics that will be emitted to an Influx database. | |
15 | // | |
16 | // Influx is a general purpose time-series database, and has no native concepts | |
17 | // of counters, gauges, or histograms. Counters are modeled as a timeseries with | |
18 | // one data point per flush, with a "count" field that reflects all adds since | |
19 | // the last flush. Gauges are modeled as a timeseries with one data point per | |
20 | // flush, with a "value" field that reflects the current state of the gauge. | |
21 | // Histograms are modeled as 4 gauge timeseries, one each for the 50th, 90th, | |
22 | // 95th, and 99th quantiles. | |
23 | // | |
24 | // Influx tags are assigned to each Go kit metric at construction, and are | |
25 | // immutable for the life of the metric. Influx fields are mapped to Go kit | |
26 | // label values, and may be mutated via With functions. Actual metric values are | |
27 | // provided as fields with specific names depending on the metric. | |
28 | // | |
29 | // All observations are batched in memory locally, and flushed on demand. | |
30 | type Influx struct { | |
31 | mtx sync.RWMutex | |
32 | counters map[string]*Counter | |
33 | gauges map[string]*Gauge | |
34 | histograms map[string]*Histogram | |
35 | tags map[string]string | |
36 | conf influxdb.BatchPointsConfig | |
37 | logger log.Logger | |
38 | } | |
39 | ||
40 | // New returns an Influx object, ready to create metrics and aggregate | |
41 | // observations, and automatically flushing to the passed Influx client every | |
42 | // flushInterval. Use the returned stop function to terminate the flushing | |
43 | // goroutine. | |
44 | func New( | |
45 | tags map[string]string, | |
46 | conf influxdb.BatchPointsConfig, | |
47 | client influxdb.Client, | |
48 | flushInterval time.Duration, | |
49 | logger log.Logger, | |
50 | ) (res *Influx, stop func()) { | |
51 | i := NewRaw(tags, conf, logger) | |
52 | ticker := time.NewTicker(flushInterval) | |
53 | go i.FlushTo(client, ticker) | |
54 | return i, ticker.Stop | |
55 | } | |
56 | ||
57 | // NewRaw returns an Influx object, ready to create metrics and aggregate | |
58 | // observations, but without automatically flushing anywhere. Users should | |
59 | // probably prefer the New constructor. | |
60 | // | |
61 | // Tags are applied to all metrics created from this object. A BatchPoints | |
62 | // structure is created from the provided BatchPointsConfig; any error will | |
63 | // cause a panic. Observations are aggregated into the BatchPoints. | |
64 | func NewRaw(tags map[string]string, conf influxdb.BatchPointsConfig, logger log.Logger) *Influx { | |
65 | return &Influx{ | |
66 | counters: map[string]*Counter{}, | |
67 | gauges: map[string]*Gauge{}, | |
68 | histograms: map[string]*Histogram{}, | |
69 | tags: tags, | |
70 | conf: conf, | |
71 | logger: logger, | |
72 | } | |
73 | } | |
74 | ||
75 | // NewCounter returns a generic counter with static tags. | |
76 | func (i *Influx) NewCounter(name string, tags map[string]string) *Counter { | |
77 | i.mtx.Lock() | |
78 | defer i.mtx.Unlock() | |
79 | c := newCounter(tags) | |
80 | i.counters[name] = c | |
81 | return c | |
82 | } | |
83 | ||
84 | // NewGauge returns a generic gauge with static tags. | |
85 | func (i *Influx) NewGauge(name string, tags map[string]string) *Gauge { | |
86 | i.mtx.Lock() | |
87 | defer i.mtx.Unlock() | |
88 | g := newGauge(tags) | |
89 | i.gauges[name] = g | |
90 | return g | |
91 | } | |
92 | ||
93 | // NewHistogram returns a generic histogram with static tags. 50 is a good | |
94 | // default number of buckets. | |
95 | func (i *Influx) NewHistogram(name string, tags map[string]string, buckets int) *Histogram { | |
96 | i.mtx.Lock() | |
97 | defer i.mtx.Unlock() | |
98 | h := newHistogram(tags, buckets) | |
99 | i.histograms[name] = h | |
100 | return h | |
101 | } | |
102 | ||
103 | // FlushTo invokes WriteTo to the client every time the ticker fires. FlushTo | |
104 | // blocks until the ticker is stopped. Most users won't need to call this method | |
105 | // directly, and should prefer to use the New constructor. | |
106 | func (i *Influx) FlushTo(client influxdb.Client, ticker *time.Ticker) { | |
107 | for range ticker.C { | |
108 | if err := i.WriteTo(client); err != nil { | |
109 | i.logger.Log("during", "flush", "err", err) | |
110 | } | |
111 | } | |
112 | } | |
113 | ||
114 | // WriteTo converts the current set of metrics to Influx BatchPoints, and writes | |
115 | // the BatchPoints to the client. Clients probably shouldn't invoke this method | |
116 | // directly, and should prefer using FlushTo, or the New constructor. | |
117 | func (i *Influx) WriteTo(client influxdb.Client) error { | |
118 | i.mtx.Lock() | |
119 | defer i.mtx.Unlock() | |
120 | ||
121 | bp, err := influxdb.NewBatchPoints(i.conf) | |
122 | if err != nil { | |
123 | return err | |
124 | } | |
125 | now := time.Now() | |
126 | ||
127 | for name, c := range i.counters { | |
128 | fields := fieldsFrom(c.LabelValues()) | |
129 | fields["count"] = c.ValueReset() | |
130 | p, err := influxdb.NewPoint(name, c.tags, fields, now) | |
131 | if err != nil { | |
132 | return err | |
133 | } | |
134 | bp.AddPoint(p) | |
135 | } | |
136 | ||
137 | for name, g := range i.gauges { | |
138 | fields := fieldsFrom(g.LabelValues()) | |
139 | fields["value"] = g.Value() | |
140 | p, err := influxdb.NewPoint(name, g.tags, fields, now) | |
141 | if err != nil { | |
142 | return err | |
143 | } | |
144 | bp.AddPoint(p) | |
145 | } | |
146 | ||
147 | for name, h := range i.histograms { | |
148 | fields := fieldsFrom(h.LabelValues()) | |
149 | for suffix, quantile := range map[string]float64{ | |
150 | ".p50": 0.50, | |
151 | ".p90": 0.90, | |
152 | ".p95": 0.95, | |
153 | ".p99": 0.99, | |
154 | } { | |
155 | fields["value"] = h.Quantile(quantile) | |
156 | p, err := influxdb.NewPoint(name+suffix, h.tags, fields, now) | |
157 | if err != nil { | |
158 | return err | |
159 | } | |
160 | bp.AddPoint(p) | |
161 | } | |
162 | } | |
163 | ||
164 | return client.Write(bp) | |
165 | } | |
166 | ||
167 | func fieldsFrom(labelValues []string) map[string]interface{} { | |
168 | if len(labelValues)%2 != 0 { | |
169 | panic("fieldsFrom received a labelValues with an odd number of strings") | |
170 | } | |
171 | fields := make(map[string]interface{}, len(labelValues)/2) | |
172 | for i := 0; i < len(labelValues); i += 2 { | |
173 | fields[labelValues[i]] = labelValues[i+1] | |
174 | } | |
175 | return fields | |
176 | } | |
177 | ||
178 | // Counter is a generic counter, with static tags. | |
179 | type Counter struct { | |
180 | *generic.Counter | |
181 | tags map[string]string | |
182 | } | |
183 | ||
184 | func newCounter(tags map[string]string) *Counter { | |
185 | return &Counter{ | |
186 | Counter: generic.NewCounter(), | |
187 | tags: tags, | |
188 | } | |
189 | } | |
190 | ||
191 | // Gauge is a generic gauge, with static tags. | |
192 | type Gauge struct { | |
193 | *generic.Gauge | |
194 | tags map[string]string | |
195 | } | |
196 | ||
197 | func newGauge(tags map[string]string) *Gauge { | |
198 | return &Gauge{ | |
199 | Gauge: generic.NewGauge(), | |
200 | tags: tags, | |
201 | } | |
202 | } | |
203 | ||
204 | // Histogram is a generic histogram, with static tags. | |
205 | type Histogram struct { | |
206 | *generic.Histogram | |
207 | tags map[string]string | |
208 | } | |
209 | ||
210 | func newHistogram(tags map[string]string, buckets int) *Histogram { | |
211 | return &Histogram{ | |
212 | Histogram: generic.NewHistogram(buckets), | |
213 | tags: tags, | |
214 | } | |
215 | } |
0 | package metrics | |
1 | ||
2 | // Counter describes a metric that accumulates values monotonically. | |
3 | // An example of a counter is the number of received HTTP requests. | |
4 | type Counter interface { | |
5 | With(labelValues ...string) Counter | |
6 | Add(delta float64) | |
7 | } | |
8 | ||
9 | // Gauge describes a metric that takes a specific value over time. | |
10 | // An example of a gauge is the current depth of a job queue. | |
11 | type Gauge interface { | |
12 | With(labelValues ...string) Gauge | |
13 | Set(value float64) | |
14 | } | |
15 | ||
16 | // Histogram describes a metric that takes repeated observations of the same | |
17 | // kind of thing, and produces a statistical summary of those observations, | |
18 | // typically expressed as quantile buckets. An example of a histogram is HTTP | |
19 | // request latencies. | |
20 | type Histogram interface { | |
21 | With(labelValues ...string) Histogram | |
22 | Observe(value float64) | |
23 | } |
0 | // Package prometheus provides Prometheus implementations for metrics. | |
1 | // Individual metrics are mapped to their Prometheus counterparts, and | |
2 | // (depending on the constructor used) may be automatically registered in the | |
3 | // global Prometheus metrics registry. | |
4 | package prometheus | |
5 | ||
6 | import ( | |
7 | "github.com/prometheus/client_golang/prometheus" | |
8 | ||
9 | "github.com/go-kit/kit/metrics2" | |
10 | "github.com/go-kit/kit/metrics2/generic" | |
11 | ) | |
12 | ||
13 | // Counter implements Counter, via a Prometheus CounterVec. | |
14 | type Counter struct { | |
15 | cv *prometheus.CounterVec | |
16 | lv []string | |
17 | } | |
18 | ||
19 | // NewCounterFrom constructs and registers a Prometheus CounterVec, | |
20 | // and returns a usable Counter object. | |
21 | func NewCounterFrom(opts prometheus.CounterOpts, labelNames []string) *Counter { | |
22 | cv := prometheus.NewCounterVec(opts, labelNames) | |
23 | prometheus.MustRegister(cv) | |
24 | return NewCounter(cv) | |
25 | } | |
26 | ||
27 | // NewCounter wraps the CounterVec and returns a usable Counter object. | |
28 | func NewCounter(cv *prometheus.CounterVec) *Counter { | |
29 | return &Counter{ | |
30 | cv: cv, | |
31 | lv: []string{}, | |
32 | } | |
33 | } | |
34 | ||
35 | // With implements Counter. | |
36 | func (c *Counter) With(labelValues ...string) metrics.Counter { | |
37 | if len(labelValues)%2 != 0 { | |
38 | labelValues = append(labelValues, generic.LabelValueUnknown) | |
39 | } | |
40 | return &Counter{ | |
41 | cv: c.cv, | |
42 | lv: append(c.lv, labelValues...), | |
43 | } | |
44 | } | |
45 | ||
46 | // Add implements Counter. | |
47 | func (c *Counter) Add(delta float64) { | |
48 | c.cv.WithLabelValues(c.lv...).Add(delta) | |
49 | } | |
50 | ||
51 | // Gauge implements Gauge, via a Prometheus GaugeVec. | |
52 | type Gauge struct { | |
53 | gv *prometheus.GaugeVec | |
54 | lv []string | |
55 | } | |
56 | ||
57 | // NewGaugeFrom construts and registers a Prometheus GaugeVec, | |
58 | // and returns a usable Gauge object. | |
59 | func NewGaugeFrom(opts prometheus.GaugeOpts, labelNames []string) *Gauge { | |
60 | gv := prometheus.NewGaugeVec(opts, labelNames) | |
61 | prometheus.MustRegister(gv) | |
62 | return NewGauge(gv) | |
63 | } | |
64 | ||
65 | // NewGauge wraps the GaugeVec and returns a usable Gauge object. | |
66 | func NewGauge(gv *prometheus.GaugeVec) *Gauge { | |
67 | return &Gauge{ | |
68 | gv: gv, | |
69 | lv: []string{}, | |
70 | } | |
71 | } | |
72 | ||
73 | // With implements Gauge. | |
74 | func (g *Gauge) With(labelValues ...string) metrics.Gauge { | |
75 | if len(labelValues)%2 != 0 { | |
76 | labelValues = append(labelValues, generic.LabelValueUnknown) | |
77 | } | |
78 | return &Gauge{ | |
79 | gv: g.gv, | |
80 | lv: append(g.lv, labelValues...), | |
81 | } | |
82 | } | |
83 | ||
84 | // Set implements Gauge. | |
85 | func (g *Gauge) Set(value float64) { | |
86 | g.gv.WithLabelValues(g.lv...).Set(value) | |
87 | } | |
88 | ||
89 | // Add is supported by Prometheus GaugeVecs. | |
90 | func (g *Gauge) Add(delta float64) { | |
91 | g.gv.WithLabelValues(g.lv...).Add(delta) | |
92 | } | |
93 | ||
94 | // Summary implements Histogram, via a Prometheus SummaryVec. The difference | |
95 | // between a Summary and a Histogram is that Summaries don't require predefined | |
96 | // quantile buckets, but cannot be statistically aggregated. | |
97 | type Summary struct { | |
98 | sv *prometheus.SummaryVec | |
99 | lv []string | |
100 | } | |
101 | ||
102 | // NewSummaryFrom constructs and registers a Prometheus SummaryVec, | |
103 | // and returns a usable Summary object. | |
104 | func NewSummaryFrom(opts prometheus.SummaryOpts, labelNames []string) *Summary { | |
105 | sv := prometheus.NewSummaryVec(opts, labelNames) | |
106 | prometheus.MustRegister(sv) | |
107 | return NewSummary(sv) | |
108 | } | |
109 | ||
110 | // NewSummary wraps the SummaryVec and returns a usable Summary object. | |
111 | func NewSummary(sv *prometheus.SummaryVec) *Summary { | |
112 | return &Summary{ | |
113 | sv: sv, | |
114 | lv: []string{}, | |
115 | } | |
116 | } | |
117 | ||
118 | // With implements Histogram. | |
119 | func (s *Summary) With(labelValues ...string) metrics.Histogram { | |
120 | if len(labelValues)%2 != 0 { | |
121 | labelValues = append(labelValues, generic.LabelValueUnknown) | |
122 | } | |
123 | return &Summary{ | |
124 | sv: s.sv, | |
125 | lv: append(s.lv, labelValues...), | |
126 | } | |
127 | } | |
128 | ||
129 | // Observe implements Histogram. | |
130 | func (s *Summary) Observe(value float64) { | |
131 | s.sv.WithLabelValues(s.lv...).Observe(value) | |
132 | } | |
133 | ||
134 | // Histogram implements Histogram via a Prometheus HistogramVec. The difference | |
135 | // between a Histogram and a Summary is that Histograms require predefined | |
136 | // quantile buckets, and can be statistically aggregated. | |
137 | type Histogram struct { | |
138 | hv *prometheus.HistogramVec | |
139 | lv []string | |
140 | } | |
141 | ||
142 | // NewHistogramFrom constructs and registers a Prometheus HistogramVec, | |
143 | // and returns a usable Histogram object. | |
144 | func NewHistogramFrom(opts prometheus.HistogramOpts, labelNames []string) *Histogram { | |
145 | hv := prometheus.NewHistogramVec(opts, labelNames) | |
146 | prometheus.MustRegister(hv) | |
147 | return NewHistogram(hv) | |
148 | } | |
149 | ||
150 | // NewHistogram wraps the HistogramVec and returns a usable Histogram object. | |
151 | func NewHistogram(hv *prometheus.HistogramVec) *Histogram { | |
152 | return &Histogram{ | |
153 | hv: hv, | |
154 | lv: []string{}, | |
155 | } | |
156 | } | |
157 | ||
158 | // With implements Histogram. | |
159 | func (h *Histogram) With(labelValues ...string) metrics.Histogram { | |
160 | if len(labelValues)%2 != 0 { | |
161 | labelValues = append(labelValues, generic.LabelValueUnknown) | |
162 | } | |
163 | return &Histogram{ | |
164 | hv: h.hv, | |
165 | lv: append(h.lv, labelValues...), | |
166 | } | |
167 | } | |
168 | ||
169 | // Observe implements Histogram. | |
170 | func (h *Histogram) Observe(value float64) { | |
171 | h.hv.WithLabelValues(h.lv...).Observe(value) | |
172 | } |
0 | package provider | |
1 | ||
2 | import ( | |
3 | "github.com/go-kit/kit/metrics2" | |
4 | "github.com/go-kit/kit/metrics2/circonus" | |
5 | ) | |
6 | ||
7 | type circonusProvider struct { | |
8 | c *circonus.Circonus | |
9 | } | |
10 | ||
11 | // NewCirconusProvider takes the given Circonnus object and returns a Provider | |
12 | // that produces Circonus metrics. | |
13 | func NewCirconusProvider(c *circonus.Circonus) Provider { | |
14 | return &circonusProvider{ | |
15 | c: c, | |
16 | } | |
17 | } | |
18 | ||
19 | // NewCounter implements Provider. | |
20 | func (p *circonusProvider) NewCounter(name string) metrics.Counter { | |
21 | return p.c.NewCounter(name) | |
22 | } | |
23 | ||
24 | // NewGauge implements Provider. | |
25 | func (p *circonusProvider) NewGauge(name string) metrics.Gauge { | |
26 | return p.c.NewGauge(name) | |
27 | } | |
28 | ||
29 | // NewHistogram implements Provider. The buckets parameter is ignored. | |
30 | func (p *circonusProvider) NewHistogram(name string, _ int) metrics.Histogram { | |
31 | return p.c.NewHistogram(name) | |
32 | } | |
33 | ||
34 | // Stop implements Provider, but is a no-op. | |
35 | func (p *circonusProvider) Stop() {} |
0 | package provider | |
1 | ||
2 | import ( | |
3 | "github.com/go-kit/kit/metrics2" | |
4 | "github.com/go-kit/kit/metrics2/discard" | |
5 | ) | |
6 | ||
7 | type discardProvider struct{} | |
8 | ||
9 | // NewDiscardProvider returns a provider that produces no-op metrics via the | |
10 | // discarding backend. | |
11 | func NewDiscardProvider() Provider { return discardProvider{} } | |
12 | ||
13 | // NewCounter implements Provider. | |
14 | func (discardProvider) NewCounter(string) metrics.Counter { return discard.NewCounter() } | |
15 | ||
16 | // NewGauge implements Provider. | |
17 | func (discardProvider) NewGauge(string) metrics.Gauge { return discard.NewGauge() } | |
18 | ||
19 | // NewHistogram implements Provider. | |
20 | func (discardProvider) NewHistogram(string, int) metrics.Histogram { return discard.NewHistogram() } | |
21 | ||
22 | // Stop implements Provider. | |
23 | func (discardProvider) Stop() {} |
0 | package provider | |
1 | ||
2 | import ( | |
3 | "github.com/go-kit/kit/metrics2" | |
4 | "github.com/go-kit/kit/metrics2/dogstatsd" | |
5 | ) | |
6 | ||
7 | // Dogstatsd | |
8 | type dogstatsdProvider struct { | |
9 | d *dogstatsd.Dogstatsd | |
10 | stop func() | |
11 | } | |
12 | ||
13 | // NewDogstatsdProvider wraps the given Dogstatsd object and stop func and | |
14 | // returns a Provider that produces Dogstatsd metrics. | |
15 | func NewDogstatsdProvider(d *dogstatsd.Dogstatsd, stop func()) Provider { | |
16 | return &dogstatsdProvider{ | |
17 | d: d, | |
18 | stop: stop, | |
19 | } | |
20 | } | |
21 | ||
22 | // NewCounter implements Provider. | |
23 | func (p *dogstatsdProvider) NewCounter(name string) metrics.Counter { | |
24 | return p.d.NewCounter(name) | |
25 | } | |
26 | ||
27 | // NewGauge implements Provider. | |
28 | func (p *dogstatsdProvider) NewGauge(name string) metrics.Gauge { | |
29 | return p.d.NewGauge(name) | |
30 | } | |
31 | ||
32 | // NewHistogram implements Provider, returning a new Dogstatsd Histogram with a | |
33 | // sample rate of 1.0. Buckets are ignored. | |
34 | func (p *dogstatsdProvider) NewHistogram(name string, _ int) metrics.Histogram { | |
35 | return p.d.NewHistogram(name, 1.0) | |
36 | } | |
37 | ||
38 | // Stop implements Provider, invoking the stop function passed at construction. | |
39 | func (p *dogstatsdProvider) Stop() { | |
40 | p.stop() | |
41 | } |
0 | package provider | |
1 | ||
2 | import "github.com/go-kit/kit/metrics2" | |
3 | import "github.com/go-kit/kit/metrics2/expvar" | |
4 | ||
5 | type expvarProvider struct{} | |
6 | ||
7 | // NewExpvarProvider returns a Provider that produces expvar metrics. | |
8 | func NewExpvarProvider() Provider { | |
9 | return expvarProvider{} | |
10 | } | |
11 | ||
12 | // NewCounter implements Provider. | |
13 | func (p expvarProvider) NewCounter(name string) metrics.Counter { | |
14 | return expvar.NewCounter(name) | |
15 | } | |
16 | ||
17 | // NewGauge implements Provider. | |
18 | func (p expvarProvider) NewGauge(name string) metrics.Gauge { | |
19 | return expvar.NewGauge(name) | |
20 | } | |
21 | ||
22 | // NewHistogram implements Provider. | |
23 | func (p expvarProvider) NewHistogram(name string, buckets int) metrics.Histogram { | |
24 | return expvar.NewHistogram(name, buckets) | |
25 | } | |
26 | ||
27 | // Stop implements Provider, but is a no-op. | |
28 | func (p expvarProvider) Stop() {} |
0 | package provider | |
1 | ||
2 | import ( | |
3 | "github.com/go-kit/kit/metrics2" | |
4 | "github.com/go-kit/kit/metrics2/graphite" | |
5 | ) | |
6 | ||
7 | type graphiteProvider struct { | |
8 | g *graphite.Graphite | |
9 | stop func() | |
10 | } | |
11 | ||
12 | // NewGraphiteProvider wraps the given Graphite object and stop func and | |
13 | // returns a Provider that produces Graphite metrics. | |
14 | func NewGraphiteProvider(g *graphite.Graphite, stop func()) Provider { | |
15 | return &graphiteProvider{ | |
16 | g: g, | |
17 | stop: stop, | |
18 | } | |
19 | } | |
20 | ||
21 | // NewCounter implements Provider. | |
22 | func (p *graphiteProvider) NewCounter(name string) metrics.Counter { | |
23 | return p.g.NewCounter(name) | |
24 | } | |
25 | ||
26 | // NewGauge implements Provider. | |
27 | func (p *graphiteProvider) NewGauge(name string) metrics.Gauge { | |
28 | return p.g.NewGauge(name) | |
29 | } | |
30 | ||
31 | // NewHistogram implements Provider. | |
32 | func (p *graphiteProvider) NewHistogram(name string, buckets int) metrics.Histogram { | |
33 | return p.g.NewHistogram(name, buckets) | |
34 | } | |
35 | ||
36 | // Stop implements Provider, invoking the stop function passed at construction. | |
37 | func (p *graphiteProvider) Stop() { | |
38 | p.stop() | |
39 | } |
0 | package provider | |
1 | ||
2 | import ( | |
3 | "github.com/go-kit/kit/metrics2" | |
4 | "github.com/go-kit/kit/metrics2/influx" | |
5 | ) | |
6 | ||
7 | type influxProvider struct { | |
8 | i *influx.Influx | |
9 | stop func() | |
10 | } | |
11 | ||
12 | // NewInfluxProvider takes the given Influx object and stop func, and returns | |
13 | // a Provider that produces Influx metrics. | |
14 | func NewInfluxProvider(i *influx.Influx, stop func()) Provider { | |
15 | return &influxProvider{ | |
16 | i: i, | |
17 | stop: stop, | |
18 | } | |
19 | } | |
20 | ||
21 | // NewCounter implements Provider. Per-metric tags are not supported. | |
22 | func (p *influxProvider) NewCounter(name string) metrics.Counter { | |
23 | return p.i.NewCounter(name, map[string]string{}) | |
24 | } | |
25 | ||
26 | // NewGauge implements Provider. Per-metric tags are not supported. | |
27 | func (p *influxProvider) NewGauge(name string) metrics.Gauge { | |
28 | return p.i.NewGauge(name, map[string]string{}) | |
29 | } | |
30 | ||
31 | // NewHistogram implements Provider. Per-metric tags are not supported. | |
32 | func (p *influxProvider) NewHistogram(name string, buckets int) metrics.Histogram { | |
33 | return p.i.NewHistogram(name, map[string]string{}, buckets) | |
34 | } | |
35 | ||
36 | // Stop implements Provider, invoking the stop function passed at construction. | |
37 | func (p *influxProvider) Stop() { | |
38 | p.stop() | |
39 | } |
0 | package provider | |
1 | ||
2 | import ( | |
3 | stdprometheus "github.com/prometheus/client_golang/prometheus" | |
4 | ||
5 | "github.com/go-kit/kit/metrics2" | |
6 | "github.com/go-kit/kit/metrics2/prometheus" | |
7 | ) | |
8 | ||
9 | type prometheusProvider struct { | |
10 | namespace string | |
11 | subsystem string | |
12 | } | |
13 | ||
14 | // NewPrometheusProvider returns a Provider that produces Prometheus metrics. | |
15 | // Namespace and subsystem are applied to all produced metrics. | |
16 | func NewPrometheusProvider(namespace, subsystem string) Provider { | |
17 | return &prometheusProvider{ | |
18 | namespace: namespace, | |
19 | subsystem: subsystem, | |
20 | } | |
21 | } | |
22 | ||
23 | // NewCounter implements Provider via prometheus.NewCounterFrom, i.e. the | |
24 | // counter is registered. The metric's namespace and subsystem are taken from | |
25 | // the Provider. Help is not set, and no const label names are set. | |
26 | func (p *prometheusProvider) NewCounter(name string) metrics.Counter { | |
27 | return prometheus.NewCounterFrom(stdprometheus.CounterOpts{ | |
28 | Namespace: p.namespace, | |
29 | Subsystem: p.subsystem, | |
30 | Name: name, | |
31 | }, []string{}) | |
32 | } | |
33 | ||
34 | // NewGauge implements Provider via prometheus.NewGaugeFrom, i.e. the gauge is | |
35 | // registered. The metric's namespace and subsystem are taken from the Provider. | |
36 | // Help is not set, and no const label names are set. | |
37 | func (p *prometheusProvider) NewGauge(name string) metrics.Gauge { | |
38 | return prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ | |
39 | Namespace: p.namespace, | |
40 | Subsystem: p.subsystem, | |
41 | Name: name, | |
42 | }, []string{}) | |
43 | } | |
44 | ||
45 | // NewGauge implements Provider via prometheus.NewSummaryFrom, i.e. the summary | |
46 | // is registered. The metric's namespace and subsystem are taken from the | |
47 | // Provider. Help is not set, and no const label names are set. Buckets are ignored. | |
48 | func (p *prometheusProvider) NewHistogram(name string, _ int) metrics.Histogram { | |
49 | return prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ | |
50 | Namespace: p.namespace, | |
51 | Subsystem: p.subsystem, | |
52 | Name: name, | |
53 | }, []string{}) | |
54 | } | |
55 | ||
56 | // Stop implements Provider, but is a no-op. | |
57 | func (p *prometheusProvider) Stop() {} |
0 | // Package provider provides a factory-like abstraction for metrics backends. | |
1 | // This package is provided specifically for the needs of the NY Times framework | |
2 | // Gizmo. Most normal Go kit users shouldn't need to use it. | |
3 | // | |
4 | // Normally, if your microservice needs to support different metrics backends, | |
5 | // you can simply do different construction based on a flag. For example, | |
6 | // | |
7 | // var latency metrics.Histogram | |
8 | // var requests metrics.Counter | |
9 | // switch *metricsBackend { | |
10 | // case "prometheus": | |
11 | // latency = prometheus.NewSummaryVec(...) | |
12 | // requests = prometheus.NewCounterVec(...) | |
13 | // case "statsd": | |
14 | // statsd, stop := statsd.New(...) | |
15 | // defer stop() | |
16 | // latency = statsd.NewHistogram(...) | |
17 | // requests = statsd.NewCounter(...) | |
18 | // default: | |
19 | // log.Fatal("unsupported metrics backend %q", *metricsBackend) | |
20 | // } | |
21 | // | |
22 | package provider | |
23 | ||
24 | import ( | |
25 | "github.com/go-kit/kit/metrics2" | |
26 | ) | |
27 | ||
28 | // Provider abstracts over constructors and lifecycle management functions for | |
29 | // each supported metrics backend. It should only be used by those who need to | |
30 | // swap out implementations, e.g. dynamically, or at a single point in an | |
31 | // intermediating framework. | |
32 | // | |
33 | // This type is primarily useful for intermediating frameworks, and is likely | |
34 | // unnecessary for most Go kit services. See the package-level doc comment for | |
35 | // more typical usage instructions. | |
36 | type Provider interface { | |
37 | NewCounter(name string) metrics.Counter | |
38 | NewGauge(name string) metrics.Gauge | |
39 | NewHistogram(name string, buckets int) metrics.Histogram | |
40 | Stop() | |
41 | } |
0 | package provider | |
1 | ||
2 | import ( | |
3 | "time" | |
4 | ||
5 | "github.com/go-kit/kit/metrics2" | |
6 | "github.com/go-kit/kit/metrics2/statsd" | |
7 | ) | |
8 | ||
9 | type statsdProvider struct { | |
10 | s *statsd.Statsd | |
11 | stop func() | |
12 | } | |
13 | ||
14 | // NewStatsdProvider wraps the given Statsd object and stop func and returns a | |
15 | // Provider that produces Statsd metrics. | |
16 | func NewStatsdProvider(s *statsd.Statsd, stop func()) Provider { | |
17 | return &statsdProvider{ | |
18 | s: s, | |
19 | stop: stop, | |
20 | } | |
21 | } | |
22 | ||
23 | // NewCounter implements Provider. | |
24 | func (p *statsdProvider) NewCounter(name string) metrics.Counter { | |
25 | return p.s.NewCounter(name) | |
26 | } | |
27 | ||
28 | // NewGauge implements Provider. | |
29 | func (p *statsdProvider) NewGauge(name string) metrics.Gauge { | |
30 | return p.s.NewGauge(name) | |
31 | } | |
32 | ||
33 | // NewHistogram implements Provider, returning a Histogram that accepts | |
34 | // observations in seconds, and reports observations to Statsd in milliseconds. | |
35 | // The sample rate is fixed at 1.0. The bucket parameter is ignored. | |
36 | func (p *statsdProvider) NewHistogram(name string, _ int) metrics.Histogram { | |
37 | return p.s.MustNewHistogram(name, time.Second, time.Millisecond, 1.0) | |
38 | } | |
39 | ||
40 | // Stop implements Provider, invoking the stop function passed at construction. | |
41 | func (p *statsdProvider) Stop() { | |
42 | p.stop() | |
43 | } |
0 | // Package statsd implements a statsd backend for package metrics. Metrics are | |
1 | // aggregated and reported in batches, in the StatsD plaintext format. Sampling | |
2 | // is not supported for counters because we aggregate counter updates and send | |
3 | // in batches. Sampling is, however, supported for Timings. | |
4 | // | |
5 | // Batching observations and emitting every few seconds is useful even if you | |
6 | // connect to your StatsD server over UDP. Emitting one network packet per | |
7 | // observation can quickly overwhelm even the fastest internal network. Batching | |
8 | // allows you to more linearly scale with growth. | |
9 | // | |
10 | // Typically you'll create a Statsd object in your main function. | |
11 | // | |
12 | // s, stop := New("myprefix.", "udp", "statsd:8126", time.Second, log.NewNopLogger()) | |
13 | // defer stop() | |
14 | // | |
15 | // Then, create the metrics that your application will track from that object. | |
16 | // Pass them as dependencies to the component that needs them; don't place them | |
17 | // in the global scope. | |
18 | // | |
19 | // requests := s.NewCounter("requests") | |
20 | // depth := s.NewGauge("queue_depth") | |
21 | // fooLatency := s.NewTiming("foo_duration", "ms", 1.0) | |
22 | // barLatency := s.MustNewHistogram("bar_duration", time.Second, time.Millisecond, 1.0) | |
23 | // | |
24 | // Invoke them in your components when you have something to instrument. | |
25 | // | |
26 | // requests.Add(1) | |
27 | // depth.Set(123) | |
28 | // fooLatency.Observe(16) // 16 ms | |
29 | // barLatency.Observe(0.032) // 0.032 sec = 32 ms | |
30 | // | |
31 | package statsd | |
32 | ||
33 | import ( | |
34 | "errors" | |
35 | "fmt" | |
36 | "io" | |
37 | "math/rand" | |
38 | "sync" | |
39 | "time" | |
40 | ||
41 | "github.com/go-kit/kit/log" | |
42 | "github.com/go-kit/kit/metrics2" | |
43 | "github.com/go-kit/kit/metrics2/generic" | |
44 | "github.com/go-kit/kit/util/conn" | |
45 | ) | |
46 | ||
47 | // Statsd is a store for metrics that will be reported to a StatsD server. | |
48 | // Create a Statsd object, use it to create metrics objects, and pass those | |
49 | // objects as dependencies to the components that will use them. | |
50 | // | |
51 | // StatsD has a concept of Timings rather than Histograms. You can create Timing | |
52 | // objects, or create Histograms that wrap Timings under the hood. | |
53 | type Statsd struct { | |
54 | mtx sync.RWMutex | |
55 | prefix string | |
56 | counters map[string]*generic.Counter | |
57 | gauges map[string]*generic.Gauge | |
58 | timings map[string]*Timing | |
59 | logger log.Logger | |
60 | } | |
61 | ||
62 | // New creates a Statsd object that flushes all metrics in the statsd format | |
63 | // every flushInterval to the network and address. Internally it utilizes a | |
64 | // util/conn.Manager and time.Ticker. Use the returned stop function to stop the | |
65 | // ticker and terminate the flushing goroutine. | |
66 | func New(prefix string, network, address string, flushInterval time.Duration, logger log.Logger) (res *Statsd, stop func()) { | |
67 | s := NewRaw(prefix, logger) | |
68 | manager := conn.NewDefaultManager(network, address, logger) | |
69 | ticker := time.NewTicker(flushInterval) | |
70 | go s.FlushTo(manager, ticker) | |
71 | return s, ticker.Stop | |
72 | } | |
73 | ||
74 | // NewRaw creates a Statsd object. By default the metrics will not be emitted | |
75 | // anywhere. Use WriteTo to flush the metrics once, or FlushTo (in a separate | |
76 | // goroutine) to flush them on a regular schedule, or use the New constructor to | |
77 | // set up the object and flushing at the same time. | |
78 | func NewRaw(prefix string, logger log.Logger) *Statsd { | |
79 | return &Statsd{ | |
80 | prefix: prefix, | |
81 | counters: map[string]*generic.Counter{}, | |
82 | gauges: map[string]*generic.Gauge{}, | |
83 | timings: map[string]*Timing{}, | |
84 | logger: logger, | |
85 | } | |
86 | } | |
87 | ||
88 | // NewCounter returns a counter metric with the given name. Adds are buffered | |
89 | // until the underlying Statsd object is flushed. | |
90 | func (s *Statsd) NewCounter(name string) *generic.Counter { | |
91 | s.mtx.Lock() | |
92 | defer s.mtx.Unlock() | |
93 | c := generic.NewCounter() | |
94 | s.counters[s.prefix+name] = c | |
95 | return c | |
96 | } | |
97 | ||
98 | // NewGauge returns a gauge metric with the given name. | |
99 | func (s *Statsd) NewGauge(name string) *generic.Gauge { | |
100 | s.mtx.Lock() | |
101 | defer s.mtx.Unlock() | |
102 | g := generic.NewGauge() | |
103 | s.gauges[s.prefix+name] = g | |
104 | return g | |
105 | } | |
106 | ||
107 | // NewTiming returns a timing metric with the given name, unit (e.g. "ms") and | |
108 | // sample rate. Pass a sample rate of 1.0 or greater to disable sampling. | |
109 | // Sampling is done at observation time. Observations are buffered until the | |
110 | // underlying statsd object is flushed. | |
111 | func (s *Statsd) NewTiming(name, unit string, sampleRate float64) *Timing { | |
112 | s.mtx.Lock() | |
113 | defer s.mtx.Unlock() | |
114 | t := NewTiming(unit, sampleRate) | |
115 | s.timings[s.prefix+name] = t | |
116 | return t | |
117 | } | |
118 | ||
119 | // NewHistogram returns a histogram metric with the given name. Observations are | |
120 | // assumed to be taken in units of observeIn, e.g. time.Second. The histogram | |
121 | // wraps a timing which reports in units of reportIn, e.g. time.Millisecond. | |
122 | // Only nanoseconds, microseconds, milliseconds, and seconds are supported | |
123 | // reportIn values. The underlying timing is sampled according to sampleRate. | |
124 | func (s *Statsd) NewHistogram(name string, observeIn, reportIn time.Duration, sampleRate float64) (metrics.Histogram, error) { | |
125 | s.mtx.Lock() | |
126 | defer s.mtx.Unlock() | |
127 | ||
128 | var unit string | |
129 | switch reportIn { | |
130 | case time.Nanosecond: | |
131 | unit = "ns" | |
132 | case time.Microsecond: | |
133 | unit = "us" | |
134 | case time.Millisecond: | |
135 | unit = "ms" | |
136 | case time.Second: | |
137 | unit = "s" | |
138 | default: | |
139 | return nil, errors.New("unsupported reporting duration") | |
140 | } | |
141 | ||
142 | t := NewTiming(unit, sampleRate) | |
143 | s.timings[s.prefix+name] = t | |
144 | return newHistogram(observeIn, reportIn, t), nil | |
145 | } | |
146 | ||
147 | // MustNewHistogram is a convenience constructor for NewHistogram, which panics | |
148 | // if there is an error. | |
149 | func (s *Statsd) MustNewHistogram(name string, observeIn, reportIn time.Duration, sampleRate float64) metrics.Histogram { | |
150 | h, err := s.NewHistogram(name, observeIn, reportIn, sampleRate) | |
151 | if err != nil { | |
152 | panic(err) | |
153 | } | |
154 | return h | |
155 | } | |
156 | ||
157 | // FlushTo invokes WriteTo to the writer every time the ticker fires. FlushTo | |
158 | // blocks until the ticker is stopped. Most users won't need to call this method | |
159 | // directly, and should prefer to use the New constructor. | |
160 | func (s *Statsd) FlushTo(w io.Writer, ticker *time.Ticker) { | |
161 | for range ticker.C { | |
162 | if _, err := s.WriteTo(w); err != nil { | |
163 | s.logger.Log("during", "flush", "err", err) | |
164 | } | |
165 | } | |
166 | } | |
167 | ||
168 | // WriteTo dumps the current state of all of the metrics to the given writer in | |
169 | // the statsd format. Each metric has its current value(s) written in sequential | |
170 | // calls to Write. Counters and gauges are dumped with their current values; | |
171 | // counters are reset. Timings write each retained observation in sequence, and | |
172 | // are reset. Clients probably shouldn't invoke this method directly, and should | |
173 | // prefer using FlushTo, or the New constructor. | |
174 | func (s *Statsd) WriteTo(w io.Writer) (int64, error) { | |
175 | s.mtx.RLock() | |
176 | defer s.mtx.RUnlock() | |
177 | var ( | |
178 | n int | |
179 | err error | |
180 | count int64 | |
181 | ) | |
182 | for name, c := range s.counters { | |
183 | n, err = fmt.Fprintf(w, "%s:%f|c\n", name, c.ValueReset()) | |
184 | count += int64(n) | |
185 | if err != nil { | |
186 | return count, err | |
187 | } | |
188 | } | |
189 | for name, g := range s.gauges { | |
190 | n, err = fmt.Fprintf(w, "%s:%f|g\n", name, g.Value()) | |
191 | count += int64(n) | |
192 | if err != nil { | |
193 | return count, err | |
194 | } | |
195 | } | |
196 | for name, t := range s.timings { | |
197 | var sampling string | |
198 | if r := t.SampleRate(); r < 1.0 { | |
199 | sampling = fmt.Sprintf("|@%f", r) | |
200 | } | |
201 | for _, value := range t.Values() { | |
202 | n, err = fmt.Fprintf(w, "%s:%d|%s%s\n", name, value, t.Unit(), sampling) | |
203 | count += int64(n) | |
204 | if err != nil { | |
205 | return count, err | |
206 | } | |
207 | } | |
208 | } | |
209 | return count, nil | |
210 | } | |
211 | ||
212 | // Timing is like a histogram that's always assumed to represent time. It also | |
213 | // has a different implementation to typical histograms in this package. StatsD | |
214 | // expects you to emit each observation to the aggregation server, and they do | |
215 | // statistical processing there. This is easier to understand, but much (much) | |
216 | // less efficient. So, we batch observations and emit the batch every interval. | |
217 | // And we support sampling, at observation time. | |
218 | type Timing struct { | |
219 | mtx sync.Mutex | |
220 | unit string | |
221 | sampleRate float64 | |
222 | values []int64 | |
223 | lvs []string // immutable | |
224 | } | |
225 | ||
226 | // NewTiming returns a new Timing object with the given units (e.g. "ms") and | |
227 | // sample rate. If sample rate >= 1.0, no sampling will be performed. This | |
228 | // function is exported only so that it can be used by package dogstatsd. As a | |
229 | // user, if you want a timing, you probably want to create it through the Statsd | |
230 | // object. | |
231 | func NewTiming(unit string, sampleRate float64) *Timing { | |
232 | return &Timing{ | |
233 | unit: unit, | |
234 | sampleRate: sampleRate, | |
235 | } | |
236 | } | |
237 | ||
238 | // With returns a new timing with the label values applies. Label values aren't | |
239 | // supported in Statsd, but they are supported in DogStatsD, which uses the same | |
240 | // Timing type. | |
241 | func (t *Timing) With(labelValues ...string) *Timing { | |
242 | if len(labelValues)%2 != 0 { | |
243 | labelValues = append(labelValues, generic.LabelValueUnknown) | |
244 | } | |
245 | return &Timing{ | |
246 | unit: t.unit, | |
247 | sampleRate: t.sampleRate, | |
248 | values: t.values, | |
249 | lvs: append(t.lvs, labelValues...), | |
250 | } | |
251 | } | |
252 | ||
253 | // LabelValues returns the current set of label values. Label values aren't | |
254 | // supported in Statsd, but they are supported in DogStatsD, which uses the same | |
255 | // Timing type. | |
256 | func (t *Timing) LabelValues() []string { | |
257 | return t.lvs | |
258 | } | |
259 | ||
260 | // Observe collects the value into the timing. If sample rate is less than 1.0, | |
261 | // sampling is performed, and the value may be dropped. | |
262 | func (t *Timing) Observe(value int64) { | |
263 | // Here we sample at observation time. This burns not-insignificant CPU in | |
264 | // the rand.Float64 call. It may be preferable to aggregate all observations | |
265 | // and sample at emission time. But that is a bit tricker to do correctly. | |
266 | if t.sampleRate < 1.0 && rand.Float64() > t.sampleRate { | |
267 | return | |
268 | } | |
269 | ||
270 | t.mtx.Lock() | |
271 | defer t.mtx.Unlock() | |
272 | t.values = append(t.values, value) | |
273 | } | |
274 | ||
275 | // Values returns the observed values since the last call to Values. This method | |
276 | // clears the internal state of the Timing; better get those values somewhere | |
277 | // safe! | |
278 | func (t *Timing) Values() []int64 { | |
279 | t.mtx.Lock() | |
280 | defer t.mtx.Unlock() | |
281 | res := t.values | |
282 | t.values = []int64{} // TODO(pb): if GC is a problem, this can be improved | |
283 | return res | |
284 | } | |
285 | ||
286 | // Unit returns the unit, e.g. "ms". | |
287 | func (t *Timing) Unit() string { return t.unit } | |
288 | ||
289 | // SampleRate returns the sample rate, e.g. 0.01 or 1.0. | |
290 | func (t *Timing) SampleRate() float64 { return t.sampleRate } | |
291 | ||
292 | // histogram wraps a Timing and implements Histogram. Namely, it takes float64 | |
293 | // observations and converts them to int64 according to a defined ratio, likely | |
294 | // with a loss of precision. | |
295 | type histogram struct { | |
296 | m float64 | |
297 | t *Timing | |
298 | lvs []string | |
299 | } | |
300 | ||
301 | func newHistogram(observeIn, reportIn time.Duration, t *Timing) *histogram { | |
302 | return &histogram{ | |
303 | m: float64(observeIn) / float64(reportIn), | |
304 | t: t, | |
305 | } | |
306 | } | |
307 | ||
308 | func (h *histogram) With(labelValues ...string) metrics.Histogram { | |
309 | if len(labelValues)%2 != 0 { | |
310 | labelValues = append(labelValues, generic.LabelValueUnknown) | |
311 | } | |
312 | return &histogram{ | |
313 | m: h.m, | |
314 | t: h.t, | |
315 | lvs: append(h.lvs, labelValues...), | |
316 | } | |
317 | } | |
318 | ||
319 | func (h *histogram) Observe(value float64) { | |
320 | h.t.Observe(int64(h.m * value)) | |
321 | } |
0 | package statsd | |
1 | ||
2 | import ( | |
3 | "testing" | |
4 | "time" | |
5 | ) | |
6 | ||
7 | func TestHistogramAdapter(t *testing.T) { | |
8 | for _, testcase := range []struct { | |
9 | observeIn time.Duration | |
10 | reportIn time.Duration | |
11 | unit string | |
12 | input float64 | |
13 | want int64 | |
14 | }{ | |
15 | {time.Second, time.Second, "s", 0.10, 0}, | |
16 | {time.Second, time.Second, "s", 1.01, 1}, | |
17 | {time.Second, time.Millisecond, "ms", 1.23, 1230}, | |
18 | {time.Millisecond, time.Microsecond, "us", 123, 123000}, | |
19 | } { | |
20 | tm := NewTiming(testcase.unit, 1.0) | |
21 | h := newHistogram(testcase.observeIn, testcase.reportIn, tm) | |
22 | h.Observe(testcase.input) | |
23 | if want, have := testcase.want, tm.Values()[0]; want != have { | |
24 | t.Errorf("Observe(%.2f %s): want %d, have %d", testcase.input, testcase.unit, want, have) | |
25 | } | |
26 | } | |
27 | } |
0 | 0 | package conn |
1 | 1 | |
2 | 2 | import ( |
3 | "errors" | |
3 | 4 | "net" |
4 | 5 | "time" |
5 | 6 | |
33 | 34 | |
34 | 35 | // NewManager returns a connection manager using the passed Dialer, network, and |
35 | 36 | // address. The AfterFunc is used to control exponential backoff and retries. |
36 | // For normal use, pass net.Dial and time.After as the Dialer and AfterFunc | |
37 | // respectively. The logger is used to log errors; pass a log.NopLogger if you | |
38 | // don't care to receive them. | |
37 | // The logger is used to log errors; pass a log.NopLogger if you don't care to | |
38 | // receive them. For normal use, prefer NewDefaultManager. | |
39 | 39 | func NewManager(d Dialer, network, address string, after AfterFunc, logger log.Logger) *Manager { |
40 | 40 | m := &Manager{ |
41 | 41 | dialer: d, |
51 | 51 | return m |
52 | 52 | } |
53 | 53 | |
54 | // NewDefaultManager is a helper constructor, suitable for most normal use in | |
55 | // real (non-test) code. It uses the real net.Dial and time.After functions. | |
56 | func NewDefaultManager(network, address string, logger log.Logger) *Manager { | |
57 | return NewManager(net.Dial, network, address, time.After, logger) | |
58 | } | |
59 | ||
54 | 60 | // Take yields the current connection. It may be nil. |
55 | 61 | func (m *Manager) Take() net.Conn { |
56 | 62 | return <-m.takec |
61 | 67 | // to reconnect, with exponential backoff. Putting a nil error is a no-op. |
62 | 68 | func (m *Manager) Put(err error) { |
63 | 69 | m.putc <- err |
70 | } | |
71 | ||
72 | // Write writes the passed data to the connection in a single Take/Put cycle. | |
73 | func (m *Manager) Write(b []byte) (int, error) { | |
74 | conn := m.Take() | |
75 | if conn == nil { | |
76 | return 0, ErrConnectionUnavailable | |
77 | } | |
78 | n, err := conn.Write(b) | |
79 | defer m.Put(err) | |
80 | return n, err | |
64 | 81 | } |
65 | 82 | |
66 | 83 | func (m *Manager) loop() { |
121 | 138 | } |
122 | 139 | return d |
123 | 140 | } |
141 | ||
142 | // ErrConnectionUnavailable is returned by the Manager's Write method when the | |
143 | // manager cannot yield a good connection. | |
144 | var ErrConnectionUnavailable = errors.New("connection unavailable") |