diff --git a/metrics/graphite/graphite.go b/metrics/graphite/graphite.go index c90d8c0..b8ca4bd 100644 --- a/metrics/graphite/graphite.go +++ b/metrics/graphite/graphite.go @@ -1,9 +1,10 @@ // Package graphite implements a Graphite backend for package metrics. Metrics -// will be emitted to a Graphite server in the plaintext protocol -// (http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol) -// which looks like: +// will be emitted to a Graphite server in the plaintext protocol which looks +// like: +// // " " // +// See http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol. // The current implementation ignores fields. package graphite @@ -23,56 +24,47 @@ "github.com/go-kit/kit/metrics" ) -// Emitter will keep track of all metrics and, once started, -// will emit the metrics via the Flush method to the given address. +// Emitter will keep track of all metrics and, once started, will emit the +// metrics via the Flush method to the given address. type Emitter struct { - prefix string - - network, addr string - conn net.Conn - dialer Dialer - start sync.Once - stop chan bool - mtx sync.Mutex + prefix string + mgr *manager counters []*counter histograms []*windowedHistogram gauges []*gauge - - logger log.Logger -} - -// NewEmitter will return an Emitter that will prefix all -// metrics names with the given prefix. Once started, it will attempt to create -// a connection with the given network and address via `net.Dial` and periodically post -// metrics to the connection in the Graphite plaintext protocol. -func NewEmitter(network, addr string, metricsPrefix string, logger log.Logger) *Emitter { - return NewEmitterDial(network, addr, net.Dial, metricsPrefix, logger) -} - -// NewEmitter will return an Emitter that will prefix all -// metrics names with the given prefix. Once started, it will attempt to create -// a connection with the given network and address via the given Dialer and periodically post -// metrics to the connection in the Graphite plaintext protocol. -func NewEmitterDial(network, addr string, dialer Dialer, metricsPrefix string, logger log.Logger) *Emitter { - return &Emitter{ - network: network, - addr: addr, - dialer: net.Dial, - stop: make(chan bool), - prefix: metricsPrefix, - logger: logger, - } + logger log.Logger + quitc chan chan struct{} +} + +// NewEmitter will return an Emitter that will prefix all metrics names with the +// given prefix. Once started, it will attempt to create a connection with the +// given network and address via `net.Dial` and periodically post metrics to the +// connection in the Graphite plaintext protocol. +func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter { + return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger) +} + +// NewEmitterDial is the same as NewEmitter, but allows you to specify your own +// Dialer function. This is primarily useful for tests. +func NewEmitterDial(dialer Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter { + e := &Emitter{ + prefix: metricsPrefix, + mgr: newManager(dialer, network, address, time.After, logger), + logger: logger, + quitc: make(chan chan struct{}), + } + go e.loop(flushInterval) + return e } // NewCounter returns a Counter whose value will be periodically emitted in // a Graphite-compatible format once the Emitter is started. Fields are ignored. func (e *Emitter) NewCounter(name string) metrics.Counter { - // only one flush at a time + e.mtx.Lock() + defer e.mtx.Unlock() c := &counter{name, 0} - e.mtx.Lock() e.counters = append(e.counters, c) - e.mtx.Unlock() return c } @@ -80,11 +72,11 @@ // windowed HDR histogram which drops data older than five minutes. // // The histogram exposes metrics for each passed quantile as gauges. Quantiles -// should be integers in the range 1..99. The gauge names are assigned by -// using the passed name as a prefix and appending "_pNN" e.g. "_p50". -// -// The values of this histogram will be periodically emitted in a Graphite-compatible -// format once the Emitter is started. Fields are ignored. +// should be integers in the range 1..99. The gauge names are assigned by using +// the passed name as a prefix and appending "_pNN" e.g. "_p50". +// +// The values of this histogram will be periodically emitted in a +// Graphite-compatible format once the Emitter is started. Fields are ignored. func (e *Emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) { gauges := map[int]metrics.Gauge{} for _, quantile := range quantiles { @@ -96,13 +88,13 @@ h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges, e.logger) e.mtx.Lock() + defer e.mtx.Unlock() e.histograms = append(e.histograms, h) - e.mtx.Unlock() return h, nil } -// NewTimeHistogram returns a TimeHistogram wrapper around the windowed -// HDR histrogram provided by this package. +// NewTimeHistogram returns a TimeHistogram wrapper around the windowed HDR +// histrogram provided by this package. func (e *Emitter) NewTimeHistogram(name string, unit time.Duration, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.TimeHistogram, error) { h, err := e.NewHistogram(name, minValue, maxValue, sigfigs, quantiles...) if err != nil { @@ -111,8 +103,8 @@ return metrics.NewTimeHistogram(unit, h), nil } -// NewGauge returns a Gauge whose value will be periodically emitted in -// a Graphite-compatible format once the Emitter is started. Fields are ignored. +// NewGauge returns a Gauge whose value will be periodically emitted in a +// Graphite-compatible format once the Emitter is started. Fields are ignored. func (e *Emitter) NewGauge(name string) metrics.Gauge { e.mtx.Lock() defer e.mtx.Unlock() @@ -125,111 +117,57 @@ return g } -func (e *Emitter) dial() error { - var err error - e.conn, err = e.dialer(e.network, e.addr) - return err -} - -type Dialer func(network, addr string) (net.Conn, error) - -// Start will kick off a background goroutine to -// call Flush once every interval. -func (e *Emitter) Start(interval time.Duration) error { - var err error - e.start.Do(func() { - err = e.dial() - if err != nil { +func (e *Emitter) loop(d time.Duration) { + ticker := time.NewTicker(d) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + e.Flush() + + case q := <-e.quitc: + e.Flush() + close(q) return } - go func() { - t := time.Tick(interval) - for { - select { - case <-t: - e.Flush() - case <-e.stop: - return - } - } - }() - }) - return err -} - -// Stop will flush the current metrics and close the -// current Graphite connection, if it exists. -func (e *Emitter) Stop() error { - if e.conn == nil { - return nil - } - // stop the ticking flush loop - e.stop <- true - // get one last flush in - e.Flush() - // close the connection - err := e.conn.Close() - // nil the conn to avoid problems - // if Stop() is called more than once. - e.conn = nil - return err -} - -var ( - RetryMax = 10 - RetryWait = 2 * time.Millisecond - RetryMultiplier = 2 -) - -// Flush will write the current metrics to the Emitter's -// connection in the Graphite plaintext protocol. -func (e *Emitter) Flush() error { - // only one flush at a time - e.mtx.Lock() + } +} + +// Stop will flush the current metrics and close the active connection. Calling +// stop more than once is a programmer error. +func (e *Emitter) Stop() { + q := make(chan struct{}) + e.quitc <- q + <-q +} + +// Flush will write the current metrics to the Emitter's connection in the +// Graphite plaintext protocol. +func (e *Emitter) Flush() { + e.mtx.Lock() // one flush at a time defer e.mtx.Unlock() - // set the system up to perform a retry loop - var err error - wait := RetryWait - for attempts := 1; ; attempts++ { - err = e.flush(e.conn) - // no error? return immediately. - if err == nil { - return nil - } - // we're at our last attempt? give up. - if attempts >= RetryMax { - break - } - // log, wait, and try again - e.logger.Log( - "err", err, - "msg", fmt.Sprintf("unable to flush metrics on attempt %d, waiting %s", attempts, wait), - ) - time.Sleep(wait) - wait = wait * time.Duration(RetryMultiplier) - } - // log if we were unable to emit metrics + conn := e.mgr.take() + if conn == nil { + e.logger.Log("during", "flush", "err", "connection unavailable") + return + } + + err := e.flush(conn) if err != nil { - e.logger.Log( - "err", err, - "msg", fmt.Sprintf("unable to flush metrics after %d attempts. giving up.", RetryMax), - ) - } - return err + e.logger.Log("during", "flush", "err", err) + } + e.mgr.put(err) } func (e *Emitter) flush(conn io.Writer) error { - - // buffer the writer and make sure to flush it w := bufio.NewWriter(conn) - // emit counter stats for _, c := range e.counters { fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, c.Name(), c.count, time.Now().Unix()) } - // emit histogram specific stats for _, h := range e.histograms { hist := h.hist.Merge() now := time.Now().Unix() @@ -240,12 +178,10 @@ fmt.Fprintf(w, "%s.%s.std-dev %.2f %d\n", e.prefix, h.Name(), hist.StdDev(), now) } - // emit gauge stats (which can include some histogram quantiles) for _, g := range e.gauges { fmt.Fprintf(w, "%s.%s %.2f %d\n", e.prefix, g.Name(), g.Get(), time.Now().Unix()) } - // check for error return w.Flush() } @@ -296,12 +232,12 @@ logger log.Logger } -// newWindowedHistogram is taken from http://github.com/codahale/metrics. It returns a -// windowed HDR histogram which drops data older than five minutes. -// -// The histogram exposes metrics for each passed quantile as gauges. Users are expected -// to provide their own set of Gauges for quantiles to make this Histogram work across multiple -// metrics providers. +// newWindowedHistogram is taken from http://github.com/codahale/metrics. It +// returns a windowed HDR histogram which drops data older than five minutes. +// +// The histogram exposes metrics for each passed quantile as gauges. Users are +// expected to provide their own set of Gauges for quantiles to make this +// Histogram work across multiple metrics providers. func newWindowedHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles map[int]metrics.Gauge, logger log.Logger) *windowedHistogram { h := &windowedHistogram{ hist: hdrhistogram.NewWindowed(5, minValue, maxValue, sigfigs), @@ -313,7 +249,8 @@ return h } -func (h *windowedHistogram) Name() string { return h.name } +func (h *windowedHistogram) Name() string { return h.name } + func (h *windowedHistogram) With(metrics.Field) metrics.Histogram { return h } func (h *windowedHistogram) Observe(value int64) { diff --git a/metrics/graphite/graphite_test.go b/metrics/graphite/graphite_test.go index 3c23038..559228d 100644 --- a/metrics/graphite/graphite_test.go +++ b/metrics/graphite/graphite_test.go @@ -5,21 +5,23 @@ "fmt" "strings" "testing" + "time" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/teststat" ) func TestHistogramQuantiles(t *testing.T) { prefix := "prefix" - e := NewEmitter("", "", prefix, nil) + e := NewEmitter("", "", prefix, time.Second, log.NewNopLogger()) var ( name = "test_histogram_quantiles" quantiles = []int{50, 90, 95, 99} ) h, err := e.NewHistogram(name, 0, 100, 3, quantiles...) if err != nil { - t.Fatalf("unable to create test histogram: ", err) + t.Fatalf("unable to create test histogram: %v", err) } h = h.With(metrics.Field{Key: "ignored", Value: "field"}) const seed, mean, stdev int64 = 424242, 50, 10 @@ -36,7 +38,7 @@ prefix = "prefix" name = "m" value = 123 - e = NewEmitter("", "", prefix, nil) + e = NewEmitter("", "", prefix, time.Second, log.NewNopLogger()) b bytes.Buffer ) e.NewCounter(name).With(metrics.Field{Key: "ignored", Value: "field"}).Add(uint64(value)) @@ -54,7 +56,7 @@ name = "xyz" value = 54321 delta = 12345 - e = NewEmitter("", "", prefix, nil) + e = NewEmitter("", "", prefix, time.Second, log.NewNopLogger()) b bytes.Buffer g = e.NewGauge(name).With(metrics.Field{Key: "ignored", Value: "field"}) ) diff --git a/metrics/graphite/manager.go b/metrics/graphite/manager.go new file mode 100644 index 0000000..ecc6b74 --- /dev/null +++ b/metrics/graphite/manager.go @@ -0,0 +1,105 @@ +package graphite + +import ( + "net" + "time" + + "github.com/go-kit/kit/log" +) + +// Dialer dials a network and address. net.Dial is a good default Dialer. +type Dialer func(network, address string) (net.Conn, error) + +// time.After is a good default afterFunc. +type afterFunc func(time.Duration) <-chan time.Time + +// manager manages a net.Conn. Clients should take the conn when they want to +// use it, and put back whatever error they receive from an e.g. Write. When a +// non-nil error is put, the conn is invalidated and a new conn is established. +// Connection failures are retried after an exponential backoff. +type manager struct { + dial Dialer + network string + address string + after afterFunc + logger log.Logger + + takec chan net.Conn + putc chan error +} + +func newManager(d Dialer, network, address string, after afterFunc, logger log.Logger) *manager { + m := &manager{ + dial: d, + network: network, + address: address, + after: after, + logger: logger, + + takec: make(chan net.Conn), + putc: make(chan error), + } + go m.loop() + return m +} + +func (m *manager) take() net.Conn { + return <-m.takec +} + +func (m *manager) put(err error) { + m.putc <- err +} + +func (m *manager) loop() { + var ( + conn = dial(m.dial, m.network, m.address, m.logger) // may block slightly + connc = make(chan net.Conn) + reconnectc <-chan time.Time // initially nil + backoff = time.Second + ) + + for { + select { + case <-reconnectc: + reconnectc = nil + go func() { connc <- dial(m.dial, m.network, m.address, m.logger) }() + + case conn = <-connc: + if conn == nil { + backoff = exponential(backoff) + reconnectc = m.after(backoff) + } else { + backoff = time.Second + reconnectc = nil + } + + case m.takec <- conn: + // might be nil + + case err := <-m.putc: + if err != nil && conn != nil { + m.logger.Log("err", err) + conn = nil // connection is bad + reconnectc = m.after(time.Nanosecond) // trigger immediately + } + } + } +} + +func dial(d Dialer, network, address string, logger log.Logger) net.Conn { + conn, err := d(network, address) + if err != nil { + logger.Log("err", err) + conn = nil + } + return conn +} + +func exponential(d time.Duration) time.Duration { + d *= 2 + if d > time.Minute { + d = time.Minute + } + return d +} diff --git a/metrics/graphite/manager_test.go b/metrics/graphite/manager_test.go new file mode 100644 index 0000000..ff9f413 --- /dev/null +++ b/metrics/graphite/manager_test.go @@ -0,0 +1,126 @@ +package graphite + +import ( + "errors" + "net" + "sync/atomic" + "testing" + "time" + + "github.com/go-kit/kit/log" +) + +func TestManager(t *testing.T) { + var ( + tickc = make(chan time.Time) + after = func(time.Duration) <-chan time.Time { return tickc } + dialconn = &mockConn{} + dialerr = error(nil) + dialer = func(string, string) (net.Conn, error) { return dialconn, dialerr } + mgr = newManager(dialer, "netw", "addr", after, log.NewNopLogger()) + ) + + // First conn should be fine. + conn := mgr.take() + if conn == nil { + t.Fatal("nil conn") + } + + // Write and check it went thru. + if _, err := conn.Write([]byte{1, 2, 3}); err != nil { + t.Fatal(err) + } + if want, have := uint64(3), atomic.LoadUint64(&dialconn.wr); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Put an error to kill the conn. + mgr.put(errors.New("should kill the connection")) + + // First takes should fail. + for i := 0; i < 10; i++ { + if conn = mgr.take(); conn != nil { + t.Fatalf("want nil conn, got real conn") + } + } + + // Trigger the reconnect. + tickc <- time.Now() + + // The dial should eventually succeed and yield a good conn. + if !within(100*time.Millisecond, func() bool { + conn = mgr.take() + return conn != nil + }) { + t.Fatal("conn remained nil") + } + + // Write and check it went thru. + if _, err := conn.Write([]byte{4, 5}); err != nil { + t.Fatal(err) + } + if want, have := uint64(5), atomic.LoadUint64(&dialconn.wr); want != have { + t.Errorf("want %d, have %d", want, have) + } + + // Dial starts failing. + dialconn, dialerr = nil, errors.New("oh noes") + mgr.put(errors.New("trigger that reconnect y'all")) + if conn = mgr.take(); conn != nil { + t.Fatalf("want nil conn, got real conn") + } + + // As many reconnects as they want. + go func() { + done := time.After(100 * time.Millisecond) + for { + select { + case tickc <- time.Now(): + case <-done: + return + } + } + }() + + // The dial should never succeed. + if within(100*time.Millisecond, func() bool { + conn = mgr.take() + return conn != nil + }) { + t.Fatal("eventually got a good conn, despite failing dialer") + } +} + +type mockConn struct { + rd, wr uint64 +} + +func (c *mockConn) Read(b []byte) (n int, err error) { + atomic.AddUint64(&c.rd, uint64(len(b))) + return len(b), nil +} + +func (c *mockConn) Write(b []byte) (n int, err error) { + atomic.AddUint64(&c.wr, uint64(len(b))) + return len(b), nil +} + +func (c *mockConn) Close() error { return nil } +func (c *mockConn) LocalAddr() net.Addr { return nil } +func (c *mockConn) RemoteAddr() net.Addr { return nil } +func (c *mockConn) SetDeadline(t time.Time) error { return nil } +func (c *mockConn) SetReadDeadline(t time.Time) error { return nil } +func (c *mockConn) SetWriteDeadline(t time.Time) error { return nil } + +func within(d time.Duration, f func() bool) bool { + deadline := time.Now().Add(d) + for { + if time.Now().After(deadline) { + return false + } + if f() { + return true + } + time.Sleep(d / 10) + } +}