Codebase list golang-github-go-kit-kit / 15e3d35
Add manager + refactor Emitter Peter Bourgon 7 years ago
4 changed file(s) with 325 addition(s) and 155 deletion(s). Raw diff Collapse all Expand all
00 // Package graphite implements a Graphite backend for package metrics. Metrics
1 // will be emitted to a Graphite server in the plaintext protocol
2 // (http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol)
3 // which looks like:
1 // will be emitted to a Graphite server in the plaintext protocol which looks
2 // like:
3 //
44 // "<metric path> <metric value> <metric timestamp>"
55 //
6 // See http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol.
67 // The current implementation ignores fields.
78 package graphite
89
2223 "github.com/go-kit/kit/metrics"
2324 )
2425
25 // Emitter will keep track of all metrics and, once started,
26 // will emit the metrics via the Flush method to the given address.
26 // Emitter will keep track of all metrics and, once started, will emit the
27 // metrics via the Flush method to the given address.
2728 type Emitter struct {
28 prefix string
29
30 network, addr string
31 conn net.Conn
32 dialer Dialer
33 start sync.Once
34 stop chan bool
35
3629 mtx sync.Mutex
30 prefix string
31 mgr *manager
3732 counters []*counter
3833 histograms []*windowedHistogram
3934 gauges []*gauge
40
41 logger log.Logger
42 }
43
44 // NewEmitter will return an Emitter that will prefix all
45 // metrics names with the given prefix. Once started, it will attempt to create
46 // a connection with the given network and address via `net.Dial` and periodically post
47 // metrics to the connection in the Graphite plaintext protocol.
48 func NewEmitter(network, addr string, metricsPrefix string, logger log.Logger) *Emitter {
49 return NewEmitterDial(network, addr, net.Dial, metricsPrefix, logger)
50 }
51
52 // NewEmitter will return an Emitter that will prefix all
53 // metrics names with the given prefix. Once started, it will attempt to create
54 // a connection with the given network and address via the given Dialer and periodically post
55 // metrics to the connection in the Graphite plaintext protocol.
56 func NewEmitterDial(network, addr string, dialer Dialer, metricsPrefix string, logger log.Logger) *Emitter {
57 return &Emitter{
58 network: network,
59 addr: addr,
60 dialer: net.Dial,
61 stop: make(chan bool),
62 prefix: metricsPrefix,
63 logger: logger,
64 }
35 logger log.Logger
36 quitc chan chan struct{}
37 }
38
39 // NewEmitter will return an Emitter that will prefix all metrics names with the
40 // given prefix. Once started, it will attempt to create a connection with the
41 // given network and address via `net.Dial` and periodically post metrics to the
42 // connection in the Graphite plaintext protocol.
43 func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
44 return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
45 }
46
47 // NewEmitterDial is the same as NewEmitter, but allows you to specify your own
48 // Dialer function. This is primarily useful for tests.
49 func NewEmitterDial(dialer Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
50 e := &Emitter{
51 prefix: metricsPrefix,
52 mgr: newManager(dialer, network, address, time.After, logger),
53 logger: logger,
54 quitc: make(chan chan struct{}),
55 }
56 go e.loop(flushInterval)
57 return e
6558 }
6659
6760 // NewCounter returns a Counter whose value will be periodically emitted in
6861 // a Graphite-compatible format once the Emitter is started. Fields are ignored.
6962 func (e *Emitter) NewCounter(name string) metrics.Counter {
70 // only one flush at a time
63 e.mtx.Lock()
64 defer e.mtx.Unlock()
7165 c := &counter{name, 0}
72 e.mtx.Lock()
7366 e.counters = append(e.counters, c)
74 e.mtx.Unlock()
7567 return c
7668 }
7769
7971 // windowed HDR histogram which drops data older than five minutes.
8072 //
8173 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
82 // should be integers in the range 1..99. The gauge names are assigned by
83 // using the passed name as a prefix and appending "_pNN" e.g. "_p50".
84 //
85 // The values of this histogram will be periodically emitted in a Graphite-compatible
86 // format once the Emitter is started. Fields are ignored.
74 // should be integers in the range 1..99. The gauge names are assigned by using
75 // the passed name as a prefix and appending "_pNN" e.g. "_p50".
76 //
77 // The values of this histogram will be periodically emitted in a
78 // Graphite-compatible format once the Emitter is started. Fields are ignored.
8779 func (e *Emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
8880 gauges := map[int]metrics.Gauge{}
8981 for _, quantile := range quantiles {
9587 h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges, e.logger)
9688
9789 e.mtx.Lock()
90 defer e.mtx.Unlock()
9891 e.histograms = append(e.histograms, h)
99 e.mtx.Unlock()
10092 return h, nil
10193 }
10294
103 // NewTimeHistogram returns a TimeHistogram wrapper around the windowed
104 // HDR histrogram provided by this package.
95 // NewTimeHistogram returns a TimeHistogram wrapper around the windowed HDR
96 // histrogram provided by this package.
10597 func (e *Emitter) NewTimeHistogram(name string, unit time.Duration, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.TimeHistogram, error) {
10698 h, err := e.NewHistogram(name, minValue, maxValue, sigfigs, quantiles...)
10799 if err != nil {
110102 return metrics.NewTimeHistogram(unit, h), nil
111103 }
112104
113 // NewGauge returns a Gauge whose value will be periodically emitted in
114 // a Graphite-compatible format once the Emitter is started. Fields are ignored.
105 // NewGauge returns a Gauge whose value will be periodically emitted in a
106 // Graphite-compatible format once the Emitter is started. Fields are ignored.
115107 func (e *Emitter) NewGauge(name string) metrics.Gauge {
116108 e.mtx.Lock()
117109 defer e.mtx.Unlock()
124116 return g
125117 }
126118
127 func (e *Emitter) dial() error {
128 var err error
129 e.conn, err = e.dialer(e.network, e.addr)
130 return err
131 }
132
133 type Dialer func(network, addr string) (net.Conn, error)
134
135 // Start will kick off a background goroutine to
136 // call Flush once every interval.
137 func (e *Emitter) Start(interval time.Duration) error {
138 var err error
139 e.start.Do(func() {
140 err = e.dial()
141 if err != nil {
119 func (e *Emitter) loop(d time.Duration) {
120 ticker := time.NewTicker(d)
121 defer ticker.Stop()
122
123 for {
124 select {
125 case <-ticker.C:
126 e.Flush()
127
128 case q := <-e.quitc:
129 e.Flush()
130 close(q)
142131 return
143132 }
144 go func() {
145 t := time.Tick(interval)
146 for {
147 select {
148 case <-t:
149 e.Flush()
150 case <-e.stop:
151 return
152 }
153 }
154 }()
155 })
156 return err
157 }
158
159 // Stop will flush the current metrics and close the
160 // current Graphite connection, if it exists.
161 func (e *Emitter) Stop() error {
162 if e.conn == nil {
163 return nil
164 }
165 // stop the ticking flush loop
166 e.stop <- true
167 // get one last flush in
168 e.Flush()
169 // close the connection
170 err := e.conn.Close()
171 // nil the conn to avoid problems
172 // if Stop() is called more than once.
173 e.conn = nil
174 return err
175 }
176
177 var (
178 RetryMax = 10
179 RetryWait = 2 * time.Millisecond
180 RetryMultiplier = 2
181 )
182
183 // Flush will write the current metrics to the Emitter's
184 // connection in the Graphite plaintext protocol.
185 func (e *Emitter) Flush() error {
186 // only one flush at a time
187 e.mtx.Lock()
133 }
134 }
135
136 // Stop will flush the current metrics and close the active connection. Calling
137 // stop more than once is a programmer error.
138 func (e *Emitter) Stop() {
139 q := make(chan struct{})
140 e.quitc <- q
141 <-q
142 }
143
144 // Flush will write the current metrics to the Emitter's connection in the
145 // Graphite plaintext protocol.
146 func (e *Emitter) Flush() {
147 e.mtx.Lock() // one flush at a time
188148 defer e.mtx.Unlock()
189149
190 // set the system up to perform a retry loop
191 var err error
192 wait := RetryWait
193 for attempts := 1; ; attempts++ {
194 err = e.flush(e.conn)
195 // no error? return immediately.
196 if err == nil {
197 return nil
198 }
199 // we're at our last attempt? give up.
200 if attempts >= RetryMax {
201 break
202 }
203 // log, wait, and try again
204 e.logger.Log(
205 "err", err,
206 "msg", fmt.Sprintf("unable to flush metrics on attempt %d, waiting %s", attempts, wait),
207 )
208 time.Sleep(wait)
209 wait = wait * time.Duration(RetryMultiplier)
210 }
211 // log if we were unable to emit metrics
150 conn := e.mgr.take()
151 if conn == nil {
152 e.logger.Log("during", "flush", "err", "connection unavailable")
153 return
154 }
155
156 err := e.flush(conn)
212157 if err != nil {
213 e.logger.Log(
214 "err", err,
215 "msg", fmt.Sprintf("unable to flush metrics after %d attempts. giving up.", RetryMax),
216 )
217 }
218 return err
158 e.logger.Log("during", "flush", "err", err)
159 }
160 e.mgr.put(err)
219161 }
220162
221163 func (e *Emitter) flush(conn io.Writer) error {
222
223 // buffer the writer and make sure to flush it
224164 w := bufio.NewWriter(conn)
225165
226 // emit counter stats
227166 for _, c := range e.counters {
228167 fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, c.Name(), c.count, time.Now().Unix())
229168 }
230169
231 // emit histogram specific stats
232170 for _, h := range e.histograms {
233171 hist := h.hist.Merge()
234172 now := time.Now().Unix()
239177 fmt.Fprintf(w, "%s.%s.std-dev %.2f %d\n", e.prefix, h.Name(), hist.StdDev(), now)
240178 }
241179
242 // emit gauge stats (which can include some histogram quantiles)
243180 for _, g := range e.gauges {
244181 fmt.Fprintf(w, "%s.%s %.2f %d\n", e.prefix, g.Name(), g.Get(), time.Now().Unix())
245182 }
246183
247 // check for error
248184 return w.Flush()
249185 }
250186
295231 logger log.Logger
296232 }
297233
298 // newWindowedHistogram is taken from http://github.com/codahale/metrics. It returns a
299 // windowed HDR histogram which drops data older than five minutes.
300 //
301 // The histogram exposes metrics for each passed quantile as gauges. Users are expected
302 // to provide their own set of Gauges for quantiles to make this Histogram work across multiple
303 // metrics providers.
234 // newWindowedHistogram is taken from http://github.com/codahale/metrics. It
235 // returns a windowed HDR histogram which drops data older than five minutes.
236 //
237 // The histogram exposes metrics for each passed quantile as gauges. Users are
238 // expected to provide their own set of Gauges for quantiles to make this
239 // Histogram work across multiple metrics providers.
304240 func newWindowedHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles map[int]metrics.Gauge, logger log.Logger) *windowedHistogram {
305241 h := &windowedHistogram{
306242 hist: hdrhistogram.NewWindowed(5, minValue, maxValue, sigfigs),
312248 return h
313249 }
314250
315 func (h *windowedHistogram) Name() string { return h.name }
251 func (h *windowedHistogram) Name() string { return h.name }
252
316253 func (h *windowedHistogram) With(metrics.Field) metrics.Histogram { return h }
317254
318255 func (h *windowedHistogram) Observe(value int64) {
44 "fmt"
55 "strings"
66 "testing"
7 "time"
78
9 "github.com/go-kit/kit/log"
810 "github.com/go-kit/kit/metrics"
911 "github.com/go-kit/kit/metrics/teststat"
1012 )
1113
1214 func TestHistogramQuantiles(t *testing.T) {
1315 prefix := "prefix"
14 e := NewEmitter("", "", prefix, nil)
16 e := NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
1517 var (
1618 name = "test_histogram_quantiles"
1719 quantiles = []int{50, 90, 95, 99}
1820 )
1921 h, err := e.NewHistogram(name, 0, 100, 3, quantiles...)
2022 if err != nil {
21 t.Fatalf("unable to create test histogram: ", err)
23 t.Fatalf("unable to create test histogram: %v", err)
2224 }
2325 h = h.With(metrics.Field{Key: "ignored", Value: "field"})
2426 const seed, mean, stdev int64 = 424242, 50, 10
3537 prefix = "prefix"
3638 name = "m"
3739 value = 123
38 e = NewEmitter("", "", prefix, nil)
40 e = NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
3941 b bytes.Buffer
4042 )
4143 e.NewCounter(name).With(metrics.Field{Key: "ignored", Value: "field"}).Add(uint64(value))
5355 name = "xyz"
5456 value = 54321
5557 delta = 12345
56 e = NewEmitter("", "", prefix, nil)
58 e = NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
5759 b bytes.Buffer
5860 g = e.NewGauge(name).With(metrics.Field{Key: "ignored", Value: "field"})
5961 )
0 package graphite
1
2 import (
3 "net"
4 "time"
5
6 "github.com/go-kit/kit/log"
7 )
8
9 // Dialer dials a network and address. net.Dial is a good default Dialer.
10 type Dialer func(network, address string) (net.Conn, error)
11
12 // time.After is a good default afterFunc.
13 type afterFunc func(time.Duration) <-chan time.Time
14
15 // manager manages a net.Conn. Clients should take the conn when they want to
16 // use it, and put back whatever error they receive from an e.g. Write. When a
17 // non-nil error is put, the conn is invalidated and a new conn is established.
18 // Connection failures are retried after an exponential backoff.
19 type manager struct {
20 dial Dialer
21 network string
22 address string
23 after afterFunc
24 logger log.Logger
25
26 takec chan net.Conn
27 putc chan error
28 }
29
30 func newManager(d Dialer, network, address string, after afterFunc, logger log.Logger) *manager {
31 m := &manager{
32 dial: d,
33 network: network,
34 address: address,
35 after: after,
36 logger: logger,
37
38 takec: make(chan net.Conn),
39 putc: make(chan error),
40 }
41 go m.loop()
42 return m
43 }
44
45 func (m *manager) take() net.Conn {
46 return <-m.takec
47 }
48
49 func (m *manager) put(err error) {
50 m.putc <- err
51 }
52
53 func (m *manager) loop() {
54 var (
55 conn = dial(m.dial, m.network, m.address, m.logger) // may block slightly
56 connc = make(chan net.Conn)
57 reconnectc <-chan time.Time // initially nil
58 backoff = time.Second
59 )
60
61 for {
62 select {
63 case <-reconnectc:
64 reconnectc = nil
65 go func() { connc <- dial(m.dial, m.network, m.address, m.logger) }()
66
67 case conn = <-connc:
68 if conn == nil {
69 backoff = exponential(backoff)
70 reconnectc = m.after(backoff)
71 } else {
72 backoff = time.Second
73 reconnectc = nil
74 }
75
76 case m.takec <- conn:
77 // might be nil
78
79 case err := <-m.putc:
80 if err != nil && conn != nil {
81 m.logger.Log("err", err)
82 conn = nil // connection is bad
83 reconnectc = m.after(time.Nanosecond) // trigger immediately
84 }
85 }
86 }
87 }
88
89 func dial(d Dialer, network, address string, logger log.Logger) net.Conn {
90 conn, err := d(network, address)
91 if err != nil {
92 logger.Log("err", err)
93 conn = nil
94 }
95 return conn
96 }
97
98 func exponential(d time.Duration) time.Duration {
99 d *= 2
100 if d > time.Minute {
101 d = time.Minute
102 }
103 return d
104 }
0 package graphite
1
2 import (
3 "errors"
4 "net"
5 "sync/atomic"
6 "testing"
7 "time"
8
9 "github.com/go-kit/kit/log"
10 )
11
12 func TestManager(t *testing.T) {
13 var (
14 tickc = make(chan time.Time)
15 after = func(time.Duration) <-chan time.Time { return tickc }
16 dialconn = &mockConn{}
17 dialerr = error(nil)
18 dialer = func(string, string) (net.Conn, error) { return dialconn, dialerr }
19 mgr = newManager(dialer, "netw", "addr", after, log.NewNopLogger())
20 )
21
22 // First conn should be fine.
23 conn := mgr.take()
24 if conn == nil {
25 t.Fatal("nil conn")
26 }
27
28 // Write and check it went thru.
29 if _, err := conn.Write([]byte{1, 2, 3}); err != nil {
30 t.Fatal(err)
31 }
32 if want, have := uint64(3), atomic.LoadUint64(&dialconn.wr); want != have {
33 t.Errorf("want %d, have %d", want, have)
34 }
35
36 // Put an error to kill the conn.
37 mgr.put(errors.New("should kill the connection"))
38
39 // First takes should fail.
40 for i := 0; i < 10; i++ {
41 if conn = mgr.take(); conn != nil {
42 t.Fatalf("want nil conn, got real conn")
43 }
44 }
45
46 // Trigger the reconnect.
47 tickc <- time.Now()
48
49 // The dial should eventually succeed and yield a good conn.
50 if !within(100*time.Millisecond, func() bool {
51 conn = mgr.take()
52 return conn != nil
53 }) {
54 t.Fatal("conn remained nil")
55 }
56
57 // Write and check it went thru.
58 if _, err := conn.Write([]byte{4, 5}); err != nil {
59 t.Fatal(err)
60 }
61 if want, have := uint64(5), atomic.LoadUint64(&dialconn.wr); want != have {
62 t.Errorf("want %d, have %d", want, have)
63 }
64
65 // Dial starts failing.
66 dialconn, dialerr = nil, errors.New("oh noes")
67 mgr.put(errors.New("trigger that reconnect y'all"))
68 if conn = mgr.take(); conn != nil {
69 t.Fatalf("want nil conn, got real conn")
70 }
71
72 // As many reconnects as they want.
73 go func() {
74 done := time.After(100 * time.Millisecond)
75 for {
76 select {
77 case tickc <- time.Now():
78 case <-done:
79 return
80 }
81 }
82 }()
83
84 // The dial should never succeed.
85 if within(100*time.Millisecond, func() bool {
86 conn = mgr.take()
87 return conn != nil
88 }) {
89 t.Fatal("eventually got a good conn, despite failing dialer")
90 }
91 }
92
93 type mockConn struct {
94 rd, wr uint64
95 }
96
97 func (c *mockConn) Read(b []byte) (n int, err error) {
98 atomic.AddUint64(&c.rd, uint64(len(b)))
99 return len(b), nil
100 }
101
102 func (c *mockConn) Write(b []byte) (n int, err error) {
103 atomic.AddUint64(&c.wr, uint64(len(b)))
104 return len(b), nil
105 }
106
107 func (c *mockConn) Close() error { return nil }
108 func (c *mockConn) LocalAddr() net.Addr { return nil }
109 func (c *mockConn) RemoteAddr() net.Addr { return nil }
110 func (c *mockConn) SetDeadline(t time.Time) error { return nil }
111 func (c *mockConn) SetReadDeadline(t time.Time) error { return nil }
112 func (c *mockConn) SetWriteDeadline(t time.Time) error { return nil }
113
114 func within(d time.Duration, f func() bool) bool {
115 deadline := time.Now().Add(d)
116 for {
117 if time.Now().After(deadline) {
118 return false
119 }
120 if f() {
121 return true
122 }
123 time.Sleep(d / 10)
124 }
125 }