Codebase list golang-github-go-kit-kit / e9da08e metrics / graphite / graphite.go
e9da08e

Tree @e9da08e (Download .tar.gz)

graphite.go @e9da08eraw · history · blame

// Package graphite implements a graphite backend for package metrics.
//
// The current implementation ignores fields.
package graphite

import (
	"bufio"
	"fmt"
	"io"
	"log"
	"math"
	"net"
	"sort"
	"sync"
	"time"

	"sync/atomic"

	"github.com/codahale/hdrhistogram"
	"github.com/go-kit/kit/metrics"
)

// Emitter will keep track of all metrics and, once started,
// will emit the metrics via the Flush method to the given io.Writer.
type Emitter interface {
	NewCounter(string) metrics.Counter
	NewHistogram(string, int64, int64, int, ...int) metrics.Histogram
	NewTimeHistogram(string, time.Duration, int64, int64, int, ...int) metrics.TimeHistogram
	NewGauge(string) metrics.Gauge

	Start(time.Duration)
	Flush() error
}

type emitter struct {
	addr   *net.TCPAddr
	prefix string

	metricMu   *sync.Mutex
	counters   []*counter
	histograms []*windowedHistogram
	gauges     []*gauge
}

// NewEmitter will return an Emitter that will prefix all
// metrics names with the given prefix. Once started, it will attempt to create
// a TCP connection with the given address and most metrics to the connection
// in a Graphite-compatible format.
func NewEmitter(addr *net.TCPAddr, prefix string) Emitter {
	e := &emitter{
		addr, prefix, &sync.Mutex{},
		[]*counter{}, []*windowedHistogram{}, []*gauge{},
	}

	return e
}

// NewCounter returns a Counter whose value will be periodically emitted in
// a Graphite-compatible format once the Emitter is started. Fields are ignored.
func (e *emitter) NewCounter(name string) metrics.Counter {
	// only one flush at a time
	e.metricMu.Lock()
	defer e.metricMu.Unlock()
	c := &counter{name, 0}
	e.counters = append(e.counters, c)
	return c
}

// NewHistogram is taken from http://github.com/codahale/metrics. It returns a
// windowed HDR histogram which drops data older than five minutes.
//
// The histogram exposes metrics for each passed quantile as gauges. Quantiles
// should be integers in the range 1..99. The gauge names are assigned by
// using the passed name as a prefix and appending "_pNN" e.g. "_p50".
//
// The values of this histogram will be periodically emitted in a Graphite-compatible
// format once the Emitter is started. Fields are ignored.
func (e *emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) metrics.Histogram {
	// only one flush at a time
	e.metricMu.Lock()
	defer e.metricMu.Unlock()

	gauges := map[int]metrics.Gauge{}
	for _, quantile := range quantiles {
		if quantile <= 0 || quantile >= 100 {
			panic(fmt.Sprintf("invalid quantile %d", quantile))
		}
		gauges[quantile] = e.gauge(fmt.Sprintf("%s_p%02d", name, quantile))
	}
	h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges)
	e.histograms = append(e.histograms, h)
	return h
}

// NewTimeHistogram returns a TimeHistogram wrapper around the windowed
// HDR histrogram provided by this package.
func (e *emitter) NewTimeHistogram(name string, unit time.Duration, minValue, maxValue int64, sigfigs int, quantiles ...int) metrics.TimeHistogram {
	h := e.NewHistogram(name, minValue, maxValue, sigfigs, quantiles...)
	return metrics.NewTimeHistogram(unit, h)
}

// NewGauge returns a Gauge whose value will be periodically emitted in
// a Graphite-compatible format once the Emitter is started. Fields are ignored.
func (e *emitter) NewGauge(name string) metrics.Gauge {
	// only one flush at a time
	e.metricMu.Lock()
	defer e.metricMu.Unlock()
	return e.gauge(name)
}

func (e *emitter) gauge(name string) metrics.Gauge {
	g := &gauge{name, 0}
	e.gauges = append(e.gauges, g)
	return g
}

// Start will kick off a background goroutine to
// call Flush once every interval.
func (e *emitter) Start(interval time.Duration) {
	go func() {
		t := time.Tick(interval)
		for range t {
			err := e.Flush()
			if err != nil {
				log.Print("error: could not dial graphite host: ", err)
				continue
			}
		}
	}()
}

// Flush will attempt to create a connection with the given address
// and write the current metrics to it in a Graphite-compatible format.
//
// Users can call this method on process shutdown to ensure
// the current metrics are pushed to Graphite.
func (e *emitter) Flush() error {
	// open connection
	conn, err := net.DialTCP("tcp", nil, e.addr)
	if err != nil {
		return err
	}

	// flush stats to connection
	e.flush(conn)

	// close connection
	conn.Close()
	return nil
}

func (e *emitter) flush(conn io.Writer) {
	// only one flush at a time
	e.metricMu.Lock()
	defer e.metricMu.Unlock()

	// buffer the writer and make sure to flush it
	w := bufio.NewWriter(conn)
	defer w.Flush()

	now := time.Now().Unix()

	// emit counter stats
	for _, c := range e.counters {
		fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, c.Name(), c.count, now)
	}

	// emit histogram specific stats
	for _, h := range e.histograms {
		hist := h.hist.Merge()
		fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, h.Name(), hist.TotalCount(), now)
		fmt.Fprintf(w, "%s.%s.min %d %d\n", e.prefix, h.Name(), hist.Min(), now)
		fmt.Fprintf(w, "%s.%s.max %d %d\n", e.prefix, h.Name(), hist.Max(), now)
		fmt.Fprintf(w, "%s.%s.mean %.2f %d\n", e.prefix, h.Name(), hist.Mean(), now)
		fmt.Fprintf(w, "%s.%s.std-dev %.2f %d\n", e.prefix, h.Name(), hist.StdDev(), now)
	}

	// emit gauge stats (which can include some histogram quantiles)
	for _, g := range e.gauges {
		fmt.Fprintf(w, "%s.%s %.2f %d\n", e.prefix, g.Name(), g.Get(), now)
	}
}

type counter struct {
	key   string
	count uint64
}

func (c *counter) Name() string { return c.key }

func (c *counter) With(metrics.Field) metrics.Counter { return c }

func (c *counter) Add(delta uint64) { atomic.AddUint64(&c.count, delta) }

type gauge struct {
	key   string
	value uint64 // math.Float64bits
}

func (g *gauge) Name() string { return g.key }

func (g *gauge) With(metrics.Field) metrics.Gauge { return g }

func (g *gauge) Add(delta float64) {
	for {
		old := atomic.LoadUint64(&g.value)
		new := math.Float64bits(math.Float64frombits(old) + delta)
		if atomic.CompareAndSwapUint64(&g.value, old, new) {
			return
		}
	}
}

func (g *gauge) Set(value float64) {
	atomic.StoreUint64(&g.value, math.Float64bits(value))
}

func (g *gauge) Get() float64 {
	return math.Float64frombits(atomic.LoadUint64(&g.value))
}

type windowedHistogram struct {
	mu   sync.Mutex
	hist *hdrhistogram.WindowedHistogram

	name   string
	gauges map[int]metrics.Gauge
}

// NewWindowedHistogram is taken from http://github.com/codahale/metrics. It returns a
// windowed HDR histogram which drops data older than five minutes.
//
// The histogram exposes metrics for each passed quantile as gauges. Users are expected
// to provide their own set of Gauges for quantiles to make this Histogram work across multiple
// metrics providers.
func newWindowedHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles map[int]metrics.Gauge) *windowedHistogram {
	h := &windowedHistogram{
		hist:   hdrhistogram.NewWindowed(5, minValue, maxValue, sigfigs),
		name:   name,
		gauges: quantiles,
	}
	go h.rotateLoop(1 * time.Minute)
	return h
}

func (h *windowedHistogram) Name() string                         { return h.name }
func (h *windowedHistogram) With(metrics.Field) metrics.Histogram { return h }

func (h *windowedHistogram) Observe(value int64) {
	h.mu.Lock()
	err := h.hist.Current.RecordValue(value)
	h.mu.Unlock()

	if err != nil {
		panic(err.Error())
	}

	for q, gauge := range h.gauges {
		gauge.Set(float64(h.hist.Current.ValueAtQuantile(float64(q))))
	}
}

func (h *windowedHistogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
	bars := h.hist.Merge().Distribution()
	buckets := make([]metrics.Bucket, len(bars))
	for i, bar := range bars {
		buckets[i] = metrics.Bucket{
			From:  bar.From,
			To:    bar.To,
			Count: bar.Count,
		}
	}
	quantiles := make([]metrics.Quantile, 0, len(h.gauges))
	for quantile, gauge := range h.gauges {
		quantiles = append(quantiles, metrics.Quantile{
			Quantile: quantile,
			Value:    int64(gauge.Get()),
		})
	}
	sort.Sort(quantileSlice(quantiles))
	return buckets, quantiles
}

func (h *windowedHistogram) rotateLoop(d time.Duration) {
	for range time.Tick(d) {
		h.mu.Lock()
		h.hist.Rotate()
		h.mu.Unlock()
	}
}

type quantileSlice []metrics.Quantile

func (a quantileSlice) Len() int           { return len(a) }
func (a quantileSlice) Less(i, j int) bool { return a[i].Quantile < a[j].Quantile }
func (a quantileSlice) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }