Codebase list golang-github-go-kit-kit / 24eddfe
Issue #529: Partition API requests to CloudWatch into separate concurrent batches to circumvent the 20 data per request limit that they have Alejandro Pedraza 6 years ago
2 changed file(s) with 90 addition(s) and 13 deletion(s). Raw diff Collapse all Expand all
1919 //
2020 // To regularly report metrics to CloudWatch, use the WriteLoop helper method.
2121 type CloudWatch struct {
22 mtx sync.RWMutex
23 namespace string
24 svc cloudwatchiface.CloudWatchAPI
25 counters map[string]*counter
26 gauges map[string]*gauge
27 histograms map[string]*histogram
28 logger log.Logger
22 mtx sync.RWMutex
23 namespace string
24 numConcurrentRequests int
25 svc cloudwatchiface.CloudWatchAPI
26 counters map[string]*counter
27 gauges map[string]*gauge
28 histograms map[string]*histogram
29 logger log.Logger
2930 }
3031
3132 // New returns a CloudWatch object that may be used to create metrics.
3435 // manually or with one of the helper methods.
3536 func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch {
3637 return &CloudWatch{
37 namespace: namespace,
38 namespace: namespace,
39 numConcurrentRequests: 10,
3840 svc: svc,
3941 counters: map[string]*counter{},
4042 gauges: map[string]*gauge{},
4143 histograms: map[string]*histogram{},
4244 logger: logger,
4345 }
46 }
47
48 // SetConcurrency overrides the default number (10) of concurrent requests sent to CloudWatch.
49 // CloudWatch allows a maximum of 20 metrics to be sent per request, so when Send is called,
50 // we partition the metrics and concurrently call their API. This parameter sets maximum number
51 // of concurrent requests.
52 func (cw *CloudWatch) SetConcurrency(numConcurrentRequests int) *CloudWatch {
53 cw.numConcurrentRequests = numConcurrentRequests
54 return cw
4455 }
4556
4657 // NewCounter returns a counter. Observations are aggregated and emitted once
132143 }
133144 }
134145
135 _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
136 Namespace: aws.String(cw.namespace),
137 MetricData: datums,
138 })
139 return err
146 var tokens = make(chan struct{}, cw.numConcurrentRequests)
147 var errors = make(chan error)
148 var n int
149
150 for len(datums) > 0 {
151 var batch []*cloudwatch.MetricDatum
152 lim := min(len(datums), cw.numConcurrentRequests)
153 batch, datums = datums[:lim], datums[lim:]
154 n++
155 go func(batch []*cloudwatch.MetricDatum) {
156 tokens <- struct{}{}
157 _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
158 Namespace: aws.String(cw.namespace),
159 MetricData: batch,
160 })
161 <-tokens
162 errors <- err
163 }(batch)
164 }
165
166 var firstErr error
167 for ; n > 0; n-- {
168 if err := <-errors; err != nil && firstErr != nil {
169 firstErr = err
170 }
171 }
172
173 return firstErr
174 }
175
176 func min(a, b int) int {
177 if a < b {
178 return a
179 }
180 return b
140181 }
141182
142183 // counter is a CloudWatch counter metric.
99 "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
1010
1111 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/metrics"
1213 "github.com/go-kit/kit/metrics/teststat"
1314 )
1415
8283 }
8384 }
8485
86 func TestCounterLowSendConcurrency(t *testing.T) {
87 namespace := "abc"
88 names := []string{"name1", "name2", "name3", "name4", "name5", "name6"}
89 label, value := "label", "value"
90 svc := newMockCloudWatch()
91 cw := New(namespace, svc, log.NewNopLogger())
92 cw = cw.SetConcurrency(2)
93
94 counters := make(map[string]metrics.Counter)
95 for _, name := range names {
96 counters[name] = cw.NewCounter(name).With(label, value)
97 }
98
99 valuef := func(name string) func() float64 {
100 return func() float64 {
101 err := cw.Send()
102 if err != nil {
103 t.Fatal(err)
104 }
105 svc.mtx.RLock()
106 defer svc.mtx.RUnlock()
107 return svc.valuesReceived[name]
108 }
109 }
110
111 for _, name := range names {
112 if err := teststat.TestCounter(counters[name], valuef(name)); err != nil {
113 t.Fatal(err)
114 }
115 if err := testDimensions(svc, name, label, value); err != nil {
116 t.Fatal(err)
117 }
118 }
119 }
120
85121 func TestGauge(t *testing.T) {
86122 namespace, name := "abc", "def"
87123 label, value := "label", "value"