Codebase list golang-github-go-kit-kit / 163ad70
Merge pull request #268 from jprobinson/provider Adding `metrics/graphite`, `metrics/provider` and `util/conn` packages plus `Emitter` types Peter Bourgon 7 years ago
12 changed file(s) with 1585 addition(s) and 1 deletion(s). Raw diff Collapse all Expand all
22 import (
33 "bytes"
44 "fmt"
5 "github.com/go-kit/kit/metrics"
5 "net"
66 "strings"
77 "sync"
88 "testing"
99 "time"
10
11 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/metrics"
13 "github.com/go-kit/kit/util/conn"
1014 )
15
16 func TestEmitterCounter(t *testing.T) {
17 e, buf := testEmitter()
18
19 c := e.NewCounter("test_statsd_counter")
20 c.Add(1)
21 c.Add(2)
22
23 // give time for things to emit
24 time.Sleep(time.Millisecond * 250)
25 // force a flush and stop
26 e.Stop()
27
28 want := "prefix.test_statsd_counter:1|c\nprefix.test_statsd_counter:2|c\n"
29 have := buf.String()
30 if want != have {
31 t.Errorf("want %q, have %q", want, have)
32 }
33 }
34
35 func TestEmitterGauge(t *testing.T) {
36 e, buf := testEmitter()
37
38 g := e.NewGauge("test_statsd_gauge")
39
40 delta := 1.0
41 g.Add(delta)
42
43 // give time for things to emit
44 time.Sleep(time.Millisecond * 250)
45 // force a flush and stop
46 e.Stop()
47
48 want := fmt.Sprintf("prefix.test_statsd_gauge:+%f|g\n", delta)
49 have := buf.String()
50 if want != have {
51 t.Errorf("want %q, have %q", want, have)
52 }
53 }
54
55 func TestEmitterHistogram(t *testing.T) {
56 e, buf := testEmitter()
57 h := e.NewHistogram("test_statsd_histogram")
58
59 h.Observe(123)
60
61 // give time for things to emit
62 time.Sleep(time.Millisecond * 250)
63 // force a flush and stop
64 e.Stop()
65
66 want := "prefix.test_statsd_histogram:123|ms\n"
67 have := buf.String()
68 if want != have {
69 t.Errorf("want %q, have %q", want, have)
70 }
71 }
1172
1273 func TestCounter(t *testing.T) {
1374 buf := &syncbuf{buf: &bytes.Buffer{}}
147208 defer s.mtx.Unlock()
148209 s.buf.Reset()
149210 }
211
212 func testEmitter() (*Emitter, *syncbuf) {
213 buf := &syncbuf{buf: &bytes.Buffer{}}
214 e := &Emitter{
215 prefix: "prefix.",
216 mgr: conn.NewManager(mockDialer(buf), "", "", time.After, log.NewNopLogger()),
217 logger: log.NewNopLogger(),
218 keyVals: make(chan keyVal),
219 quitc: make(chan chan struct{}),
220 }
221 go e.loop(time.Millisecond * 20)
222 return e, buf
223 }
224
225 func mockDialer(buf *syncbuf) conn.Dialer {
226 return func(net, addr string) (net.Conn, error) {
227 return &mockConn{buf}, nil
228 }
229 }
230
231 type mockConn struct {
232 buf *syncbuf
233 }
234
235 func (c *mockConn) Read(b []byte) (n int, err error) {
236 panic("not implemented")
237 }
238
239 func (c *mockConn) Write(b []byte) (n int, err error) {
240 return c.buf.Write(b)
241 }
242
243 func (c *mockConn) Close() error {
244 panic("not implemented")
245 }
246
247 func (c *mockConn) LocalAddr() net.Addr {
248 panic("not implemented")
249 }
250
251 func (c *mockConn) RemoteAddr() net.Addr {
252 panic("not implemented")
253 }
254
255 func (c *mockConn) SetDeadline(t time.Time) error {
256 panic("not implemented")
257 }
258
259 func (c *mockConn) SetReadDeadline(t time.Time) error {
260 panic("not implemented")
261 }
262
263 func (c *mockConn) SetWriteDeadline(t time.Time) error {
264 panic("not implemented")
265 }
0 package dogstatsd
1
2 import (
3 "bytes"
4 "fmt"
5 "net"
6 "time"
7
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/metrics"
10 "github.com/go-kit/kit/util/conn"
11 )
12
13 // Emitter is a struct to manage connections and orchestrate the emission of
14 // metrics to a DogStatsd process.
15 type Emitter struct {
16 prefix string
17 keyVals chan keyVal
18 mgr *conn.Manager
19 logger log.Logger
20 quitc chan chan struct{}
21 }
22
23 type keyVal struct {
24 key string
25 val string
26 }
27
28 func stringToKeyVal(key string, keyVals chan keyVal) chan string {
29 vals := make(chan string)
30 go func() {
31 for val := range vals {
32 keyVals <- keyVal{key: key, val: val}
33 }
34 }()
35 return vals
36 }
37
38 // NewEmitter will return an Emitter that will prefix all metrics names with the
39 // given prefix. Once started, it will attempt to create a connection with the
40 // given network and address via `net.Dial` and periodically post metrics to the
41 // connection in the DogStatsD protocol.
42 func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
43 return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
44 }
45
46 // NewEmitterDial is the same as NewEmitter, but allows you to specify your own
47 // Dialer function. This is primarily useful for tests.
48 func NewEmitterDial(dialer conn.Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
49 e := &Emitter{
50 prefix: metricsPrefix,
51 mgr: conn.NewManager(dialer, network, address, time.After, logger),
52 logger: logger,
53 keyVals: make(chan keyVal),
54 quitc: make(chan chan struct{}),
55 }
56 go e.loop(flushInterval)
57 return e
58 }
59
60 // NewCounter returns a Counter that emits observations in the DogStatsD protocol
61 // via the Emitter's connection manager. Observations are buffered for the
62 // report interval or until the buffer exceeds a max packet size, whichever
63 // comes first. Fields are ignored.
64 func (e *Emitter) NewCounter(key string) metrics.Counter {
65 key = e.prefix + key
66 return &counter{
67 key: key,
68 c: stringToKeyVal(key, e.keyVals),
69 }
70 }
71
72 // NewHistogram returns a Histogram that emits observations in the DogStatsD
73 // protocol via the Emitter's conection manager. Observations are buffered for
74 // the reporting interval or until the buffer exceeds a max packet size,
75 // whichever comes first. Fields are ignored.
76 //
77 // NewHistogram is mapped to a statsd Timing, so observations should represent
78 // milliseconds. If you observe in units of nanoseconds, you can make the
79 // translation with a ScaledHistogram:
80 //
81 // NewScaledHistogram(histogram, time.Millisecond)
82 //
83 // You can also enforce the constraint in a typesafe way with a millisecond
84 // TimeHistogram:
85 //
86 // NewTimeHistogram(histogram, time.Millisecond)
87 //
88 // TODO: support for sampling.
89 func (e *Emitter) NewHistogram(key string) metrics.Histogram {
90 key = e.prefix + key
91 return &histogram{
92 key: key,
93 h: stringToKeyVal(key, e.keyVals),
94 }
95 }
96
97 // NewGauge returns a Gauge that emits values in the DogStatsD protocol via the
98 // the Emitter's connection manager. Values are buffered for the report
99 // interval or until the buffer exceeds a max packet size, whichever comes
100 // first. Fields are ignored.
101 //
102 // TODO: support for sampling
103 func (e *Emitter) NewGauge(key string) metrics.Gauge {
104 key = e.prefix + key
105 return &gauge{
106 key: key,
107 g: stringToKeyVal(key, e.keyVals),
108 }
109 }
110
111 func (e *Emitter) loop(d time.Duration) {
112 ticker := time.NewTicker(d)
113 defer ticker.Stop()
114 buf := &bytes.Buffer{}
115 for {
116 select {
117 case kv := <-e.keyVals:
118 fmt.Fprintf(buf, "%s:%s\n", kv.key, kv.val)
119 if buf.Len() > maxBufferSize {
120 e.Flush(buf)
121 }
122
123 case <-ticker.C:
124 e.Flush(buf)
125
126 case q := <-e.quitc:
127 e.Flush(buf)
128 close(q)
129 return
130 }
131 }
132 }
133
134 // Stop will flush the current metrics and close the active connection. Calling
135 // stop more than once is a programmer error.
136 func (e *Emitter) Stop() {
137 q := make(chan struct{})
138 e.quitc <- q
139 <-q
140 }
141
142 // Flush will write the given buffer to a connection provided by the Emitter's
143 // connection manager.
144 func (e *Emitter) Flush(buf *bytes.Buffer) {
145 conn := e.mgr.Take()
146 if conn == nil {
147 e.logger.Log("during", "flush", "err", "connection unavailable")
148 return
149 }
150
151 _, err := conn.Write(buf.Bytes())
152 if err != nil {
153 e.logger.Log("during", "flush", "err", err)
154 }
155 buf.Reset()
156
157 e.mgr.Put(err)
158 }
0 package graphite
1
2 import (
3 "bufio"
4 "fmt"
5 "io"
6 "net"
7 "sync"
8 "time"
9
10 "github.com/go-kit/kit/log"
11 "github.com/go-kit/kit/metrics"
12 "github.com/go-kit/kit/util/conn"
13 )
14
15 // Emitter is a struct to manage connections and orchestrate the emission of
16 // metrics to a Graphite system.
17 type Emitter struct {
18 mtx sync.Mutex
19 prefix string
20 mgr *conn.Manager
21 counters []*counter
22 histograms []*windowedHistogram
23 gauges []*gauge
24 logger log.Logger
25 quitc chan chan struct{}
26 }
27
28 // NewEmitter will return an Emitter that will prefix all metrics names with the
29 // given prefix. Once started, it will attempt to create a connection with the
30 // given network and address via `net.Dial` and periodically post metrics to the
31 // connection in the Graphite plaintext protocol.
32 func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
33 return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
34 }
35
36 // NewEmitterDial is the same as NewEmitter, but allows you to specify your own
37 // Dialer function. This is primarily useful for tests.
38 func NewEmitterDial(dialer conn.Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
39 e := &Emitter{
40 prefix: metricsPrefix,
41 mgr: conn.NewManager(dialer, network, address, time.After, logger),
42 logger: logger,
43 quitc: make(chan chan struct{}),
44 }
45 go e.loop(flushInterval)
46 return e
47 }
48
49 // NewCounter returns a Counter whose value will be periodically emitted in
50 // a Graphite-compatible format once the Emitter is started. Fields are ignored.
51 func (e *Emitter) NewCounter(name string) metrics.Counter {
52 e.mtx.Lock()
53 defer e.mtx.Unlock()
54 c := newCounter(name)
55 e.counters = append(e.counters, c)
56 return c
57 }
58
59 // NewHistogram is taken from http://github.com/codahale/metrics. It returns a
60 // windowed HDR histogram which drops data older than five minutes.
61 //
62 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
63 // should be integers in the range 1..99. The gauge names are assigned by using
64 // the passed name as a prefix and appending "_pNN" e.g. "_p50".
65 //
66 // The values of this histogram will be periodically emitted in a
67 // Graphite-compatible format once the Emitter is started. Fields are ignored.
68 func (e *Emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
69 gauges := map[int]metrics.Gauge{}
70 for _, quantile := range quantiles {
71 if quantile <= 0 || quantile >= 100 {
72 return nil, fmt.Errorf("invalid quantile %d", quantile)
73 }
74 gauges[quantile] = e.gauge(fmt.Sprintf("%s_p%02d", name, quantile))
75 }
76 h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges, e.logger)
77
78 e.mtx.Lock()
79 defer e.mtx.Unlock()
80 e.histograms = append(e.histograms, h)
81 return h, nil
82 }
83
84 // NewGauge returns a Gauge whose value will be periodically emitted in a
85 // Graphite-compatible format once the Emitter is started. Fields are ignored.
86 func (e *Emitter) NewGauge(name string) metrics.Gauge {
87 e.mtx.Lock()
88 defer e.mtx.Unlock()
89 return e.gauge(name)
90 }
91
92 func (e *Emitter) gauge(name string) metrics.Gauge {
93 g := &gauge{name, 0}
94 e.gauges = append(e.gauges, g)
95 return g
96 }
97
98 func (e *Emitter) loop(d time.Duration) {
99 ticker := time.NewTicker(d)
100 defer ticker.Stop()
101
102 for {
103 select {
104 case <-ticker.C:
105 e.Flush()
106
107 case q := <-e.quitc:
108 e.Flush()
109 close(q)
110 return
111 }
112 }
113 }
114
115 // Stop will flush the current metrics and close the active connection. Calling
116 // stop more than once is a programmer error.
117 func (e *Emitter) Stop() {
118 q := make(chan struct{})
119 e.quitc <- q
120 <-q
121 }
122
123 // Flush will write the current metrics to the Emitter's connection in the
124 // Graphite plaintext protocol.
125 func (e *Emitter) Flush() {
126 e.mtx.Lock() // one flush at a time
127 defer e.mtx.Unlock()
128
129 conn := e.mgr.Take()
130 if conn == nil {
131 e.logger.Log("during", "flush", "err", "connection unavailable")
132 return
133 }
134
135 err := e.flush(conn)
136 if err != nil {
137 e.logger.Log("during", "flush", "err", err)
138 }
139 e.mgr.Put(err)
140 }
141
142 func (e *Emitter) flush(w io.Writer) error {
143 bw := bufio.NewWriter(w)
144
145 for _, c := range e.counters {
146 c.flush(bw, e.prefix)
147 }
148
149 for _, h := range e.histograms {
150 h.flush(bw, e.prefix)
151 }
152
153 for _, g := range e.gauges {
154 g.flush(bw, e.prefix)
155 }
156
157 return bw.Flush()
158 }
0 // Package graphite implements a Graphite backend for package metrics. Metrics
1 // will be emitted to a Graphite server in the plaintext protocol which looks
2 // like:
3 //
4 // "<metric path> <metric value> <metric timestamp>"
5 //
6 // See http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol.
7 // The current implementation ignores fields.
8 package graphite
9
10 import (
11 "fmt"
12 "io"
13 "math"
14 "sort"
15 "sync"
16 "sync/atomic"
17 "time"
18
19 "github.com/codahale/hdrhistogram"
20
21 "github.com/go-kit/kit/log"
22 "github.com/go-kit/kit/metrics"
23 )
24
25 func newCounter(name string) *counter {
26 return &counter{name, 0}
27 }
28
29 func newGauge(name string) *gauge {
30 return &gauge{name, 0}
31 }
32
33 // counter implements the metrics.counter interface but also provides a
34 // Flush method to emit the current counter values in the Graphite plaintext
35 // protocol.
36 type counter struct {
37 key string
38 count uint64
39 }
40
41 func (c *counter) Name() string { return c.key }
42
43 // With currently ignores fields.
44 func (c *counter) With(metrics.Field) metrics.Counter { return c }
45
46 func (c *counter) Add(delta uint64) { atomic.AddUint64(&c.count, delta) }
47
48 func (c *counter) get() uint64 { return atomic.LoadUint64(&c.count) }
49
50 // flush will emit the current counter value in the Graphite plaintext
51 // protocol to the given io.Writer.
52 func (c *counter) flush(w io.Writer, prefix string) {
53 fmt.Fprintf(w, "%s.count %d %d\n", prefix+c.Name(), c.get(), time.Now().Unix())
54 }
55
56 // gauge implements the metrics.gauge interface but also provides a
57 // Flush method to emit the current counter values in the Graphite plaintext
58 // protocol.
59 type gauge struct {
60 key string
61 value uint64 // math.Float64bits
62 }
63
64 func (g *gauge) Name() string { return g.key }
65
66 // With currently ignores fields.
67 func (g *gauge) With(metrics.Field) metrics.Gauge { return g }
68
69 func (g *gauge) Add(delta float64) {
70 for {
71 old := atomic.LoadUint64(&g.value)
72 new := math.Float64bits(math.Float64frombits(old) + delta)
73 if atomic.CompareAndSwapUint64(&g.value, old, new) {
74 return
75 }
76 }
77 }
78
79 func (g *gauge) Set(value float64) {
80 atomic.StoreUint64(&g.value, math.Float64bits(value))
81 }
82
83 func (g *gauge) Get() float64 {
84 return math.Float64frombits(atomic.LoadUint64(&g.value))
85 }
86
87 // Flush will emit the current gauge value in the Graphite plaintext
88 // protocol to the given io.Writer.
89 func (g *gauge) flush(w io.Writer, prefix string) {
90 fmt.Fprintf(w, "%s %.2f %d\n", prefix+g.Name(), g.Get(), time.Now().Unix())
91 }
92
93 // windowedHistogram is taken from http://github.com/codahale/metrics. It
94 // is a windowed HDR histogram which drops data older than five minutes.
95 //
96 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
97 // should be integers in the range 1..99. The gauge names are assigned by using
98 // the passed name as a prefix and appending "_pNN" e.g. "_p50".
99 //
100 // The values of this histogram will be periodically emitted in a
101 // Graphite-compatible format once the GraphiteProvider is started. Fields are ignored.
102 type windowedHistogram struct {
103 mtx sync.Mutex
104 hist *hdrhistogram.WindowedHistogram
105
106 name string
107 gauges map[int]metrics.Gauge
108 logger log.Logger
109 }
110
111 func newWindowedHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles map[int]metrics.Gauge, logger log.Logger) *windowedHistogram {
112 h := &windowedHistogram{
113 hist: hdrhistogram.NewWindowed(5, minValue, maxValue, sigfigs),
114 name: name,
115 gauges: quantiles,
116 logger: logger,
117 }
118 go h.rotateLoop(1 * time.Minute)
119 return h
120 }
121
122 func (h *windowedHistogram) Name() string { return h.name }
123
124 func (h *windowedHistogram) With(metrics.Field) metrics.Histogram { return h }
125
126 func (h *windowedHistogram) Observe(value int64) {
127 h.mtx.Lock()
128 err := h.hist.Current.RecordValue(value)
129 h.mtx.Unlock()
130
131 if err != nil {
132 h.logger.Log("err", err, "msg", "unable to record histogram value")
133 return
134 }
135
136 for q, gauge := range h.gauges {
137 gauge.Set(float64(h.hist.Current.ValueAtQuantile(float64(q))))
138 }
139 }
140
141 func (h *windowedHistogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
142 bars := h.hist.Merge().Distribution()
143 buckets := make([]metrics.Bucket, len(bars))
144 for i, bar := range bars {
145 buckets[i] = metrics.Bucket{
146 From: bar.From,
147 To: bar.To,
148 Count: bar.Count,
149 }
150 }
151 quantiles := make([]metrics.Quantile, 0, len(h.gauges))
152 for quantile, gauge := range h.gauges {
153 quantiles = append(quantiles, metrics.Quantile{
154 Quantile: quantile,
155 Value: int64(gauge.Get()),
156 })
157 }
158 sort.Sort(quantileSlice(quantiles))
159 return buckets, quantiles
160 }
161
162 func (h *windowedHistogram) flush(w io.Writer, prefix string) {
163 name := prefix + h.Name()
164 hist := h.hist.Merge()
165 now := time.Now().Unix()
166 fmt.Fprintf(w, "%s.count %d %d\n", name, hist.TotalCount(), now)
167 fmt.Fprintf(w, "%s.min %d %d\n", name, hist.Min(), now)
168 fmt.Fprintf(w, "%s.max %d %d\n", name, hist.Max(), now)
169 fmt.Fprintf(w, "%s.mean %.2f %d\n", name, hist.Mean(), now)
170 fmt.Fprintf(w, "%s.std-dev %.2f %d\n", name, hist.StdDev(), now)
171 }
172
173 func (h *windowedHistogram) rotateLoop(d time.Duration) {
174 for range time.Tick(d) {
175 h.mtx.Lock()
176 h.hist.Rotate()
177 h.mtx.Unlock()
178 }
179 }
180
181 type quantileSlice []metrics.Quantile
182
183 func (a quantileSlice) Len() int { return len(a) }
184 func (a quantileSlice) Less(i, j int) bool { return a[i].Quantile < a[j].Quantile }
185 func (a quantileSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
0 package graphite
1
2 import (
3 "bytes"
4 "fmt"
5 "strings"
6 "testing"
7 "time"
8
9 "github.com/go-kit/kit/log"
10 "github.com/go-kit/kit/metrics"
11 "github.com/go-kit/kit/metrics/teststat"
12 )
13
14 func TestHistogramQuantiles(t *testing.T) {
15 prefix := "prefix."
16 e := NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
17 var (
18 name = "test_histogram_quantiles"
19 quantiles = []int{50, 90, 95, 99}
20 )
21 h, err := e.NewHistogram(name, 0, 100, 3, quantiles...)
22 if err != nil {
23 t.Fatalf("unable to create test histogram: %v", err)
24 }
25 h = h.With(metrics.Field{Key: "ignored", Value: "field"})
26 const seed, mean, stdev int64 = 424242, 50, 10
27 teststat.PopulateNormalHistogram(t, h, seed, mean, stdev)
28
29 // flush the current metrics into a buffer to examine
30 var b bytes.Buffer
31 e.flush(&b)
32 teststat.AssertGraphiteNormalHistogram(t, prefix, name, mean, stdev, quantiles, b.String())
33 }
34
35 func TestCounter(t *testing.T) {
36 var (
37 prefix = "prefix."
38 name = "m"
39 value = 123
40 e = NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
41 b bytes.Buffer
42 )
43 e.NewCounter(name).With(metrics.Field{Key: "ignored", Value: "field"}).Add(uint64(value))
44 e.flush(&b)
45 want := fmt.Sprintf("%s%s.count %d", prefix, name, value)
46 payload := b.String()
47 if !strings.HasPrefix(payload, want) {
48 t.Errorf("counter %s want\n%s, have\n%s", name, want, payload)
49 }
50 }
51
52 func TestGauge(t *testing.T) {
53 var (
54 prefix = "prefix."
55 name = "xyz"
56 value = 54321
57 delta = 12345
58 e = NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
59 b bytes.Buffer
60 g = e.NewGauge(name).With(metrics.Field{Key: "ignored", Value: "field"})
61 )
62
63 g.Set(float64(value))
64 g.Add(float64(delta))
65
66 e.flush(&b)
67 payload := b.String()
68
69 want := fmt.Sprintf("%s%s %d", prefix, name, value+delta)
70 if !strings.HasPrefix(payload, want) {
71 t.Errorf("gauge %s want\n%s, have\n%s", name, want, payload)
72 }
73 }
74
75 func TestEmitterStops(t *testing.T) {
76 e := NewEmitter("foo", "bar", "baz", time.Second, log.NewNopLogger())
77 time.Sleep(100 * time.Millisecond)
78 e.Stop()
79 }
0 package provider
1
2 import (
3 "errors"
4 "time"
5
6 "github.com/prometheus/client_golang/prometheus"
7
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/metrics"
10 "github.com/go-kit/kit/metrics/discard"
11 "github.com/go-kit/kit/metrics/dogstatsd"
12 kitexp "github.com/go-kit/kit/metrics/expvar"
13 "github.com/go-kit/kit/metrics/graphite"
14 kitprom "github.com/go-kit/kit/metrics/prometheus"
15 "github.com/go-kit/kit/metrics/statsd"
16 )
17
18 // Provider represents a union set of constructors and lifecycle management
19 // functions for each supported metrics backend. It should be used by those who
20 // need to easily swap out implementations, e.g. dynamically, or at a single
21 // point in an intermediating framework.
22 type Provider interface {
23 NewCounter(name, help string) metrics.Counter
24 NewHistogram(name, help string, min, max int64, sigfigs int, quantiles ...int) (metrics.Histogram, error)
25 NewGauge(name, help string) metrics.Gauge
26 Stop()
27 }
28
29 // NewGraphiteProvider will return a Provider implementation that is a simple
30 // wrapper around a graphite.Emitter. All metric names will be prefixed with the
31 // given value and data will be emitted once every interval. If no network value
32 // is given, it will default to "udp".
33 func NewGraphiteProvider(network, address, prefix string, interval time.Duration, logger log.Logger) (Provider, error) {
34 if network == "" {
35 network = "udp"
36 }
37 if address == "" {
38 return nil, errors.New("address is required")
39 }
40 return graphiteProvider{
41 e: graphite.NewEmitter(network, address, prefix, interval, logger),
42 }, nil
43 }
44
45 type graphiteProvider struct {
46 e *graphite.Emitter
47 }
48
49 var _ Provider = graphiteProvider{}
50
51 // NewCounter implements Provider. Help is ignored.
52 func (p graphiteProvider) NewCounter(name, _ string) metrics.Counter {
53 return p.e.NewCounter(name)
54 }
55
56 // NewHistogram implements Provider. Help is ignored.
57 func (p graphiteProvider) NewHistogram(name, _ string, min, max int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
58 return p.e.NewHistogram(name, min, max, sigfigs, quantiles...)
59 }
60
61 // NewGauge implements Provider. Help is ignored.
62 func (p graphiteProvider) NewGauge(name, _ string) metrics.Gauge {
63 return p.e.NewGauge(name)
64 }
65
66 // Stop implements Provider.
67 func (p graphiteProvider) Stop() {
68 p.e.Stop()
69 }
70
71 // NewStatsdProvider will return a Provider implementation that is a simple
72 // wrapper around a statsd.Emitter. All metric names will be prefixed with the
73 // given value and data will be emitted once every interval or when the buffer
74 // has reached its max size. If no network value is given, it will default to
75 // "udp".
76 func NewStatsdProvider(network, address, prefix string, interval time.Duration, logger log.Logger) (Provider, error) {
77 if network == "" {
78 network = "udp"
79 }
80 if address == "" {
81 return nil, errors.New("address is required")
82 }
83 return statsdProvider{
84 e: statsd.NewEmitter(network, address, prefix, interval, logger),
85 }, nil
86 }
87
88 type statsdProvider struct {
89 e *statsd.Emitter
90 }
91
92 var _ Provider = statsdProvider{}
93
94 // NewCounter implements Provider. Help is ignored.
95 func (p statsdProvider) NewCounter(name, _ string) metrics.Counter {
96 return p.e.NewCounter(name)
97 }
98
99 // NewHistogram implements Provider. Help is ignored.
100 func (p statsdProvider) NewHistogram(name, _ string, min, max int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
101 return p.e.NewHistogram(name), nil
102 }
103
104 // NewGauge implements Provider. Help is ignored.
105 func (p statsdProvider) NewGauge(name, _ string) metrics.Gauge {
106 return p.e.NewGauge(name)
107 }
108
109 // Stop will call the underlying statsd.Emitter's Stop method.
110 func (p statsdProvider) Stop() {
111 p.e.Stop()
112 }
113
114 // NewDogStatsdProvider will return a Provider implementation that is a simple
115 // wrapper around a dogstatsd.Emitter. All metric names will be prefixed with
116 // the given value and data will be emitted once every interval or when the
117 // buffer has reached its max size. If no network value is given, it will
118 // default to "udp".
119 func NewDogStatsdProvider(network, address, prefix string, interval time.Duration, logger log.Logger) (Provider, error) {
120 if network == "" {
121 network = "udp"
122 }
123 if address == "" {
124 return nil, errors.New("address is required")
125 }
126 return dogstatsdProvider{
127 e: dogstatsd.NewEmitter(network, address, prefix, interval, logger),
128 }, nil
129 }
130
131 type dogstatsdProvider struct {
132 e *dogstatsd.Emitter
133 }
134
135 var _ Provider = dogstatsdProvider{}
136
137 // NewCounter implements Provider. Help is ignored.
138 func (p dogstatsdProvider) NewCounter(name, _ string) metrics.Counter {
139 return p.e.NewCounter(name)
140 }
141
142 // NewHistogram implements Provider. Help is ignored.
143 func (p dogstatsdProvider) NewHistogram(name, _ string, min, max int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
144 return p.e.NewHistogram(name), nil
145 }
146
147 // NewGauge implements Provider. Help is ignored.
148 func (p dogstatsdProvider) NewGauge(name, _ string) metrics.Gauge {
149 return p.e.NewGauge(name)
150 }
151
152 // Stop will call the underlying statsd.Emitter's Stop method.
153 func (p dogstatsdProvider) Stop() {
154 p.e.Stop()
155 }
156
157 // NewExpvarProvider is a very thin wrapper over the expvar package.
158 // If a prefix is provided, it will prefix all metric names.
159 func NewExpvarProvider(prefix string) Provider {
160 return expvarProvider{prefix: prefix}
161 }
162
163 type expvarProvider struct {
164 prefix string
165 }
166
167 var _ Provider = expvarProvider{}
168
169 // NewCounter implements Provider. Help is ignored.
170 func (p expvarProvider) NewCounter(name, _ string) metrics.Counter {
171 return kitexp.NewCounter(p.prefix + name)
172 }
173
174 // NewHistogram implements Provider. Help is ignored.
175 func (p expvarProvider) NewHistogram(name, _ string, min, max int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
176 return kitexp.NewHistogram(p.prefix+name, min, max, sigfigs, quantiles...), nil
177 }
178
179 // NewGauge implements Provider. Help is ignored.
180 func (p expvarProvider) NewGauge(name, _ string) metrics.Gauge {
181 return kitexp.NewGauge(p.prefix + name)
182 }
183
184 // Stop is a no-op.
185 func (expvarProvider) Stop() {}
186
187 type prometheusProvider struct {
188 namespace string
189 subsystem string
190 }
191
192 var _ Provider = prometheusProvider{}
193
194 // NewPrometheusProvider returns a Prometheus provider that uses the provided
195 // namespace and subsystem for all metrics.
196 func NewPrometheusProvider(namespace, subsystem string) Provider {
197 return prometheusProvider{
198 namespace: namespace,
199 subsystem: subsystem,
200 }
201 }
202
203 // NewCounter implements Provider.
204 func (p prometheusProvider) NewCounter(name, help string) metrics.Counter {
205 return kitprom.NewCounter(prometheus.CounterOpts{
206 Namespace: p.namespace,
207 Subsystem: p.subsystem,
208 Name: name,
209 Help: help,
210 }, nil)
211 }
212
213 // NewHistogram ignores all parameters except name and help.
214 func (p prometheusProvider) NewHistogram(name, help string, _, _ int64, _ int, _ ...int) (metrics.Histogram, error) {
215 return kitprom.NewHistogram(prometheus.HistogramOpts{
216 Namespace: p.namespace,
217 Subsystem: p.subsystem,
218 Name: name,
219 Help: help,
220 }, nil), nil
221 }
222
223 // NewGauge implements Provider.
224 func (p prometheusProvider) NewGauge(name, help string) metrics.Gauge {
225 return kitprom.NewGauge(prometheus.GaugeOpts{
226 Namespace: p.namespace,
227 Subsystem: p.subsystem,
228 Name: name,
229 Help: help,
230 }, nil)
231 }
232
233 // Stop is a no-op.
234 func (prometheusProvider) Stop() {}
235
236 var _ Provider = discardProvider{}
237
238 // NewDiscardProvider returns a provider that will discard all metrics.
239 func NewDiscardProvider() Provider {
240 return discardProvider{}
241 }
242
243 type discardProvider struct{}
244
245 func (p discardProvider) NewCounter(name string, _ string) metrics.Counter {
246 return discard.NewCounter(name)
247 }
248
249 func (p discardProvider) NewHistogram(name string, _ string, _ int64, _ int64, _ int, _ ...int) (metrics.Histogram, error) {
250 return discard.NewHistogram(name), nil
251 }
252
253 func (p discardProvider) NewGauge(name string, _ string) metrics.Gauge {
254 return discard.NewGauge(name)
255 }
256
257 // Stop is a no-op.
258 func (p discardProvider) Stop() {}
0 package provider
1
2 import (
3 "testing"
4 "time"
5
6 "github.com/go-kit/kit/log"
7 )
8
9 func TestGraphite(t *testing.T) {
10 p, err := NewGraphiteProvider("network", "address", "prefix", time.Second, log.NewNopLogger())
11 if err != nil {
12 t.Fatal(err)
13 }
14 testProvider(t, "Graphite", p)
15 }
16
17 func TestStatsd(t *testing.T) {
18 p, err := NewStatsdProvider("network", "address", "prefix", time.Second, log.NewNopLogger())
19 if err != nil {
20 t.Fatal(err)
21 }
22 testProvider(t, "Statsd", p)
23 }
24
25 func TestDogStatsd(t *testing.T) {
26 p, err := NewDogStatsdProvider("network", "address", "prefix", time.Second, log.NewNopLogger())
27 if err != nil {
28 t.Fatal(err)
29 }
30 testProvider(t, "DogStatsd", p)
31 }
32
33 func TestExpvar(t *testing.T) {
34 testProvider(t, "Expvar", NewExpvarProvider("prefix"))
35 }
36
37 func TestPrometheus(t *testing.T) {
38 testProvider(t, "Prometheus", NewPrometheusProvider("namespace", "subsystem"))
39 }
40
41 func testProvider(t *testing.T, what string, p Provider) {
42 c := p.NewCounter("counter", "Counter help.")
43 c.Add(1)
44
45 h, err := p.NewHistogram("histogram", "Histogram help.", 1, 100, 3, 50, 95, 99)
46 if err != nil {
47 t.Errorf("%s: NewHistogram: %v", what, err)
48 }
49 h.Observe(99)
50
51 g := p.NewGauge("gauge", "Gauge help.")
52 g.Set(123)
53
54 p.Stop()
55 }
0 package statsd
1
2 import (
3 "bytes"
4 "fmt"
5 "net"
6 "time"
7
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/metrics"
10 "github.com/go-kit/kit/util/conn"
11 )
12
13 // Emitter is a struct to manage connections and orchestrate the emission of
14 // metrics to a Statsd process.
15 type Emitter struct {
16 prefix string
17 keyVals chan keyVal
18 mgr *conn.Manager
19 logger log.Logger
20 quitc chan chan struct{}
21 }
22
23 type keyVal struct {
24 key string
25 val string
26 }
27
28 func stringToKeyVal(key string, keyVals chan keyVal) chan string {
29 vals := make(chan string)
30 go func() {
31 for val := range vals {
32 keyVals <- keyVal{key: key, val: val}
33 }
34 }()
35 return vals
36 }
37
38 // NewEmitter will return an Emitter that will prefix all metrics names with the
39 // given prefix. Once started, it will attempt to create a connection with the
40 // given network and address via `net.Dial` and periodically post metrics to the
41 // connection in the statsd protocol.
42 func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
43 return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
44 }
45
46 // NewEmitterDial is the same as NewEmitter, but allows you to specify your own
47 // Dialer function. This is primarily useful for tests.
48 func NewEmitterDial(dialer conn.Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
49 e := &Emitter{
50 prefix: metricsPrefix,
51 mgr: conn.NewManager(dialer, network, address, time.After, logger),
52 logger: logger,
53 keyVals: make(chan keyVal),
54 quitc: make(chan chan struct{}),
55 }
56 go e.loop(flushInterval)
57 return e
58 }
59
60 // NewCounter returns a Counter that emits observations in the statsd protocol
61 // via the Emitter's connection manager. Observations are buffered for the
62 // report interval or until the buffer exceeds a max packet size, whichever
63 // comes first. Fields are ignored.
64 func (e *Emitter) NewCounter(key string) metrics.Counter {
65 key = e.prefix + key
66 return &counter{
67 key: key,
68 c: stringToKeyVal(key, e.keyVals),
69 }
70 }
71
72 // NewHistogram returns a Histogram that emits observations in the statsd
73 // protocol via the Emitter's conection manager. Observations are buffered for
74 // the reporting interval or until the buffer exceeds a max packet size,
75 // whichever comes first. Fields are ignored.
76 //
77 // NewHistogram is mapped to a statsd Timing, so observations should represent
78 // milliseconds. If you observe in units of nanoseconds, you can make the
79 // translation with a ScaledHistogram:
80 //
81 // NewScaledHistogram(histogram, time.Millisecond)
82 //
83 // You can also enforce the constraint in a typesafe way with a millisecond
84 // TimeHistogram:
85 //
86 // NewTimeHistogram(histogram, time.Millisecond)
87 //
88 // TODO: support for sampling.
89 func (e *Emitter) NewHistogram(key string) metrics.Histogram {
90 key = e.prefix + key
91 return &histogram{
92 key: key,
93 h: stringToKeyVal(key, e.keyVals),
94 }
95 }
96
97 // NewGauge returns a Gauge that emits values in the statsd protocol via the
98 // the Emitter's connection manager. Values are buffered for the report
99 // interval or until the buffer exceeds a max packet size, whichever comes
100 // first. Fields are ignored.
101 //
102 // TODO: support for sampling
103 func (e *Emitter) NewGauge(key string) metrics.Gauge {
104 key = e.prefix + key
105 return &gauge{
106 key: key,
107 g: stringToKeyVal(key, e.keyVals),
108 }
109 }
110
111 func (e *Emitter) loop(d time.Duration) {
112 ticker := time.NewTicker(d)
113 defer ticker.Stop()
114 buf := &bytes.Buffer{}
115 for {
116 select {
117 case kv := <-e.keyVals:
118 fmt.Fprintf(buf, "%s:%s\n", kv.key, kv.val)
119 if buf.Len() > maxBufferSize {
120 e.Flush(buf)
121 }
122
123 case <-ticker.C:
124 e.Flush(buf)
125
126 case q := <-e.quitc:
127 e.Flush(buf)
128 close(q)
129 return
130 }
131 }
132 }
133
134 // Stop will flush the current metrics and close the active connection. Calling
135 // stop more than once is a programmer error.
136 func (e *Emitter) Stop() {
137 q := make(chan struct{})
138 e.quitc <- q
139 <-q
140 }
141
142 // Flush will write the given buffer to a connection provided by the Emitter's
143 // connection manager.
144 func (e *Emitter) Flush(buf *bytes.Buffer) {
145 conn := e.mgr.Take()
146 if conn == nil {
147 e.logger.Log("during", "flush", "err", "connection unavailable")
148 return
149 }
150
151 _, err := conn.Write(buf.Bytes())
152 if err != nil {
153 e.logger.Log("during", "flush", "err", err)
154 }
155 buf.Reset()
156
157 e.mgr.Put(err)
158 }
22 import (
33 "bytes"
44 "fmt"
5 "net"
56 "strings"
67 "sync"
78 "testing"
89 "time"
10
11 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/util/conn"
913 )
14
15 func TestEmitterCounter(t *testing.T) {
16 e, buf := testEmitter()
17
18 c := e.NewCounter("test_statsd_counter")
19 c.Add(1)
20 c.Add(2)
21
22 // give time for things to emit
23 time.Sleep(time.Millisecond * 250)
24 // force a flush and stop
25 e.Stop()
26
27 want := "prefix.test_statsd_counter:1|c\nprefix.test_statsd_counter:2|c\n"
28 have := buf.String()
29 if want != have {
30 t.Errorf("want %q, have %q", want, have)
31 }
32 }
33
34 func TestEmitterGauge(t *testing.T) {
35 e, buf := testEmitter()
36
37 g := e.NewGauge("test_statsd_gauge")
38
39 delta := 1.0
40 g.Add(delta)
41
42 // give time for things to emit
43 time.Sleep(time.Millisecond * 250)
44 // force a flush and stop
45 e.Stop()
46
47 want := fmt.Sprintf("prefix.test_statsd_gauge:+%f|g\n", delta)
48 have := buf.String()
49 if want != have {
50 t.Errorf("want %q, have %q", want, have)
51 }
52 }
53
54 func TestEmitterHistogram(t *testing.T) {
55 e, buf := testEmitter()
56 h := e.NewHistogram("test_statsd_histogram")
57
58 h.Observe(123)
59
60 // give time for things to emit
61 time.Sleep(time.Millisecond * 250)
62 // force a flush and stop
63 e.Stop()
64
65 want := "prefix.test_statsd_histogram:123|ms\n"
66 have := buf.String()
67 if want != have {
68 t.Errorf("want %q, have %q", want, have)
69 }
70 }
1071
1172 func TestCounter(t *testing.T) {
1273 buf := &syncbuf{buf: &bytes.Buffer{}}
140201 defer s.mtx.Unlock()
141202 s.buf.Reset()
142203 }
204
205 func testEmitter() (*Emitter, *syncbuf) {
206 buf := &syncbuf{buf: &bytes.Buffer{}}
207 e := &Emitter{
208 prefix: "prefix.",
209 mgr: conn.NewManager(mockDialer(buf), "", "", time.After, log.NewNopLogger()),
210 logger: log.NewNopLogger(),
211 keyVals: make(chan keyVal),
212 quitc: make(chan chan struct{}),
213 }
214 go e.loop(time.Millisecond * 20)
215 return e, buf
216 }
217
218 func mockDialer(buf *syncbuf) conn.Dialer {
219 return func(net, addr string) (net.Conn, error) {
220 return &mockConn{buf}, nil
221 }
222 }
223
224 type mockConn struct {
225 buf *syncbuf
226 }
227
228 func (c *mockConn) Read(b []byte) (n int, err error) {
229 panic("not implemented")
230 }
231
232 func (c *mockConn) Write(b []byte) (n int, err error) {
233 return c.buf.Write(b)
234 }
235
236 func (c *mockConn) Close() error {
237 panic("not implemented")
238 }
239
240 func (c *mockConn) LocalAddr() net.Addr {
241 panic("not implemented")
242 }
243
244 func (c *mockConn) RemoteAddr() net.Addr {
245 panic("not implemented")
246 }
247
248 func (c *mockConn) SetDeadline(t time.Time) error {
249 panic("not implemented")
250 }
251
252 func (c *mockConn) SetReadDeadline(t time.Time) error {
253 panic("not implemented")
254 }
255
256 func (c *mockConn) SetWriteDeadline(t time.Time) error {
257 panic("not implemented")
258 }
0 package teststat
1
2 import (
3 "fmt"
4 "math"
5 "regexp"
6 "strconv"
7 "testing"
8 )
9
10 // AssertGraphiteNormalHistogram ensures the expvar Histogram referenced by
11 // metricName abides a normal distribution.
12 func AssertGraphiteNormalHistogram(t *testing.T, prefix, metricName string, mean, stdev int64, quantiles []int, gPayload string) {
13 // check for hdr histo data
14 wants := map[string]int64{"count": 1234, "min": 15, "max": 83}
15 for key, want := range wants {
16 re := regexp.MustCompile(fmt.Sprintf("%s%s.%s (\\d*)", prefix, metricName, key))
17 res := re.FindAllStringSubmatch(gPayload, 1)
18 if res == nil {
19 t.Error("did not find metrics log for", key, "in \n", gPayload)
20 continue
21 }
22
23 if len(res[0]) == 1 {
24 t.Fatalf("%q: bad regex, please check the test scenario", key)
25 }
26
27 have, err := strconv.ParseInt(res[0][1], 10, 64)
28 if err != nil {
29 t.Fatal(err)
30 }
31
32 if want != have {
33 t.Errorf("key %s: want %d, have %d", key, want, have)
34 }
35 }
36
37 const tolerance int = 2
38 wants = map[string]int64{".std-dev": stdev, ".mean": mean}
39 for _, quantile := range quantiles {
40 wants[fmt.Sprintf("_p%02d", quantile)] = normalValueAtQuantile(mean, stdev, quantile)
41 }
42 // check for quantile gauges
43 for key, want := range wants {
44 re := regexp.MustCompile(fmt.Sprintf("%s%s%s (\\d*\\.\\d*)", prefix, metricName, key))
45 res := re.FindAllStringSubmatch(gPayload, 1)
46 if res == nil {
47 t.Errorf("did not find metrics log for %s", key)
48 continue
49 }
50
51 if len(res[0]) == 1 {
52 t.Fatalf("%q: bad regex found, please check the test scenario", key)
53 }
54 have, err := strconv.ParseFloat(res[0][1], 64)
55 if err != nil {
56 t.Fatal(err)
57 }
58 if int(math.Abs(float64(want)-have)) > tolerance {
59 t.Errorf("key %s: want %.2f, have %.2f", key, want, have)
60 }
61 }
62 }
0 package conn
1
2 import (
3 "net"
4 "time"
5
6 "github.com/go-kit/kit/log"
7 )
8
9 // Dialer dials a network and address. net.Dial is a good default Dialer.
10 type Dialer func(network, address string) (net.Conn, error)
11
12 // AfterFunc imitates time.After.
13 type AfterFunc func(time.Duration) <-chan time.Time
14
15 // Manager manages a net.Conn. Clients should take the conn when they want to
16 // use it, and put back whatever error they receive from an e.g. Write. When a
17 // non-nil error is put, the conn is invalidated and a new conn is established.
18 // Connection failures are retried after an exponential backoff.
19 type Manager struct {
20 dial Dialer
21 network string
22 address string
23 after AfterFunc
24 logger log.Logger
25
26 takec chan net.Conn
27 putc chan error
28 }
29
30 func NewManager(d Dialer, network, address string, after AfterFunc, logger log.Logger) *Manager {
31 m := &Manager{
32 dial: d,
33 network: network,
34 address: address,
35 after: after,
36 logger: logger,
37
38 takec: make(chan net.Conn),
39 putc: make(chan error),
40 }
41 go m.loop()
42 return m
43 }
44
45 func (m *Manager) Take() net.Conn {
46 return <-m.takec
47 }
48
49 func (m *Manager) Put(err error) {
50 m.putc <- err
51 }
52
53 func (m *Manager) loop() {
54 var (
55 conn = dial(m.dial, m.network, m.address, m.logger) // may block slightly
56 connc = make(chan net.Conn)
57 reconnectc <-chan time.Time // initially nil
58 backoff = time.Second
59 )
60
61 for {
62 select {
63 case <-reconnectc:
64 reconnectc = nil
65 go func() { connc <- dial(m.dial, m.network, m.address, m.logger) }()
66
67 case conn = <-connc:
68 if conn == nil {
69 backoff = exponential(backoff)
70 reconnectc = m.after(backoff)
71 } else {
72 backoff = time.Second
73 reconnectc = nil
74 }
75
76 case m.takec <- conn:
77 // might be nil
78
79 case err := <-m.putc:
80 if err != nil && conn != nil {
81 m.logger.Log("err", err)
82 conn = nil // connection is bad
83 reconnectc = m.after(time.Nanosecond) // trigger immediately
84 }
85 }
86 }
87 }
88
89 func dial(d Dialer, network, address string, logger log.Logger) net.Conn {
90 conn, err := d(network, address)
91 if err != nil {
92 logger.Log("err", err)
93 conn = nil
94 }
95 return conn
96 }
97
98 func exponential(d time.Duration) time.Duration {
99 d *= 2
100 if d > time.Minute {
101 d = time.Minute
102 }
103 return d
104 }
0 package conn
1
2 import (
3 "errors"
4 "net"
5 "sync/atomic"
6 "testing"
7 "time"
8
9 "github.com/go-kit/kit/log"
10 )
11
12 func TestManager(t *testing.T) {
13 var (
14 tickc = make(chan time.Time)
15 after = func(time.Duration) <-chan time.Time { return tickc }
16 dialconn = &mockConn{}
17 dialerr = error(nil)
18 dialer = func(string, string) (net.Conn, error) { return dialconn, dialerr }
19 mgr = NewManager(dialer, "netw", "addr", after, log.NewNopLogger())
20 )
21
22 // First conn should be fine.
23 conn := mgr.Take()
24 if conn == nil {
25 t.Fatal("nil conn")
26 }
27
28 // Write and check it went thru.
29 if _, err := conn.Write([]byte{1, 2, 3}); err != nil {
30 t.Fatal(err)
31 }
32 if want, have := uint64(3), atomic.LoadUint64(&dialconn.wr); want != have {
33 t.Errorf("want %d, have %d", want, have)
34 }
35
36 // Put an error to kill the conn.
37 mgr.Put(errors.New("should kill the connection"))
38
39 // First takes should fail.
40 for i := 0; i < 10; i++ {
41 if conn = mgr.Take(); conn != nil {
42 t.Fatalf("want nil conn, got real conn")
43 }
44 }
45
46 // Trigger the reconnect.
47 tickc <- time.Now()
48
49 // The dial should eventually succeed and yield a good conn.
50 if !within(100*time.Millisecond, func() bool {
51 conn = mgr.Take()
52 return conn != nil
53 }) {
54 t.Fatal("conn remained nil")
55 }
56
57 // Write and check it went thru.
58 if _, err := conn.Write([]byte{4, 5}); err != nil {
59 t.Fatal(err)
60 }
61 if want, have := uint64(5), atomic.LoadUint64(&dialconn.wr); want != have {
62 t.Errorf("want %d, have %d", want, have)
63 }
64
65 // Dial starts failing.
66 dialconn, dialerr = nil, errors.New("oh noes")
67 mgr.Put(errors.New("trigger that reconnect y'all"))
68 if conn = mgr.Take(); conn != nil {
69 t.Fatalf("want nil conn, got real conn")
70 }
71
72 // As many reconnects as they want.
73 go func() {
74 done := time.After(100 * time.Millisecond)
75 for {
76 select {
77 case tickc <- time.Now():
78 case <-done:
79 return
80 }
81 }
82 }()
83
84 // The dial should never succeed.
85 if within(100*time.Millisecond, func() bool {
86 conn = mgr.Take()
87 return conn != nil
88 }) {
89 t.Fatal("eventually got a good conn, despite failing dialer")
90 }
91 }
92
93 type mockConn struct {
94 rd, wr uint64
95 }
96
97 func (c *mockConn) Read(b []byte) (n int, err error) {
98 atomic.AddUint64(&c.rd, uint64(len(b)))
99 return len(b), nil
100 }
101
102 func (c *mockConn) Write(b []byte) (n int, err error) {
103 atomic.AddUint64(&c.wr, uint64(len(b)))
104 return len(b), nil
105 }
106
107 func (c *mockConn) Close() error { return nil }
108 func (c *mockConn) LocalAddr() net.Addr { return nil }
109 func (c *mockConn) RemoteAddr() net.Addr { return nil }
110 func (c *mockConn) SetDeadline(t time.Time) error { return nil }
111 func (c *mockConn) SetReadDeadline(t time.Time) error { return nil }
112 func (c *mockConn) SetWriteDeadline(t time.Time) error { return nil }
113
114 func within(d time.Duration, f func() bool) bool {
115 deadline := time.Now().Add(d)
116 for {
117 if time.Now().After(deadline) {
118 return false
119 }
120 if f() {
121 return true
122 }
123 time.Sleep(d / 10)
124 }
125 }