Codebase list golang-github-go-kit-kit / d8e4488 metrics2 / statsd / statsd.go
d8e4488

Tree @d8e4488 (Download .tar.gz)

statsd.go @d8e4488raw · history · blame

// Package statsd implements a statsd backend for package metrics. Metrics are
// aggregated and reported in batches, in the StatsD plaintext format. Sampling
// is not supported for counters because we aggregate counter updates and send
// in batches. Sampling is, however, supported for Timings.
//
// Batching observations and emitting every few seconds is useful even if you
// connect to your StatsD server over UDP. Emitting one network packet per
// observation can quickly overwhelm even the fastest internal network. Batching
// allows you to more linearly scale with growth.
//
// Typically you'll create a Statsd object in your main function.
//
//    s, stop := New("myprefix.", "udp", "statsd:8126", time.Second, log.NewNopLogger())
//    defer stop()
//
// Then, create the metrics that your application will track from that object.
// Pass them as dependencies to the component that needs them; don't place them
// in the global scope.
//
//    requests := s.NewCounter("requests")
//    depth := s.NewGauge("queue_depth")
//    fooLatency := s.NewTiming("foo_duration", "ms", 1.0)
//    barLatency := s.MustNewHistogram("bar_duration", time.Second, time.Millisecond, 1.0)
//
// Invoke them in your components when you have something to instrument.
//
//    requests.Add(1)
//    depth.Set(123)
//    fooLatency.Observe(16)    // 16 ms
//    barLatency.Observe(0.032) // 0.032 sec = 32 ms
//
package statsd

import (
	"errors"
	"fmt"
	"io"
	"math/rand"
	"sync"
	"time"

	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/metrics2"
	"github.com/go-kit/kit/metrics2/generic"
	"github.com/go-kit/kit/util/conn"
)

// Statsd is a store for metrics that will be reported to a StatsD server.
// Create a Statsd object, use it to create metrics objects, and pass those
// objects as dependencies to the components that will use them.
//
// StatsD has a concept of Timings rather than Histograms. You can create Timing
// objects, or create Histograms that wrap Timings under the hood.
type Statsd struct {
	mtx      sync.RWMutex
	prefix   string
	counters map[string]*generic.Counter
	gauges   map[string]*generic.Gauge
	timings  map[string]*Timing
	logger   log.Logger
}

// New creates a Statsd object that flushes all metrics in the statsd format
// every flushInterval to the network and address. Internally it utilizes a
// util/conn.Manager and time.Ticker. Use the returned stop function to stop the
// ticker and terminate the flushing goroutine.
func New(prefix string, network, address string, flushInterval time.Duration, logger log.Logger) (res *Statsd, stop func()) {
	s := NewRaw(prefix, logger)
	manager := conn.NewDefaultManager(network, address, logger)
	ticker := time.NewTicker(flushInterval)
	go s.FlushTo(manager, ticker)
	return s, ticker.Stop
}

// NewRaw creates a Statsd object. By default the metrics will not be emitted
// anywhere. Use WriteTo to flush the metrics once, or FlushTo (in a separate
// goroutine) to flush them on a regular schedule, or use the New constructor to
// set up the object and flushing at the same time.
func NewRaw(prefix string, logger log.Logger) *Statsd {
	return &Statsd{
		prefix:   prefix,
		counters: map[string]*generic.Counter{},
		gauges:   map[string]*generic.Gauge{},
		timings:  map[string]*Timing{},
		logger:   logger,
	}
}

// NewCounter returns a counter metric with the given name. Adds are buffered
// until the underlying Statsd object is flushed.
func (s *Statsd) NewCounter(name string) *generic.Counter {
	s.mtx.Lock()
	defer s.mtx.Unlock()
	c := generic.NewCounter()
	s.counters[s.prefix+name] = c
	return c
}

// NewGauge returns a gauge metric with the given name.
func (s *Statsd) NewGauge(name string) *generic.Gauge {
	s.mtx.Lock()
	defer s.mtx.Unlock()
	g := generic.NewGauge()
	s.gauges[s.prefix+name] = g
	return g
}

// NewTiming returns a timing metric with the given name, unit (e.g. "ms") and
// sample rate. Pass a sample rate of 1.0 or greater to disable sampling.
// Sampling is done at observation time. Observations are buffered until the
// underlying statsd object is flushed.
func (s *Statsd) NewTiming(name, unit string, sampleRate float64) *Timing {
	s.mtx.Lock()
	defer s.mtx.Unlock()
	t := NewTiming(unit, sampleRate)
	s.timings[s.prefix+name] = t
	return t
}

// NewHistogram returns a histogram metric with the given name. Observations are
// assumed to be taken in units of observeIn, e.g. time.Second. The histogram
// wraps a timing which reports in units of reportIn, e.g. time.Millisecond.
// Only nanoseconds, microseconds, milliseconds, and seconds are supported
// reportIn values. The underlying timing is sampled according to sampleRate.
func (s *Statsd) NewHistogram(name string, observeIn, reportIn time.Duration, sampleRate float64) (metrics.Histogram, error) {
	s.mtx.Lock()
	defer s.mtx.Unlock()

	var unit string
	switch reportIn {
	case time.Nanosecond:
		unit = "ns"
	case time.Microsecond:
		unit = "us"
	case time.Millisecond:
		unit = "ms"
	case time.Second:
		unit = "s"
	default:
		return nil, errors.New("unsupported reporting duration")
	}

	t := NewTiming(unit, sampleRate)
	s.timings[s.prefix+name] = t
	return newHistogram(observeIn, reportIn, t), nil
}

// MustNewHistogram is a convenience constructor for NewHistogram, which panics
// if there is an error.
func (s *Statsd) MustNewHistogram(name string, observeIn, reportIn time.Duration, sampleRate float64) metrics.Histogram {
	h, err := s.NewHistogram(name, observeIn, reportIn, sampleRate)
	if err != nil {
		panic(err)
	}
	return h
}

// FlushTo invokes WriteTo to the writer every time the ticker fires. FlushTo
// blocks until the ticker is stopped. Most users won't need to call this method
// directly, and should prefer to use the New constructor.
func (s *Statsd) FlushTo(w io.Writer, ticker *time.Ticker) {
	for range ticker.C {
		if _, err := s.WriteTo(w); err != nil {
			s.logger.Log("during", "flush", "err", err)
		}
	}
}

// WriteTo dumps the current state of all of the metrics to the given writer in
// the statsd format. Each metric has its current value(s) written in sequential
// calls to Write. Counters and gauges are dumped with their current values;
// counters are reset. Timings write each retained observation in sequence, and
// are reset. Clients probably shouldn't invoke this method directly, and should
// prefer using FlushTo, or the New constructor.
func (s *Statsd) WriteTo(w io.Writer) (int64, error) {
	s.mtx.RLock()
	defer s.mtx.RUnlock()
	var (
		n     int
		err   error
		count int64
	)
	for name, c := range s.counters {
		n, err = fmt.Fprintf(w, "%s:%f|c\n", name, c.ValueReset())
		count += int64(n)
		if err != nil {
			return count, err
		}
	}
	for name, g := range s.gauges {
		n, err = fmt.Fprintf(w, "%s:%f|g\n", name, g.Value())
		count += int64(n)
		if err != nil {
			return count, err
		}
	}
	for name, t := range s.timings {
		var sampling string
		if r := t.SampleRate(); r < 1.0 {
			sampling = fmt.Sprintf("|@%f", r)
		}
		for _, value := range t.Values() {
			n, err = fmt.Fprintf(w, "%s:%d|%s%s\n", name, value, t.Unit(), sampling)
			count += int64(n)
			if err != nil {
				return count, err
			}
		}
	}
	return count, nil
}

// Timing is like a histogram that's always assumed to represent time. It also
// has a different implementation to typical histograms in this package. StatsD
// expects you to emit each observation to the aggregation server, and they do
// statistical processing there. This is easier to understand, but much (much)
// less efficient. So, we batch observations and emit the batch every interval.
// And we support sampling, at observation time.
type Timing struct {
	mtx        sync.Mutex
	unit       string
	sampleRate float64
	values     []int64
	lvs        []string // immutable
}

// NewTiming returns a new Timing object with the given units (e.g. "ms") and
// sample rate. If sample rate >= 1.0, no sampling will be performed. This
// function is exported only so that it can be used by package dogstatsd. As a
// user, if you want a timing, you probably want to create it through the Statsd
// object.
func NewTiming(unit string, sampleRate float64) *Timing {
	return &Timing{
		unit:       unit,
		sampleRate: sampleRate,
	}
}

// With returns a new timing with the label values applies. Label values aren't
// supported in Statsd, but they are supported in DogStatsD, which uses the same
// Timing type.
func (t *Timing) With(labelValues ...string) *Timing {
	if len(labelValues)%2 != 0 {
		labelValues = append(labelValues, generic.LabelValueUnknown)
	}
	return &Timing{
		unit:       t.unit,
		sampleRate: t.sampleRate,
		values:     t.values,
		lvs:        append(t.lvs, labelValues...),
	}
}

// LabelValues returns the current set of label values. Label values aren't
// supported in Statsd, but they are supported in DogStatsD, which uses the same
// Timing type.
func (t *Timing) LabelValues() []string {
	return t.lvs
}

// Observe collects the value into the timing. If sample rate is less than 1.0,
// sampling is performed, and the value may be dropped.
func (t *Timing) Observe(value int64) {
	// Here we sample at observation time. This burns not-insignificant CPU in
	// the rand.Float64 call. It may be preferable to aggregate all observations
	// and sample at emission time. But that is a bit tricker to do correctly.
	if t.sampleRate < 1.0 && rand.Float64() > t.sampleRate {
		return
	}

	t.mtx.Lock()
	defer t.mtx.Unlock()
	t.values = append(t.values, value)
}

// Values returns the observed values since the last call to Values. This method
// clears the internal state of the Timing; better get those values somewhere
// safe!
func (t *Timing) Values() []int64 {
	t.mtx.Lock()
	defer t.mtx.Unlock()
	res := t.values
	t.values = []int64{} // TODO(pb): if GC is a problem, this can be improved
	return res
}

// Unit returns the unit, e.g. "ms".
func (t *Timing) Unit() string { return t.unit }

// SampleRate returns the sample rate, e.g. 0.01 or 1.0.
func (t *Timing) SampleRate() float64 { return t.sampleRate }

// histogram wraps a Timing and implements Histogram. Namely, it takes float64
// observations and converts them to int64 according to a defined ratio, likely
// with a loss of precision.
type histogram struct {
	m   float64
	t   *Timing
	lvs []string
}

func newHistogram(observeIn, reportIn time.Duration, t *Timing) *histogram {
	return &histogram{
		m: float64(observeIn) / float64(reportIn),
		t: t,
	}
}

func (h *histogram) With(labelValues ...string) metrics.Histogram {
	if len(labelValues)%2 != 0 {
		labelValues = append(labelValues, generic.LabelValueUnknown)
	}
	return &histogram{
		m:   h.m,
		t:   h.t,
		lvs: append(h.lvs, labelValues...),
	}
}

func (h *histogram) Observe(value float64) {
	h.t.Observe(int64(h.m * value))
}