Codebase list golang-github-go-kit-kit / e9da08e
adding metrics/graphite package JP Robinson 7 years ago
3 changed file(s) with 437 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
0 // Package graphite implements a graphite backend for package metrics.
1 //
2 // The current implementation ignores fields.
3 package graphite
4
5 import (
6 "bufio"
7 "fmt"
8 "io"
9 "log"
10 "math"
11 "net"
12 "sort"
13 "sync"
14 "time"
15
16 "sync/atomic"
17
18 "github.com/codahale/hdrhistogram"
19 "github.com/go-kit/kit/metrics"
20 )
21
22 // Emitter will keep track of all metrics and, once started,
23 // will emit the metrics via the Flush method to the given io.Writer.
24 type Emitter interface {
25 NewCounter(string) metrics.Counter
26 NewHistogram(string, int64, int64, int, ...int) metrics.Histogram
27 NewTimeHistogram(string, time.Duration, int64, int64, int, ...int) metrics.TimeHistogram
28 NewGauge(string) metrics.Gauge
29
30 Start(time.Duration)
31 Flush() error
32 }
33
34 type emitter struct {
35 addr *net.TCPAddr
36 prefix string
37
38 metricMu *sync.Mutex
39 counters []*counter
40 histograms []*windowedHistogram
41 gauges []*gauge
42 }
43
44 // NewEmitter will return an Emitter that will prefix all
45 // metrics names with the given prefix. Once started, it will attempt to create
46 // a TCP connection with the given address and most metrics to the connection
47 // in a Graphite-compatible format.
48 func NewEmitter(addr *net.TCPAddr, prefix string) Emitter {
49 e := &emitter{
50 addr, prefix, &sync.Mutex{},
51 []*counter{}, []*windowedHistogram{}, []*gauge{},
52 }
53
54 return e
55 }
56
57 // NewCounter returns a Counter whose value will be periodically emitted in
58 // a Graphite-compatible format once the Emitter is started. Fields are ignored.
59 func (e *emitter) NewCounter(name string) metrics.Counter {
60 // only one flush at a time
61 e.metricMu.Lock()
62 defer e.metricMu.Unlock()
63 c := &counter{name, 0}
64 e.counters = append(e.counters, c)
65 return c
66 }
67
68 // NewHistogram is taken from http://github.com/codahale/metrics. It returns a
69 // windowed HDR histogram which drops data older than five minutes.
70 //
71 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
72 // should be integers in the range 1..99. The gauge names are assigned by
73 // using the passed name as a prefix and appending "_pNN" e.g. "_p50".
74 //
75 // The values of this histogram will be periodically emitted in a Graphite-compatible
76 // format once the Emitter is started. Fields are ignored.
77 func (e *emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) metrics.Histogram {
78 // only one flush at a time
79 e.metricMu.Lock()
80 defer e.metricMu.Unlock()
81
82 gauges := map[int]metrics.Gauge{}
83 for _, quantile := range quantiles {
84 if quantile <= 0 || quantile >= 100 {
85 panic(fmt.Sprintf("invalid quantile %d", quantile))
86 }
87 gauges[quantile] = e.gauge(fmt.Sprintf("%s_p%02d", name, quantile))
88 }
89 h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges)
90 e.histograms = append(e.histograms, h)
91 return h
92 }
93
94 // NewTimeHistogram returns a TimeHistogram wrapper around the windowed
95 // HDR histrogram provided by this package.
96 func (e *emitter) NewTimeHistogram(name string, unit time.Duration, minValue, maxValue int64, sigfigs int, quantiles ...int) metrics.TimeHistogram {
97 h := e.NewHistogram(name, minValue, maxValue, sigfigs, quantiles...)
98 return metrics.NewTimeHistogram(unit, h)
99 }
100
101 // NewGauge returns a Gauge whose value will be periodically emitted in
102 // a Graphite-compatible format once the Emitter is started. Fields are ignored.
103 func (e *emitter) NewGauge(name string) metrics.Gauge {
104 // only one flush at a time
105 e.metricMu.Lock()
106 defer e.metricMu.Unlock()
107 return e.gauge(name)
108 }
109
110 func (e *emitter) gauge(name string) metrics.Gauge {
111 g := &gauge{name, 0}
112 e.gauges = append(e.gauges, g)
113 return g
114 }
115
116 // Start will kick off a background goroutine to
117 // call Flush once every interval.
118 func (e *emitter) Start(interval time.Duration) {
119 go func() {
120 t := time.Tick(interval)
121 for range t {
122 err := e.Flush()
123 if err != nil {
124 log.Print("error: could not dial graphite host: ", err)
125 continue
126 }
127 }
128 }()
129 }
130
131 // Flush will attempt to create a connection with the given address
132 // and write the current metrics to it in a Graphite-compatible format.
133 //
134 // Users can call this method on process shutdown to ensure
135 // the current metrics are pushed to Graphite.
136 func (e *emitter) Flush() error {
137 // open connection
138 conn, err := net.DialTCP("tcp", nil, e.addr)
139 if err != nil {
140 return err
141 }
142
143 // flush stats to connection
144 e.flush(conn)
145
146 // close connection
147 conn.Close()
148 return nil
149 }
150
151 func (e *emitter) flush(conn io.Writer) {
152 // only one flush at a time
153 e.metricMu.Lock()
154 defer e.metricMu.Unlock()
155
156 // buffer the writer and make sure to flush it
157 w := bufio.NewWriter(conn)
158 defer w.Flush()
159
160 now := time.Now().Unix()
161
162 // emit counter stats
163 for _, c := range e.counters {
164 fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, c.Name(), c.count, now)
165 }
166
167 // emit histogram specific stats
168 for _, h := range e.histograms {
169 hist := h.hist.Merge()
170 fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, h.Name(), hist.TotalCount(), now)
171 fmt.Fprintf(w, "%s.%s.min %d %d\n", e.prefix, h.Name(), hist.Min(), now)
172 fmt.Fprintf(w, "%s.%s.max %d %d\n", e.prefix, h.Name(), hist.Max(), now)
173 fmt.Fprintf(w, "%s.%s.mean %.2f %d\n", e.prefix, h.Name(), hist.Mean(), now)
174 fmt.Fprintf(w, "%s.%s.std-dev %.2f %d\n", e.prefix, h.Name(), hist.StdDev(), now)
175 }
176
177 // emit gauge stats (which can include some histogram quantiles)
178 for _, g := range e.gauges {
179 fmt.Fprintf(w, "%s.%s %.2f %d\n", e.prefix, g.Name(), g.Get(), now)
180 }
181 }
182
183 type counter struct {
184 key string
185 count uint64
186 }
187
188 func (c *counter) Name() string { return c.key }
189
190 func (c *counter) With(metrics.Field) metrics.Counter { return c }
191
192 func (c *counter) Add(delta uint64) { atomic.AddUint64(&c.count, delta) }
193
194 type gauge struct {
195 key string
196 value uint64 // math.Float64bits
197 }
198
199 func (g *gauge) Name() string { return g.key }
200
201 func (g *gauge) With(metrics.Field) metrics.Gauge { return g }
202
203 func (g *gauge) Add(delta float64) {
204 for {
205 old := atomic.LoadUint64(&g.value)
206 new := math.Float64bits(math.Float64frombits(old) + delta)
207 if atomic.CompareAndSwapUint64(&g.value, old, new) {
208 return
209 }
210 }
211 }
212
213 func (g *gauge) Set(value float64) {
214 atomic.StoreUint64(&g.value, math.Float64bits(value))
215 }
216
217 func (g *gauge) Get() float64 {
218 return math.Float64frombits(atomic.LoadUint64(&g.value))
219 }
220
221 type windowedHistogram struct {
222 mu sync.Mutex
223 hist *hdrhistogram.WindowedHistogram
224
225 name string
226 gauges map[int]metrics.Gauge
227 }
228
229 // NewWindowedHistogram is taken from http://github.com/codahale/metrics. It returns a
230 // windowed HDR histogram which drops data older than five minutes.
231 //
232 // The histogram exposes metrics for each passed quantile as gauges. Users are expected
233 // to provide their own set of Gauges for quantiles to make this Histogram work across multiple
234 // metrics providers.
235 func newWindowedHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles map[int]metrics.Gauge) *windowedHistogram {
236 h := &windowedHistogram{
237 hist: hdrhistogram.NewWindowed(5, minValue, maxValue, sigfigs),
238 name: name,
239 gauges: quantiles,
240 }
241 go h.rotateLoop(1 * time.Minute)
242 return h
243 }
244
245 func (h *windowedHistogram) Name() string { return h.name }
246 func (h *windowedHistogram) With(metrics.Field) metrics.Histogram { return h }
247
248 func (h *windowedHistogram) Observe(value int64) {
249 h.mu.Lock()
250 err := h.hist.Current.RecordValue(value)
251 h.mu.Unlock()
252
253 if err != nil {
254 panic(err.Error())
255 }
256
257 for q, gauge := range h.gauges {
258 gauge.Set(float64(h.hist.Current.ValueAtQuantile(float64(q))))
259 }
260 }
261
262 func (h *windowedHistogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
263 bars := h.hist.Merge().Distribution()
264 buckets := make([]metrics.Bucket, len(bars))
265 for i, bar := range bars {
266 buckets[i] = metrics.Bucket{
267 From: bar.From,
268 To: bar.To,
269 Count: bar.Count,
270 }
271 }
272 quantiles := make([]metrics.Quantile, 0, len(h.gauges))
273 for quantile, gauge := range h.gauges {
274 quantiles = append(quantiles, metrics.Quantile{
275 Quantile: quantile,
276 Value: int64(gauge.Get()),
277 })
278 }
279 sort.Sort(quantileSlice(quantiles))
280 return buckets, quantiles
281 }
282
283 func (h *windowedHistogram) rotateLoop(d time.Duration) {
284 for range time.Tick(d) {
285 h.mu.Lock()
286 h.hist.Rotate()
287 h.mu.Unlock()
288 }
289 }
290
291 type quantileSlice []metrics.Quantile
292
293 func (a quantileSlice) Len() int { return len(a) }
294 func (a quantileSlice) Less(i, j int) bool { return a[i].Quantile < a[j].Quantile }
295 func (a quantileSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
0 package graphite
1
2 import (
3 "bytes"
4 "fmt"
5 "strings"
6 "testing"
7
8 "github.com/go-kit/kit/metrics"
9 "github.com/go-kit/kit/metrics/teststat"
10 )
11
12 func TestHistogramQuantiles(t *testing.T) {
13 prefix := "prefix"
14 e := NewEmitter(nil, prefix)
15 var (
16 name = "test_histogram_quantiles"
17 quantiles = []int{50, 90, 95, 99}
18 h = e.NewHistogram(name, 0, 100, 3, quantiles...).With(metrics.Field{Key: "ignored", Value: "field"})
19 )
20 const seed, mean, stdev int64 = 424242, 50, 10
21 teststat.PopulateNormalHistogram(t, h, seed, mean, stdev)
22
23 // flush the current metrics into a buffer to examine
24 var b bytes.Buffer
25 e.(*emitter).flush(&b)
26 teststat.AssertGraphiteNormalHistogram(t, prefix, name, mean, stdev, quantiles, b.String())
27 }
28
29 func TestCounter(t *testing.T) {
30 var (
31 prefix = "prefix"
32 name = "m"
33 value = 123
34 e = NewEmitter(nil, prefix)
35 b bytes.Buffer
36 )
37 e.NewCounter(name).With(metrics.Field{Key: "ignored", Value: "field"}).Add(uint64(value))
38 e.(*emitter).flush(&b)
39 want := fmt.Sprintf("%s.%s.count %d", prefix, name, value)
40 payload := b.String()
41 if !strings.HasPrefix(payload, want) {
42 t.Errorf("counter %s want\n%s, have\n%s", name, want, payload)
43 }
44 }
45
46 func TestGauge(t *testing.T) {
47 var (
48 prefix = "prefix"
49 name = "xyz"
50 value = 54321
51 delta = 12345
52 e = NewEmitter(nil, prefix)
53 b bytes.Buffer
54 g = e.NewGauge(name).With(metrics.Field{Key: "ignored", Value: "field"})
55 )
56
57 g.Set(float64(value))
58 g.Add(float64(delta))
59
60 e.(*emitter).flush(&b)
61 payload := b.String()
62
63 want := fmt.Sprintf("%s.%s %d", prefix, name, value+delta)
64 if !strings.HasPrefix(payload, want) {
65 t.Errorf("gauge %s want\n%s, have\n%s", name, want, payload)
66 }
67 }
68
69 func TestInvalidQuantile(t *testing.T) {
70 e := NewEmitter(nil, "")
71 defer func() {
72 if err := recover(); err == nil {
73 t.Errorf("expected panic, got none")
74 } else {
75 t.Logf("got expected panic: %v", err)
76 }
77 }()
78 e.NewHistogram("foo", 0.0, 100.0, 3, 50, 90, 95, 99, 101)
79 }
0 package teststat
1
2 import (
3 "fmt"
4 "math"
5 "regexp"
6 "strconv"
7 "testing"
8 )
9
10 // AssertGraphiteNormalHistogram ensures the expvar Histogram referenced by
11 // metricName abides a normal distribution.
12 func AssertGraphiteNormalHistogram(t *testing.T, prefix, metricName string, mean, stdev int64, quantiles []int, gPayload string) {
13 const tolerance int = 2
14
15 // check for hdr histo data
16 wants := map[string]int64{"count": 1234, "min": 15, "max": 83, "std-dev": stdev, "mean": mean}
17 for key, want := range wants {
18 re := regexp.MustCompile(fmt.Sprintf("%s.%s.%s (\\d*)", prefix, metricName, key))
19 if res := re.FindAllStringSubmatch(gPayload, 1); res != nil {
20 if len(res[0]) == 1 {
21 t.Errorf("bad regex found, please check the test scenario")
22 continue
23 }
24
25 have, err := strconv.ParseInt(res[0][1], 10, 64)
26 if err != nil {
27 t.Fatal(err)
28 }
29
30 if int(math.Abs(float64(want-have))) > tolerance {
31 t.Errorf("key %s: want %d, have %d", key, want, have)
32 }
33 } else {
34 t.Error("did not find metrics log for", key, "in \n", gPayload)
35 }
36 }
37
38 // check for quantile gauges
39 for _, quantile := range quantiles {
40 want := normalValueAtQuantile(mean, stdev, quantile)
41
42 re := regexp.MustCompile(fmt.Sprintf("%s.%s_p%02d (\\d*\\.\\d*)", prefix, metricName, quantile))
43 if res := re.FindAllStringSubmatch(gPayload, 1); res != nil {
44 if len(res[0]) == 1 {
45 t.Errorf("bad regex found, please check the test scenario")
46 continue
47 }
48 have, err := strconv.ParseFloat(res[0][1], 64)
49 if err != nil {
50 t.Fatal(err)
51 }
52 if int(math.Abs(float64(want)-have)) > tolerance {
53 t.Errorf("quantile %d: want %.2f, have %.2f", quantile, want, have)
54 }
55 } else {
56 t.Errorf("did not find metrics log for %d", quantile)
57 }
58
59 }
60 }