package cloudwatch
import (
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
)
// CloudWatch receives metrics observations and forwards them to CloudWatch.
// Create a CloudWatch object, use it to create metrics, and pass those metrics as
// dependencies to the components that will use them.
//
// To regularly report metrics to CloudWatch, use the WriteLoop helper method.
type CloudWatch struct {
mtx sync.RWMutex
namespace string
numConcurrentRequests int
svc cloudwatchiface.CloudWatchAPI
counters map[string]*counter
gauges map[string]*gauge
histograms map[string]*histogram
logger log.Logger
}
// New returns a CloudWatch object that may be used to create metrics.
// Namespace is applied to all created metrics and maps to the CloudWatch namespace.
// Callers must ensure that regular calls to Send are performed, either
// manually or with one of the helper methods.
func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch {
return &CloudWatch{
namespace: namespace,
numConcurrentRequests: 10,
svc: svc,
counters: map[string]*counter{},
gauges: map[string]*gauge{},
histograms: map[string]*histogram{},
logger: logger,
}
}
// SetConcurrency overrides the default number (10) of concurrent requests sent to CloudWatch.
// CloudWatch allows a maximum of 20 metrics to be sent per request, so when Send is called,
// we partition the metrics and concurrently call their API. This parameter sets maximum number
// of concurrent requests.
func (cw *CloudWatch) SetConcurrency(numConcurrentRequests int) *CloudWatch {
cw.numConcurrentRequests = numConcurrentRequests
return cw
}
// NewCounter returns a counter. Observations are aggregated and emitted once
// per write invocation.
func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
cw.mtx.Lock()
defer cw.mtx.Unlock()
c := &counter{c: generic.NewCounter(name)}
cw.counters[name] = c
return c
}
// NewGauge returns a gauge. Observations are aggregated and emitted once per
// write invocation.
func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
cw.mtx.Lock()
defer cw.mtx.Unlock()
g := &gauge{g: generic.NewGauge(name)}
cw.gauges[name] = g
return g
}
// NewHistogram returns a histogram. Observations are aggregated and emitted as
// per-quantile gauges, once per write invocation. 50 is a good default value
// for buckets.
func (cw *CloudWatch) NewHistogram(name string, buckets int) metrics.Histogram {
cw.mtx.Lock()
defer cw.mtx.Unlock()
h := &histogram{h: generic.NewHistogram(name, buckets)}
cw.histograms[name] = h
return h
}
// WriteLoop is a helper method that invokes Send every time the passed
// channel fires. This method blocks until the channel is closed, so clients
// probably want to run it in its own goroutine. For typical usage, create a
// time.Ticker and pass its C channel to this method.
func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
for range c {
if err := cw.Send(); err != nil {
cw.logger.Log("during", "Send", "err", err)
}
}
}
// Send will fire an API request to CloudWatch with the latest stats for
// all metrics. It is preferred that the WriteLoop method is used.
func (cw *CloudWatch) Send() error {
cw.mtx.RLock()
defer cw.mtx.RUnlock()
now := time.Now()
var datums []*cloudwatch.MetricDatum
for name, c := range cw.counters {
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(c.c.LabelValues()...),
Value: aws.Float64(c.c.Value()),
Timestamp: aws.Time(now),
})
}
for name, g := range cw.gauges {
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(name),
Dimensions: makeDimensions(g.g.LabelValues()...),
Value: aws.Float64(g.g.Value()),
Timestamp: aws.Time(now),
})
}
for name, h := range cw.histograms {
for _, p := range []struct {
s string
f float64
}{
{"50", 0.50},
{"90", 0.90},
{"95", 0.95},
{"99", 0.99},
} {
datums = append(datums, &cloudwatch.MetricDatum{
MetricName: aws.String(fmt.Sprintf("%s_%s", name, p.s)),
Dimensions: makeDimensions(h.h.LabelValues()...),
Value: aws.Float64(h.h.Quantile(p.f)),
Timestamp: aws.Time(now),
})
}
}
var tokens = make(chan struct{}, cw.numConcurrentRequests)
var errors = make(chan error)
var n int
for len(datums) > 0 {
var batch []*cloudwatch.MetricDatum
lim := min(len(datums), cw.numConcurrentRequests)
batch, datums = datums[:lim], datums[lim:]
n++
go func(batch []*cloudwatch.MetricDatum) {
tokens <- struct{}{}
_, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
Namespace: aws.String(cw.namespace),
MetricData: batch,
})
<-tokens
errors <- err
}(batch)
}
var firstErr error
for ; n > 0; n-- {
if err := <-errors; err != nil && firstErr != nil {
firstErr = err
}
}
return firstErr
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
// counter is a CloudWatch counter metric.
type counter struct {
c *generic.Counter
}
// With implements counter
func (c *counter) With(labelValues ...string) metrics.Counter {
c.c = c.c.With(labelValues...).(*generic.Counter)
return c
}
// Add implements counter.
func (c *counter) Add(delta float64) {
c.c.Add(delta)
}
// gauge is a CloudWatch gauge metric.
type gauge struct {
g *generic.Gauge
}
// With implements gauge
func (g *gauge) With(labelValues ...string) metrics.Gauge {
g.g = g.g.With(labelValues...).(*generic.Gauge)
return g
}
// Set implements gauge
func (g *gauge) Set(value float64) {
g.g.Set(value)
}
// Add implements gauge
func (g *gauge) Add(delta float64) {
g.g.Add(delta)
}
// histogram is a CloudWatch histogram metric
type histogram struct {
h *generic.Histogram
}
// With implements histogram
func (h *histogram) With(labelValues ...string) metrics.Histogram {
h.h = h.h.With(labelValues...).(*generic.Histogram)
return h
}
// Observe implements histogram
func (h *histogram) Observe(value float64) {
h.h.Observe(value)
}
func makeDimensions(labelValues ...string) []*cloudwatch.Dimension {
dimensions := make([]*cloudwatch.Dimension, len(labelValues)/2)
for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
dimensions[j] = &cloudwatch.Dimension{
Name: aws.String(labelValues[i]),
Value: aws.String(labelValues[i+1]),
}
}
return dimensions
}