package cloudwatch
import (
"fmt"
"os"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
"github.com/go-kit/kit/metrics/internal/lv"
"strconv"
)
const (
maxConcurrentRequests = 20
)
type Percentiles []struct {
s string
f float64
}
// CloudWatch receives metrics observations and forwards them to CloudWatch.
// Create a CloudWatch object, use it to create metrics, and pass those metrics as
// dependencies to the components that will use them.
//
// To regularly report metrics to CloudWatch, use the WriteLoop helper method.
type CloudWatch struct {
mtx sync.RWMutex
sem chan struct{}
namespace string
svc cloudwatchiface.CloudWatchAPI
counters *lv.Space
gauges *lv.Space
histograms *lv.Space
percentiles []float64 // percentiles to track
logger log.Logger
numConcurrentRequests int
}
type option func(*CloudWatch)
func (s *CloudWatch) apply(opt option) {
if opt != nil {
opt(s)
}
}
func WithLogger(logger log.Logger) option {
return func(c *CloudWatch) {
c.logger = logger
}
}
// WithPercentiles registers the percentiles to track, overriding the
// existing/default values.
// Reason is that Cloudwatch makes you pay per metric, so you can save half the money
// by only using 2 metrics instead of the default 4.
func WithPercentiles(percentiles ...float64) option {
return func(c *CloudWatch) {
c.percentiles = make([]float64, 0, len(percentiles))
for _, p := range percentiles {
if p < 0 || p > 1 {
continue // illegal entry; ignore
}
c.percentiles = append(c.percentiles, p)
}
}
}
func WithConcurrentRequests(n int) option {
return func(c *CloudWatch) {
if n > maxConcurrentRequests {
n = maxConcurrentRequests
}
c.numConcurrentRequests = n
}
}
// New returns a CloudWatch object that may be used to create metrics.
// Namespace is applied to all created metrics and maps to the CloudWatch namespace.
// Callers must ensure that regular calls to Send are performed, either
// manually or with one of the helper methods.
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) *CloudWatch {
cw := &CloudWatch{
sem: nil, // set below
namespace: namespace,
svc: svc,
counters: lv.NewSpace(),
gauges: lv.NewSpace(),
histograms: lv.NewSpace(),
numConcurrentRequests: 10,
logger: log.NewLogfmtLogger(os.Stderr),
percentiles: []float64{0.50, 0.90, 0.95, 0.99},
}
for _, optFunc := range options {
optFunc(cw)
}
cw.sem = make(chan struct{}, cw.numConcurrentRequests)
return cw
}
// NewCounter returns a counter. Observations are aggregated and emitted once
// per write invocation.
func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
return &Counter{
name: name,
obs: cw.counters.Observe,
}
}
// NewGauge returns an gauge.
func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
return &Gauge{
name: name,
obs: cw.gauges.Observe,
add: cw.gauges.Add,
}
}
// NewHistogram returns a histogram.
func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
return &Histogram{
name: name,
obs: cw.histograms.Observe,
}
}
// WriteLoop is a helper method that invokes Send every time the passed
// channel fires. This method blocks until the channel is closed, so clients
// probably want to run it in its own goroutine. For typical usage, create a
// time.Ticker and pass its C channel to this method.
func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
for range c {
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
}
}
}
// Send will fire an API request to CloudWatch with the latest stats for
// all metrics. It is preferred that the WriteLoop method is used.
func (cw *CloudWatch) Send() error {
cw.mtx.RLock()
defer cw.mtx.RUnlock()
now := time.Now()
var datums []*cloudwatch.MetricDatum
cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
value := sum(values)
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(lvs...),
Value: aws.Float64(value),
Timestamp: aws.Time(now),
})
return true
})
cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
value := last(values)
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(lvs...),
Value: aws.Float64(value),
Timestamp: aws.Time(now),
})
return true
})
// format a [0,1]-float value to a percentile value, with minimum nr of decimals
// 0.90 -> "90"
// 0.95 -> "95"
// 0.999 -> "99.9"
formatPerc := func(p float64) string {
return strconv.FormatFloat(p*100, 'f', -1, 64)
}
cw.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
histogram := generic.NewHistogram(name, 50)
for _, v := range values {
histogram.Observe(v)
}
for _, perc := range cw.percentiles {
value := histogram.Quantile(perc)
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(fmt.Sprintf("%s_%s", name, formatPerc(perc))),
Dimensions: makeDimensions(lvs...),
Value: aws.Float64(value),
Timestamp: aws.Time(now),
})
}
return true
})
var batches [][]*cloudwatch.MetricDatum
for len(datums) > 0 {
var batch []*cloudwatch.MetricDatum
lim := min(len(datums), maxConcurrentRequests)
batch, datums = datums[:lim], datums[lim:]
batches = append(batches, batch)
}
var errors = make(chan error, len(batches))
for _, batch := range batches {
go func(batch []*cloudwatch.MetricDatum) {
cw.sem <- struct{}{}
defer func() {
<-cw.sem
}()
_, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
Namespace: aws.String(cw.namespace),
MetricData: batch,
})
errors <- err
}(batch)
}
var firstErr error
for i := 0; i < cap(errors); i++ {
if err := <-errors; err != nil && firstErr != nil {
firstErr = err
}
}
return firstErr
}
func sum(a []float64) float64 {
var v float64
for _, f := range a {
v += f
}
return v
}
func last(a []float64) float64 {
return a[len(a)-1]
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
type observeFunc func(name string, lvs lv.LabelValues, value float64)
// Counter is a counter. Observations are forwarded to a node
// object, and aggregated (summed) per timeseries.
type Counter struct {
name string
lvs lv.LabelValues
obs observeFunc
}
// With implements metrics.Counter.
func (c *Counter) With(labelValues ...string) metrics.Counter {
return &Counter{
name: c.name,
lvs: c.lvs.With(labelValues...),
obs: c.obs,
}
}
// Add implements metrics.Counter.
func (c *Counter) Add(delta float64) {
c.obs(c.name, c.lvs, delta)
}
// Gauge is a gauge. Observations are forwarded to a node
// object, and aggregated (the last observation selected) per timeseries.
type Gauge struct {
name string
lvs lv.LabelValues
obs observeFunc
add observeFunc
}
// With implements metrics.Gauge.
func (g *Gauge) With(labelValues ...string) metrics.Gauge {
return &Gauge{
name: g.name,
lvs: g.lvs.With(labelValues...),
obs: g.obs,
add: g.add,
}
}
// Set implements metrics.Gauge.
func (g *Gauge) Set(value float64) {
g.obs(g.name, g.lvs, value)
}
// Add implements metrics.Gauge.
func (g *Gauge) Add(delta float64) {
g.add(g.name, g.lvs, delta)
}
// Histogram is an Influx histrogram. Observations are aggregated into a
// generic.Histogram and emitted as per-quantile gauges to the Influx server.
type Histogram struct {
name string
lvs lv.LabelValues
obs observeFunc
}
// With implements metrics.Histogram.
func (h *Histogram) With(labelValues ...string) metrics.Histogram {
return &Histogram{
name: h.name,
lvs: h.lvs.With(labelValues...),
obs: h.obs,
}
}
// Observe implements metrics.Histogram.
func (h *Histogram) Observe(value float64) {
h.obs(h.name, h.lvs, value)
}
func makeDimensions(labelValues ...string) []*cloudwatch.Dimension {
dimensions := make([]*cloudwatch.Dimension, len(labelValues)/2)
for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
dimensions[j] = &cloudwatch.Dimension{
Name: aws.String(labelValues[i]),
Value: aws.String(labelValues[i+1]),
}
}
return dimensions
}