Codebase list golang-github-go-kit-kit / 8dbce0d
created metrics/statsd.Emitter, util/conn.Manager and udpated providers JP Robinson 7 years ago
10 changed file(s) with 630 addition(s) and 459 deletion(s). Raw diff Collapse all Expand all
0 package graphite
1
2 import (
3 "bufio"
4 "fmt"
5 "io"
6 "net"
7 "sync"
8 "time"
9
10 "github.com/go-kit/kit/log"
11 "github.com/go-kit/kit/metrics"
12 "github.com/go-kit/kit/util/conn"
13 )
14
15 // Emitter is a struct to manage connections and orchestrate the emission of
16 // metrics to a Graphite system.
17 type Emitter struct {
18 mtx sync.Mutex
19 prefix string
20 mgr *conn.Manager
21 counters []*counter
22 histograms []*windowedHistogram
23 gauges []*gauge
24 logger log.Logger
25 quitc chan chan struct{}
26 }
27
28 // NewEmitter will return an Emitter that will prefix all metrics names with the
29 // given prefix. Once started, it will attempt to create a connection with the
30 // given network and address via `net.Dial` and periodically post metrics to the
31 // connection in the Graphite plaintext protocol.
32 func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
33 return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
34 }
35
36 // NewEmitterDial is the same as NewEmitter, but allows you to specify your own
37 // Dialer function. This is primarily useful for tests.
38 func NewEmitterDial(dialer conn.Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
39 e := &Emitter{
40 prefix: metricsPrefix,
41 mgr: conn.NewManager(dialer, network, address, time.After, logger),
42 logger: logger,
43 quitc: make(chan chan struct{}),
44 }
45 go e.loop(flushInterval)
46 return e
47 }
48
49 // NewCounter returns a Counter whose value will be periodically emitted in
50 // a Graphite-compatible format once the Emitter is started. Fields are ignored.
51 func (e *Emitter) NewCounter(name string) metrics.Counter {
52 e.mtx.Lock()
53 defer e.mtx.Unlock()
54 c := newCounter(name)
55 e.counters = append(e.counters, c)
56 return c
57 }
58
59 // NewHistogram is taken from http://github.com/codahale/metrics. It returns a
60 // windowed HDR histogram which drops data older than five minutes.
61 //
62 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
63 // should be integers in the range 1..99. The gauge names are assigned by using
64 // the passed name as a prefix and appending "_pNN" e.g. "_p50".
65 //
66 // The values of this histogram will be periodically emitted in a
67 // Graphite-compatible format once the Emitter is started. Fields are ignored.
68 func (e *Emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
69 gauges := map[int]metrics.Gauge{}
70 for _, quantile := range quantiles {
71 if quantile <= 0 || quantile >= 100 {
72 return nil, fmt.Errorf("invalid quantile %d", quantile)
73 }
74 gauges[quantile] = e.gauge(fmt.Sprintf("%s_p%02d", name, quantile))
75 }
76 h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges, e.logger)
77
78 e.mtx.Lock()
79 defer e.mtx.Unlock()
80 e.histograms = append(e.histograms, h)
81 return h, nil
82 }
83
84 // NewGauge returns a Gauge whose value will be periodically emitted in a
85 // Graphite-compatible format once the Emitter is started. Fields are ignored.
86 func (e *Emitter) NewGauge(name string) metrics.Gauge {
87 e.mtx.Lock()
88 defer e.mtx.Unlock()
89 return e.gauge(name)
90 }
91
92 func (e *Emitter) gauge(name string) metrics.Gauge {
93 g := &gauge{name, 0}
94 e.gauges = append(e.gauges, g)
95 return g
96 }
97
98 func (e *Emitter) loop(d time.Duration) {
99 ticker := time.NewTicker(d)
100 defer ticker.Stop()
101
102 for {
103 select {
104 case <-ticker.C:
105 e.Flush()
106
107 case q := <-e.quitc:
108 e.Flush()
109 close(q)
110 return
111 }
112 }
113 }
114
115 // Stop will flush the current metrics and close the active connection. Calling
116 // stop more than once is a programmer error.
117 func (e *Emitter) Stop() {
118 q := make(chan struct{})
119 e.quitc <- q
120 <-q
121 }
122
123 // Flush will write the current metrics to the Emitter's connection in the
124 // Graphite plaintext protocol.
125 func (e *Emitter) Flush() {
126 e.mtx.Lock() // one flush at a time
127 defer e.mtx.Unlock()
128
129 conn := e.mgr.Take()
130 if conn == nil {
131 e.logger.Log("during", "flush", "err", "connection unavailable")
132 return
133 }
134
135 err := e.flush(conn)
136 if err != nil {
137 e.logger.Log("during", "flush", "err", err)
138 }
139 e.mgr.Put(err)
140 }
141
142 func (e *Emitter) flush(conn io.Writer) error {
143 w := bufio.NewWriter(conn)
144
145 for _, c := range e.counters {
146 c.flush(w, e.prefix)
147 }
148
149 for _, h := range e.histograms {
150 h.flush(w, e.prefix)
151 }
152
153 for _, g := range e.gauges {
154 g.flush(w, e.prefix)
155 }
156
157 return w.Flush()
158 }
88 package graphite
99
1010 import (
11 "bufio"
1211 "fmt"
1312 "io"
1413 "math"
15 "net"
1614 "sort"
1715 "sync"
1816 "sync/atomic"
2321 "github.com/go-kit/kit/metrics"
2422 )
2523
26 // Emitter will keep track of all metrics and, once started, will emit the
27 // metrics via the Flush method to the given address.
28 type Emitter struct {
29 mtx sync.Mutex
30 prefix string
31 mgr *manager
32 counters []*counter
33 histograms []*windowedHistogram
34 gauges []*gauge
35 logger log.Logger
36 quitc chan chan struct{}
24 // Newcounter will return a metrics.counter with the given name and a base
25 // value of 0.
26 func newCounter(name string) *counter {
27 return &counter{name, 0}
3728 }
3829
39 // NewEmitter will return an Emitter that will prefix all metrics names with the
40 // given prefix. Once started, it will attempt to create a connection with the
41 // given network and address via `net.Dial` and periodically post metrics to the
42 // connection in the Graphite plaintext protocol.
43 func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
44 return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
30 // Newgauge will return a metrics.gauge with the given name and a starting
31 // value of 0.
32 func newGauge(name string) *gauge {
33 return &gauge{name, 0}
4534 }
4635
47 // NewEmitterDial is the same as NewEmitter, but allows you to specify your own
48 // Dialer function. This is primarily useful for tests.
49 func NewEmitterDial(dialer Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
50 e := &Emitter{
51 prefix: metricsPrefix,
52 mgr: newManager(dialer, network, address, time.After, logger),
53 logger: logger,
54 quitc: make(chan chan struct{}),
55 }
56 go e.loop(flushInterval)
57 return e
58 }
59
60 // NewCounter returns a Counter whose value will be periodically emitted in
61 // a Graphite-compatible format once the Emitter is started. Fields are ignored.
62 func (e *Emitter) NewCounter(name string) metrics.Counter {
63 e.mtx.Lock()
64 defer e.mtx.Unlock()
65 c := &counter{name, 0}
66 e.counters = append(e.counters, c)
67 return c
68 }
69
70 // NewHistogram is taken from http://github.com/codahale/metrics. It returns a
71 // windowed HDR histogram which drops data older than five minutes.
72 //
73 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
74 // should be integers in the range 1..99. The gauge names are assigned by using
75 // the passed name as a prefix and appending "_pNN" e.g. "_p50".
76 //
77 // The values of this histogram will be periodically emitted in a
78 // Graphite-compatible format once the Emitter is started. Fields are ignored.
79 func (e *Emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
80 gauges := map[int]metrics.Gauge{}
81 for _, quantile := range quantiles {
82 if quantile <= 0 || quantile >= 100 {
83 return nil, fmt.Errorf("invalid quantile %d", quantile)
84 }
85 gauges[quantile] = e.gauge(fmt.Sprintf("%s_p%02d", name, quantile))
86 }
87 h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges, e.logger)
88
89 e.mtx.Lock()
90 defer e.mtx.Unlock()
91 e.histograms = append(e.histograms, h)
92 return h, nil
93 }
94
95 // NewTimeHistogram returns a TimeHistogram wrapper around the windowed HDR
96 // histrogram provided by this package.
97 func (e *Emitter) NewTimeHistogram(name string, unit time.Duration, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.TimeHistogram, error) {
98 h, err := e.NewHistogram(name, minValue, maxValue, sigfigs, quantiles...)
99 if err != nil {
100 return nil, err
101 }
102 return metrics.NewTimeHistogram(unit, h), nil
103 }
104
105 // NewGauge returns a Gauge whose value will be periodically emitted in a
106 // Graphite-compatible format once the Emitter is started. Fields are ignored.
107 func (e *Emitter) NewGauge(name string) metrics.Gauge {
108 e.mtx.Lock()
109 defer e.mtx.Unlock()
110 return e.gauge(name)
111 }
112
113 func (e *Emitter) gauge(name string) metrics.Gauge {
114 g := &gauge{name, 0}
115 e.gauges = append(e.gauges, g)
116 return g
117 }
118
119 func (e *Emitter) loop(d time.Duration) {
120 ticker := time.NewTicker(d)
121 defer ticker.Stop()
122
123 for {
124 select {
125 case <-ticker.C:
126 e.Flush()
127
128 case q := <-e.quitc:
129 e.Flush()
130 close(q)
131 return
132 }
133 }
134 }
135
136 // Stop will flush the current metrics and close the active connection. Calling
137 // stop more than once is a programmer error.
138 func (e *Emitter) Stop() {
139 q := make(chan struct{})
140 e.quitc <- q
141 <-q
142 }
143
144 // Flush will write the current metrics to the Emitter's connection in the
145 // Graphite plaintext protocol.
146 func (e *Emitter) Flush() {
147 e.mtx.Lock() // one flush at a time
148 defer e.mtx.Unlock()
149
150 conn := e.mgr.take()
151 if conn == nil {
152 e.logger.Log("during", "flush", "err", "connection unavailable")
153 return
154 }
155
156 err := e.flush(conn)
157 if err != nil {
158 e.logger.Log("during", "flush", "err", err)
159 }
160 e.mgr.put(err)
161 }
162
163 func (e *Emitter) flush(conn io.Writer) error {
164 w := bufio.NewWriter(conn)
165
166 for _, c := range e.counters {
167 fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, c.Name(), c.count, time.Now().Unix())
168 }
169
170 for _, h := range e.histograms {
171 hist := h.hist.Merge()
172 now := time.Now().Unix()
173 fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, h.Name(), hist.TotalCount(), now)
174 fmt.Fprintf(w, "%s.%s.min %d %d\n", e.prefix, h.Name(), hist.Min(), now)
175 fmt.Fprintf(w, "%s.%s.max %d %d\n", e.prefix, h.Name(), hist.Max(), now)
176 fmt.Fprintf(w, "%s.%s.mean %.2f %d\n", e.prefix, h.Name(), hist.Mean(), now)
177 fmt.Fprintf(w, "%s.%s.std-dev %.2f %d\n", e.prefix, h.Name(), hist.StdDev(), now)
178 }
179
180 for _, g := range e.gauges {
181 fmt.Fprintf(w, "%s.%s %.2f %d\n", e.prefix, g.Name(), g.Get(), time.Now().Unix())
182 }
183
184 return w.Flush()
185 }
186
36 // counter implements the metrics.counter interface but also provides a
37 // Flush method to emit the current counter values in the Graphite plaintext
38 // protocol.
18739 type counter struct {
18840 key string
18941 count uint64
19143
19244 func (c *counter) Name() string { return c.key }
19345
46 // With currently ignores fields.
19447 func (c *counter) With(metrics.Field) metrics.Counter { return c }
19548
19649 func (c *counter) Add(delta uint64) { atomic.AddUint64(&c.count, delta) }
19750
51 func (c *counter) get() uint64 { return atomic.LoadUint64(&c.count) }
52
53 // flush will emit the current counter value in the Graphite plaintext
54 // protocol to the given io.Writer.
55 func (c *counter) flush(conn io.Writer, prefix string) {
56 fmt.Fprintf(conn, "%s.count %d %d\n", prefix+c.Name(), c.get(), time.Now().Unix())
57 }
58
59 // gauge implements the metrics.gauge interface but also provides a
60 // Flush method to emit the current counter values in the Graphite plaintext
61 // protocol.
19862 type gauge struct {
19963 key string
20064 value uint64 // math.Float64bits
20266
20367 func (g *gauge) Name() string { return g.key }
20468
69 // With currently ignores fields.
20570 func (g *gauge) With(metrics.Field) metrics.Gauge { return g }
20671
20772 func (g *gauge) Add(delta float64) {
22287 return math.Float64frombits(atomic.LoadUint64(&g.value))
22388 }
22489
90 // Flush will emit the current gauge value in the Graphite plaintext
91 // protocol to the given io.Writer.
92 func (g *gauge) flush(conn io.Writer, prefix string) {
93 fmt.Fprintf(conn, "%s %.2f %d\n", prefix+g.Name(), g.Get(), time.Now().Unix())
94 }
95
96 // windowedHistogram is taken from http://github.com/codahale/metrics. It
97 // is a windowed HDR histogram which drops data older than five minutes.
98 //
99 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
100 // should be integers in the range 1..99. The gauge names are assigned by using
101 // the passed name as a prefix and appending "_pNN" e.g. "_p50".
102 //
103 // The values of this histogram will be periodically emitted in a
104 // Graphite-compatible format once the GraphiteProvider is started. Fields are ignored.
225105 type windowedHistogram struct {
226106 mtx sync.Mutex
227107 hist *hdrhistogram.WindowedHistogram
231111 logger log.Logger
232112 }
233113
234 // newWindowedHistogram is taken from http://github.com/codahale/metrics. It
235 // returns a windowed HDR histogram which drops data older than five minutes.
236 //
237 // The histogram exposes metrics for each passed quantile as gauges. Users are
238 // expected to provide their own set of Gauges for quantiles to make this
239 // Histogram work across multiple metrics providers.
240114 func newWindowedHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles map[int]metrics.Gauge, logger log.Logger) *windowedHistogram {
241115 h := &windowedHistogram{
242116 hist: hdrhistogram.NewWindowed(5, minValue, maxValue, sigfigs),
288162 return buckets, quantiles
289163 }
290164
165 func (h *windowedHistogram) flush(conn io.Writer, prefix string) {
166 name := prefix + h.Name()
167 hist := h.hist.Merge()
168 now := time.Now().Unix()
169 fmt.Fprintf(conn, "%s.count %d %d\n", name, hist.TotalCount(), now)
170 fmt.Fprintf(conn, "%s.min %d %d\n", name, hist.Min(), now)
171 fmt.Fprintf(conn, "%s.max %d %d\n", name, hist.Max(), now)
172 fmt.Fprintf(conn, "%s.mean %.2f %d\n", name, hist.Mean(), now)
173 fmt.Fprintf(conn, "%s.std-dev %.2f %d\n", name, hist.StdDev(), now)
174 }
175
291176 func (h *windowedHistogram) rotateLoop(d time.Duration) {
292177 for range time.Tick(d) {
293178 h.mtx.Lock()
1212 )
1313
1414 func TestHistogramQuantiles(t *testing.T) {
15 prefix := "prefix"
15 prefix := "prefix."
1616 e := NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
1717 var (
1818 name = "test_histogram_quantiles"
3434
3535 func TestCounter(t *testing.T) {
3636 var (
37 prefix = "prefix"
37 prefix = "prefix."
3838 name = "m"
3939 value = 123
4040 e = NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
4242 )
4343 e.NewCounter(name).With(metrics.Field{Key: "ignored", Value: "field"}).Add(uint64(value))
4444 e.flush(&b)
45 want := fmt.Sprintf("%s.%s.count %d", prefix, name, value)
45 want := fmt.Sprintf("%s%s.count %d", prefix, name, value)
4646 payload := b.String()
4747 if !strings.HasPrefix(payload, want) {
4848 t.Errorf("counter %s want\n%s, have\n%s", name, want, payload)
5151
5252 func TestGauge(t *testing.T) {
5353 var (
54 prefix = "prefix"
54 prefix = "prefix."
5555 name = "xyz"
5656 value = 54321
5757 delta = 12345
6666 e.flush(&b)
6767 payload := b.String()
6868
69 want := fmt.Sprintf("%s.%s %d", prefix, name, value+delta)
69 want := fmt.Sprintf("%s%s %d", prefix, name, value+delta)
7070 if !strings.HasPrefix(payload, want) {
7171 t.Errorf("gauge %s want\n%s, have\n%s", name, want, payload)
7272 }
+0
-105
metrics/graphite/manager.go less more
0 package graphite
1
2 import (
3 "net"
4 "time"
5
6 "github.com/go-kit/kit/log"
7 )
8
9 // Dialer dials a network and address. net.Dial is a good default Dialer.
10 type Dialer func(network, address string) (net.Conn, error)
11
12 // time.After is a good default afterFunc.
13 type afterFunc func(time.Duration) <-chan time.Time
14
15 // manager manages a net.Conn. Clients should take the conn when they want to
16 // use it, and put back whatever error they receive from an e.g. Write. When a
17 // non-nil error is put, the conn is invalidated and a new conn is established.
18 // Connection failures are retried after an exponential backoff.
19 type manager struct {
20 dial Dialer
21 network string
22 address string
23 after afterFunc
24 logger log.Logger
25
26 takec chan net.Conn
27 putc chan error
28 }
29
30 func newManager(d Dialer, network, address string, after afterFunc, logger log.Logger) *manager {
31 m := &manager{
32 dial: d,
33 network: network,
34 address: address,
35 after: after,
36 logger: logger,
37
38 takec: make(chan net.Conn),
39 putc: make(chan error),
40 }
41 go m.loop()
42 return m
43 }
44
45 func (m *manager) take() net.Conn {
46 return <-m.takec
47 }
48
49 func (m *manager) put(err error) {
50 m.putc <- err
51 }
52
53 func (m *manager) loop() {
54 var (
55 conn = dial(m.dial, m.network, m.address, m.logger) // may block slightly
56 connc = make(chan net.Conn)
57 reconnectc <-chan time.Time // initially nil
58 backoff = time.Second
59 )
60
61 for {
62 select {
63 case <-reconnectc:
64 reconnectc = nil
65 go func() { connc <- dial(m.dial, m.network, m.address, m.logger) }()
66
67 case conn = <-connc:
68 if conn == nil {
69 backoff = exponential(backoff)
70 reconnectc = m.after(backoff)
71 } else {
72 backoff = time.Second
73 reconnectc = nil
74 }
75
76 case m.takec <- conn:
77 // might be nil
78
79 case err := <-m.putc:
80 if err != nil && conn != nil {
81 m.logger.Log("err", err)
82 conn = nil // connection is bad
83 reconnectc = m.after(time.Nanosecond) // trigger immediately
84 }
85 }
86 }
87 }
88
89 func dial(d Dialer, network, address string, logger log.Logger) net.Conn {
90 conn, err := d(network, address)
91 if err != nil {
92 logger.Log("err", err)
93 conn = nil
94 }
95 return conn
96 }
97
98 func exponential(d time.Duration) time.Duration {
99 d *= 2
100 if d > time.Minute {
101 d = time.Minute
102 }
103 return d
104 }
+0
-126
metrics/graphite/manager_test.go less more
0 package graphite
1
2 import (
3 "errors"
4 "net"
5 "sync/atomic"
6 "testing"
7 "time"
8
9 "github.com/go-kit/kit/log"
10 )
11
12 func TestManager(t *testing.T) {
13 var (
14 tickc = make(chan time.Time)
15 after = func(time.Duration) <-chan time.Time { return tickc }
16 dialconn = &mockConn{}
17 dialerr = error(nil)
18 dialer = func(string, string) (net.Conn, error) { return dialconn, dialerr }
19 mgr = newManager(dialer, "netw", "addr", after, log.NewNopLogger())
20 )
21
22 // First conn should be fine.
23 conn := mgr.take()
24 if conn == nil {
25 t.Fatal("nil conn")
26 }
27
28 // Write and check it went thru.
29 if _, err := conn.Write([]byte{1, 2, 3}); err != nil {
30 t.Fatal(err)
31 }
32 if want, have := uint64(3), atomic.LoadUint64(&dialconn.wr); want != have {
33 t.Errorf("want %d, have %d", want, have)
34 }
35
36 // Put an error to kill the conn.
37 mgr.put(errors.New("should kill the connection"))
38
39 // First takes should fail.
40 for i := 0; i < 10; i++ {
41 if conn = mgr.take(); conn != nil {
42 t.Fatalf("want nil conn, got real conn")
43 }
44 }
45
46 // Trigger the reconnect.
47 tickc <- time.Now()
48
49 // The dial should eventually succeed and yield a good conn.
50 if !within(100*time.Millisecond, func() bool {
51 conn = mgr.take()
52 return conn != nil
53 }) {
54 t.Fatal("conn remained nil")
55 }
56
57 // Write and check it went thru.
58 if _, err := conn.Write([]byte{4, 5}); err != nil {
59 t.Fatal(err)
60 }
61 if want, have := uint64(5), atomic.LoadUint64(&dialconn.wr); want != have {
62 t.Errorf("want %d, have %d", want, have)
63 }
64
65 // Dial starts failing.
66 dialconn, dialerr = nil, errors.New("oh noes")
67 mgr.put(errors.New("trigger that reconnect y'all"))
68 if conn = mgr.take(); conn != nil {
69 t.Fatalf("want nil conn, got real conn")
70 }
71
72 // As many reconnects as they want.
73 go func() {
74 done := time.After(100 * time.Millisecond)
75 for {
76 select {
77 case tickc <- time.Now():
78 case <-done:
79 return
80 }
81 }
82 }()
83
84 // The dial should never succeed.
85 if within(100*time.Millisecond, func() bool {
86 conn = mgr.take()
87 return conn != nil
88 }) {
89 t.Fatal("eventually got a good conn, despite failing dialer")
90 }
91 }
92
93 type mockConn struct {
94 rd, wr uint64
95 }
96
97 func (c *mockConn) Read(b []byte) (n int, err error) {
98 atomic.AddUint64(&c.rd, uint64(len(b)))
99 return len(b), nil
100 }
101
102 func (c *mockConn) Write(b []byte) (n int, err error) {
103 atomic.AddUint64(&c.wr, uint64(len(b)))
104 return len(b), nil
105 }
106
107 func (c *mockConn) Close() error { return nil }
108 func (c *mockConn) LocalAddr() net.Addr { return nil }
109 func (c *mockConn) RemoteAddr() net.Addr { return nil }
110 func (c *mockConn) SetDeadline(t time.Time) error { return nil }
111 func (c *mockConn) SetReadDeadline(t time.Time) error { return nil }
112 func (c *mockConn) SetWriteDeadline(t time.Time) error { return nil }
113
114 func within(d time.Duration, f func() bool) bool {
115 deadline := time.Now().Add(d)
116 for {
117 if time.Now().After(deadline) {
118 return false
119 }
120 if f() {
121 return true
122 }
123 time.Sleep(d / 10)
124 }
125 }
11
22 import (
33 "errors"
4 "net"
54 "time"
65
76 "github.com/go-kit/kit/log"
2827 // NewGraphiteProvider will return a Provider implementation that is a simple
2928 // wrapper around a graphite.Emitter. All metrics names will get prefixed
3029 // with the given value and data will be emitted once every interval.
31 func NewGraphiteProvider(addr, net, prefix string, interval time.Duration, logger log.Logger) (Provider, error) {
30 // If no network value is given, it will get defaulted to "udp".
31 func NewGraphiteProvider(addr, network, prefix string, interval time.Duration, logger log.Logger) (Provider, error) {
3232 if addr == "" {
3333 return nil, errors.New("graphite server address is required")
3434 }
35 if net == "" {
36 net = "udp"
35 if network == "" {
36 network = "udp"
3737 }
38 // nop logger for now :\
39 e := graphite.NewEmitter(addr, net, prefix, interval, logger)
38 e := graphite.NewEmitter(addr, network, prefix, interval, logger)
4039 return &graphiteProvider{Emitter: e}, nil
4140 }
4241
4443 *graphite.Emitter
4544 }
4645
47 // NewStatsdProvider will create a UDP connection for each metric
48 // with the given address. All metrics will use the given interval
49 // and, if a prefix is provided, it will be included in metric names
50 // with this format:
51 // "prefix.name"
52 func NewStatsdProvider(addr, prefix string, interval time.Duration, logger log.Logger) (Provider, error) {
53 return &statsdProvider{addr: addr, prefix: prefix, interval: interval}, nil
46 // NewStatsdProvider will return a Provider implementation that is a simple
47 // wrapper around a statsd.Emitter. All metrics names will get prefixed
48 // with the given value and data will be emitted once every interval
49 // or when the buffer has reached its max size.
50 // If no network value is given, it will get defaulted to "udp".
51 func NewStatsdProvider(addr, network, prefix string, interval time.Duration, logger log.Logger) (Provider, error) {
52 if addr == "" {
53 return nil, errors.New("statsd server address is required")
54 }
55 if network == "" {
56 network = "udp"
57 }
58 e := statsd.NewEmitter(addr, network, prefix, interval, logger)
59 return &statsdProvider{e: e}, nil
5460 }
5561
5662 type statsdProvider struct {
57 addr string
58
59 interval time.Duration
60 prefix string
61
62 logger log.Logger
63 }
64
65 func (s *statsdProvider) conn() (net.Conn, error) {
66 return net.Dial("udp", s.addr)
67 }
68
69 func (s *statsdProvider) pref(name string) string {
70 if len(s.prefix) > 0 {
71 return s.prefix + "." + name
72 }
73 return name
63 e *statsd.Emitter
7464 }
7565
7666 func (s *statsdProvider) NewCounter(name string) metrics.Counter {
77 conn, err := s.conn()
78 if err != nil {
79 s.logger.Log("during", "new counter", "err", err)
80 return nil
81 }
82 return statsd.NewCounter(conn, s.pref(name), s.interval)
67 return s.e.NewCounter(name)
8368 }
8469
8570 func (s *statsdProvider) NewHistogram(name string, min, max int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
86 conn, err := s.conn()
87 if err != nil {
88 return nil, err
89 }
90 return statsd.NewHistogram(conn, s.pref(name), s.interval), nil
71 return s.e.NewHistogram(name), nil
9172 }
9273
9374 func (s *statsdProvider) NewGauge(name string) metrics.Gauge {
94 conn, err := s.conn()
95 if err != nil {
96 s.logger.Log("during", "new gauge", "err", err)
97 return nil
98 }
99 return statsd.NewGauge(conn, s.pref(name), s.interval)
75 return s.e.NewGauge(name)
10076 }
10177
102 // Stop is a no-op (should we try to close the UDP connections here?)
103 func (s *statsdProvider) Stop() {}
78 // Stop will call the underlying statsd.Emitter's Stop method.
79 func (s *statsdProvider) Stop() {
80 s.e.Stop()
81 }
10482
10583 // NewExpvarProvider is a very thin wrapper over the expvar package.
106 // If a prefix is provided, it will be included in metric names with this
107 // format:
108 // "prefix.name"
84 // If a prefix is provided, it will prefix in metric names.
10985 func NewExpvarProvider(prefix string) Provider {
11086 return &expvarProvider{prefix: prefix}
11187 }
11591 }
11692
11793 func (e *expvarProvider) pref(name string) string {
118 if len(e.prefix) > 0 {
119 return e.prefix + "." + name
120 }
121 return name
94 return e.prefix + name
12295 }
12396
12497 func (e *expvarProvider) NewCounter(name string) metrics.Counter {
0 package statsd
1
2 import (
3 "bytes"
4 "fmt"
5 "net"
6 "time"
7
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/metrics"
10 "github.com/go-kit/kit/util/conn"
11 )
12
13 // Emitter is a struct to manage connections and orchestrate the emission of
14 // metrics to a Statsd process.
15 type Emitter struct {
16 prefix string
17 keyVals chan keyVal
18 mgr *conn.Manager
19 logger log.Logger
20 quitc chan chan struct{}
21 }
22
23 type keyVal struct {
24 key string
25 val string
26 }
27
28 func stringToKeyVal(key string, keyVals chan keyVal) chan string {
29 vals := make(chan string)
30 go func() {
31 for val := range vals {
32 keyVals <- keyVal{key: key, val: val}
33 }
34 }()
35 return vals
36 }
37
38 // NewEmitter will return an Emitter that will prefix all metrics names with the
39 // given prefix. Once started, it will attempt to create a connection with the
40 // given network and address via `net.Dial` and periodically post metrics to the
41 // connection in the statsd protocol.
42 func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
43 return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
44 }
45
46 // NewEmitterDial is the same as NewEmitter, but allows you to specify your own
47 // Dialer function. This is primarily useful for tests.
48 func NewEmitterDial(dialer conn.Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
49 e := &Emitter{
50 prefix: metricsPrefix,
51 mgr: conn.NewManager(dialer, network, address, time.After, logger),
52 logger: logger,
53 keyVals: make(chan keyVal),
54 quitc: make(chan chan struct{}),
55 }
56 var b bytes.Buffer
57 go e.loop(flushInterval, &b)
58 return e
59 }
60
61 // NewCounter returns a Counter that emits observations in the statsd protocol
62 // via the Emitter's connection manager. Observations are buffered for the
63 // report interval or until the buffer exceeds a max packet size, whichever
64 // comes first. Fields are ignored.
65 func (e *Emitter) NewCounter(key string) metrics.Counter {
66 return &statsdCounter{
67 key: e.prefix + key,
68 c: stringToKeyVal(key, e.keyVals),
69 }
70 }
71
72 // NewHistogram returns a Histogram that emits observations in the statsd
73 // protocol via the Emitter's conection manager. Observations are buffered for
74 // the reporting interval or until the buffer exceeds a max packet size,
75 // whichever comes first. Fields are ignored.
76 //
77 // NewHistogram is mapped to a statsd Timing, so observations should represent
78 // milliseconds. If you observe in units of nanoseconds, you can make the
79 // translation with a ScaledHistogram:
80 //
81 // NewScaledHistogram(statsdHistogram, time.Millisecond)
82 //
83 // You can also enforce the constraint in a typesafe way with a millisecond
84 // TimeHistogram:
85 //
86 // NewTimeHistogram(statsdHistogram, time.Millisecond)
87 //
88 // TODO: support for sampling.
89 func (e *Emitter) NewHistogram(key string) metrics.Histogram {
90 return &statsdHistogram{
91 key: e.prefix + key,
92 h: stringToKeyVal(key, e.keyVals),
93 }
94 }
95
96 // NewGauge returns a Gauge that emits values in the statsd protocol via the
97 // the Emitter's connection manager. Values are buffered for the report
98 // interval or until the buffer exceeds a max packet size, whichever comes
99 // first. Fields are ignored.
100 //
101 // TODO: support for sampling
102 func (e *Emitter) NewGauge(key string) metrics.Gauge {
103 return &statsdGauge{
104 key: e.prefix + key,
105 g: stringToKeyVal(key, e.keyVals),
106 }
107 }
108
109 func (e *Emitter) loop(d time.Duration, buf *bytes.Buffer) {
110 ticker := time.NewTicker(d)
111 defer ticker.Stop()
112 for {
113 select {
114 case kv := <-e.keyVals:
115 fmt.Fprintf(buf, "%s:%s\n", kv.key, kv.val)
116 if buf.Len() > maxBufferSize {
117 e.Flush(buf)
118 }
119
120 case <-ticker.C:
121 e.Flush(buf)
122
123 case q := <-e.quitc:
124 e.Flush(buf)
125 close(q)
126 return
127 }
128 }
129 }
130
131 // Stop will flush the current metrics and close the active connection. Calling
132 // stop more than once is a programmer error.
133 func (e *Emitter) Stop() {
134 q := make(chan struct{})
135 e.quitc <- q
136 <-q
137 }
138
139 // Flush will write the given buffer to a connection provided by the Emitter's
140 // connection manager.
141 func (e *Emitter) Flush(buf *bytes.Buffer) {
142 conn := e.mgr.Take()
143 if conn == nil {
144 e.logger.Log("during", "flush", "err", "connection unavailable")
145 return
146 }
147
148 _, err := conn.Write(buf.Bytes())
149 if err != nil {
150 e.logger.Log("during", "flush", "err", err)
151 }
152 e.mgr.Put(err)
153 }
1313 // check for hdr histo data
1414 wants := map[string]int64{"count": 1234, "min": 15, "max": 83}
1515 for key, want := range wants {
16 re := regexp.MustCompile(fmt.Sprintf("%s.%s.%s (\\d*)", prefix, metricName, key))
16 re := regexp.MustCompile(fmt.Sprintf("%s%s.%s (\\d*)", prefix, metricName, key))
1717 res := re.FindAllStringSubmatch(gPayload, 1)
1818 if res == nil {
1919 t.Error("did not find metrics log for", key, "in \n", gPayload)
4141 }
4242 // check for quantile gauges
4343 for key, want := range wants {
44 re := regexp.MustCompile(fmt.Sprintf("%s.%s%s (\\d*\\.\\d*)", prefix, metricName, key))
44 re := regexp.MustCompile(fmt.Sprintf("%s%s%s (\\d*\\.\\d*)", prefix, metricName, key))
4545 res := re.FindAllStringSubmatch(gPayload, 1)
4646 if res == nil {
4747 t.Errorf("did not find metrics log for %s", key)
0 package conn
1
2 import (
3 "net"
4 "time"
5
6 "github.com/go-kit/kit/log"
7 )
8
9 // Dialer dials a network and address. net.Dial is a good default Dialer.
10 type Dialer func(network, address string) (net.Conn, error)
11
12 // time.After is a good default afterFunc.
13 type afterFunc func(time.Duration) <-chan time.Time
14
15 // Manager manages a net.Conn. Clients should take the conn when they want to
16 // use it, and put back whatever error they receive from an e.g. Write. When a
17 // non-nil error is put, the conn is invalidated and a new conn is established.
18 // Connection failures are retried after an exponential backoff.
19 type Manager struct {
20 dial Dialer
21 network string
22 address string
23 after afterFunc
24 logger log.Logger
25
26 takec chan net.Conn
27 putc chan error
28 }
29
30 func NewManager(d Dialer, network, address string, after afterFunc, logger log.Logger) *Manager {
31 m := &Manager{
32 dial: d,
33 network: network,
34 address: address,
35 after: after,
36 logger: logger,
37
38 takec: make(chan net.Conn),
39 putc: make(chan error),
40 }
41 go m.loop()
42 return m
43 }
44
45 func (m *Manager) Take() net.Conn {
46 return <-m.takec
47 }
48
49 func (m *Manager) Put(err error) {
50 m.putc <- err
51 }
52
53 func (m *Manager) loop() {
54 var (
55 conn = dial(m.dial, m.network, m.address, m.logger) // may block slightly
56 connc = make(chan net.Conn)
57 reconnectc <-chan time.Time // initially nil
58 backoff = time.Second
59 )
60
61 for {
62 select {
63 case <-reconnectc:
64 reconnectc = nil
65 go func() { connc <- dial(m.dial, m.network, m.address, m.logger) }()
66
67 case conn = <-connc:
68 if conn == nil {
69 backoff = exponential(backoff)
70 reconnectc = m.after(backoff)
71 } else {
72 backoff = time.Second
73 reconnectc = nil
74 }
75
76 case m.takec <- conn:
77 // might be nil
78
79 case err := <-m.putc:
80 if err != nil && conn != nil {
81 m.logger.Log("err", err)
82 conn = nil // connection is bad
83 reconnectc = m.after(time.Nanosecond) // trigger immediately
84 }
85 }
86 }
87 }
88
89 func dial(d Dialer, network, address string, logger log.Logger) net.Conn {
90 conn, err := d(network, address)
91 if err != nil {
92 logger.Log("err", err)
93 conn = nil
94 }
95 return conn
96 }
97
98 func exponential(d time.Duration) time.Duration {
99 d *= 2
100 if d > time.Minute {
101 d = time.Minute
102 }
103 return d
104 }
0 package conn
1
2 import (
3 "errors"
4 "net"
5 "sync/atomic"
6 "testing"
7 "time"
8
9 "github.com/go-kit/kit/log"
10 )
11
12 func TestManager(t *testing.T) {
13 var (
14 tickc = make(chan time.Time)
15 after = func(time.Duration) <-chan time.Time { return tickc }
16 dialconn = &mockConn{}
17 dialerr = error(nil)
18 dialer = func(string, string) (net.Conn, error) { return dialconn, dialerr }
19 mgr = NewManager(dialer, "netw", "addr", after, log.NewNopLogger())
20 )
21
22 // First conn should be fine.
23 conn := mgr.Take()
24 if conn == nil {
25 t.Fatal("nil conn")
26 }
27
28 // Write and check it went thru.
29 if _, err := conn.Write([]byte{1, 2, 3}); err != nil {
30 t.Fatal(err)
31 }
32 if want, have := uint64(3), atomic.LoadUint64(&dialconn.wr); want != have {
33 t.Errorf("want %d, have %d", want, have)
34 }
35
36 // Put an error to kill the conn.
37 mgr.Put(errors.New("should kill the connection"))
38
39 // First takes should fail.
40 for i := 0; i < 10; i++ {
41 if conn = mgr.Take(); conn != nil {
42 t.Fatalf("want nil conn, got real conn")
43 }
44 }
45
46 // Trigger the reconnect.
47 tickc <- time.Now()
48
49 // The dial should eventually succeed and yield a good conn.
50 if !within(100*time.Millisecond, func() bool {
51 conn = mgr.Take()
52 return conn != nil
53 }) {
54 t.Fatal("conn remained nil")
55 }
56
57 // Write and check it went thru.
58 if _, err := conn.Write([]byte{4, 5}); err != nil {
59 t.Fatal(err)
60 }
61 if want, have := uint64(5), atomic.LoadUint64(&dialconn.wr); want != have {
62 t.Errorf("want %d, have %d", want, have)
63 }
64
65 // Dial starts failing.
66 dialconn, dialerr = nil, errors.New("oh noes")
67 mgr.Put(errors.New("trigger that reconnect y'all"))
68 if conn = mgr.Take(); conn != nil {
69 t.Fatalf("want nil conn, got real conn")
70 }
71
72 // As many reconnects as they want.
73 go func() {
74 done := time.After(100 * time.Millisecond)
75 for {
76 select {
77 case tickc <- time.Now():
78 case <-done:
79 return
80 }
81 }
82 }()
83
84 // The dial should never succeed.
85 if within(100*time.Millisecond, func() bool {
86 conn = mgr.Take()
87 return conn != nil
88 }) {
89 t.Fatal("eventually got a good conn, despite failing dialer")
90 }
91 }
92
93 type mockConn struct {
94 rd, wr uint64
95 }
96
97 func (c *mockConn) Read(b []byte) (n int, err error) {
98 atomic.AddUint64(&c.rd, uint64(len(b)))
99 return len(b), nil
100 }
101
102 func (c *mockConn) Write(b []byte) (n int, err error) {
103 atomic.AddUint64(&c.wr, uint64(len(b)))
104 return len(b), nil
105 }
106
107 func (c *mockConn) Close() error { return nil }
108 func (c *mockConn) LocalAddr() net.Addr { return nil }
109 func (c *mockConn) RemoteAddr() net.Addr { return nil }
110 func (c *mockConn) SetDeadline(t time.Time) error { return nil }
111 func (c *mockConn) SetReadDeadline(t time.Time) error { return nil }
112 func (c *mockConn) SetWriteDeadline(t time.Time) error { return nil }
113
114 func within(d time.Duration, f func() bool) bool {
115 deadline := time.Now().Add(d)
116 for {
117 if time.Now().After(deadline) {
118 return false
119 }
120 if f() {
121 return true
122 }
123 time.Sleep(d / 10)
124 }
125 }