Codebase list golang-github-go-kit-kit / c377c2ee-57ee-4295-ba5c-f8a0b0e527d1/main metrics / cloudwatch / cloudwatch.go
c377c2ee-57ee-4295-ba5c-f8a0b0e527d1/main

Tree @c377c2ee-57ee-4295-ba5c-f8a0b0e527d1/main (Download .tar.gz)

cloudwatch.go @c377c2ee-57ee-4295-ba5c-f8a0b0e527d1/main

5f24949
 
 
ead1f1c
2a3c64e
5f24949
3aad148
 
 
5f24949
f99615e
ead1f1c
5f24949
 
 
2a3c64e
f583a20
5f24949
 
196891a
 
 
 
2a3c64e
 
 
 
 
5616015
 
 
 
 
5f24949
002c376
 
 
 
 
 
 
f583a20
24eddfe
2a3c64e
 
 
002c376
2a3c64e
002c376
2a3c64e
 
 
 
 
 
002c376
 
2a3c64e
 
 
f583a20
 
0be9fba
 
f583a20
002c376
0be9fba
f583a20
 
 
2a3c64e
f583a20
2a3c64e
 
 
 
 
002c376
2a3c64e
 
 
002c376
2a3c64e
5f24949
 
0933e61
 
 
 
2a3c64e
002c376
 
 
 
 
 
 
2a3c64e
 
f583a20
2a3c64e
 
002c376
 
196891a
 
002c376
 
 
5f24949
 
5616015
 
ead1f1c
2a3c64e
 
 
 
5f24949
 
2a3c64e
ead1f1c
2a3c64e
 
 
 
 
f7bcf6a
 
2a3c64e
 
 
 
 
 
3a00cf8
 
0933e61
 
 
 
3aad148
 
5616015
 
 
 
 
3aad148
ead1f1c
 
5616015
 
 
 
f7bcf6a
ead1f1c
5616015
2a3c64e
 
5616015
 
2a3c64e
 
5616015
 
2a3c64e
 
3a00cf8
2a3c64e
 
5616015
 
2a3c64e
 
5616015
 
2a3c64e
 
5616015
f583a20
 
 
 
 
 
 
 
2a3c64e
 
 
 
 
 
 
f583a20
 
f7bcf6a
4d9f525
2a3c64e
 
f7bcf6a
3a00cf8
f7bcf6a
2a3c64e
 
5616015
196891a
24eddfe
 
196891a
24eddfe
196891a
 
 
 
 
24eddfe
196891a
 
 
 
24eddfe
 
 
 
 
 
 
 
196891a
24eddfe
 
 
 
 
 
 
 
2a3c64e
 
 
 
 
 
 
 
 
 
 
 
24eddfe
 
 
 
 
3aad148
 
2a3c64e
 
 
 
 
 
 
 
5f24949
 
2a3c64e
 
 
 
 
 
 
5f24949
 
2a3c64e
 
 
5f24949
3aad148
2a3c64e
 
 
 
 
 
 
3a00cf8
 
2a3c64e
 
 
 
 
 
 
 
3a00cf8
 
2a3c64e
 
 
3a00cf8
 
2a3c64e
 
 
3a00cf8
 
2a3c64e
 
 
 
 
 
f7bcf6a
 
2a3c64e
 
 
 
 
 
 
f7bcf6a
 
2a3c64e
 
 
f7bcf6a
 
3aad148
 
1223f9f
3aad148
 
 
 
 
 
 
package cloudwatch

import (
	"fmt"
	"os"
	"sync"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/cloudwatch"
	"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"

	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/metrics"
	"github.com/go-kit/kit/metrics/generic"
	"github.com/go-kit/kit/metrics/internal/lv"
	"strconv"
)

const (
	maxConcurrentRequests = 20
)

type Percentiles []struct {
	s string
	f float64
}

// CloudWatch receives metrics observations and forwards them to CloudWatch.
// Create a CloudWatch object, use it to create metrics, and pass those metrics as
// dependencies to the components that will use them.
//
// To regularly report metrics to CloudWatch, use the WriteLoop helper method.
type CloudWatch struct {
	mtx                   sync.RWMutex
	sem                   chan struct{}
	namespace             string
	svc                   cloudwatchiface.CloudWatchAPI
	counters              *lv.Space
	gauges                *lv.Space
	histograms            *lv.Space
	percentiles           []float64 // percentiles to track
	logger                log.Logger
	numConcurrentRequests int
}

type option func(*CloudWatch)

func (s *CloudWatch) apply(opt option) {
	if opt != nil {
		opt(s)
	}
}

func WithLogger(logger log.Logger) option {
	return func(c *CloudWatch) {
		c.logger = logger
	}
}

// WithPercentiles registers the percentiles to track, overriding the
// existing/default values.
// Reason is that Cloudwatch makes you pay per metric, so you can save half the money
// by only using 2 metrics instead of the default 4.
func WithPercentiles(percentiles ...float64) option {
	return func(c *CloudWatch) {
		c.percentiles = make([]float64, 0, len(percentiles))
		for _, p := range percentiles {
			if p < 0 || p > 1 {
				continue // illegal entry; ignore
			}
			c.percentiles = append(c.percentiles, p)
		}
	}
}

func WithConcurrentRequests(n int) option {
	return func(c *CloudWatch) {
		if n > maxConcurrentRequests {
			n = maxConcurrentRequests
		}
		c.numConcurrentRequests = n
	}
}

// New returns a CloudWatch object that may be used to create metrics.
// Namespace is applied to all created metrics and maps to the CloudWatch namespace.
// Callers must ensure that regular calls to Send are performed, either
// manually or with one of the helper methods.
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) *CloudWatch {
	cw := &CloudWatch{
		sem:                   nil, // set below
		namespace:             namespace,
		svc:                   svc,
		counters:              lv.NewSpace(),
		gauges:                lv.NewSpace(),
		histograms:            lv.NewSpace(),
		numConcurrentRequests: 10,
		logger:                log.NewLogfmtLogger(os.Stderr),
		percentiles:           []float64{0.50, 0.90, 0.95, 0.99},
	}

	for _, optFunc := range options {
		optFunc(cw)
	}

	cw.sem = make(chan struct{}, cw.numConcurrentRequests)

	return cw
}

// NewCounter returns a counter. Observations are aggregated and emitted once
// per write invocation.
func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
	return &Counter{
		name: name,
		obs:  cw.counters.Observe,
	}
}

// NewGauge returns an gauge.
func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
	return &Gauge{
		name: name,
		obs:  cw.gauges.Observe,
		add:  cw.gauges.Add,
	}
}

// NewHistogram returns a histogram.
func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
	return &Histogram{
		name: name,
		obs:  cw.histograms.Observe,
	}
}

// WriteLoop is a helper method that invokes Send every time the passed
// channel fires. This method blocks until the channel is closed, so clients
// probably want to run it in its own goroutine. For typical usage, create a
// time.Ticker and pass its C channel to this method.
func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
	for range c {
		if err := cw.Send(); err != nil {
			cw.logger.Log("during", "Send", "err", err)
		}
	}
}

// Send will fire an API request to CloudWatch with the latest stats for
// all metrics. It is preferred that the WriteLoop method is used.
func (cw *CloudWatch) Send() error {
	cw.mtx.RLock()
	defer cw.mtx.RUnlock()
	now := time.Now()

	var datums []*cloudwatch.MetricDatum

	cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
		value := sum(values)
		datums = append(datums, &cloudwatch.MetricDatum{
			MetricName: aws.String(name),
			Dimensions: makeDimensions(lvs...),
			Value:      aws.Float64(value),
			Timestamp:  aws.Time(now),
		})
		return true
	})

	cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
		value := last(values)
		datums = append(datums, &cloudwatch.MetricDatum{
			MetricName: aws.String(name),
			Dimensions: makeDimensions(lvs...),
			Value:      aws.Float64(value),
			Timestamp:  aws.Time(now),
		})
		return true
	})

	// format a [0,1]-float value to a percentile value, with minimum nr of decimals
	// 0.90 -> "90"
	// 0.95 -> "95"
	// 0.999 -> "99.9"
	formatPerc := func(p float64) string {
		return strconv.FormatFloat(p*100, 'f', -1, 64)
	}

	cw.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
		histogram := generic.NewHistogram(name, 50)

		for _, v := range values {
			histogram.Observe(v)
		}

		for _, perc := range cw.percentiles {
			value := histogram.Quantile(perc)
			datums = append(datums, &cloudwatch.MetricDatum{
				MetricName: aws.String(fmt.Sprintf("%s_%s", name, formatPerc(perc))),
				Dimensions: makeDimensions(lvs...),
				Value:      aws.Float64(value),
				Timestamp:  aws.Time(now),
			})
		}
		return true
	})

	var batches [][]*cloudwatch.MetricDatum
	for len(datums) > 0 {
		var batch []*cloudwatch.MetricDatum
		lim := min(len(datums), maxConcurrentRequests)
		batch, datums = datums[:lim], datums[lim:]
		batches = append(batches, batch)
	}

	var errors = make(chan error, len(batches))
	for _, batch := range batches {
		go func(batch []*cloudwatch.MetricDatum) {
			cw.sem <- struct{}{}
			defer func() {
				<-cw.sem
			}()
			_, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
				Namespace:  aws.String(cw.namespace),
				MetricData: batch,
			})
			errors <- err
		}(batch)
	}
	var firstErr error
	for i := 0; i < cap(errors); i++ {
		if err := <-errors; err != nil && firstErr != nil {
			firstErr = err
		}
	}

	return firstErr
}

func sum(a []float64) float64 {
	var v float64
	for _, f := range a {
		v += f
	}
	return v
}

func last(a []float64) float64 {
	return a[len(a)-1]
}

func min(a, b int) int {
	if a < b {
		return a
	}
	return b
}

type observeFunc func(name string, lvs lv.LabelValues, value float64)

// Counter is a counter. Observations are forwarded to a node
// object, and aggregated (summed) per timeseries.
type Counter struct {
	name string
	lvs  lv.LabelValues
	obs  observeFunc
}

// With implements metrics.Counter.
func (c *Counter) With(labelValues ...string) metrics.Counter {
	return &Counter{
		name: c.name,
		lvs:  c.lvs.With(labelValues...),
		obs:  c.obs,
	}
}

// Add implements metrics.Counter.
func (c *Counter) Add(delta float64) {
	c.obs(c.name, c.lvs, delta)
}

// Gauge is a gauge. Observations are forwarded to a node
// object, and aggregated (the last observation selected) per timeseries.
type Gauge struct {
	name string
	lvs  lv.LabelValues
	obs  observeFunc
	add  observeFunc
}

// With implements metrics.Gauge.
func (g *Gauge) With(labelValues ...string) metrics.Gauge {
	return &Gauge{
		name: g.name,
		lvs:  g.lvs.With(labelValues...),
		obs:  g.obs,
		add:  g.add,
	}
}

// Set implements metrics.Gauge.
func (g *Gauge) Set(value float64) {
	g.obs(g.name, g.lvs, value)
}

// Add implements metrics.Gauge.
func (g *Gauge) Add(delta float64) {
	g.add(g.name, g.lvs, delta)
}

// Histogram is an Influx histrogram. Observations are aggregated into a
// generic.Histogram and emitted as per-quantile gauges to the Influx server.
type Histogram struct {
	name string
	lvs  lv.LabelValues
	obs  observeFunc
}

// With implements metrics.Histogram.
func (h *Histogram) With(labelValues ...string) metrics.Histogram {
	return &Histogram{
		name: h.name,
		lvs:  h.lvs.With(labelValues...),
		obs:  h.obs,
	}
}

// Observe implements metrics.Histogram.
func (h *Histogram) Observe(value float64) {
	h.obs(h.name, h.lvs, value)
}

func makeDimensions(labelValues ...string) []*cloudwatch.Dimension {
	dimensions := make([]*cloudwatch.Dimension, len(labelValues)/2)
	for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
		dimensions[j] = &cloudwatch.Dimension{
			Name:  aws.String(labelValues[i]),
			Value: aws.String(labelValues[i+1]),
		}
	}
	return dimensions
}