Codebase list golang-github-go-kit-kit / debian/0.6.0-2_bpo9+1 metrics / statsd / statsd.go
debian/0.6.0-2_bpo9+1

Tree @debian/0.6.0-2_bpo9+1 (Download .tar.gz)

statsd.go @debian/0.6.0-2_bpo9+1

8cce994
 
 
4928fe6
8cce994
 
 
 
4928fe6
 
 
 
 
 
 
8cce994
8114e4a
 
 
8cce994
4928fe6
 
8cce994
 
 
 
 
 
 
4928fe6
8cce994
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
414eee2
 
8cce994
 
 
 
 
 
49d9075
4928fe6
 
8cce994
 
 
 
 
32d7a0b
8cce994
49d9075
4928fe6
8cce994
 
 
 
 
 
 
 
414eee2
 
8cce994
 
 
 
 
 
 
 
 
49d9075
4928fe6
 
8cce994
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4928fe6
8cce994
 
 
 
 
 
 
 
 
 
 
49d9075
8cce994
 
 
 
 
 
 
 
 
 
4928fe6
8cce994
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4928fe6
 
8cce994
 
 
 
 
 
4928fe6
 
8cce994
 
8aab2ed
 
8cce994
 
 
 
 
 
a1d8b3f
 
8cce994
 
 
 
 
 
 
414eee2
 
8cce994
 
a1d8b3f
 
 
8cce994
 
 
49d9075
4928fe6
8cce994
 
 
 
 
32d7a0b
414eee2
 
8cce994
 
 
4928fe6
 
8cce994
 
 
49d9075
4928fe6
32d7a0b
 
 
 
 
8cce994
 
 
 
 
 
4928fe6
 
8cce994
 
 
4928fe6
 
8cce994
 
 
4928fe6
// Package statsd provides a StatsD backend for package metrics. StatsD has no
// concept of arbitrary key-value tagging, so label values are not supported,
// and With is a no-op on all metrics.
//
// This package batches observations and emits them on some schedule to the
// remote server. This is useful even if you connect to your StatsD server over
// UDP. Emitting one network packet per observation can quickly overwhelm even
// the fastest internal network.
package statsd

import (
	"fmt"
	"io"
	"time"

	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/metrics"
	"github.com/go-kit/kit/metrics/internal/lv"
	"github.com/go-kit/kit/metrics/internal/ratemap"
	"github.com/go-kit/kit/util/conn"
)

// Statsd receives metrics observations and forwards them to a StatsD server.
// Create a Statsd object, use it to create metrics, and pass those metrics as
// dependencies to the components that will use them.
//
// All metrics are buffered until WriteTo is called. Counters and gauges are
// aggregated into a single observation per timeseries per write. Timings are
// buffered but not aggregated.
//
// To regularly report metrics to an io.Writer, use the WriteLoop helper method.
// To send to a StatsD server, use the SendLoop helper method.
type Statsd struct {
	prefix string
	rates  *ratemap.RateMap

	// The observations are collected in an N-dimensional vector space, even
	// though they only take advantage of a single dimension (name). This is an
	// implementation detail born purely from convenience. It would be more
	// accurate to collect them in a map[string][]float64, but we already have
	// this nice data structure and helper methods.
	counters *lv.Space
	gauges   *lv.Space
	timings  *lv.Space

	logger log.Logger
}

// New returns a Statsd object that may be used to create metrics. Prefix is
// applied to all created metrics. Callers must ensure that regular calls to
// WriteTo are performed, either manually or with one of the helper methods.
func New(prefix string, logger log.Logger) *Statsd {
	return &Statsd{
		prefix:   prefix,
		rates:    ratemap.New(),
		counters: lv.NewSpace(),
		gauges:   lv.NewSpace(),
		timings:  lv.NewSpace(),
		logger:   logger,
	}
}

// NewCounter returns a counter, sending observations to this Statsd object.
func (s *Statsd) NewCounter(name string, sampleRate float64) *Counter {
	s.rates.Set(s.prefix+name, sampleRate)
	return &Counter{
		name: s.prefix + name,
		obs:  s.counters.Observe,
	}
}

// NewGauge returns a gauge, sending observations to this Statsd object.
func (s *Statsd) NewGauge(name string) *Gauge {
	return &Gauge{
		name: s.prefix + name,
		obs:  s.gauges.Observe,
		add:  s.gauges.Add,
	}
}

// NewTiming returns a histogram whose observations are interpreted as
// millisecond durations, and are forwarded to this Statsd object.
func (s *Statsd) NewTiming(name string, sampleRate float64) *Timing {
	s.rates.Set(s.prefix+name, sampleRate)
	return &Timing{
		name: s.prefix + name,
		obs:  s.timings.Observe,
	}
}

// WriteLoop is a helper method that invokes WriteTo to the passed writer every
// time the passed channel fires. This method blocks until the channel is
// closed, so clients probably want to run it in its own goroutine. For typical
// usage, create a time.Ticker and pass its C channel to this method.
func (s *Statsd) WriteLoop(c <-chan time.Time, w io.Writer) {
	for range c {
		if _, err := s.WriteTo(w); err != nil {
			s.logger.Log("during", "WriteTo", "err", err)
		}
	}
}

// SendLoop is a helper method that wraps WriteLoop, passing a managed
// connection to the network and address. Like WriteLoop, this method blocks
// until the channel is closed, so clients probably want to start it in its own
// goroutine. For typical usage, create a time.Ticker and pass its C channel to
// this method.
func (s *Statsd) SendLoop(c <-chan time.Time, network, address string) {
	s.WriteLoop(c, conn.NewDefaultManager(network, address, s.logger))
}

// WriteTo flushes the buffered content of the metrics to the writer, in
// StatsD format. WriteTo abides best-effort semantics, so observations are
// lost if there is a problem with the write. Clients should be sure to call
// WriteTo regularly, ideally through the WriteLoop or SendLoop helper methods.
func (s *Statsd) WriteTo(w io.Writer) (count int64, err error) {
	var n int

	s.counters.Reset().Walk(func(name string, _ lv.LabelValues, values []float64) bool {
		n, err = fmt.Fprintf(w, "%s:%f|c%s\n", name, sum(values), sampling(s.rates.Get(name)))
		if err != nil {
			return false
		}
		count += int64(n)
		return true
	})
	if err != nil {
		return count, err
	}

	s.gauges.Reset().Walk(func(name string, _ lv.LabelValues, values []float64) bool {
		n, err = fmt.Fprintf(w, "%s:%f|g\n", name, last(values))
		if err != nil {
			return false
		}
		count += int64(n)
		return true
	})
	if err != nil {
		return count, err
	}

	s.timings.Reset().Walk(func(name string, _ lv.LabelValues, values []float64) bool {
		sampleRate := s.rates.Get(name)
		for _, value := range values {
			n, err = fmt.Fprintf(w, "%s:%f|ms%s\n", name, value, sampling(sampleRate))
			if err != nil {
				return false
			}
			count += int64(n)
		}
		return true
	})
	if err != nil {
		return count, err
	}

	return count, err
}

func sum(a []float64) float64 {
	var v float64
	for _, f := range a {
		v += f
	}
	return v
}

func last(a []float64) float64 {
	return a[len(a)-1]
}

func sampling(r float64) string {
	var sv string
	if r < 1.0 {
		sv = fmt.Sprintf("|@%f", r)
	}
	return sv
}

type observeFunc func(name string, lvs lv.LabelValues, value float64)

// Counter is a StatsD counter. Observations are forwarded to a Statsd object,
// and aggregated (summed) per timeseries.
type Counter struct {
	name string
	obs  observeFunc
}

// With is a no-op.
func (c *Counter) With(...string) metrics.Counter {
	return c
}

// Add implements metrics.Counter.
func (c *Counter) Add(delta float64) {
	c.obs(c.name, lv.LabelValues{}, delta)
}

// Gauge is a StatsD gauge. Observations are forwarded to a Statsd object, and
// aggregated (the last observation selected) per timeseries.
type Gauge struct {
	name string
	obs  observeFunc
	add  observeFunc
}

// With is a no-op.
func (g *Gauge) With(...string) metrics.Gauge {
	return g
}

// Set implements metrics.Gauge.
func (g *Gauge) Set(value float64) {
	g.obs(g.name, lv.LabelValues{}, value)
}

// Add implements metrics.Gauge.
func (g *Gauge) Add(delta float64) {
	g.add(g.name, lv.LabelValues{}, delta)
}

// Timing is a StatsD timing, or metrics.Histogram. Observations are
// forwarded to a Statsd object, and collected (but not aggregated) per
// timeseries.
type Timing struct {
	name string
	obs  observeFunc
}

// With is a no-op.
func (t *Timing) With(...string) metrics.Histogram {
	return t
}

// Observe implements metrics.Histogram. Value is interpreted as milliseconds.
func (t *Timing) Observe(value float64) {
	t.obs(t.name, lv.LabelValues{}, value)
}