diff --git a/metrics/graphite/graphite.go b/metrics/graphite/graphite.go index a2073c6..c1c6ae9 100644 --- a/metrics/graphite/graphite.go +++ b/metrics/graphite/graphite.go @@ -1,4 +1,8 @@ -// Package graphite implements a graphite backend for package metrics. +// Package graphite implements a Graphite backend for package metrics. Metrics +// will be emitted to a Graphite server in the plaintext protocol +// (http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol) +// which looks like: +// " " // // The current implementation ignores fields. package graphite @@ -7,7 +11,6 @@ "bufio" "fmt" "io" - "log" "math" "net" "sort" @@ -16,52 +19,59 @@ "time" "github.com/codahale/hdrhistogram" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" ) // Emitter will keep track of all metrics and, once started, // will emit the metrics via the Flush method to the given address. type Emitter interface { - NewCounter(string) metrics.Counter - NewHistogram(string, int64, int64, int, ...int) metrics.Histogram - NewTimeHistogram(string, time.Duration, int64, int64, int, ...int) metrics.TimeHistogram - NewGauge(string) metrics.Gauge - - Start(time.Duration) - Flush() error + NewCounter(name string) metrics.Counter + NewHistogram(name string, min int64, max int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) + NewTimeHistogram(name string, unit time.Duration, min int64, max int64, sigfigs int, quantiles ...int) (metrics.TimeHistogram, error) + NewGauge(name string) metrics.Gauge + + Start(reportInvterval time.Duration) error + Flush() } type emitter struct { - addr *net.TCPAddr prefix string - metricMu *sync.Mutex + addr string + tcp bool + conn net.Conn + + mtx sync.Mutex counters []*counter histograms []*windowedHistogram gauges []*gauge + + logger log.Logger } // NewEmitter will return an Emitter that will prefix all // metrics names with the given prefix. Once started, it will attempt to create -// a TCP connection with the given address and periodically post metrics to the -// connection in a Graphite-compatible format. -func NewEmitter(addr *net.TCPAddr, prefix string) Emitter { - e := &emitter{ - addr, prefix, &sync.Mutex{}, - []*counter{}, []*windowedHistogram{}, []*gauge{}, - } - - return e +// a TCP or a UDP connection with the given address and periodically post +// metrics to the connection in the Graphite plaintext protocol. +// If the provided `tcp` parameter is false, a UDP connection will be used. +func NewEmitter(addr string, tcp bool, metricsPrefix string, logger log.Logger) Emitter { + return &emitter{ + addr: addr, + tcp: tcp, + prefix: metricsPrefix, + logger: logger, + } } // NewCounter returns a Counter whose value will be periodically emitted in // a Graphite-compatible format once the Emitter is started. Fields are ignored. func (e *emitter) NewCounter(name string) metrics.Counter { // only one flush at a time - e.metricMu.Lock() - defer e.metricMu.Unlock() c := &counter{name, 0} + e.mtx.Lock() e.counters = append(e.counters, c) + e.mtx.Unlock() return c } @@ -74,36 +84,37 @@ // // The values of this histogram will be periodically emitted in a Graphite-compatible // format once the Emitter is started. Fields are ignored. -func (e *emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) metrics.Histogram { - // only one flush at a time - e.metricMu.Lock() - defer e.metricMu.Unlock() - +func (e *emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) { gauges := map[int]metrics.Gauge{} for _, quantile := range quantiles { if quantile <= 0 || quantile >= 100 { - panic(fmt.Sprintf("invalid quantile %d", quantile)) + return nil, fmt.Errorf("invalid quantile %d", quantile) } gauges[quantile] = e.gauge(fmt.Sprintf("%s_p%02d", name, quantile)) } - h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges) + h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges, e.logger) + + e.mtx.Lock() e.histograms = append(e.histograms, h) - return h + e.mtx.Unlock() + return h, nil } // NewTimeHistogram returns a TimeHistogram wrapper around the windowed // HDR histrogram provided by this package. -func (e *emitter) NewTimeHistogram(name string, unit time.Duration, minValue, maxValue int64, sigfigs int, quantiles ...int) metrics.TimeHistogram { - h := e.NewHistogram(name, minValue, maxValue, sigfigs, quantiles...) - return metrics.NewTimeHistogram(unit, h) +func (e *emitter) NewTimeHistogram(name string, unit time.Duration, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.TimeHistogram, error) { + h, err := e.NewHistogram(name, minValue, maxValue, sigfigs, quantiles...) + if err != nil { + return nil, err + } + return metrics.NewTimeHistogram(unit, h), nil } // NewGauge returns a Gauge whose value will be periodically emitted in // a Graphite-compatible format once the Emitter is started. Fields are ignored. func (e *emitter) NewGauge(name string) metrics.Gauge { - // only one flush at a time - e.metricMu.Lock() - defer e.metricMu.Unlock() + e.mtx.Lock() + defer e.mtx.Unlock() return e.gauge(name) } @@ -113,60 +124,69 @@ return g } +func (e *emitter) dial() error { + if e.tcp { + tAddr, err := net.ResolveTCPAddr("tcp", e.addr) + if err != nil { + return err + } + e.conn, err = net.DialTCP("tcp", nil, tAddr) + if err != nil { + return err + } + } else { + uAddr, err := net.ResolveUDPAddr("udp", e.addr) + if err != nil { + return err + } + e.conn, err = net.DialUDP("udp", nil, uAddr) + if err != nil { + return err + } + } + return nil +} + // Start will kick off a background goroutine to // call Flush once every interval. -func (e *emitter) Start(interval time.Duration) { +func (e *emitter) Start(interval time.Duration) error { + err := e.dial() + if err != nil { + return err + } go func() { - t := time.Tick(interval) - for range t { - err := e.Flush() - if err != nil { - log.Print("error: could not dial graphite host: ", err) - continue - } + for range time.Tick(interval) { + e.Flush() } }() + return nil } // Flush will attempt to create a connection with the given address -// and write the current metrics to it in a Graphite-compatible format. +// and write the current metrics to it in the Graphite plaintext protocol. // // Users can call this method on process shutdown to ensure // the current metrics are pushed to Graphite. -func (e *emitter) Flush() error { - // open connection - conn, err := net.DialTCP("tcp", nil, e.addr) - if err != nil { - return err - } - - // flush stats to connection - e.flush(conn) - - // close connection - conn.Close() - return nil -} +func (e *emitter) Flush() { e.flush(e.conn) } func (e *emitter) flush(conn io.Writer) { // only one flush at a time - e.metricMu.Lock() - defer e.metricMu.Unlock() + e.mtx.Lock() + defer e.mtx.Unlock() // buffer the writer and make sure to flush it w := bufio.NewWriter(conn) defer w.Flush() - now := time.Now().Unix() - // emit counter stats for _, c := range e.counters { - fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, c.Name(), c.count, now) + fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, c.Name(), c.count, time.Now().Unix()) } // emit histogram specific stats for _, h := range e.histograms { hist := h.hist.Merge() + now := time.Now().Unix() fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, h.Name(), hist.TotalCount(), now) fmt.Fprintf(w, "%s.%s.min %d %d\n", e.prefix, h.Name(), hist.Min(), now) fmt.Fprintf(w, "%s.%s.max %d %d\n", e.prefix, h.Name(), hist.Max(), now) @@ -176,7 +196,7 @@ // emit gauge stats (which can include some histogram quantiles) for _, g := range e.gauges { - fmt.Fprintf(w, "%s.%s %.2f %d\n", e.prefix, g.Name(), g.Get(), now) + fmt.Fprintf(w, "%s.%s %.2f %d\n", e.prefix, g.Name(), g.Get(), time.Now().Unix()) } } @@ -219,11 +239,12 @@ } type windowedHistogram struct { - mu sync.Mutex + mtx sync.Mutex hist *hdrhistogram.WindowedHistogram name string gauges map[int]metrics.Gauge + logger log.Logger } // newWindowedHistogram is taken from http://github.com/codahale/metrics. It returns a @@ -232,11 +253,12 @@ // The histogram exposes metrics for each passed quantile as gauges. Users are expected // to provide their own set of Gauges for quantiles to make this Histogram work across multiple // metrics providers. -func newWindowedHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles map[int]metrics.Gauge) *windowedHistogram { +func newWindowedHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles map[int]metrics.Gauge, logger log.Logger) *windowedHistogram { h := &windowedHistogram{ hist: hdrhistogram.NewWindowed(5, minValue, maxValue, sigfigs), name: name, gauges: quantiles, + logger: logger, } go h.rotateLoop(1 * time.Minute) return h @@ -246,12 +268,13 @@ func (h *windowedHistogram) With(metrics.Field) metrics.Histogram { return h } func (h *windowedHistogram) Observe(value int64) { - h.mu.Lock() + h.mtx.Lock() err := h.hist.Current.RecordValue(value) - h.mu.Unlock() + h.mtx.Unlock() if err != nil { - panic(err.Error()) + h.logger.Log("err", err, "msg", "unable to record histogram value") + return } for q, gauge := range h.gauges { @@ -282,9 +305,9 @@ func (h *windowedHistogram) rotateLoop(d time.Duration) { for range time.Tick(d) { - h.mu.Lock() + h.mtx.Lock() h.hist.Rotate() - h.mu.Unlock() + h.mtx.Unlock() } } diff --git a/metrics/graphite/graphite_test.go b/metrics/graphite/graphite_test.go index fdd8575..cee876f 100644 --- a/metrics/graphite/graphite_test.go +++ b/metrics/graphite/graphite_test.go @@ -12,12 +12,16 @@ func TestHistogramQuantiles(t *testing.T) { prefix := "prefix" - e := NewEmitter(nil, prefix) + e := NewEmitter("", true, prefix, nil) var ( name = "test_histogram_quantiles" quantiles = []int{50, 90, 95, 99} - h = e.NewHistogram(name, 0, 100, 3, quantiles...).With(metrics.Field{Key: "ignored", Value: "field"}) ) + h, err := e.NewHistogram(name, 0, 100, 3, quantiles...) + if err != nil { + t.Fatalf("unable to create test histogram: ", err) + } + h = h.With(metrics.Field{Key: "ignored", Value: "field"}) const seed, mean, stdev int64 = 424242, 50, 10 teststat.PopulateNormalHistogram(t, h, seed, mean, stdev) @@ -32,11 +36,11 @@ prefix = "prefix" name = "m" value = 123 - e = NewEmitter(nil, prefix) + e = NewEmitter("", true, prefix, nil).(*emitter) b bytes.Buffer ) e.NewCounter(name).With(metrics.Field{Key: "ignored", Value: "field"}).Add(uint64(value)) - e.(*emitter).flush(&b) + e.flush(&b) want := fmt.Sprintf("%s.%s.count %d", prefix, name, value) payload := b.String() if !strings.HasPrefix(payload, want) { @@ -50,7 +54,7 @@ name = "xyz" value = 54321 delta = 12345 - e = NewEmitter(nil, prefix) + e = NewEmitter("", true, prefix, nil) b bytes.Buffer g = e.NewGauge(name).With(metrics.Field{Key: "ignored", Value: "field"}) ) @@ -66,15 +70,3 @@ t.Errorf("gauge %s want\n%s, have\n%s", name, want, payload) } } - -func TestInvalidQuantile(t *testing.T) { - e := NewEmitter(nil, "") - defer func() { - if err := recover(); err == nil { - t.Errorf("expected panic, got none") - } else { - t.Logf("got expected panic: %v", err) - } - }() - e.NewHistogram("foo", 0.0, 100.0, 3, 50, 90, 95, 99, 101) -} diff --git a/metrics/teststat/graphite.go b/metrics/teststat/graphite.go index 498ca20..4cb5994 100644 --- a/metrics/teststat/graphite.go +++ b/metrics/teststat/graphite.go @@ -11,51 +11,53 @@ // AssertGraphiteNormalHistogram ensures the expvar Histogram referenced by // metricName abides a normal distribution. func AssertGraphiteNormalHistogram(t *testing.T, prefix, metricName string, mean, stdev int64, quantiles []int, gPayload string) { - const tolerance int = 2 - // check for hdr histo data - wants := map[string]int64{"count": 1234, "min": 15, "max": 83, "std-dev": stdev, "mean": mean} + wants := map[string]int64{"count": 1234, "min": 15, "max": 83} for key, want := range wants { re := regexp.MustCompile(fmt.Sprintf("%s.%s.%s (\\d*)", prefix, metricName, key)) - if res := re.FindAllStringSubmatch(gPayload, 1); res != nil { - if len(res[0]) == 1 { - t.Errorf("bad regex found, please check the test scenario") - continue - } + res := re.FindAllStringSubmatch(gPayload, 1) + if res == nil { + t.Error("did not find metrics log for", key, "in \n", gPayload) + continue + } - have, err := strconv.ParseInt(res[0][1], 10, 64) - if err != nil { - t.Fatal(err) - } + if len(res[0]) == 1 { + t.Fatalf("%q: bad regex, please check the test scenario", key) + } - if int(math.Abs(float64(want-have))) > tolerance { - t.Errorf("key %s: want %d, have %d", key, want, have) - } - } else { - t.Error("did not find metrics log for", key, "in \n", gPayload) + have, err := strconv.ParseInt(res[0][1], 10, 64) + if err != nil { + t.Fatal(err) + } + + if want != have { + t.Errorf("key %s: want %d, have %d", key, want, have) } } + const tolerance int = 2 + wants = map[string]int64{".std-dev": stdev, ".mean": mean} + for _, quantile := range quantiles { + wants[fmt.Sprintf("_p%02d", quantile)] = normalValueAtQuantile(mean, stdev, quantile) + } // check for quantile gauges - for _, quantile := range quantiles { - want := normalValueAtQuantile(mean, stdev, quantile) - - re := regexp.MustCompile(fmt.Sprintf("%s.%s_p%02d (\\d*\\.\\d*)", prefix, metricName, quantile)) - if res := re.FindAllStringSubmatch(gPayload, 1); res != nil { - if len(res[0]) == 1 { - t.Errorf("bad regex found, please check the test scenario") - continue - } - have, err := strconv.ParseFloat(res[0][1], 64) - if err != nil { - t.Fatal(err) - } - if int(math.Abs(float64(want)-have)) > tolerance { - t.Errorf("quantile %d: want %.2f, have %.2f", quantile, want, have) - } - } else { - t.Errorf("did not find metrics log for %d", quantile) + for key, want := range wants { + re := regexp.MustCompile(fmt.Sprintf("%s.%s%s (\\d*\\.\\d*)", prefix, metricName, key)) + res := re.FindAllStringSubmatch(gPayload, 1) + if res == nil { + t.Errorf("did not find metrics log for %s", key) + continue } + if len(res[0]) == 1 { + t.Fatalf("%q: bad regex found, please check the test scenario", key) + } + have, err := strconv.ParseFloat(res[0][1], 64) + if err != nil { + t.Fatal(err) + } + if int(math.Abs(float64(want)-have)) > tolerance { + t.Errorf("key %s: want %.2f, have %.2f", key, want, have) + } } }