diff --git a/metrics2/circonus/circonus.go b/metrics2/circonus/circonus.go new file mode 100644 index 0000000..591b2eb --- /dev/null +++ b/metrics2/circonus/circonus.go @@ -0,0 +1,85 @@ +// Package circonus provides a Circonus backend for metrics. +package circonus + +import ( + "github.com/circonus-labs/circonus-gometrics" + + "github.com/go-kit/kit/metrics2" +) + +// Circonus wraps a CirconusMetrics object and provides constructors for each of +// the Go kit metrics. The CirconusMetrics object manages aggregation of +// observations and emission to the Circonus server. +type Circonus struct { + m *circonusgometrics.CirconusMetrics +} + +// New creates a new Circonus object wrapping the passed CirconusMetrics, which +// the caller should create and set in motion. The Circonus object can be used +// to construct individual Go kit metrics. +func New(m *circonusgometrics.CirconusMetrics) *Circonus { + return &Circonus{ + m: m, + } +} + +// NewCounter returns a counter metric with the given name. +func (c *Circonus) NewCounter(name string) *Counter { + return &Counter{ + name: name, + m: c.m, + } +} + +// NewGauge returns a gauge metric with the given name. +func (c *Circonus) NewGauge(name string) *Gauge { + return &Gauge{ + name: name, + m: c.m, + } +} + +// NewHistogram returns a histogram metric with the given name. +func (c *Circonus) NewHistogram(name string) *Histogram { + return &Histogram{ + h: c.m.NewHistogram(name), + } +} + +// Counter is a Circonus implementation of a counter metric. +type Counter struct { + name string + m *circonusgometrics.CirconusMetrics +} + +// With implements Counter, but is a no-op, because Circonus metrics have no +// concept of per-observation label values. +func (c *Counter) With(labelValues ...string) metrics.Counter { return c } + +// Add implements Counter. Delta is converted to uint64; precision will be lost. +func (c *Counter) Add(delta float64) { c.m.Add(c.name, uint64(delta)) } + +// Gauge is a Circonus implementation of a gauge metric. +type Gauge struct { + name string + m *circonusgometrics.CirconusMetrics +} + +// With implements Gauge, but is a no-op, because Circonus metrics have no +// concept of per-observation label values. +func (g *Gauge) With(labelValues ...string) metrics.Gauge { return g } + +// Set implements Gauge. Value is converted to int64; precision will be lost. +func (g *Gauge) Set(value float64) { g.m.SetGauge(g.name, int64(value)) } + +// Histogram is a Circonus implementation of a histogram metric. +type Histogram struct { + h *circonusgometrics.Histogram +} + +// With implements Histogram, but is a no-op, because Circonus metrics have no +// concept of per-observation label values. +func (h *Histogram) With(labelValues ...string) metrics.Histogram { return h } + +// Observe implements Histogram. No precision is lost. +func (h *Histogram) Observe(value float64) { h.h.RecordValue(value) } diff --git a/metrics2/discard/discard.go b/metrics2/discard/discard.go new file mode 100644 index 0000000..a572c70 --- /dev/null +++ b/metrics2/discard/discard.go @@ -0,0 +1,37 @@ +// Package discard provides a no-op metrics backend. +package discard + +import "github.com/go-kit/kit/metrics2" + +type counter struct{} + +// NewCounter returns a new no-op counter. +func NewCounter() metrics.Counter { return counter{} } + +// With implements Counter. +func (c counter) With(labelValues ...string) metrics.Counter { return c } + +// Add implements Counter. +func (c counter) Add(delta float64) {} + +type gauge struct{} + +// NewGauge returns a new no-op gauge. +func NewGauge() metrics.Gauge { return gauge{} } + +// With implements Gauge. +func (g gauge) With(labelValues ...string) metrics.Gauge { return g } + +// Set implements Gauge. +func (g gauge) Set(value float64) {} + +type histogram struct{} + +// NewHistogram returns a new no-op histogram. +func NewHistogram() metrics.Histogram { return histogram{} } + +// With implements Histogram. +func (h histogram) With(labelValues ...string) metrics.Histogram { return h } + +// Observe implements histogram. +func (h histogram) Observe(value float64) {} diff --git a/metrics2/doc.go b/metrics2/doc.go new file mode 100644 index 0000000..c3dc4ae --- /dev/null +++ b/metrics2/doc.go @@ -0,0 +1,43 @@ +// Package metrics provides a framework for application instrumentation. All +// metrics are safe for concurrent use. Considerable design influence has been +// taken from https://github.com/codahale/metrics and https://prometheus.io. +// +// This package contains the common interfaces. Your code should take these +// interfaces as parameters. Implementations are provided for different +// instrumentation systems in the various subdirectories. +// +// Usage +// +// Metrics are dependencies and should be passed to the components that need +// them in the same way you'd construct and pass a database handle, or reference +// to another component. So, create metrics in your func main, using whichever +// concrete implementation is appropriate for your organization. +// +// latency := prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ +// Namespace: "myteam", +// Subsystem: "foosvc", +// Name: "request_latency_seconds", +// Help: "Incoming request latency in seconds." +// }, []string{"method", "status_code"}) +// +// Write your components to take the metrics they will use as parameters to +// their constructors. Use the interface types, not the concrete types. That is, +// +// // NewAPI takes metrics.Histogram, not *prometheus.Summary +// func NewAPI(s Store, logger log.Logger, latency metrics.Histogram) *API { +// // ... +// } +// +// func (a *API) ServeFoo(w http.ResponseWriter, r *http.Request) { +// begin := time.Now() +// // ... +// a.latency.Observe(time.Since(begin).Seconds()) +// } +// +// Finally, pass the metrics as dependencies when building your object graph. +// This should happen in func main, not in the global scope. +// +// api := NewAPI(store, logger, latency) +// http.ListenAndServe("/", api) +// +package metrics diff --git a/metrics2/dogstatsd/dogstatsd.go b/metrics2/dogstatsd/dogstatsd.go new file mode 100644 index 0000000..e3be46d --- /dev/null +++ b/metrics2/dogstatsd/dogstatsd.go @@ -0,0 +1,318 @@ +// Package dogstatsd provides a DogStatsD backend for package metrics. It's very +// similar to statsd, but supports arbitrary tags per-metric, which map to Go +// kit's label values. So, while label values are no-ops in statsd, they are +// supported here. For more details, see the documentation at +// http://docs.datadoghq.com/guides/dogstatsd/. +// +// This package batches observations and emits them on some schedule to the +// remote server. This is useful even if you connect to your DogStatsD server +// over UDP. Emitting one network packet per observation can quickly overwhelm +// even the fastest internal network. Batching allows you to more linearly scale +// with growth. +// +// Typically you'll create a Dogstatsd object in your main function. +// +// d, stop := New("myprefix.", "udp", "dogstatsd:8125", time.Second, log.NewNopLogger()) +// defer stop() +// +// Then, create the metrics that your application will track from that object. +// Pass them as dependencies to the component that needs them; don't place them +// in the global scope. +// +// requests := d.NewCounter("requests") +// foo := NewFoo(store, logger, requests) +// +// Invoke them in your components when you have something to instrument. +// +// f.requests.Add(1) +// +package dogstatsd + +import ( + "fmt" + "io" + "math/rand" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics2" + "github.com/go-kit/kit/metrics2/generic" + "github.com/go-kit/kit/metrics2/statsd" + "github.com/go-kit/kit/util/conn" +) + +// Dogstatsd is a store for metrics that will be reported to a DogStatsD server. +// Create a Dogstatsd object, use it to create metrics objects, and pass those +// objects as dependencies to the components that will use them. +type Dogstatsd struct { + mtx sync.RWMutex + prefix string + counters map[string]*generic.Counter + gauges map[string]*generic.Gauge + histograms map[string]*Histogram + timings map[string]*statsd.Timing + sets map[string]*Set + logger log.Logger +} + +// NewRaw creates a Dogstatsd object. By default the metrics will not be emitted +// anywhere. Use WriteTo to flush the metrics once, or FlushTo (in a separate +// goroutine) to flush them on a regular schedule, or use the New constructor to +// set up the object and flushing at the same time. +func NewRaw(prefix string, logger log.Logger) *Dogstatsd { + return &Dogstatsd{ + prefix: prefix, + counters: map[string]*generic.Counter{}, + gauges: map[string]*generic.Gauge{}, + histograms: map[string]*Histogram{}, + timings: map[string]*statsd.Timing{}, + sets: map[string]*Set{}, + logger: logger, + } +} + +// New creates a Statsd object that flushes all metrics in the DogStatsD format +// every flushInterval to the network and address. Use the returned stop +// function to terminate the flushing goroutine. +func New(prefix string, network, address string, flushInterval time.Duration, logger log.Logger) (res *Dogstatsd, stop func()) { + d := NewRaw(prefix, logger) + manager := conn.NewDefaultManager(network, address, logger) + ticker := time.NewTicker(flushInterval) + go d.FlushTo(manager, ticker) + return d, ticker.Stop +} + +// NewCounter returns a counter metric with the given name. Adds are buffered +// until the underlying Statsd object is flushed. +func (d *Dogstatsd) NewCounter(name string) *generic.Counter { + d.mtx.Lock() + defer d.mtx.Unlock() + c := generic.NewCounter() + d.counters[d.prefix+name] = c + return c +} + +// NewGauge returns a gauge metric with the given name. +func (d *Dogstatsd) NewGauge(name string) *generic.Gauge { + d.mtx.Lock() + defer d.mtx.Unlock() + g := generic.NewGauge() + d.gauges[d.prefix+name] = g + return g +} + +// NewHistogram returns a histogram metric with the given name and sample rate. +func (d *Dogstatsd) NewHistogram(name string, sampleRate float64) *Histogram { + d.mtx.Lock() + defer d.mtx.Unlock() + h := newHistogram(sampleRate) + d.histograms[d.prefix+name] = h + return h +} + +// NewTiming returns a StatsD timing metric (DogStatsD documentation calls them +// Timers) with the given name, unit (e.g. "ms") and sample rate. Pass a sample +// rate of 1.0 or greater to disable sampling. Sampling is done at observation +// time. Observations are buffered until the underlying statsd object is +// flushed. +func (d *Dogstatsd) NewTiming(name, unit string, sampleRate float64) *statsd.Timing { + d.mtx.Lock() + defer d.mtx.Unlock() + t := statsd.NewTiming(unit, sampleRate) + d.timings[d.prefix+name] = t + return t +} + +// NewSet returns a DogStatsD set with the given name. +func (d *Dogstatsd) NewSet(name string) *Set { + d.mtx.Lock() + defer d.mtx.Unlock() + s := newSet() + d.sets[d.prefix+name] = s + return s +} + +// FlushTo invokes WriteTo to the writer every time the ticker fires. FlushTo +// blocks until the ticker is stopped. Most users won't need to call this method +// directly, and should prefer to use the New constructor. +func (d *Dogstatsd) FlushTo(w io.Writer, ticker *time.Ticker) { + for range ticker.C { + if _, err := d.WriteTo(w); err != nil { + d.logger.Log("during", "flush", "err", err) + } + } +} + +// WriteTo dumps the current state of all of the metrics to the given writer in +// the DogStatsD format. Each metric has its current value(s) written in +// sequential calls to Write. Counters and gauges are dumped with their current +// values; counters are reset. Histograms and timers have all of their +// (potentially sampled) observations dumped, and are reset. Sets have all of +// their observations dumped and are reset. Clients probably shouldn't invoke +// this method directly, and should prefer using FlushTo, or the New +// constructor. +func (d *Dogstatsd) WriteTo(w io.Writer) (int64, error) { + d.mtx.RLock() + defer d.mtx.RUnlock() + var ( + n int + err error + count int64 + ) + for name, c := range d.counters { + value := c.ValueReset() + tv := tagValues(c.LabelValues()) + n, err = fmt.Fprintf(w, "%s:%f|c%s\n", name, value, tv) + count += int64(n) + if err != nil { + return count, err + } + } + for name, g := range d.gauges { + value := g.Value() + tv := tagValues(g.LabelValues()) + n, err := fmt.Fprintf(w, "%s:%f|g%s\n", name, value, tv) + count += int64(n) + if err != nil { + return count, err + } + } + for name, h := range d.histograms { + sv := sampling(h.sampleRate) + tv := tagValues(h.lvs) + for _, value := range h.values { + n, err = fmt.Fprintf(w, "%s:%f%s%s\n", name, value, sv, tv) + count += int64(n) + if err != nil { + return count, err + } + } + } + for name, t := range d.timings { + un := t.Unit() + sv := sampling(t.SampleRate()) + tv := tagValues(t.LabelValues()) + for _, value := range t.Values() { + n, err = fmt.Fprintf(w, "%s:%d|%s%s%s\n", name, value, un, sv, tv) + count += int64(n) + if err != nil { + return count, err + } + } + } + for name, s := range d.sets { + for _, value := range s.Values() { + n, err = fmt.Fprintf(w, "%s:%s|s\n", name, value) + count += int64(n) + if err != nil { + return count, err + } + } + } + return count, nil +} + +// Histogram is a denormalized collection of observed values. A general form of +// a StatsD Timing. Create histograms through the Dogstatsd object. +type Histogram struct { + mtx sync.Mutex + sampleRate float64 + values []float64 + lvs []string +} + +func newHistogram(sampleRate float64) *Histogram { + return &Histogram{ + sampleRate: sampleRate, + } +} + +// With implements metrics.Histogram. +func (h *Histogram) With(labelValues ...string) metrics.Histogram { + if len(labelValues)%2 != 0 { + labelValues = append(labelValues, generic.LabelValueUnknown) + } + return &Histogram{ + sampleRate: h.sampleRate, + values: h.values, + lvs: append(h.lvs, labelValues...), + } +} + +// Observe implements metrics.Histogram. Values are simply aggregated into memory. +// If the sample rate is less than 1.0, observations may be dropped. +func (h *Histogram) Observe(value float64) { + if h.sampleRate < 1.0 && rand.Float64() > h.sampleRate { + return + } + h.mtx.Lock() + defer h.mtx.Unlock() + h.values = append(h.values, value) +} + +// Values returns the observed values since the last call to Values. This method +// clears the internal state of the Histogram; better get those values somewhere +// safe! +func (h *Histogram) Values() []float64 { + h.mtx.Lock() + defer h.mtx.Unlock() + res := h.values + h.values = []float64{} + return res +} + +// Set is a DogStatsD-specific metric for tracking unique identifiers. +// Create sets through the Dogstatsd object. +type Set struct { + mtx sync.Mutex + values map[string]struct{} +} + +func newSet() *Set { + return &Set{ + values: map[string]struct{}{}, + } +} + +// Observe adds the value to the set. +func (s *Set) Observe(value string) { + s.mtx.Lock() + defer s.mtx.Unlock() + s.values[value] = struct{}{} +} + +// Values returns the unique observed values since the last call to Values. This +// method clears the internal state of the Set; better get those values +// somewhere safe! +func (s *Set) Values() []string { + res := make([]string, 0, len(s.values)) + for value := range s.values { + res = append(res, value) + } + s.values = map[string]struct{}{} // TODO(pb): if GC is a problem, this can be improved + return res +} + +func sampling(r float64) string { + var sv string + if r < 1.0 { + sv = fmt.Sprintf("|@%f", r) + } + return sv +} + +func tagValues(labelValues []string) string { + if len(labelValues) == 0 { + return "" + } + if len(labelValues)%2 != 0 { + panic("tagValues received a labelValues with an odd number of strings") + } + pairs := make([]string, 0, len(labelValues)/2) + for i := 0; i < len(labelValues); i += 2 { + pairs = append(pairs, labelValues[i]+":"+labelValues[i+1]) + } + return "|#" + strings.Join(pairs, ",") +} diff --git a/metrics2/expvar/expvar.go b/metrics2/expvar/expvar.go new file mode 100644 index 0000000..fa3ea35 --- /dev/null +++ b/metrics2/expvar/expvar.go @@ -0,0 +1,89 @@ +// Package expvar provides expvar backends for metrics. +// Label values are not supported. +package expvar + +import ( + "expvar" + "sync" + + "github.com/go-kit/kit/metrics2" + "github.com/go-kit/kit/metrics2/generic" +) + +// Counter implements the counter metric with an expvar float. +type Counter struct { + f *expvar.Float +} + +// NewCounter creates an expvar Float with the given name, and returns an object +// that implements the Counter interface. +func NewCounter(name string) *Counter { + return &Counter{ + f: expvar.NewFloat(name), + } +} + +// With is a no-op. +func (c *Counter) With(labelValues ...string) metrics.Counter { return c } + +// Add implements Counter. +func (c *Counter) Add(delta float64) { c.f.Add(delta) } + +// Gauge implements the gauge metric wtih an expvar float. +type Gauge struct { + f *expvar.Float +} + +// NewGauge creates an expvar Float with the given name, and returns an object +// that implements the Gauge interface. +func NewGauge(name string) *Gauge { + return &Gauge{ + f: expvar.NewFloat(name), + } +} + +// With is a no-op. +func (g *Gauge) With(labelValues ...string) metrics.Gauge { return g } + +// Set implements Gauge. +func (g *Gauge) Set(value float64) { g.f.Set(value) } + +// Histogram implements the histogram metric with a combination of the generic +// Histogram object and several expvar Floats, one for each of the 50th, 90th, +// 95th, and 99th quantiles of observed values, with the quantile attached to +// the name as a suffix. +type Histogram struct { + mtx sync.Mutex + h *generic.Histogram + p50 *expvar.Float + p90 *expvar.Float + p95 *expvar.Float + p99 *expvar.Float +} + +// NewHistogram returns a Histogram object with the given name and number of +// buckets in the underlying histogram object. 50 is a good default number of +// buckets. +func NewHistogram(name string, buckets int) *Histogram { + return &Histogram{ + h: generic.NewHistogram(buckets), + p50: expvar.NewFloat(name + ".p50"), + p90: expvar.NewFloat(name + ".p90"), + p95: expvar.NewFloat(name + ".p95"), + p99: expvar.NewFloat(name + ".p99"), + } +} + +// With is a no-op. +func (h *Histogram) With(labelValues ...string) metrics.Histogram { return h } + +// Observe impleemts Histogram. +func (h *Histogram) Observe(value float64) { + h.mtx.Lock() + defer h.mtx.Unlock() + h.Observe(value) + h.p50.Set(h.h.Quantile(0.50)) + h.p90.Set(h.h.Quantile(0.90)) + h.p95.Set(h.h.Quantile(0.95)) + h.p99.Set(h.h.Quantile(0.99)) +} diff --git a/metrics2/generic/generic.go b/metrics2/generic/generic.go new file mode 100644 index 0000000..ec6b625 --- /dev/null +++ b/metrics2/generic/generic.go @@ -0,0 +1,207 @@ +// Package generic implements generic versions of each of the metric types. They +// can be embedded by other implementations, and converted to specific formats +// as necessary. +package generic + +import ( + "math" + "sync" + "sync/atomic" + + "github.com/VividCortex/gohistogram" + + "github.com/go-kit/kit/metrics2" +) + +// LabelValueUnknown is used as a label value when one is expected but not +// provided, typically due to user error. +const LabelValueUnknown = "unknown" + +// Counter is an in-memory implementation of a Counter. +type Counter struct { + sampleRate float64 + bits uint64 + lvs []string // immutable +} + +// NewCounter returns a new, usable Counter. +func NewCounter() *Counter { + return &Counter{} +} + +// With implements Counter. +func (c *Counter) With(labelValues ...string) metrics.Counter { + if len(labelValues)%2 != 0 { + labelValues = append(labelValues, LabelValueUnknown) + } + return &Counter{ + bits: atomic.LoadUint64(&c.bits), + lvs: append(c.lvs, labelValues...), + } +} + +// Add implements Counter. +func (c *Counter) Add(delta float64) { + for { + var ( + old = atomic.LoadUint64(&c.bits) + newf = math.Float64frombits(old) + delta + new = math.Float64bits(newf) + ) + if atomic.CompareAndSwapUint64(&c.bits, old, new) { + break + } + } +} + +// Value returns the current value of the counter. +func (c *Counter) Value() float64 { + return math.Float64frombits(atomic.LoadUint64(&c.bits)) +} + +// ValueReset returns the current value of the counter, and resets it to zero. +// This is useful for metrics backends whose counter aggregations expect deltas, +// like statsd. +func (c *Counter) ValueReset() float64 { + for { + var ( + old = atomic.LoadUint64(&c.bits) + newf = 0.0 + new = math.Float64bits(newf) + ) + if atomic.CompareAndSwapUint64(&c.bits, old, new) { + return math.Float64frombits(old) + } + } +} + +// LabelValues returns the set of label values attached to the counter. +func (c *Counter) LabelValues() []string { + return c.lvs +} + +// Gauge is an in-memory implementation of a Gauge. +type Gauge struct { + bits uint64 + lvs []string // immutable +} + +// NewGauge returns a new, usable Gauge. +func NewGauge() *Gauge { + return &Gauge{} +} + +// With implements Gauge. +func (c *Gauge) With(labelValues ...string) metrics.Gauge { + if len(labelValues)%2 != 0 { + labelValues = append(labelValues, LabelValueUnknown) + } + return &Gauge{ + bits: atomic.LoadUint64(&c.bits), + lvs: append(c.lvs, labelValues...), + } +} + +// Set implements Gauge. +func (c *Gauge) Set(value float64) { + atomic.StoreUint64(&c.bits, math.Float64bits(value)) +} + +// Value returns the current value of the gauge. +func (c *Gauge) Value() float64 { + return math.Float64frombits(atomic.LoadUint64(&c.bits)) +} + +// LabelValues returns the set of label values attached to the gauge. +func (c *Gauge) LabelValues() []string { + return c.lvs +} + +// Histogram is an in-memory implementation of a streaming histogram, based on +// VividCortex/gohistogram. It dynamically computes quantiles, so it's not +// suitable for aggregation. +type Histogram struct { + lvs []string // immutable + h gohistogram.Histogram +} + +// NewHistogram returns a numeric histogram based on VividCortex/gohistogram. A +// good default value for buckets is 50. +func NewHistogram(buckets int) *Histogram { + return &Histogram{ + h: gohistogram.NewHistogram(buckets), + } +} + +// With implements Histogram. +func (h *Histogram) With(labelValues ...string) metrics.Histogram { + if len(labelValues)%2 != 0 { + labelValues = append(labelValues, LabelValueUnknown) + } + return &Histogram{ + lvs: append(h.lvs, labelValues...), + h: h.h, + } +} + +// Observe implements Histogram. +func (h *Histogram) Observe(value float64) { + h.h.Add(value) +} + +// Quantile returns the value of the quantile q, 0.0 < q < 1.0. +func (h *Histogram) Quantile(q float64) float64 { + return h.h.Quantile(q) +} + +// LabelValues returns the set of label values attached to the histogram. +func (h *Histogram) LabelValues() []string { + return h.lvs +} + +// SimpleHistogram is an in-memory implementation of a Histogram. It only tracks +// an approximate moving average, so is likely too naïve for many use cases. +type SimpleHistogram struct { + mtx sync.RWMutex + lvs []string + avg float64 + n uint64 +} + +// NewSimpleHistogram returns a SimpleHistogram, ready for observations. +func NewSimpleHistogram() *SimpleHistogram { + return &SimpleHistogram{} +} + +// With implements Histogram. +func (h *SimpleHistogram) With(labelValues ...string) metrics.Histogram { + if len(labelValues)%2 != 0 { + labelValues = append(labelValues, LabelValueUnknown) + } + return &SimpleHistogram{ + lvs: append(h.lvs, labelValues...), + avg: h.avg, + n: h.n, + } +} + +// Observe implements Histogram. +func (h *SimpleHistogram) Observe(value float64) { + h.mtx.Lock() + defer h.mtx.Unlock() + h.n++ + h.avg -= h.avg / float64(h.n) + h.avg += value / float64(h.n) +} + +// ApproximateMovingAverage returns the approximate moving average of observations. +func (h *SimpleHistogram) ApproximateMovingAverage() float64 { + h.mtx.RLock() + h.mtx.RUnlock() + return h.avg +} + +// LabelValues returns the set of label values attached to the histogram. +func (h *SimpleHistogram) LabelValues() []string { + return h.lvs +} diff --git a/metrics2/graphite/graphite.go b/metrics2/graphite/graphite.go new file mode 100644 index 0000000..8ba555b --- /dev/null +++ b/metrics2/graphite/graphite.go @@ -0,0 +1,137 @@ +// Package graphite provides a Graphite backend for metrics. Metrics are emitted +// with each observation in the plaintext protocol. See +// http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol +// for more information. +// +// Graphite does not have a native understanding of metric parameterization, so +// label values are aggregated but not reported. Use distinct metrics for each +// unique combination of label values. +package graphite + +import ( + "fmt" + "io" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics2/generic" + "github.com/go-kit/kit/util/conn" +) + +// Graphite is a store for metrics that will be reported to a Graphite server. +// Create a Graphite object, use it to create metrics objects, and pass those +// objects as dependencies to the components that will use them. +type Graphite struct { + mtx sync.RWMutex + prefix string + counters map[string]*generic.Counter + gauges map[string]*generic.Gauge + histograms map[string]*generic.Histogram + logger log.Logger +} + +// New creates a Statsd object that flushes all metrics in the Graphite +// plaintext format every flushInterval to the network and address. Use the +// returned stop function to terminate the flushing goroutine. +func New(prefix string, network, address string, flushInterval time.Duration, logger log.Logger) (res *Graphite, stop func()) { + s := NewRaw(prefix, logger) + manager := conn.NewDefaultManager(network, address, logger) + ticker := time.NewTicker(flushInterval) + go s.FlushTo(manager, ticker) + return s, ticker.Stop +} + +// NewRaw returns a Graphite object capable of allocating individual metrics. +// All metrics will share the given prefix in their path. All metrics can be +// snapshotted, and their values and statistical summaries written to a writer, +// via the WriteTo method. +func NewRaw(prefix string, logger log.Logger) *Graphite { + return &Graphite{ + prefix: prefix, + counters: map[string]*generic.Counter{}, + gauges: map[string]*generic.Gauge{}, + histograms: map[string]*generic.Histogram{}, + logger: logger, + } +} + +// NewCounter allocates and returns a counter with the given name. +func (g *Graphite) NewCounter(name string) *generic.Counter { + g.mtx.Lock() + defer g.mtx.Unlock() + c := generic.NewCounter() + g.counters[g.prefix+name] = c + return c +} + +// NewGauge allocates and returns a gauge with the given name. +func (g *Graphite) NewGauge(name string) *generic.Gauge { + g.mtx.Lock() + defer g.mtx.Unlock() + ga := generic.NewGauge() + g.gauges[g.prefix+name] = ga + return ga +} + +// NewHistogram allocates and returns a histogram with the given name and bucket +// count. 50 is a good default number of buckets. Histograms report their 50th, +// 90th, 95th, and 99th quantiles in distinct metrics with the .p50, .p90, .p95, +// and .p99 suffixes, respectively. +func (g *Graphite) NewHistogram(name string, buckets int) *generic.Histogram { + g.mtx.Lock() + defer g.mtx.Unlock() + h := generic.NewHistogram(buckets) + g.histograms[g.prefix+name] = h + return h +} + +// FlushTo invokes WriteTo to the writer every time the ticker fires. FlushTo +// blocks until the ticker is stopped. Most users won't need to call this method +// directly, and should prefer to use the New constructor. +func (g *Graphite) FlushTo(w io.Writer, ticker *time.Ticker) { + for range ticker.C { + if _, err := g.WriteTo(w); err != nil { + g.logger.Log("during", "flush", "err", err) + } + } +} + +// WriteTo writes a snapshot of all of the allocated metrics to the writer in +// the Graphite plaintext format. Clients probably shouldn't invoke this method +// directly, and should prefer using FlushTo, or the New constructor. +func (g *Graphite) WriteTo(w io.Writer) (int64, error) { + g.mtx.RLock() + defer g.mtx.RUnlock() + var ( + n int + err error + count int64 + now = time.Now().Unix() + ) + for path, c := range g.counters { + n, err = fmt.Fprintf(w, "%s.count %f %d\n", path, c.Value(), now) + if err != nil { + return count, err + } + count += int64(n) + } + for path, ga := range g.gauges { + n, err = fmt.Fprintf(w, "%s %f %d\n", path, ga.Value(), now) + if err != nil { + return count, err + } + count += int64(n) + } + for path, h := range g.histograms { + n, err = fmt.Fprintf(w, "%s.p50 %f %d\n", path, h.Quantile(0.50), now) + n, err = fmt.Fprintf(w, "%s.p90 %f %d\n", path, h.Quantile(0.90), now) + n, err = fmt.Fprintf(w, "%s.p95 %f %d\n", path, h.Quantile(0.95), now) + n, err = fmt.Fprintf(w, "%s.p99 %f %d\n", path, h.Quantile(0.99), now) + if err != nil { + return count, err + } + count += int64(n) + } + return count, nil +} diff --git a/metrics2/influx/influxdb.go b/metrics2/influx/influxdb.go new file mode 100644 index 0000000..2912af3 --- /dev/null +++ b/metrics2/influx/influxdb.go @@ -0,0 +1,216 @@ +// Package influx provides an InfluxDB implementation for metrics. The model is +// similar to other push-based instrumentation systems. Observations are +// aggregated locally and emitted to the Influx server on regular intervals. +package influx + +import ( + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics2/generic" + influxdb "github.com/influxdata/influxdb/client/v2" +) + +// Influx is a store for metrics that will be emitted to an Influx database. +// +// Influx is a general purpose time-series database, and has no native concepts +// of counters, gauges, or histograms. Counters are modeled as a timeseries with +// one data point per flush, with a "count" field that reflects all adds since +// the last flush. Gauges are modeled as a timeseries with one data point per +// flush, with a "value" field that reflects the current state of the gauge. +// Histograms are modeled as 4 gauge timeseries, one each for the 50th, 90th, +// 95th, and 99th quantiles. +// +// Influx tags are assigned to each Go kit metric at construction, and are +// immutable for the life of the metric. Influx fields are mapped to Go kit +// label values, and may be mutated via With functions. Actual metric values are +// provided as fields with specific names depending on the metric. +// +// All observations are batched in memory locally, and flushed on demand. +type Influx struct { + mtx sync.RWMutex + counters map[string]*Counter + gauges map[string]*Gauge + histograms map[string]*Histogram + tags map[string]string + conf influxdb.BatchPointsConfig + logger log.Logger +} + +// New returns an Influx object, ready to create metrics and aggregate +// observations, and automatically flushing to the passed Influx client every +// flushInterval. Use the returned stop function to terminate the flushing +// goroutine. +func New( + tags map[string]string, + conf influxdb.BatchPointsConfig, + client influxdb.Client, + flushInterval time.Duration, + logger log.Logger, +) (res *Influx, stop func()) { + i := NewRaw(tags, conf, logger) + ticker := time.NewTicker(flushInterval) + go i.FlushTo(client, ticker) + return i, ticker.Stop +} + +// NewRaw returns an Influx object, ready to create metrics and aggregate +// observations, but without automatically flushing anywhere. Users should +// probably prefer the New constructor. +// +// Tags are applied to all metrics created from this object. A BatchPoints +// structure is created from the provided BatchPointsConfig; any error will +// cause a panic. Observations are aggregated into the BatchPoints. +func NewRaw(tags map[string]string, conf influxdb.BatchPointsConfig, logger log.Logger) *Influx { + return &Influx{ + counters: map[string]*Counter{}, + gauges: map[string]*Gauge{}, + histograms: map[string]*Histogram{}, + tags: tags, + conf: conf, + logger: logger, + } +} + +// NewCounter returns a generic counter with static tags. +func (i *Influx) NewCounter(name string, tags map[string]string) *Counter { + i.mtx.Lock() + defer i.mtx.Unlock() + c := newCounter(tags) + i.counters[name] = c + return c +} + +// NewGauge returns a generic gauge with static tags. +func (i *Influx) NewGauge(name string, tags map[string]string) *Gauge { + i.mtx.Lock() + defer i.mtx.Unlock() + g := newGauge(tags) + i.gauges[name] = g + return g +} + +// NewHistogram returns a generic histogram with static tags. 50 is a good +// default number of buckets. +func (i *Influx) NewHistogram(name string, tags map[string]string, buckets int) *Histogram { + i.mtx.Lock() + defer i.mtx.Unlock() + h := newHistogram(tags, buckets) + i.histograms[name] = h + return h +} + +// FlushTo invokes WriteTo to the client every time the ticker fires. FlushTo +// blocks until the ticker is stopped. Most users won't need to call this method +// directly, and should prefer to use the New constructor. +func (i *Influx) FlushTo(client influxdb.Client, ticker *time.Ticker) { + for range ticker.C { + if err := i.WriteTo(client); err != nil { + i.logger.Log("during", "flush", "err", err) + } + } +} + +// WriteTo converts the current set of metrics to Influx BatchPoints, and writes +// the BatchPoints to the client. Clients probably shouldn't invoke this method +// directly, and should prefer using FlushTo, or the New constructor. +func (i *Influx) WriteTo(client influxdb.Client) error { + i.mtx.Lock() + defer i.mtx.Unlock() + + bp, err := influxdb.NewBatchPoints(i.conf) + if err != nil { + return err + } + now := time.Now() + + for name, c := range i.counters { + fields := fieldsFrom(c.LabelValues()) + fields["count"] = c.ValueReset() + p, err := influxdb.NewPoint(name, c.tags, fields, now) + if err != nil { + return err + } + bp.AddPoint(p) + } + + for name, g := range i.gauges { + fields := fieldsFrom(g.LabelValues()) + fields["value"] = g.Value() + p, err := influxdb.NewPoint(name, g.tags, fields, now) + if err != nil { + return err + } + bp.AddPoint(p) + } + + for name, h := range i.histograms { + fields := fieldsFrom(h.LabelValues()) + for suffix, quantile := range map[string]float64{ + ".p50": 0.50, + ".p90": 0.90, + ".p95": 0.95, + ".p99": 0.99, + } { + fields["value"] = h.Quantile(quantile) + p, err := influxdb.NewPoint(name+suffix, h.tags, fields, now) + if err != nil { + return err + } + bp.AddPoint(p) + } + } + + return client.Write(bp) +} + +func fieldsFrom(labelValues []string) map[string]interface{} { + if len(labelValues)%2 != 0 { + panic("fieldsFrom received a labelValues with an odd number of strings") + } + fields := make(map[string]interface{}, len(labelValues)/2) + for i := 0; i < len(labelValues); i += 2 { + fields[labelValues[i]] = labelValues[i+1] + } + return fields +} + +// Counter is a generic counter, with static tags. +type Counter struct { + *generic.Counter + tags map[string]string +} + +func newCounter(tags map[string]string) *Counter { + return &Counter{ + Counter: generic.NewCounter(), + tags: tags, + } +} + +// Gauge is a generic gauge, with static tags. +type Gauge struct { + *generic.Gauge + tags map[string]string +} + +func newGauge(tags map[string]string) *Gauge { + return &Gauge{ + Gauge: generic.NewGauge(), + tags: tags, + } +} + +// Histogram is a generic histogram, with static tags. +type Histogram struct { + *generic.Histogram + tags map[string]string +} + +func newHistogram(tags map[string]string, buckets int) *Histogram { + return &Histogram{ + Histogram: generic.NewHistogram(buckets), + tags: tags, + } +} diff --git a/metrics2/metrics.go b/metrics2/metrics.go new file mode 100644 index 0000000..f2eb6ea --- /dev/null +++ b/metrics2/metrics.go @@ -0,0 +1,24 @@ +package metrics + +// Counter describes a metric that accumulates values monotonically. +// An example of a counter is the number of received HTTP requests. +type Counter interface { + With(labelValues ...string) Counter + Add(delta float64) +} + +// Gauge describes a metric that takes a specific value over time. +// An example of a gauge is the current depth of a job queue. +type Gauge interface { + With(labelValues ...string) Gauge + Set(value float64) +} + +// Histogram describes a metric that takes repeated observations of the same +// kind of thing, and produces a statistical summary of those observations, +// typically expressed as quantile buckets. An example of a histogram is HTTP +// request latencies. +type Histogram interface { + With(labelValues ...string) Histogram + Observe(value float64) +} diff --git a/metrics2/prometheus/prometheus.go b/metrics2/prometheus/prometheus.go new file mode 100644 index 0000000..2d1f01f --- /dev/null +++ b/metrics2/prometheus/prometheus.go @@ -0,0 +1,173 @@ +// Package prometheus provides Prometheus implementations for metrics. +// Individual metrics are mapped to their Prometheus counterparts, and +// (depending on the constructor used) may be automatically registered in the +// global Prometheus metrics registry. +package prometheus + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/go-kit/kit/metrics2" + "github.com/go-kit/kit/metrics2/generic" +) + +// Counter implements Counter, via a Prometheus CounterVec. +type Counter struct { + cv *prometheus.CounterVec + lv []string +} + +// NewCounterFrom constructs and registers a Prometheus CounterVec, +// and returns a usable Counter object. +func NewCounterFrom(opts prometheus.CounterOpts, labelNames []string) *Counter { + cv := prometheus.NewCounterVec(opts, labelNames) + prometheus.MustRegister(cv) + return NewCounter(cv) +} + +// NewCounter wraps the CounterVec and returns a usable Counter object. +func NewCounter(cv *prometheus.CounterVec) *Counter { + return &Counter{ + cv: cv, + lv: []string{}, + } +} + +// With implements Counter. +func (c *Counter) With(labelValues ...string) metrics.Counter { + if len(labelValues)%2 != 0 { + labelValues = append(labelValues, generic.LabelValueUnknown) + } + return &Counter{ + cv: c.cv, + lv: append(c.lv, labelValues...), + } +} + +// Add implements Counter. +func (c *Counter) Add(delta float64) { + c.cv.WithLabelValues(c.lv...).Add(delta) +} + +// Gauge implements Gauge, via a Prometheus GaugeVec. +type Gauge struct { + gv *prometheus.GaugeVec + lv []string +} + +// NewGaugeFrom construts and registers a Prometheus GaugeVec, +// and returns a usable Gauge object. +func NewGaugeFrom(opts prometheus.GaugeOpts, labelNames []string) *Gauge { + gv := prometheus.NewGaugeVec(opts, labelNames) + prometheus.MustRegister(gv) + return NewGauge(gv) +} + +// NewGauge wraps the GaugeVec and returns a usable Gauge object. +func NewGauge(gv *prometheus.GaugeVec) *Gauge { + return &Gauge{ + gv: gv, + lv: []string{}, + } +} + +// With implements Gauge. +func (g *Gauge) With(labelValues ...string) metrics.Gauge { + if len(labelValues)%2 != 0 { + labelValues = append(labelValues, generic.LabelValueUnknown) + } + return &Gauge{ + gv: g.gv, + lv: append(g.lv, labelValues...), + } +} + +// Set implements Gauge. +func (g *Gauge) Set(value float64) { + g.gv.WithLabelValues(g.lv...).Set(value) +} + +// Add is supported by Prometheus GaugeVecs. +func (g *Gauge) Add(delta float64) { + g.gv.WithLabelValues(g.lv...).Add(delta) +} + +// Summary implements Histogram, via a Prometheus SummaryVec. The difference +// between a Summary and a Histogram is that Summaries don't require predefined +// quantile buckets, but cannot be statistically aggregated. +type Summary struct { + sv *prometheus.SummaryVec + lv []string +} + +// NewSummaryFrom constructs and registers a Prometheus SummaryVec, +// and returns a usable Summary object. +func NewSummaryFrom(opts prometheus.SummaryOpts, labelNames []string) *Summary { + sv := prometheus.NewSummaryVec(opts, labelNames) + prometheus.MustRegister(sv) + return NewSummary(sv) +} + +// NewSummary wraps the SummaryVec and returns a usable Summary object. +func NewSummary(sv *prometheus.SummaryVec) *Summary { + return &Summary{ + sv: sv, + lv: []string{}, + } +} + +// With implements Histogram. +func (s *Summary) With(labelValues ...string) metrics.Histogram { + if len(labelValues)%2 != 0 { + labelValues = append(labelValues, generic.LabelValueUnknown) + } + return &Summary{ + sv: s.sv, + lv: append(s.lv, labelValues...), + } +} + +// Observe implements Histogram. +func (s *Summary) Observe(value float64) { + s.sv.WithLabelValues(s.lv...).Observe(value) +} + +// Histogram implements Histogram via a Prometheus HistogramVec. The difference +// between a Histogram and a Summary is that Histograms require predefined +// quantile buckets, and can be statistically aggregated. +type Histogram struct { + hv *prometheus.HistogramVec + lv []string +} + +// NewHistogramFrom constructs and registers a Prometheus HistogramVec, +// and returns a usable Histogram object. +func NewHistogramFrom(opts prometheus.HistogramOpts, labelNames []string) *Histogram { + hv := prometheus.NewHistogramVec(opts, labelNames) + prometheus.MustRegister(hv) + return NewHistogram(hv) +} + +// NewHistogram wraps the HistogramVec and returns a usable Histogram object. +func NewHistogram(hv *prometheus.HistogramVec) *Histogram { + return &Histogram{ + hv: hv, + lv: []string{}, + } +} + +// With implements Histogram. +func (h *Histogram) With(labelValues ...string) metrics.Histogram { + if len(labelValues)%2 != 0 { + labelValues = append(labelValues, generic.LabelValueUnknown) + } + return &Histogram{ + hv: h.hv, + lv: append(h.lv, labelValues...), + } +} + +// Observe implements Histogram. +func (h *Histogram) Observe(value float64) { + h.hv.WithLabelValues(h.lv...).Observe(value) +} diff --git a/metrics2/provider/circonus.go b/metrics2/provider/circonus.go new file mode 100644 index 0000000..87a04d7 --- /dev/null +++ b/metrics2/provider/circonus.go @@ -0,0 +1,36 @@ +package provider + +import ( + "github.com/go-kit/kit/metrics2" + "github.com/go-kit/kit/metrics2/circonus" +) + +type circonusProvider struct { + c *circonus.Circonus +} + +// NewCirconusProvider takes the given Circonnus object and returns a Provider +// that produces Circonus metrics. +func NewCirconusProvider(c *circonus.Circonus) Provider { + return &circonusProvider{ + c: c, + } +} + +// NewCounter implements Provider. +func (p *circonusProvider) NewCounter(name string) metrics.Counter { + return p.c.NewCounter(name) +} + +// NewGauge implements Provider. +func (p *circonusProvider) NewGauge(name string) metrics.Gauge { + return p.c.NewGauge(name) +} + +// NewHistogram implements Provider. The buckets parameter is ignored. +func (p *circonusProvider) NewHistogram(name string, _ int) metrics.Histogram { + return p.c.NewHistogram(name) +} + +// Stop implements Provider, but is a no-op. +func (p *circonusProvider) Stop() {} diff --git a/metrics2/provider/discard.go b/metrics2/provider/discard.go new file mode 100644 index 0000000..67f0ec2 --- /dev/null +++ b/metrics2/provider/discard.go @@ -0,0 +1,24 @@ +package provider + +import ( + "github.com/go-kit/kit/metrics2" + "github.com/go-kit/kit/metrics2/discard" +) + +type discardProvider struct{} + +// NewDiscardProvider returns a provider that produces no-op metrics via the +// discarding backend. +func NewDiscardProvider() Provider { return discardProvider{} } + +// NewCounter implements Provider. +func (discardProvider) NewCounter(string) metrics.Counter { return discard.NewCounter() } + +// NewGauge implements Provider. +func (discardProvider) NewGauge(string) metrics.Gauge { return discard.NewGauge() } + +// NewHistogram implements Provider. +func (discardProvider) NewHistogram(string, int) metrics.Histogram { return discard.NewHistogram() } + +// Stop implements Provider. +func (discardProvider) Stop() {} diff --git a/metrics2/provider/dogstatsd.go b/metrics2/provider/dogstatsd.go new file mode 100644 index 0000000..db18940 --- /dev/null +++ b/metrics2/provider/dogstatsd.go @@ -0,0 +1,42 @@ +package provider + +import ( + "github.com/go-kit/kit/metrics2" + "github.com/go-kit/kit/metrics2/dogstatsd" +) + +// Dogstatsd +type dogstatsdProvider struct { + d *dogstatsd.Dogstatsd + stop func() +} + +// NewDogstatsdProvider wraps the given Dogstatsd object and stop func and +// returns a Provider that produces Dogstatsd metrics. +func NewDogstatsdProvider(d *dogstatsd.Dogstatsd, stop func()) Provider { + return &dogstatsdProvider{ + d: d, + stop: stop, + } +} + +// NewCounter implements Provider. +func (p *dogstatsdProvider) NewCounter(name string) metrics.Counter { + return p.d.NewCounter(name) +} + +// NewGauge implements Provider. +func (p *dogstatsdProvider) NewGauge(name string) metrics.Gauge { + return p.d.NewGauge(name) +} + +// NewHistogram implements Provider, returning a new Dogstatsd Histogram with a +// sample rate of 1.0. Buckets are ignored. +func (p *dogstatsdProvider) NewHistogram(name string, _ int) metrics.Histogram { + return p.d.NewHistogram(name, 1.0) +} + +// Stop implements Provider, invoking the stop function passed at construction. +func (p *dogstatsdProvider) Stop() { + p.stop() +} diff --git a/metrics2/provider/expvar.go b/metrics2/provider/expvar.go new file mode 100644 index 0000000..909e1b2 --- /dev/null +++ b/metrics2/provider/expvar.go @@ -0,0 +1,29 @@ +package provider + +import "github.com/go-kit/kit/metrics2" +import "github.com/go-kit/kit/metrics2/expvar" + +type expvarProvider struct{} + +// NewExpvarProvider returns a Provider that produces expvar metrics. +func NewExpvarProvider() Provider { + return expvarProvider{} +} + +// NewCounter implements Provider. +func (p expvarProvider) NewCounter(name string) metrics.Counter { + return expvar.NewCounter(name) +} + +// NewGauge implements Provider. +func (p expvarProvider) NewGauge(name string) metrics.Gauge { + return expvar.NewGauge(name) +} + +// NewHistogram implements Provider. +func (p expvarProvider) NewHistogram(name string, buckets int) metrics.Histogram { + return expvar.NewHistogram(name, buckets) +} + +// Stop implements Provider, but is a no-op. +func (p expvarProvider) Stop() {} diff --git a/metrics2/provider/graphite.go b/metrics2/provider/graphite.go new file mode 100644 index 0000000..20bf62f --- /dev/null +++ b/metrics2/provider/graphite.go @@ -0,0 +1,40 @@ +package provider + +import ( + "github.com/go-kit/kit/metrics2" + "github.com/go-kit/kit/metrics2/graphite" +) + +type graphiteProvider struct { + g *graphite.Graphite + stop func() +} + +// NewGraphiteProvider wraps the given Graphite object and stop func and +// returns a Provider that produces Graphite metrics. +func NewGraphiteProvider(g *graphite.Graphite, stop func()) Provider { + return &graphiteProvider{ + g: g, + stop: stop, + } +} + +// NewCounter implements Provider. +func (p *graphiteProvider) NewCounter(name string) metrics.Counter { + return p.g.NewCounter(name) +} + +// NewGauge implements Provider. +func (p *graphiteProvider) NewGauge(name string) metrics.Gauge { + return p.g.NewGauge(name) +} + +// NewHistogram implements Provider. +func (p *graphiteProvider) NewHistogram(name string, buckets int) metrics.Histogram { + return p.g.NewHistogram(name, buckets) +} + +// Stop implements Provider, invoking the stop function passed at construction. +func (p *graphiteProvider) Stop() { + p.stop() +} diff --git a/metrics2/provider/influx.go b/metrics2/provider/influx.go new file mode 100644 index 0000000..3aceeea --- /dev/null +++ b/metrics2/provider/influx.go @@ -0,0 +1,40 @@ +package provider + +import ( + "github.com/go-kit/kit/metrics2" + "github.com/go-kit/kit/metrics2/influx" +) + +type influxProvider struct { + i *influx.Influx + stop func() +} + +// NewInfluxProvider takes the given Influx object and stop func, and returns +// a Provider that produces Influx metrics. +func NewInfluxProvider(i *influx.Influx, stop func()) Provider { + return &influxProvider{ + i: i, + stop: stop, + } +} + +// NewCounter implements Provider. Per-metric tags are not supported. +func (p *influxProvider) NewCounter(name string) metrics.Counter { + return p.i.NewCounter(name, map[string]string{}) +} + +// NewGauge implements Provider. Per-metric tags are not supported. +func (p *influxProvider) NewGauge(name string) metrics.Gauge { + return p.i.NewGauge(name, map[string]string{}) +} + +// NewHistogram implements Provider. Per-metric tags are not supported. +func (p *influxProvider) NewHistogram(name string, buckets int) metrics.Histogram { + return p.i.NewHistogram(name, map[string]string{}, buckets) +} + +// Stop implements Provider, invoking the stop function passed at construction. +func (p *influxProvider) Stop() { + p.stop() +} diff --git a/metrics2/provider/prometheus.go b/metrics2/provider/prometheus.go new file mode 100644 index 0000000..e85dd40 --- /dev/null +++ b/metrics2/provider/prometheus.go @@ -0,0 +1,58 @@ +package provider + +import ( + stdprometheus "github.com/prometheus/client_golang/prometheus" + + "github.com/go-kit/kit/metrics2" + "github.com/go-kit/kit/metrics2/prometheus" +) + +type prometheusProvider struct { + namespace string + subsystem string +} + +// NewPrometheusProvider returns a Provider that produces Prometheus metrics. +// Namespace and subsystem are applied to all produced metrics. +func NewPrometheusProvider(namespace, subsystem string) Provider { + return &prometheusProvider{ + namespace: namespace, + subsystem: subsystem, + } +} + +// NewCounter implements Provider via prometheus.NewCounterFrom, i.e. the +// counter is registered. The metric's namespace and subsystem are taken from +// the Provider. Help is not set, and no const label names are set. +func (p *prometheusProvider) NewCounter(name string) metrics.Counter { + return prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: p.namespace, + Subsystem: p.subsystem, + Name: name, + }, []string{}) +} + +// NewGauge implements Provider via prometheus.NewGaugeFrom, i.e. the gauge is +// registered. The metric's namespace and subsystem are taken from the Provider. +// Help is not set, and no const label names are set. +func (p *prometheusProvider) NewGauge(name string) metrics.Gauge { + return prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: p.namespace, + Subsystem: p.subsystem, + Name: name, + }, []string{}) +} + +// NewGauge implements Provider via prometheus.NewSummaryFrom, i.e. the summary +// is registered. The metric's namespace and subsystem are taken from the +// Provider. Help is not set, and no const label names are set. Buckets are ignored. +func (p *prometheusProvider) NewHistogram(name string, _ int) metrics.Histogram { + return prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ + Namespace: p.namespace, + Subsystem: p.subsystem, + Name: name, + }, []string{}) +} + +// Stop implements Provider, but is a no-op. +func (p *prometheusProvider) Stop() {} diff --git a/metrics2/provider/provider.go b/metrics2/provider/provider.go new file mode 100644 index 0000000..481648d --- /dev/null +++ b/metrics2/provider/provider.go @@ -0,0 +1,42 @@ +// Package provider provides a factory-like abstraction for metrics backends. +// This package is provided specifically for the needs of the NY Times framework +// Gizmo. Most normal Go kit users shouldn't need to use it. +// +// Normally, if your microservice needs to support different metrics backends, +// you can simply do different construction based on a flag. For example, +// +// var latency metrics.Histogram +// var requests metrics.Counter +// switch *metricsBackend { +// case "prometheus": +// latency = prometheus.NewSummaryVec(...) +// requests = prometheus.NewCounterVec(...) +// case "statsd": +// statsd, stop := statsd.New(...) +// defer stop() +// latency = statsd.NewHistogram(...) +// requests = statsd.NewCounter(...) +// default: +// log.Fatal("unsupported metrics backend %q", *metricsBackend) +// } +// +package provider + +import ( + "github.com/go-kit/kit/metrics2" +) + +// Provider abstracts over constructors and lifecycle management functions for +// each supported metrics backend. It should only be used by those who need to +// swap out implementations, e.g. dynamically, or at a single point in an +// intermediating framework. +// +// This type is primarily useful for intermediating frameworks, and is likely +// unnecessary for most Go kit services. See the package-level doc comment for +// more typical usage instructions. +type Provider interface { + NewCounter(name string) metrics.Counter + NewGauge(name string) metrics.Gauge + NewHistogram(name string, buckets int) metrics.Histogram + Stop() +} diff --git a/metrics2/provider/statsd.go b/metrics2/provider/statsd.go new file mode 100644 index 0000000..731baa5 --- /dev/null +++ b/metrics2/provider/statsd.go @@ -0,0 +1,44 @@ +package provider + +import ( + "time" + + "github.com/go-kit/kit/metrics2" + "github.com/go-kit/kit/metrics2/statsd" +) + +type statsdProvider struct { + s *statsd.Statsd + stop func() +} + +// NewStatsdProvider wraps the given Statsd object and stop func and returns a +// Provider that produces Statsd metrics. +func NewStatsdProvider(s *statsd.Statsd, stop func()) Provider { + return &statsdProvider{ + s: s, + stop: stop, + } +} + +// NewCounter implements Provider. +func (p *statsdProvider) NewCounter(name string) metrics.Counter { + return p.s.NewCounter(name) +} + +// NewGauge implements Provider. +func (p *statsdProvider) NewGauge(name string) metrics.Gauge { + return p.s.NewGauge(name) +} + +// NewHistogram implements Provider, returning a Histogram that accepts +// observations in seconds, and reports observations to Statsd in milliseconds. +// The sample rate is fixed at 1.0. The bucket parameter is ignored. +func (p *statsdProvider) NewHistogram(name string, _ int) metrics.Histogram { + return p.s.MustNewHistogram(name, time.Second, time.Millisecond, 1.0) +} + +// Stop implements Provider, invoking the stop function passed at construction. +func (p *statsdProvider) Stop() { + p.stop() +} diff --git a/metrics2/statsd/statsd.go b/metrics2/statsd/statsd.go new file mode 100644 index 0000000..cfadf14 --- /dev/null +++ b/metrics2/statsd/statsd.go @@ -0,0 +1,322 @@ +// Package statsd implements a statsd backend for package metrics. Metrics are +// aggregated and reported in batches, in the StatsD plaintext format. Sampling +// is not supported for counters because we aggregate counter updates and send +// in batches. Sampling is, however, supported for Timings. +// +// Batching observations and emitting every few seconds is useful even if you +// connect to your StatsD server over UDP. Emitting one network packet per +// observation can quickly overwhelm even the fastest internal network. Batching +// allows you to more linearly scale with growth. +// +// Typically you'll create a Statsd object in your main function. +// +// s, stop := New("myprefix.", "udp", "statsd:8126", time.Second, log.NewNopLogger()) +// defer stop() +// +// Then, create the metrics that your application will track from that object. +// Pass them as dependencies to the component that needs them; don't place them +// in the global scope. +// +// requests := s.NewCounter("requests") +// depth := s.NewGauge("queue_depth") +// fooLatency := s.NewTiming("foo_duration", "ms", 1.0) +// barLatency := s.MustNewHistogram("bar_duration", time.Second, time.Millisecond, 1.0) +// +// Invoke them in your components when you have something to instrument. +// +// requests.Add(1) +// depth.Set(123) +// fooLatency.Observe(16) // 16 ms +// barLatency.Observe(0.032) // 0.032 sec = 32 ms +// +package statsd + +import ( + "errors" + "fmt" + "io" + "math/rand" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics2" + "github.com/go-kit/kit/metrics2/generic" + "github.com/go-kit/kit/util/conn" +) + +// Statsd is a store for metrics that will be reported to a StatsD server. +// Create a Statsd object, use it to create metrics objects, and pass those +// objects as dependencies to the components that will use them. +// +// StatsD has a concept of Timings rather than Histograms. You can create Timing +// objects, or create Histograms that wrap Timings under the hood. +type Statsd struct { + mtx sync.RWMutex + prefix string + counters map[string]*generic.Counter + gauges map[string]*generic.Gauge + timings map[string]*Timing + logger log.Logger +} + +// New creates a Statsd object that flushes all metrics in the statsd format +// every flushInterval to the network and address. Internally it utilizes a +// util/conn.Manager and time.Ticker. Use the returned stop function to stop the +// ticker and terminate the flushing goroutine. +func New(prefix string, network, address string, flushInterval time.Duration, logger log.Logger) (res *Statsd, stop func()) { + s := NewRaw(prefix, logger) + manager := conn.NewDefaultManager(network, address, logger) + ticker := time.NewTicker(flushInterval) + go s.FlushTo(manager, ticker) + return s, ticker.Stop +} + +// NewRaw creates a Statsd object. By default the metrics will not be emitted +// anywhere. Use WriteTo to flush the metrics once, or FlushTo (in a separate +// goroutine) to flush them on a regular schedule, or use the New constructor to +// set up the object and flushing at the same time. +func NewRaw(prefix string, logger log.Logger) *Statsd { + return &Statsd{ + prefix: prefix, + counters: map[string]*generic.Counter{}, + gauges: map[string]*generic.Gauge{}, + timings: map[string]*Timing{}, + logger: logger, + } +} + +// NewCounter returns a counter metric with the given name. Adds are buffered +// until the underlying Statsd object is flushed. +func (s *Statsd) NewCounter(name string) *generic.Counter { + s.mtx.Lock() + defer s.mtx.Unlock() + c := generic.NewCounter() + s.counters[s.prefix+name] = c + return c +} + +// NewGauge returns a gauge metric with the given name. +func (s *Statsd) NewGauge(name string) *generic.Gauge { + s.mtx.Lock() + defer s.mtx.Unlock() + g := generic.NewGauge() + s.gauges[s.prefix+name] = g + return g +} + +// NewTiming returns a timing metric with the given name, unit (e.g. "ms") and +// sample rate. Pass a sample rate of 1.0 or greater to disable sampling. +// Sampling is done at observation time. Observations are buffered until the +// underlying statsd object is flushed. +func (s *Statsd) NewTiming(name, unit string, sampleRate float64) *Timing { + s.mtx.Lock() + defer s.mtx.Unlock() + t := NewTiming(unit, sampleRate) + s.timings[s.prefix+name] = t + return t +} + +// NewHistogram returns a histogram metric with the given name. Observations are +// assumed to be taken in units of observeIn, e.g. time.Second. The histogram +// wraps a timing which reports in units of reportIn, e.g. time.Millisecond. +// Only nanoseconds, microseconds, milliseconds, and seconds are supported +// reportIn values. The underlying timing is sampled according to sampleRate. +func (s *Statsd) NewHistogram(name string, observeIn, reportIn time.Duration, sampleRate float64) (metrics.Histogram, error) { + s.mtx.Lock() + defer s.mtx.Unlock() + + var unit string + switch reportIn { + case time.Nanosecond: + unit = "ns" + case time.Microsecond: + unit = "us" + case time.Millisecond: + unit = "ms" + case time.Second: + unit = "s" + default: + return nil, errors.New("unsupported reporting duration") + } + + t := NewTiming(unit, sampleRate) + s.timings[s.prefix+name] = t + return newHistogram(observeIn, reportIn, t), nil +} + +// MustNewHistogram is a convenience constructor for NewHistogram, which panics +// if there is an error. +func (s *Statsd) MustNewHistogram(name string, observeIn, reportIn time.Duration, sampleRate float64) metrics.Histogram { + h, err := s.NewHistogram(name, observeIn, reportIn, sampleRate) + if err != nil { + panic(err) + } + return h +} + +// FlushTo invokes WriteTo to the writer every time the ticker fires. FlushTo +// blocks until the ticker is stopped. Most users won't need to call this method +// directly, and should prefer to use the New constructor. +func (s *Statsd) FlushTo(w io.Writer, ticker *time.Ticker) { + for range ticker.C { + if _, err := s.WriteTo(w); err != nil { + s.logger.Log("during", "flush", "err", err) + } + } +} + +// WriteTo dumps the current state of all of the metrics to the given writer in +// the statsd format. Each metric has its current value(s) written in sequential +// calls to Write. Counters and gauges are dumped with their current values; +// counters are reset. Timings write each retained observation in sequence, and +// are reset. Clients probably shouldn't invoke this method directly, and should +// prefer using FlushTo, or the New constructor. +func (s *Statsd) WriteTo(w io.Writer) (int64, error) { + s.mtx.RLock() + defer s.mtx.RUnlock() + var ( + n int + err error + count int64 + ) + for name, c := range s.counters { + n, err = fmt.Fprintf(w, "%s:%f|c\n", name, c.ValueReset()) + count += int64(n) + if err != nil { + return count, err + } + } + for name, g := range s.gauges { + n, err = fmt.Fprintf(w, "%s:%f|g\n", name, g.Value()) + count += int64(n) + if err != nil { + return count, err + } + } + for name, t := range s.timings { + var sampling string + if r := t.SampleRate(); r < 1.0 { + sampling = fmt.Sprintf("|@%f", r) + } + for _, value := range t.Values() { + n, err = fmt.Fprintf(w, "%s:%d|%s%s\n", name, value, t.Unit(), sampling) + count += int64(n) + if err != nil { + return count, err + } + } + } + return count, nil +} + +// Timing is like a histogram that's always assumed to represent time. It also +// has a different implementation to typical histograms in this package. StatsD +// expects you to emit each observation to the aggregation server, and they do +// statistical processing there. This is easier to understand, but much (much) +// less efficient. So, we batch observations and emit the batch every interval. +// And we support sampling, at observation time. +type Timing struct { + mtx sync.Mutex + unit string + sampleRate float64 + values []int64 + lvs []string // immutable +} + +// NewTiming returns a new Timing object with the given units (e.g. "ms") and +// sample rate. If sample rate >= 1.0, no sampling will be performed. This +// function is exported only so that it can be used by package dogstatsd. As a +// user, if you want a timing, you probably want to create it through the Statsd +// object. +func NewTiming(unit string, sampleRate float64) *Timing { + return &Timing{ + unit: unit, + sampleRate: sampleRate, + } +} + +// With returns a new timing with the label values applies. Label values aren't +// supported in Statsd, but they are supported in DogStatsD, which uses the same +// Timing type. +func (t *Timing) With(labelValues ...string) *Timing { + if len(labelValues)%2 != 0 { + labelValues = append(labelValues, generic.LabelValueUnknown) + } + return &Timing{ + unit: t.unit, + sampleRate: t.sampleRate, + values: t.values, + lvs: append(t.lvs, labelValues...), + } +} + +// LabelValues returns the current set of label values. Label values aren't +// supported in Statsd, but they are supported in DogStatsD, which uses the same +// Timing type. +func (t *Timing) LabelValues() []string { + return t.lvs +} + +// Observe collects the value into the timing. If sample rate is less than 1.0, +// sampling is performed, and the value may be dropped. +func (t *Timing) Observe(value int64) { + // Here we sample at observation time. This burns not-insignificant CPU in + // the rand.Float64 call. It may be preferable to aggregate all observations + // and sample at emission time. But that is a bit tricker to do correctly. + if t.sampleRate < 1.0 && rand.Float64() > t.sampleRate { + return + } + + t.mtx.Lock() + defer t.mtx.Unlock() + t.values = append(t.values, value) +} + +// Values returns the observed values since the last call to Values. This method +// clears the internal state of the Timing; better get those values somewhere +// safe! +func (t *Timing) Values() []int64 { + t.mtx.Lock() + defer t.mtx.Unlock() + res := t.values + t.values = []int64{} // TODO(pb): if GC is a problem, this can be improved + return res +} + +// Unit returns the unit, e.g. "ms". +func (t *Timing) Unit() string { return t.unit } + +// SampleRate returns the sample rate, e.g. 0.01 or 1.0. +func (t *Timing) SampleRate() float64 { return t.sampleRate } + +// histogram wraps a Timing and implements Histogram. Namely, it takes float64 +// observations and converts them to int64 according to a defined ratio, likely +// with a loss of precision. +type histogram struct { + m float64 + t *Timing + lvs []string +} + +func newHistogram(observeIn, reportIn time.Duration, t *Timing) *histogram { + return &histogram{ + m: float64(observeIn) / float64(reportIn), + t: t, + } +} + +func (h *histogram) With(labelValues ...string) metrics.Histogram { + if len(labelValues)%2 != 0 { + labelValues = append(labelValues, generic.LabelValueUnknown) + } + return &histogram{ + m: h.m, + t: h.t, + lvs: append(h.lvs, labelValues...), + } +} + +func (h *histogram) Observe(value float64) { + h.t.Observe(int64(h.m * value)) +} diff --git a/metrics2/statsd/statsd_test.go b/metrics2/statsd/statsd_test.go new file mode 100644 index 0000000..c28e6dd --- /dev/null +++ b/metrics2/statsd/statsd_test.go @@ -0,0 +1,28 @@ +package statsd + +import ( + "testing" + "time" +) + +func TestHistogramAdapter(t *testing.T) { + for _, testcase := range []struct { + observeIn time.Duration + reportIn time.Duration + unit string + input float64 + want int64 + }{ + {time.Second, time.Second, "s", 0.10, 0}, + {time.Second, time.Second, "s", 1.01, 1}, + {time.Second, time.Millisecond, "ms", 1.23, 1230}, + {time.Millisecond, time.Microsecond, "us", 123, 123000}, + } { + tm := NewTiming(testcase.unit, 1.0) + h := newHistogram(testcase.observeIn, testcase.reportIn, tm) + h.Observe(testcase.input) + if want, have := testcase.want, tm.Values()[0]; want != have { + t.Errorf("Observe(%.2f %s): want %d, have %d", testcase.input, testcase.unit, want, have) + } + } +} diff --git a/util/conn/manager.go b/util/conn/manager.go index 75c997f..0b7db62 100644 --- a/util/conn/manager.go +++ b/util/conn/manager.go @@ -1,6 +1,7 @@ package conn import ( + "errors" "net" "time" @@ -34,9 +35,8 @@ // NewManager returns a connection manager using the passed Dialer, network, and // address. The AfterFunc is used to control exponential backoff and retries. -// For normal use, pass net.Dial and time.After as the Dialer and AfterFunc -// respectively. The logger is used to log errors; pass a log.NopLogger if you -// don't care to receive them. +// The logger is used to log errors; pass a log.NopLogger if you don't care to +// receive them. For normal use, prefer NewDefaultManager. func NewManager(d Dialer, network, address string, after AfterFunc, logger log.Logger) *Manager { m := &Manager{ dialer: d, @@ -52,6 +52,12 @@ return m } +// NewDefaultManager is a helper constructor, suitable for most normal use in +// real (non-test) code. It uses the real net.Dial and time.After functions. +func NewDefaultManager(network, address string, logger log.Logger) *Manager { + return NewManager(net.Dial, network, address, time.After, logger) +} + // Take yields the current connection. It may be nil. func (m *Manager) Take() net.Conn { return <-m.takec @@ -62,6 +68,17 @@ // to reconnect, with exponential backoff. Putting a nil error is a no-op. func (m *Manager) Put(err error) { m.putc <- err +} + +// Write writes the passed data to the connection in a single Take/Put cycle. +func (m *Manager) Write(b []byte) (int, error) { + conn := m.Take() + if conn == nil { + return 0, ErrConnectionUnavailable + } + n, err := conn.Write(b) + defer m.Put(err) + return n, err } func (m *Manager) loop() { @@ -122,3 +139,7 @@ } return d } + +// ErrConnectionUnavailable is returned by the Manager's Write method when the +// manager cannot yield a good connection. +var ErrConnectionUnavailable = errors.New("connection unavailable")