diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index 855dee2..e267e03 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -12,6 +12,10 @@ "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/generic" +) + +const ( + maxConcurrentRequests = 20 ) // CloudWatch receives metrics observations and forwards them to CloudWatch. @@ -21,6 +25,7 @@ // To regularly report metrics to CloudWatch, use the WriteLoop helper method. type CloudWatch struct { mtx sync.RWMutex + sem chan struct{} namespace string numConcurrentRequests int svc cloudwatchiface.CloudWatchAPI @@ -32,27 +37,25 @@ // New returns a CloudWatch object that may be used to create metrics. // Namespace is applied to all created metrics and maps to the CloudWatch namespace. +// NumConcurrent sets the number of simultaneous requests to Amazon. +// A good default value is 10 and the maximum is 20. // Callers must ensure that regular calls to Send are performed, either // manually or with one of the helper methods. -func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch { +func New(namespace string, svc cloudwatchiface.CloudWatchAPI, numConcurrent int, logger log.Logger) *CloudWatch { + if numConcurrent > maxConcurrentRequests { + numConcurrent = maxConcurrentRequests + } + return &CloudWatch{ + sem: make(chan struct{}, numConcurrent), namespace: namespace, - numConcurrentRequests: 10, + numConcurrentRequests: numConcurrent, svc: svc, counters: map[string]*counter{}, gauges: map[string]*gauge{}, histograms: map[string]*histogram{}, logger: logger, } -} - -// SetConcurrency overrides the default number (10) of concurrent requests sent to CloudWatch. -// CloudWatch allows a maximum of 20 metrics to be sent per request, so when Send is called, -// we partition the metrics and concurrently call their API. This parameter sets maximum number -// of concurrent requests. -func (cw *CloudWatch) SetConcurrency(numConcurrentRequests int) *CloudWatch { - cw.numConcurrentRequests = numConcurrentRequests - return cw } // NewCounter returns a counter. Observations are aggregated and emitted once @@ -144,28 +147,30 @@ } } - var tokens = make(chan struct{}, cw.numConcurrentRequests) - var errors = make(chan error) - var n int - + var batches [][]*cloudwatch.MetricDatum for len(datums) > 0 { var batch []*cloudwatch.MetricDatum - lim := min(len(datums), cw.numConcurrentRequests) + lim := min(len(datums), maxConcurrentRequests) batch, datums = datums[:lim], datums[lim:] - n++ + batches = append(batches, batch) + } + + var errors = make(chan error, len(batches)) + for _, batch := range batches { go func(batch []*cloudwatch.MetricDatum) { - tokens <- struct{}{} + cw.sem <- struct{}{} + defer func() { + <-cw.sem + }() _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{ Namespace: aws.String(cw.namespace), MetricData: batch, }) - <-tokens errors <- err }(batch) } - var firstErr error - for ; n > 0; n-- { + for i := 0; i < cap(errors); i++ { if err := <-errors; err != nil && firstErr != nil { firstErr = err } diff --git a/metrics/cloudwatch/cloudwatch_test.go b/metrics/cloudwatch/cloudwatch_test.go index fa6cca3..c5f11db 100644 --- a/metrics/cloudwatch/cloudwatch_test.go +++ b/metrics/cloudwatch/cloudwatch_test.go @@ -3,6 +3,7 @@ import ( "errors" "fmt" + "strconv" "sync" "testing" @@ -65,7 +66,7 @@ namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, log.NewNopLogger()) + cw := New(namespace, svc, 10, log.NewNopLogger()) counter := cw.NewCounter(name).With(label, value) valuef := func() float64 { err := cw.Send() @@ -86,34 +87,33 @@ func TestCounterLowSendConcurrency(t *testing.T) { namespace := "abc" - names := []string{"name1", "name2", "name3", "name4", "name5", "name6"} - label, value := "label", "value" + var names, labels, values []string + for i := 1; i <= 45; i++ { + num := strconv.Itoa(i) + names = append(names, "name"+num) + labels = append(labels, "label"+num) + values = append(values, "value"+num) + } svc := newMockCloudWatch() - cw := New(namespace, svc, log.NewNopLogger()) - cw = cw.SetConcurrency(2) + cw := New(namespace, svc, 2, log.NewNopLogger()) counters := make(map[string]metrics.Counter) - for _, name := range names { - counters[name] = cw.NewCounter(name).With(label, value) + var wants []float64 + for i, name := range names { + counters[name] = cw.NewCounter(name).With(labels[i], values[i]) + wants = append(wants, teststat.FillCounter(counters[name])) } - valuef := func(name string) func() float64 { - return func() float64 { - err := cw.Send() - if err != nil { - t.Fatal(err) - } - svc.mtx.RLock() - defer svc.mtx.RUnlock() - return svc.valuesReceived[name] - } + err := cw.Send() + if err != nil { + t.Fatal(err) } - for _, name := range names { - if err := teststat.TestCounter(counters[name], valuef(name)); err != nil { - t.Fatal(err) + for i, name := range names { + if svc.valuesReceived[name] != wants[i] { + t.Fatal("want %f, have %f", wants[i], svc.valuesReceived[name]) } - if err := testDimensions(svc, name, label, value); err != nil { + if err := testDimensions(svc, name, labels[i], values[i]); err != nil { t.Fatal(err) } } @@ -123,7 +123,7 @@ namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, log.NewNopLogger()) + cw := New(namespace, svc, 10, log.NewNopLogger()) gauge := cw.NewGauge(name).With(label, value) valuef := func() float64 { err := cw.Send() @@ -146,7 +146,7 @@ namespace, name := "abc", "def" label, value := "label", "value" svc := newMockCloudWatch() - cw := New(namespace, svc, log.NewNopLogger()) + cw := New(namespace, svc, 10, log.NewNopLogger()) histogram := cw.NewHistogram(name, 50).With(label, value) n50 := fmt.Sprintf("%s_50", name) n90 := fmt.Sprintf("%s_90", name)