Codebase list golang-github-go-kit-kit / 414eee2
metrics/statsd: make tests deterministic and race-free Peter Bourgon 8 years ago
2 changed file(s) with 124 addition(s) and 85 deletion(s). Raw diff Collapse all Expand all
4141 //
4242 // TODO: support for sampling.
4343 func NewCounter(w io.Writer, key string, reportInterval time.Duration) metrics.Counter {
44 return NewCounterTick(w, key, time.Tick(reportInterval))
45 }
46
47 // NewCounterTick is the same as NewCounter, but allows the user to pass in a
48 // ticker channel instead of invoking time.Tick.
49 func NewCounterTick(w io.Writer, key string, reportTicker <-chan time.Time) metrics.Counter {
4450 c := &statsdCounter{
4551 key: key,
4652 c: make(chan string),
4753 }
48 go fwd(w, key, reportInterval, c.c)
54 go fwd(w, key, reportTicker, c.c)
4955 return c
5056 }
5157
6874 //
6975 // TODO: support for sampling.
7076 func NewGauge(w io.Writer, key string, reportInterval time.Duration) metrics.Gauge {
77 return NewGaugeTick(w, key, time.Tick(reportInterval))
78 }
79
80 // NewGaugeTick is the same as NewGauge, but allows the user to pass in a ticker
81 // channel instead of invoking time.Tick.
82 func NewGaugeTick(w io.Writer, key string, reportTicker <-chan time.Time) metrics.Gauge {
7183 g := &statsdGauge{
7284 key: key,
7385 g: make(chan string),
7486 }
75 go fwd(w, key, reportInterval, g.g)
87 go fwd(w, key, reportTicker, g.g)
7688 return g
7789 }
7890
105117 // same. The callback determines the value, and fields are ignored, so
106118 // NewCallbackGauge returns nothing.
107119 func NewCallbackGauge(w io.Writer, key string, reportInterval, scrapeInterval time.Duration, callback func() float64) {
108 go fwd(w, key, reportInterval, emitEvery(scrapeInterval, callback))
109 }
110
111 func emitEvery(d time.Duration, callback func() float64) <-chan string {
120 NewCallbackGaugeTick(w, key, time.Tick(reportInterval), time.Tick(scrapeInterval), callback)
121 }
122
123 // NewCallbackGaugeTick is the same as NewCallbackGauge, but allows the user to
124 // pass in ticker channels instead of durations to control report and scrape
125 // intervals.
126 func NewCallbackGaugeTick(w io.Writer, key string, reportTicker, scrapeTicker <-chan time.Time, callback func() float64) {
127 go fwd(w, key, reportTicker, emitEvery(scrapeTicker, callback))
128 }
129
130 func emitEvery(emitTicker <-chan time.Time, callback func() float64) <-chan string {
112131 c := make(chan string)
113132 go func() {
114 for range tick(d) {
133 for range emitTicker {
115134 c <- fmt.Sprintf("%f|g", callback())
116135 }
117136 }()
141160 //
142161 // TODO: support for sampling.
143162 func NewHistogram(w io.Writer, key string, reportInterval time.Duration) metrics.Histogram {
163 return NewHistogramTick(w, key, time.Tick(reportInterval))
164 }
165
166 // NewHistogramTick is the same as NewHistogram, but allows the user to pass a
167 // ticker channel instead of invoking time.Tick.
168 func NewHistogramTick(w io.Writer, key string, reportTicker <-chan time.Time) metrics.Histogram {
144169 h := &statsdHistogram{
145170 key: key,
146171 h: make(chan string),
147172 }
148 go fwd(w, key, reportInterval, h.h)
173 go fwd(w, key, reportTicker, h.h)
149174 return h
150175 }
151176
162187 return []metrics.Bucket{}, []metrics.Quantile{}
163188 }
164189
165 var tick = time.Tick
166
167 func fwd(w io.Writer, key string, reportInterval time.Duration, c <-chan string) {
190 func fwd(w io.Writer, key string, reportTicker <-chan time.Time, c <-chan string) {
168191 buf := &bytes.Buffer{}
169 tick := tick(reportInterval)
170192 for {
171193 select {
172194 case s := <-c:
175197 flush(w, buf)
176198 }
177199
178 case <-tick:
200 case <-reportTicker:
179201 flush(w, buf)
180202 }
181203 }
00 package statsd
1
2 // In package metrics so we can stub tick.
31
42 import (
53 "bytes"
64 "fmt"
7 "runtime"
85 "strings"
6 "sync"
97 "testing"
108 "time"
119 )
1210
1311 func TestCounter(t *testing.T) {
14 ch := make(chan time.Time)
15 tick = func(time.Duration) <-chan time.Time { return ch }
16 defer func() { tick = time.Tick }()
17
18 buf := &bytes.Buffer{}
19 c := NewCounter(buf, "test_statsd_counter", time.Second)
12 buf := &syncbuf{buf: &bytes.Buffer{}}
13 reportc := make(chan time.Time)
14 c := NewCounterTick(buf, "test_statsd_counter", reportc)
2015
2116 c.Add(1)
2217 c.Add(2)
23 ch <- time.Now()
2418
25 for i := 0; i < 10 && buf.Len() == 0; i++ {
26 time.Sleep(time.Millisecond)
27 }
28
29 if want, have := "test_statsd_counter:1|c\ntest_statsd_counter:2|c\n", buf.String(); want != have {
30 t.Errorf("want %q, have %q", want, have)
31 }
19 want, have := "test_statsd_counter:1|c\ntest_statsd_counter:2|c\n", ""
20 by(t, 100*time.Millisecond, func() bool {
21 have = buf.String()
22 return want == have
23 }, func() {
24 reportc <- time.Now()
25 }, fmt.Sprintf("want %q, have %q", want, have))
3226 }
3327
3428 func TestGauge(t *testing.T) {
35 ch := make(chan time.Time)
36 tick = func(time.Duration) <-chan time.Time { return ch }
37 defer func() { tick = time.Tick }()
38
39 buf := &bytes.Buffer{}
40 g := NewGauge(buf, "test_statsd_gauge", time.Second)
29 buf := &syncbuf{buf: &bytes.Buffer{}}
30 reportc := make(chan time.Time)
31 g := NewGaugeTick(buf, "test_statsd_gauge", reportc)
4132
4233 delta := 1.0
43 g.Add(delta) // send command
44 runtime.Gosched() // yield to buffer write
45 ch <- time.Now() // signal flush
46 runtime.Gosched() // yield to flush
47 if want, have := fmt.Sprintf("test_statsd_gauge:+%f|g\n", delta), buf.String(); want != have {
48 t.Errorf("want %q, have %q", want, have)
49 }
34 g.Add(delta)
35
36 want, have := fmt.Sprintf("test_statsd_gauge:+%f|g\n", delta), ""
37 by(t, 100*time.Millisecond, func() bool {
38 have = buf.String()
39 return want == have
40 }, func() {
41 reportc <- time.Now()
42 }, fmt.Sprintf("want %q, have %q", want, have))
5043
5144 buf.Reset()
52
5345 delta = -2.0
5446 g.Add(delta)
55 runtime.Gosched()
56 ch <- time.Now()
57 runtime.Gosched()
58 if want, have := fmt.Sprintf("test_statsd_gauge:%f|g\n", delta), buf.String(); want != have {
59 t.Errorf("want %q, have %q", want, have)
60 }
47
48 want, have = fmt.Sprintf("test_statsd_gauge:%f|g\n", delta), ""
49 by(t, 100*time.Millisecond, func() bool {
50 have = buf.String()
51 return want == have
52 }, func() {
53 reportc <- time.Now()
54 }, fmt.Sprintf("want %q, have %q", want, have))
6155
6256 buf.Reset()
63
6457 value := 3.0
6558 g.Set(value)
66 runtime.Gosched()
67 ch <- time.Now()
68 runtime.Gosched()
69 if want, have := fmt.Sprintf("test_statsd_gauge:%f|g\n", value), buf.String(); want != have {
70 t.Errorf("want %q, have %q", want, have)
71 }
59
60 want, have = fmt.Sprintf("test_statsd_gauge:%f|g\n", value), ""
61 by(t, 100*time.Millisecond, func() bool {
62 have = buf.String()
63 return want == have
64 }, func() {
65 reportc <- time.Now()
66 }, fmt.Sprintf("want %q, have %q", want, have))
7267 }
7368
7469 func TestCallbackGauge(t *testing.T) {
75 ch := make(chan time.Time)
76 tick = func(time.Duration) <-chan time.Time { return ch }
77 defer func() { tick = time.Tick }()
78
79 buf := &bytes.Buffer{}
70 buf := &syncbuf{buf: &bytes.Buffer{}}
71 reportc, scrapec := make(chan time.Time), make(chan time.Time)
8072 value := 55.55
8173 cb := func() float64 { return value }
82 NewCallbackGauge(buf, "test_statsd_callback_gauge", time.Second, time.Nanosecond, cb)
74 NewCallbackGaugeTick(buf, "test_statsd_callback_gauge", reportc, scrapec, cb)
8375
84 ch <- time.Now() // signal emitter
85 runtime.Gosched() // yield to emitter
86 ch <- time.Now() // signal flush
87 runtime.Gosched() // yield to flush
76 scrapec <- time.Now()
77 reportc <- time.Now()
8878
8979 // Travis is annoying
90 check := func() bool { return buf.String() != "" }
91 execute := func() { ch <- time.Now(); runtime.Gosched(); time.Sleep(5 * time.Millisecond) }
92 by(t, time.Second, check, execute, "buffer never got write+flush")
80 by(t, time.Second, func() bool {
81 return buf.String() != ""
82 }, func() {
83 reportc <- time.Now()
84 }, "buffer never got write+flush")
9385
94 if want, have := fmt.Sprintf("test_statsd_callback_gauge:%f|g\n", value), buf.String(); !strings.HasPrefix(have, want) {
95 t.Errorf("want %q, have %q", want, have)
96 }
86 want, have := fmt.Sprintf("test_statsd_callback_gauge:%f|g\n", value), ""
87 by(t, 100*time.Millisecond, func() bool {
88 have = buf.String()
89 return strings.HasPrefix(have, want) // HasPrefix because we might get multiple writes
90 }, func() {
91 reportc <- time.Now()
92 }, fmt.Sprintf("want %q, have %q", want, have))
9793 }
9894
9995 func TestHistogram(t *testing.T) {
100 ch := make(chan time.Time)
101 tick = func(time.Duration) <-chan time.Time { return ch }
102 defer func() { tick = time.Tick }()
103
104 buf := &bytes.Buffer{}
105 h := NewHistogram(buf, "test_statsd_histogram", time.Second)
96 buf := &syncbuf{buf: &bytes.Buffer{}}
97 reportc := make(chan time.Time)
98 h := NewHistogramTick(buf, "test_statsd_histogram", reportc)
10699
107100 h.Observe(123)
108101
109 runtime.Gosched()
110 ch <- time.Now()
111 runtime.Gosched()
112 if want, have := "test_statsd_histogram:123|ms\n", buf.String(); want != have {
113 t.Errorf("want %q, have %q", want, have)
114 }
102 want, have := "test_statsd_histogram:123|ms\n", ""
103 by(t, 100*time.Millisecond, func() bool {
104 have = buf.String()
105 return want == have
106 }, func() {
107 reportc <- time.Now()
108 }, fmt.Sprintf("want %q, have %q", want, have))
115109 }
116110
117111 func by(t *testing.T, d time.Duration, check func() bool, execute func(), msg string) {
123117 execute()
124118 }
125119 }
120
121 type syncbuf struct {
122 mtx sync.Mutex
123 buf *bytes.Buffer
124 }
125
126 func (s *syncbuf) Write(p []byte) (int, error) {
127 s.mtx.Lock()
128 defer s.mtx.Unlock()
129 return s.buf.Write(p)
130 }
131
132 func (s *syncbuf) String() string {
133 s.mtx.Lock()
134 defer s.mtx.Unlock()
135 return s.buf.String()
136 }
137
138 func (s *syncbuf) Reset() {
139 s.mtx.Lock()
140 defer s.mtx.Unlock()
141 s.buf.Reset()
142 }