Codebase list golang-github-go-kit-kit / f5b2f6c
Add DogStatsD metrics backend for go-kit Scott Kidder 8 years ago
2 changed file(s) with 398 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
0 // Package dogstatsd implements a DogStatsD backend for package metrics.
1 //
2 // This implementation supports Datadog tags that provide additional metric
3 // filtering capabilities. See the DogStatsD documentation for protocol
4 // specifics:
5 // http://docs.datadoghq.com/guides/dogstatsd/
6 //
7 package dogstatsd
8
9 import (
10 "bytes"
11 "fmt"
12 "io"
13 "log"
14 "math"
15 "time"
16
17 "sync/atomic"
18
19 "github.com/go-kit/kit/metrics"
20 )
21
22 // dogstatsd metrics were based on the statsd package in go-kit
23
24 const maxBufferSize = 1400 // bytes
25
26 type dogstatsdCounter struct {
27 key string
28 c chan string
29 tags []metrics.Field
30 }
31
32 // NewCounter returns a Counter that emits observations in the DogStatsD protocol
33 // to the passed writer. Observations are buffered for the report interval or
34 // until the buffer exceeds a max packet size, whichever comes first.
35 //
36 // TODO: support for sampling.
37 func NewCounter(w io.Writer, key string, reportInterval time.Duration, globalTags []metrics.Field) metrics.Counter {
38 return NewCounterTick(w, key, time.Tick(reportInterval), globalTags)
39 }
40
41 // NewCounterTick is the same as NewCounter, but allows the user to pass in a
42 // ticker channel instead of invoking time.Tick.
43 func NewCounterTick(w io.Writer, key string, reportTicker <-chan time.Time, tags []metrics.Field) metrics.Counter {
44 c := &dogstatsdCounter{
45 key: key,
46 c: make(chan string),
47 tags: tags,
48 }
49 go fwd(w, key, reportTicker, c.c)
50 return c
51 }
52
53 func (c *dogstatsdCounter) Name() string { return c.key }
54
55 func (c *dogstatsdCounter) With(f metrics.Field) metrics.Counter {
56 return &dogstatsdCounter{
57 key: c.key,
58 c: c.c,
59 tags: append(c.tags, f),
60 }
61 }
62
63 func (c *dogstatsdCounter) Add(delta uint64) { c.c <- applyTags(fmt.Sprintf("%d|c", delta), c.tags) }
64
65 type dogstatsdGauge struct {
66 key string
67 lastValue uint64 // math.Float64frombits
68 g chan string
69 tags []metrics.Field
70 }
71
72 // NewGauge returns a Gauge that emits values in the DogStatsD protocol to the
73 // passed writer. Values are buffered for the report interval or until the
74 // buffer exceeds a max packet size, whichever comes first.
75 //
76 // TODO: support for sampling.
77 func NewGauge(w io.Writer, key string, reportInterval time.Duration, tags []metrics.Field) metrics.Gauge {
78 return NewGaugeTick(w, key, time.Tick(reportInterval), tags)
79 }
80
81 // NewGaugeTick is the same as NewGauge, but allows the user to pass in a ticker
82 // channel instead of invoking time.Tick.
83 func NewGaugeTick(w io.Writer, key string, reportTicker <-chan time.Time, tags []metrics.Field) metrics.Gauge {
84 g := &dogstatsdGauge{
85 key: key,
86 g: make(chan string),
87 tags: tags,
88 }
89 go fwd(w, key, reportTicker, g.g)
90 return g
91 }
92
93 func (g *dogstatsdGauge) Name() string { return g.key }
94
95 func (g *dogstatsdGauge) With(f metrics.Field) metrics.Gauge {
96 return &dogstatsdGauge{
97 key: g.key,
98 lastValue: g.lastValue,
99 g: g.g,
100 tags: append(g.tags, f),
101 }
102 }
103
104 func (g *dogstatsdGauge) Add(delta float64) {
105 // https://github.com/etsy/statsd/blob/master/docs/metric_types.md#gauges
106 sign := "+"
107 if delta < 0 {
108 sign, delta = "-", -delta
109 }
110 g.g <- applyTags(fmt.Sprintf("%s%f|g", sign, delta), g.tags)
111 }
112
113 func (g *dogstatsdGauge) Set(value float64) {
114 atomic.StoreUint64(&g.lastValue, math.Float64bits(value))
115 g.g <- applyTags(fmt.Sprintf("%f|g", value), g.tags)
116 }
117
118 func (g *dogstatsdGauge) Get() float64 {
119 return math.Float64frombits(atomic.LoadUint64(&g.lastValue))
120 }
121
122 // NewCallbackGauge emits values in the DogStatsD protocol to the passed writer.
123 // It collects values every scrape interval from the callback. Values are
124 // buffered for the report interval or until the buffer exceeds a max packet
125 // size, whichever comes first. The report and scrape intervals may be the
126 // same. The callback determines the value, and fields are ignored, so
127 // NewCallbackGauge returns nothing.
128 func NewCallbackGauge(w io.Writer, key string, reportInterval, scrapeInterval time.Duration, callback func() float64) {
129 NewCallbackGaugeTick(w, key, time.Tick(reportInterval), time.Tick(scrapeInterval), callback)
130 }
131
132 // NewCallbackGaugeTick is the same as NewCallbackGauge, but allows the user to
133 // pass in ticker channels instead of durations to control report and scrape
134 // intervals.
135 func NewCallbackGaugeTick(w io.Writer, key string, reportTicker, scrapeTicker <-chan time.Time, callback func() float64) {
136 go fwd(w, key, reportTicker, emitEvery(scrapeTicker, callback))
137 }
138
139 func emitEvery(emitTicker <-chan time.Time, callback func() float64) <-chan string {
140 c := make(chan string)
141 go func() {
142 for range emitTicker {
143 c <- fmt.Sprintf("%f|g", callback())
144 }
145 }()
146 return c
147 }
148
149 type dogstatsdHistogram struct {
150 key string
151 h chan string
152 tags []metrics.Field
153 }
154
155 // NewHistogram returns a Histogram that emits observations in the DogStatsD
156 // protocol to the passed writer. Observations are buffered for the reporting
157 // interval or until the buffer exceeds a max packet size, whichever comes
158 // first.
159 //
160 // NewHistogram is mapped to a statsd Timing, so observations should represent
161 // milliseconds. If you observe in units of nanoseconds, you can make the
162 // translation with a ScaledHistogram:
163 //
164 // NewScaledHistogram(dogstatsdHistogram, time.Millisecond)
165 //
166 // You can also enforce the constraint in a typesafe way with a millisecond
167 // TimeHistogram:
168 //
169 // NewTimeHistogram(dogstatsdHistogram, time.Millisecond)
170 //
171 // TODO: support for sampling.
172 func NewHistogram(w io.Writer, key string, reportInterval time.Duration, tags []metrics.Field) metrics.Histogram {
173 return NewHistogramTick(w, key, time.Tick(reportInterval), tags)
174 }
175
176 // NewHistogramTick is the same as NewHistogram, but allows the user to pass a
177 // ticker channel instead of invoking time.Tick.
178 func NewHistogramTick(w io.Writer, key string, reportTicker <-chan time.Time, tags []metrics.Field) metrics.Histogram {
179 h := &dogstatsdHistogram{
180 key: key,
181 h: make(chan string),
182 tags: tags,
183 }
184 go fwd(w, key, reportTicker, h.h)
185 return h
186 }
187
188 func (h *dogstatsdHistogram) Name() string { return h.key }
189
190 func (h *dogstatsdHistogram) With(f metrics.Field) metrics.Histogram {
191 return &dogstatsdHistogram{
192 key: h.key,
193 h: h.h,
194 tags: append(h.tags, f),
195 }
196 }
197
198 func (h *dogstatsdHistogram) Observe(value int64) {
199 h.h <- applyTags(fmt.Sprintf("%d|ms", value), h.tags)
200 }
201
202 func (h *dogstatsdHistogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
203 // TODO(pb): no way to do this without introducing e.g. codahale/hdrhistogram
204 return []metrics.Bucket{}, []metrics.Quantile{}
205 }
206
207 func fwd(w io.Writer, key string, reportTicker <-chan time.Time, c <-chan string) {
208 buf := &bytes.Buffer{}
209 for {
210 select {
211 case s := <-c:
212 fmt.Fprintf(buf, "%s:%s\n", key, s)
213 if buf.Len() > maxBufferSize {
214 flush(w, buf)
215 }
216
217 case <-reportTicker:
218 flush(w, buf)
219 }
220 }
221 }
222
223 func flush(w io.Writer, buf *bytes.Buffer) {
224 if buf.Len() <= 0 {
225 return
226 }
227 if _, err := w.Write(buf.Bytes()); err != nil {
228 log.Printf("error: could not write to dogstatsd: %v", err)
229 }
230 buf.Reset()
231 }
232
233 func applyTags(value string, tags []metrics.Field) string {
234 if len(tags) > 0 {
235 var tagsString string
236 for _, t := range tags {
237 switch tagsString {
238 case "":
239 tagsString = fmt.Sprintf("%s:%s", t.Key, t.Value)
240 default:
241 tagsString = fmt.Sprintf("%s,%s:%s", tagsString, t.Key, t.Value)
242 }
243 }
244 value = fmt.Sprintf("%s|#%s", value, tagsString)
245 }
246 return value
247 }
0 package dogstatsd
1
2 import (
3 "bytes"
4 "fmt"
5 "github.com/go-kit/kit/metrics"
6 "strings"
7 "sync"
8 "testing"
9 "time"
10 )
11
12 func TestCounter(t *testing.T) {
13 buf := &syncbuf{buf: &bytes.Buffer{}}
14 reportc := make(chan time.Time)
15 tags := []metrics.Field{}
16 c := NewCounterTick(buf, "test_statsd_counter", reportc, tags)
17
18 c.Add(1)
19 c.With(metrics.Field{"foo", "bar"}).Add(2)
20 c.With(metrics.Field{"foo", "bar"}).With(metrics.Field{"abc", "123"}).Add(2)
21 c.Add(3)
22
23 want, have := "test_statsd_counter:1|c\ntest_statsd_counter:2|c|#foo:bar\ntest_statsd_counter:2|c|#foo:bar,abc:123\ntest_statsd_counter:3|c\n", ""
24 by(t, 100*time.Millisecond, func() bool {
25 have = buf.String()
26 return want == have
27 }, func() {
28 reportc <- time.Now()
29 }, fmt.Sprintf("want %q, have %q", want, have))
30 }
31
32 func TestGauge(t *testing.T) {
33 buf := &syncbuf{buf: &bytes.Buffer{}}
34 reportc := make(chan time.Time)
35 tags := []metrics.Field{}
36 g := NewGaugeTick(buf, "test_statsd_gauge", reportc, tags)
37
38 delta := 1.0
39 g.Add(delta)
40
41 want, have := fmt.Sprintf("test_statsd_gauge:+%f|g\n", delta), ""
42 by(t, 100*time.Millisecond, func() bool {
43 have = buf.String()
44 return want == have
45 }, func() {
46 reportc <- time.Now()
47 }, fmt.Sprintf("want %q, have %q", want, have))
48
49 buf.Reset()
50 delta = -2.0
51 g.With(metrics.Field{"foo", "bar"}).Add(delta)
52
53 want, have = fmt.Sprintf("test_statsd_gauge:%f|g|#foo:bar\n", delta), ""
54 by(t, 100*time.Millisecond, func() bool {
55 have = buf.String()
56 return want == have
57 }, func() {
58 reportc <- time.Now()
59 }, fmt.Sprintf("want %q, have %q", want, have))
60
61 buf.Reset()
62 value := 3.0
63 g.With(metrics.Field{"foo", "bar"}).With(metrics.Field{"abc", "123"}).Set(value)
64
65 want, have = fmt.Sprintf("test_statsd_gauge:%f|g|#foo:bar,abc:123\n", value), ""
66 by(t, 100*time.Millisecond, func() bool {
67 have = buf.String()
68 return want == have
69 }, func() {
70 reportc <- time.Now()
71 }, fmt.Sprintf("want %q, have %q", want, have))
72 }
73
74 func TestCallbackGauge(t *testing.T) {
75 buf := &syncbuf{buf: &bytes.Buffer{}}
76 reportc, scrapec := make(chan time.Time), make(chan time.Time)
77 value := 55.55
78 cb := func() float64 { return value }
79 NewCallbackGaugeTick(buf, "test_statsd_callback_gauge", reportc, scrapec, cb)
80
81 scrapec <- time.Now()
82 reportc <- time.Now()
83
84 // Travis is annoying
85 by(t, time.Second, func() bool {
86 return buf.String() != ""
87 }, func() {
88 reportc <- time.Now()
89 }, "buffer never got write+flush")
90
91 want, have := fmt.Sprintf("test_statsd_callback_gauge:%f|g\n", value), ""
92 by(t, 100*time.Millisecond, func() bool {
93 have = buf.String()
94 return strings.HasPrefix(have, want) // HasPrefix because we might get multiple writes
95 }, func() {
96 reportc <- time.Now()
97 }, fmt.Sprintf("want %q, have %q", want, have))
98 }
99
100 func TestHistogram(t *testing.T) {
101 buf := &syncbuf{buf: &bytes.Buffer{}}
102 reportc := make(chan time.Time)
103 tags := []metrics.Field{}
104 h := NewHistogramTick(buf, "test_statsd_histogram", reportc, tags)
105
106 h.Observe(123)
107 h.With(metrics.Field{"foo", "bar"}).Observe(456)
108
109 want, have := "test_statsd_histogram:123|ms\ntest_statsd_histogram:456|ms|#foo:bar\n", ""
110 by(t, 100*time.Millisecond, func() bool {
111 have = buf.String()
112 return want == have
113 }, func() {
114 reportc <- time.Now()
115 }, fmt.Sprintf("want %q, have %q", want, have))
116 }
117
118 func by(t *testing.T, d time.Duration, check func() bool, execute func(), msg string) {
119 deadline := time.Now().Add(d)
120 for !check() {
121 if time.Now().After(deadline) {
122 t.Fatal(msg)
123 }
124 execute()
125 }
126 }
127
128 type syncbuf struct {
129 mtx sync.Mutex
130 buf *bytes.Buffer
131 }
132
133 func (s *syncbuf) Write(p []byte) (int, error) {
134 s.mtx.Lock()
135 defer s.mtx.Unlock()
136 return s.buf.Write(p)
137 }
138
139 func (s *syncbuf) String() string {
140 s.mtx.Lock()
141 defer s.mtx.Unlock()
142 return s.buf.String()
143 }
144
145 func (s *syncbuf) Reset() {
146 s.mtx.Lock()
147 defer s.mtx.Unlock()
148 s.buf.Reset()
149 }