Codebase list golang-github-go-kit-kit / b573c06
adding dogstatsd.Emitter and provider.dogstatsProvider JP Robinson 7 years ago
4 changed file(s) with 328 addition(s) and 1 deletion(s). Raw diff Collapse all Expand all
22 import (
33 "bytes"
44 "fmt"
5 "github.com/go-kit/kit/metrics"
5 "net"
66 "strings"
77 "sync"
88 "testing"
99 "time"
10
11 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/metrics"
13 "github.com/go-kit/kit/util/conn"
1014 )
15
16 func TestEmitterCounter(t *testing.T) {
17 e, buf := testEmitter()
18
19 c := e.NewCounter("test_statsd_counter")
20 c.Add(1)
21 c.Add(2)
22
23 // give time for things to emit
24 time.Sleep(time.Millisecond * 250)
25 // force a flush and stop
26 e.Stop()
27
28 want := "prefix.test_statsd_counter:1|c\nprefix.test_statsd_counter:2|c\n"
29 have := buf.String()
30 if want != have {
31 t.Errorf("want %q, have %q", want, have)
32 }
33 }
34
35 func TestEmitterGauge(t *testing.T) {
36 e, buf := testEmitter()
37
38 g := e.NewGauge("test_statsd_gauge")
39
40 delta := 1.0
41 g.Add(delta)
42
43 // give time for things to emit
44 time.Sleep(time.Millisecond * 250)
45 // force a flush and stop
46 e.Stop()
47
48 want := fmt.Sprintf("prefix.test_statsd_gauge:+%f|g\n", delta)
49 have := buf.String()
50 if want != have {
51 t.Errorf("want %q, have %q", want, have)
52 }
53 }
54
55 func TestEmitterHistogram(t *testing.T) {
56 e, buf := testEmitter()
57 h := e.NewHistogram("test_statsd_histogram")
58
59 h.Observe(123)
60
61 // give time for things to emit
62 time.Sleep(time.Millisecond * 250)
63 // force a flush and stop
64 e.Stop()
65
66 want := "prefix.test_statsd_histogram:123|ms\n"
67 have := buf.String()
68 if want != have {
69 t.Errorf("want %q, have %q", want, have)
70 }
71 }
1172
1273 func TestCounter(t *testing.T) {
1374 buf := &syncbuf{buf: &bytes.Buffer{}}
147208 defer s.mtx.Unlock()
148209 s.buf.Reset()
149210 }
211
212 func testEmitter() (*Emitter, *syncbuf) {
213 buf := &syncbuf{buf: &bytes.Buffer{}}
214 e := &Emitter{
215 prefix: "prefix.",
216 mgr: conn.NewManager(mockDialer(buf), "", "", time.After, log.NewNopLogger()),
217 logger: log.NewNopLogger(),
218 keyVals: make(chan keyVal),
219 quitc: make(chan chan struct{}),
220 }
221 go e.loop(time.Millisecond * 20)
222 return e, buf
223 }
224
225 func mockDialer(buf *syncbuf) conn.Dialer {
226 return func(net, addr string) (net.Conn, error) {
227 return &mockConn{buf}, nil
228 }
229 }
230
231 type mockConn struct {
232 buf *syncbuf
233 }
234
235 func (c *mockConn) Read(b []byte) (n int, err error) {
236 panic("not implemented")
237 }
238
239 func (c *mockConn) Write(b []byte) (n int, err error) {
240 return c.buf.Write(b)
241 }
242
243 func (c *mockConn) Close() error {
244 panic("not implemented")
245 }
246
247 func (c *mockConn) LocalAddr() net.Addr {
248 panic("not implemented")
249 }
250
251 func (c *mockConn) RemoteAddr() net.Addr {
252 panic("not implemented")
253 }
254
255 func (c *mockConn) SetDeadline(t time.Time) error {
256 panic("not implemented")
257 }
258
259 func (c *mockConn) SetReadDeadline(t time.Time) error {
260 panic("not implemented")
261 }
262
263 func (c *mockConn) SetWriteDeadline(t time.Time) error {
264 panic("not implemented")
265 }
0 package dogstatsd
1
2 import (
3 "bytes"
4 "fmt"
5 "net"
6 "time"
7
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/metrics"
10 "github.com/go-kit/kit/util/conn"
11 )
12
13 // Emitter is a struct to manage connections and orchestrate the emission of
14 // metrics to a Statsd process.
15 type Emitter struct {
16 prefix string
17 keyVals chan keyVal
18 mgr *conn.Manager
19 logger log.Logger
20 quitc chan chan struct{}
21 }
22
23 type keyVal struct {
24 key string
25 val string
26 }
27
28 func stringToKeyVal(key string, keyVals chan keyVal) chan string {
29 vals := make(chan string)
30 go func() {
31 for val := range vals {
32 keyVals <- keyVal{key: key, val: val}
33 }
34 }()
35 return vals
36 }
37
38 // NewEmitter will return an Emitter that will prefix all metrics names with the
39 // given prefix. Once started, it will attempt to create a connection with the
40 // given network and address via `net.Dial` and periodically post metrics to the
41 // connection in the DogStatsD protocol.
42 func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
43 return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
44 }
45
46 // NewEmitterDial is the same as NewEmitter, but allows you to specify your own
47 // Dialer function. This is primarily useful for tests.
48 func NewEmitterDial(dialer conn.Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
49 e := &Emitter{
50 prefix: metricsPrefix,
51 mgr: conn.NewManager(dialer, network, address, time.After, logger),
52 logger: logger,
53 keyVals: make(chan keyVal),
54 quitc: make(chan chan struct{}),
55 }
56 go e.loop(flushInterval)
57 return e
58 }
59
60 // NewCounter returns a Counter that emits observations in the DogStatsD protocol
61 // via the Emitter's connection manager. Observations are buffered for the
62 // report interval or until the buffer exceeds a max packet size, whichever
63 // comes first. Fields are ignored.
64 func (e *Emitter) NewCounter(key string) metrics.Counter {
65 key = e.prefix + key
66 return &counter{
67 key: key,
68 c: stringToKeyVal(key, e.keyVals),
69 }
70 }
71
72 // NewHistogram returns a Histogram that emits observations in the DogStatsD
73 // protocol via the Emitter's conection manager. Observations are buffered for
74 // the reporting interval or until the buffer exceeds a max packet size,
75 // whichever comes first. Fields are ignored.
76 //
77 // NewHistogram is mapped to a statsd Timing, so observations should represent
78 // milliseconds. If you observe in units of nanoseconds, you can make the
79 // translation with a ScaledHistogram:
80 //
81 // NewScaledHistogram(histogram, time.Millisecond)
82 //
83 // You can also enforce the constraint in a typesafe way with a millisecond
84 // TimeHistogram:
85 //
86 // NewTimeHistogram(histogram, time.Millisecond)
87 //
88 // TODO: support for sampling.
89 func (e *Emitter) NewHistogram(key string) metrics.Histogram {
90 key = e.prefix + key
91 return &histogram{
92 key: key,
93 h: stringToKeyVal(key, e.keyVals),
94 }
95 }
96
97 // NewGauge returns a Gauge that emits values in the DogStatsD protocol via the
98 // the Emitter's connection manager. Values are buffered for the report
99 // interval or until the buffer exceeds a max packet size, whichever comes
100 // first. Fields are ignored.
101 //
102 // TODO: support for sampling
103 func (e *Emitter) NewGauge(key string) metrics.Gauge {
104 key = e.prefix + key
105 return &gauge{
106 key: key,
107 g: stringToKeyVal(key, e.keyVals),
108 }
109 }
110
111 func (e *Emitter) loop(d time.Duration) {
112 ticker := time.NewTicker(d)
113 defer ticker.Stop()
114 buf := &bytes.Buffer{}
115 for {
116 select {
117 case kv := <-e.keyVals:
118 fmt.Fprintf(buf, "%s:%s\n", kv.key, kv.val)
119 if buf.Len() > maxBufferSize {
120 e.Flush(buf)
121 }
122
123 case <-ticker.C:
124 e.Flush(buf)
125
126 case q := <-e.quitc:
127 e.Flush(buf)
128 close(q)
129 return
130 }
131 }
132 }
133
134 // Stop will flush the current metrics and close the active connection. Calling
135 // stop more than once is a programmer error.
136 func (e *Emitter) Stop() {
137 q := make(chan struct{})
138 e.quitc <- q
139 <-q
140 }
141
142 // Flush will write the given buffer to a connection provided by the Emitter's
143 // connection manager.
144 func (e *Emitter) Flush(buf *bytes.Buffer) {
145 conn := e.mgr.Take()
146 if conn == nil {
147 e.logger.Log("during", "flush", "err", "connection unavailable")
148 return
149 }
150
151 _, err := conn.Write(buf.Bytes())
152 if err != nil {
153 e.logger.Log("during", "flush", "err", err)
154 }
155 buf.Reset()
156
157 e.mgr.Put(err)
158 }
77
88 "github.com/go-kit/kit/log"
99 "github.com/go-kit/kit/metrics"
10 "github.com/go-kit/kit/metrics/dogstatsd"
1011 kitexp "github.com/go-kit/kit/metrics/expvar"
1112 "github.com/go-kit/kit/metrics/graphite"
1213 kitprom "github.com/go-kit/kit/metrics/prometheus"
109110 p.e.Stop()
110111 }
111112
113 // NewDogStatsdProvider will return a Provider implementation that is a simple
114 // wrapper around a dogstatsd.Emitter. All metric names will be prefixed with
115 // the given value and data will be emitted once every interval or when the
116 // buffer has reached its max size. If no network value is given, it will
117 // default to "udp".
118 func NewDogStatsdProvider(network, address, prefix string, interval time.Duration, logger log.Logger) (Provider, error) {
119 if network == "" {
120 network = "udp"
121 }
122 if address == "" {
123 return nil, errors.New("address is required")
124 }
125 return dogstatsdProvider{
126 e: dogstatsd.NewEmitter(network, address, prefix, interval, logger),
127 }, nil
128 }
129
130 type dogstatsdProvider struct {
131 e *dogstatsd.Emitter
132 }
133
134 var _ Provider = dogstatsdProvider{}
135
136 // NewCounter implements Provider. Help is ignored.
137 func (p dogstatsdProvider) NewCounter(name, _ string) metrics.Counter {
138 return p.e.NewCounter(name)
139 }
140
141 // NewHistogram implements Provider. Help is ignored.
142 func (p dogstatsdProvider) NewHistogram(name, _ string, min, max int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
143 return p.e.NewHistogram(name), nil
144 }
145
146 // NewGauge implements Provider. Help is ignored.
147 func (p dogstatsdProvider) NewGauge(name, _ string) metrics.Gauge {
148 return p.e.NewGauge(name)
149 }
150
151 // Stop will call the underlying statsd.Emitter's Stop method.
152 func (p dogstatsdProvider) Stop() {
153 p.e.Stop()
154 }
155
112156 // NewExpvarProvider is a very thin wrapper over the expvar package.
113157 // If a prefix is provided, it will prefix all metric names.
114158 func NewExpvarProvider(prefix string) Provider {
2222 testProvider(t, "Statsd", p)
2323 }
2424
25 func TestDogStatsd(t *testing.T) {
26 p, err := NewDogStatsdProvider("network", "address", "prefix", time.Second, log.NewNopLogger())
27 if err != nil {
28 t.Fatal(err)
29 }
30 testProvider(t, "DogStatsd", p)
31 }
32
2533 func TestExpvar(t *testing.T) {
2634 testProvider(t, "Expvar", NewExpvarProvider("prefix"))
2735 }