Codebase list golang-github-go-kit-kit / d8e4488 metrics2 / influx / influxdb.go
d8e4488

Tree @d8e4488 (Download .tar.gz)

influxdb.go @d8e4488raw · history · blame

// Package influx provides an InfluxDB implementation for metrics. The model is
// similar to other push-based instrumentation systems. Observations are
// aggregated locally and emitted to the Influx server on regular intervals.
package influx

import (
	"sync"
	"time"

	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/metrics2/generic"
	influxdb "github.com/influxdata/influxdb/client/v2"
)

// Influx is a store for metrics that will be emitted to an Influx database.
//
// Influx is a general purpose time-series database, and has no native concepts
// of counters, gauges, or histograms. Counters are modeled as a timeseries with
// one data point per flush, with a "count" field that reflects all adds since
// the last flush. Gauges are modeled as a timeseries with one data point per
// flush, with a "value" field that reflects the current state of the gauge.
// Histograms are modeled as 4 gauge timeseries, one each for the 50th, 90th,
// 95th, and 99th quantiles.
//
// Influx tags are assigned to each Go kit metric at construction, and are
// immutable for the life of the metric. Influx fields are mapped to Go kit
// label values, and may be mutated via With functions. Actual metric values are
// provided as fields with specific names depending on the metric.
//
// All observations are batched in memory locally, and flushed on demand.
type Influx struct {
	mtx        sync.RWMutex
	counters   map[string]*Counter
	gauges     map[string]*Gauge
	histograms map[string]*Histogram
	tags       map[string]string
	conf       influxdb.BatchPointsConfig
	logger     log.Logger
}

// New returns an Influx object, ready to create metrics and aggregate
// observations, and automatically flushing to the passed Influx client every
// flushInterval. Use the returned stop function to terminate the flushing
// goroutine.
func New(
	tags map[string]string,
	conf influxdb.BatchPointsConfig,
	client influxdb.Client,
	flushInterval time.Duration,
	logger log.Logger,
) (res *Influx, stop func()) {
	i := NewRaw(tags, conf, logger)
	ticker := time.NewTicker(flushInterval)
	go i.FlushTo(client, ticker)
	return i, ticker.Stop
}

// NewRaw returns an Influx object, ready to create metrics and aggregate
// observations, but without automatically flushing anywhere. Users should
// probably prefer the New constructor.
//
// Tags are applied to all metrics created from this object. A BatchPoints
// structure is created from the provided BatchPointsConfig; any error will
// cause a panic. Observations are aggregated into the BatchPoints.
func NewRaw(tags map[string]string, conf influxdb.BatchPointsConfig, logger log.Logger) *Influx {
	return &Influx{
		counters:   map[string]*Counter{},
		gauges:     map[string]*Gauge{},
		histograms: map[string]*Histogram{},
		tags:       tags,
		conf:       conf,
		logger:     logger,
	}
}

// NewCounter returns a generic counter with static tags.
func (i *Influx) NewCounter(name string, tags map[string]string) *Counter {
	i.mtx.Lock()
	defer i.mtx.Unlock()
	c := newCounter(tags)
	i.counters[name] = c
	return c
}

// NewGauge returns a generic gauge with static tags.
func (i *Influx) NewGauge(name string, tags map[string]string) *Gauge {
	i.mtx.Lock()
	defer i.mtx.Unlock()
	g := newGauge(tags)
	i.gauges[name] = g
	return g
}

// NewHistogram returns a generic histogram with static tags. 50 is a good
// default number of buckets.
func (i *Influx) NewHistogram(name string, tags map[string]string, buckets int) *Histogram {
	i.mtx.Lock()
	defer i.mtx.Unlock()
	h := newHistogram(tags, buckets)
	i.histograms[name] = h
	return h
}

// FlushTo invokes WriteTo to the client every time the ticker fires. FlushTo
// blocks until the ticker is stopped. Most users won't need to call this method
// directly, and should prefer to use the New constructor.
func (i *Influx) FlushTo(client influxdb.Client, ticker *time.Ticker) {
	for range ticker.C {
		if err := i.WriteTo(client); err != nil {
			i.logger.Log("during", "flush", "err", err)
		}
	}
}

// WriteTo converts the current set of metrics to Influx BatchPoints, and writes
// the BatchPoints to the client. Clients probably shouldn't invoke this method
// directly, and should prefer using FlushTo, or the New constructor.
func (i *Influx) WriteTo(client influxdb.Client) error {
	i.mtx.Lock()
	defer i.mtx.Unlock()

	bp, err := influxdb.NewBatchPoints(i.conf)
	if err != nil {
		return err
	}
	now := time.Now()

	for name, c := range i.counters {
		fields := fieldsFrom(c.LabelValues())
		fields["count"] = c.ValueReset()
		p, err := influxdb.NewPoint(name, c.tags, fields, now)
		if err != nil {
			return err
		}
		bp.AddPoint(p)
	}

	for name, g := range i.gauges {
		fields := fieldsFrom(g.LabelValues())
		fields["value"] = g.Value()
		p, err := influxdb.NewPoint(name, g.tags, fields, now)
		if err != nil {
			return err
		}
		bp.AddPoint(p)
	}

	for name, h := range i.histograms {
		fields := fieldsFrom(h.LabelValues())
		for suffix, quantile := range map[string]float64{
			".p50": 0.50,
			".p90": 0.90,
			".p95": 0.95,
			".p99": 0.99,
		} {
			fields["value"] = h.Quantile(quantile)
			p, err := influxdb.NewPoint(name+suffix, h.tags, fields, now)
			if err != nil {
				return err
			}
			bp.AddPoint(p)
		}
	}

	return client.Write(bp)
}

func fieldsFrom(labelValues []string) map[string]interface{} {
	if len(labelValues)%2 != 0 {
		panic("fieldsFrom received a labelValues with an odd number of strings")
	}
	fields := make(map[string]interface{}, len(labelValues)/2)
	for i := 0; i < len(labelValues); i += 2 {
		fields[labelValues[i]] = labelValues[i+1]
	}
	return fields
}

// Counter is a generic counter, with static tags.
type Counter struct {
	*generic.Counter
	tags map[string]string
}

func newCounter(tags map[string]string) *Counter {
	return &Counter{
		Counter: generic.NewCounter(),
		tags:    tags,
	}
}

// Gauge is a generic gauge, with static tags.
type Gauge struct {
	*generic.Gauge
	tags map[string]string
}

func newGauge(tags map[string]string) *Gauge {
	return &Gauge{
		Gauge: generic.NewGauge(),
		tags:  tags,
	}
}

// Histogram is a generic histogram, with static tags.
type Histogram struct {
	*generic.Histogram
	tags map[string]string
}

func newHistogram(tags map[string]string, buckets int) *Histogram {
	return &Histogram{
		Histogram: generic.NewHistogram(buckets),
		tags:      tags,
	}
}