diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index 728634b..855dee2 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -20,13 +20,14 @@ // // To regularly report metrics to CloudWatch, use the WriteLoop helper method. type CloudWatch struct { - mtx sync.RWMutex - namespace string - svc cloudwatchiface.CloudWatchAPI - counters map[string]*counter - gauges map[string]*gauge - histograms map[string]*histogram - logger log.Logger + mtx sync.RWMutex + namespace string + numConcurrentRequests int + svc cloudwatchiface.CloudWatchAPI + counters map[string]*counter + gauges map[string]*gauge + histograms map[string]*histogram + logger log.Logger } // New returns a CloudWatch object that may be used to create metrics. @@ -35,13 +36,23 @@ // manually or with one of the helper methods. func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch { return &CloudWatch{ - namespace: namespace, + namespace: namespace, + numConcurrentRequests: 10, svc: svc, counters: map[string]*counter{}, gauges: map[string]*gauge{}, histograms: map[string]*histogram{}, logger: logger, } +} + +// SetConcurrency overrides the default number (10) of concurrent requests sent to CloudWatch. +// CloudWatch allows a maximum of 20 metrics to be sent per request, so when Send is called, +// we partition the metrics and concurrently call their API. This parameter sets maximum number +// of concurrent requests. +func (cw *CloudWatch) SetConcurrency(numConcurrentRequests int) *CloudWatch { + cw.numConcurrentRequests = numConcurrentRequests + return cw } // NewCounter returns a counter. Observations are aggregated and emitted once @@ -133,11 +144,41 @@ } } - _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ - Namespace: aws.String(cw.namespace), - MetricData: datums, - }) - return err + var tokens = make(chan struct{}, cw.numConcurrentRequests) + var errors = make(chan error) + var n int + + for len(datums) > 0 { + var batch []*cloudwatch.MetricDatum + lim := min(len(datums), cw.numConcurrentRequests) + batch, datums = datums[:lim], datums[lim:] + n++ + go func(batch []*cloudwatch.MetricDatum) { + tokens <- struct{}{} + _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ + Namespace: aws.String(cw.namespace), + MetricData: batch, + }) + <-tokens + errors <- err + }(batch) + } + + var firstErr error + for ; n > 0; n-- { + if err := <-errors; err != nil && firstErr != nil { + firstErr = err + } + } + + return firstErr +} + +func min(a, b int) int { + if a < b { + return a + } + return b } // counter is a CloudWatch counter metric. diff --git a/metrics/cloudwatch/cloudwatch_test.go b/metrics/cloudwatch/cloudwatch_test.go index c0cf1d0..fa6cca3 100644 --- a/metrics/cloudwatch/cloudwatch_test.go +++ b/metrics/cloudwatch/cloudwatch_test.go @@ -10,6 +10,7 @@ "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/teststat" ) @@ -83,6 +84,41 @@ } } +func TestCounterLowSendConcurrency(t *testing.T) { + namespace := "abc" + names := []string{"name1", "name2", "name3", "name4", "name5", "name6"} + label, value := "label", "value" + svc := newMockCloudWatch() + cw := New(namespace, svc, log.NewNopLogger()) + cw = cw.SetConcurrency(2) + + counters := make(map[string]metrics.Counter) + for _, name := range names { + counters[name] = cw.NewCounter(name).With(label, value) + } + + valuef := func(name string) func() float64 { + return func() float64 { + err := cw.Send() + if err != nil { + t.Fatal(err) + } + svc.mtx.RLock() + defer svc.mtx.RUnlock() + return svc.valuesReceived[name] + } + } + + for _, name := range names { + if err := teststat.TestCounter(counters[name], valuef(name)); err != nil { + t.Fatal(err) + } + if err := testDimensions(svc, name, label, value); err != nil { + t.Fatal(err) + } + } +} + func TestGauge(t *testing.T) { namespace, name := "abc", "def" label, value := "label", "value"