use batch values API for CloudWatch PutMetric data call (#960)
* use batch values API for CloudWatch PutMetric data call
which was introduced at https://github.com/aws/aws-sdk-go/blob/master/CHANGELOG.md#release-v11536-2018-09-17
* fix test, so they can accept the list of received values from the gauge
* use batch api always
Taras authored 3 years ago
GitHub committed 3 years ago
19 | 19 | |
20 | 20 | const ( |
21 | 21 | maxConcurrentRequests = 20 |
22 | maxValuesInABatch = 150 | |
22 | 23 | ) |
23 | 24 | |
24 | 25 | type Percentiles []struct { |
173 | 174 | }) |
174 | 175 | |
175 | 176 | cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { |
176 | value := last(values) | |
177 | datums = append(datums, &cloudwatch.MetricDatum{ | |
177 | datum := &cloudwatch.MetricDatum{ | |
178 | 178 | MetricName: aws.String(name), |
179 | 179 | Dimensions: makeDimensions(lvs...), |
180 | Value: aws.Float64(value), | |
181 | 180 | Timestamp: aws.Time(now), |
182 | }) | |
181 | } | |
182 | ||
183 | if len(values) == 0 { | |
184 | return true | |
185 | } | |
186 | ||
187 | // CloudWatch Put Metrics API (https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html) | |
188 | // expects batch of unique values including the array of corresponding counts | |
189 | valuesCounter := make(map[float64]int) | |
190 | for _, v := range values { | |
191 | valuesCounter[v]++ | |
192 | } | |
193 | ||
194 | for value, count := range valuesCounter { | |
195 | if len(datum.Values) == maxValuesInABatch { | |
196 | break | |
197 | } | |
198 | datum.Values = append(datum.Values, aws.Float64(value)) | |
199 | datum.Counts = append(datum.Counts, aws.Float64(float64(count))) | |
200 | } | |
201 | ||
202 | datums = append(datums, datum) | |
183 | 203 | return true |
184 | 204 | }) |
185 | 205 |
17 | 17 | type mockCloudWatch struct { |
18 | 18 | cloudwatchiface.CloudWatchAPI |
19 | 19 | mtx sync.RWMutex |
20 | valuesReceived map[string]float64 | |
20 | valuesReceived map[string][]float64 | |
21 | 21 | dimensionsReceived map[string][]*cloudwatch.Dimension |
22 | 22 | } |
23 | 23 | |
24 | 24 | func newMockCloudWatch() *mockCloudWatch { |
25 | 25 | return &mockCloudWatch{ |
26 | valuesReceived: map[string]float64{}, | |
26 | valuesReceived: map[string][]float64{}, | |
27 | 27 | dimensionsReceived: map[string][]*cloudwatch.Dimension{}, |
28 | 28 | } |
29 | 29 | } |
32 | 32 | mcw.mtx.Lock() |
33 | 33 | defer mcw.mtx.Unlock() |
34 | 34 | for _, datum := range input.MetricData { |
35 | mcw.valuesReceived[*datum.MetricName] = *datum.Value | |
35 | if len(datum.Values) > 0 { | |
36 | for _, v := range datum.Values { | |
37 | mcw.valuesReceived[*datum.MetricName] = append(mcw.valuesReceived[*datum.MetricName], *v) | |
38 | } | |
39 | } else { | |
40 | mcw.valuesReceived[*datum.MetricName] = append(mcw.valuesReceived[*datum.MetricName], *datum.Value) | |
41 | } | |
36 | 42 | mcw.dimensionsReceived[*datum.MetricName] = datum.Dimensions |
37 | 43 | } |
38 | 44 | return nil, nil |
75 | 81 | cw := New(namespace, svc, WithLogger(log.NewNopLogger())) |
76 | 82 | counter := cw.NewCounter(name).With(label, value) |
77 | 83 | valuef := func() float64 { |
78 | err := cw.Send() | |
79 | if err != nil { | |
80 | t.Fatal(err) | |
81 | } | |
82 | svc.mtx.RLock() | |
83 | defer svc.mtx.RUnlock() | |
84 | return svc.valuesReceived[name] | |
84 | if err := cw.Send(); err != nil { | |
85 | t.Fatal(err) | |
86 | } | |
87 | svc.mtx.RLock() | |
88 | defer svc.mtx.RUnlock() | |
89 | value := svc.valuesReceived[name][len(svc.valuesReceived[name])-1] | |
90 | delete(svc.valuesReceived, name) | |
91 | ||
92 | return value | |
85 | 93 | } |
86 | 94 | if err := teststat.TestCounter(counter, valuef); err != nil { |
87 | 95 | t.Fatal(err) |
122 | 130 | } |
123 | 131 | |
124 | 132 | for i, name := range names { |
125 | if svc.valuesReceived[name] != wants[i] { | |
133 | if l := len(svc.valuesReceived[name]); l == 0 && wants[i] == 0 { | |
134 | continue | |
135 | } else if l != 1 { | |
136 | t.Fatalf("one value expected, got %d", l) | |
137 | } | |
138 | ||
139 | if svc.valuesReceived[name][0] != wants[i] { | |
126 | 140 | t.Fatalf("want %f, have %f", wants[i], svc.valuesReceived[name]) |
127 | 141 | } |
128 | 142 | if err := svc.testDimensions(name, labels[i], values[i]); err != nil { |
137 | 151 | svc := newMockCloudWatch() |
138 | 152 | cw := New(namespace, svc, WithLogger(log.NewNopLogger())) |
139 | 153 | gauge := cw.NewGauge(name).With(label, value) |
140 | valuef := func() float64 { | |
141 | err := cw.Send() | |
142 | if err != nil { | |
143 | t.Fatal(err) | |
144 | } | |
145 | svc.mtx.RLock() | |
146 | defer svc.mtx.RUnlock() | |
147 | return svc.valuesReceived[name] | |
148 | } | |
154 | valuef := func() []float64 { | |
155 | if err := cw.Send(); err != nil { | |
156 | t.Fatal(err) | |
157 | } | |
158 | svc.mtx.RLock() | |
159 | res := svc.valuesReceived[name] | |
160 | delete(svc.valuesReceived, name) | |
161 | defer svc.mtx.RUnlock() | |
162 | return res | |
163 | } | |
164 | ||
149 | 165 | if err := teststat.TestGauge(gauge, valuef); err != nil { |
150 | 166 | t.Fatal(err) |
151 | 167 | } |
169 | 185 | if err != nil { |
170 | 186 | t.Fatal(err) |
171 | 187 | } |
172 | svc.mtx.RLock() | |
173 | defer svc.mtx.RUnlock() | |
174 | p50 = svc.valuesReceived[n50] | |
175 | p90 = svc.valuesReceived[n90] | |
176 | p95 = svc.valuesReceived[n95] | |
177 | p99 = svc.valuesReceived[n99] | |
188 | ||
189 | svc.mtx.RLock() | |
190 | defer svc.mtx.RUnlock() | |
191 | if len(svc.valuesReceived[n50]) > 0 { | |
192 | p50 = svc.valuesReceived[n50][0] | |
193 | delete(svc.valuesReceived, n50) | |
194 | } | |
195 | ||
196 | if len(svc.valuesReceived[n90]) > 0 { | |
197 | p90 = svc.valuesReceived[n90][0] | |
198 | delete(svc.valuesReceived, n90) | |
199 | } | |
200 | ||
201 | if len(svc.valuesReceived[n95]) > 0 { | |
202 | p95 = svc.valuesReceived[n95][0] | |
203 | delete(svc.valuesReceived, n95) | |
204 | } | |
205 | ||
206 | if len(svc.valuesReceived[n99]) > 0 { | |
207 | p99 = svc.valuesReceived[n99][0] | |
208 | delete(svc.valuesReceived, n99) | |
209 | } | |
178 | 210 | return |
179 | 211 | } |
180 | 212 | if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil { |
206 | 238 | } |
207 | 239 | svc.mtx.RLock() |
208 | 240 | defer svc.mtx.RUnlock() |
209 | p50 = svc.valuesReceived[n50] | |
210 | p90 = svc.valuesReceived[n90] | |
241 | if len(svc.valuesReceived[n50]) > 0 { | |
242 | p50 = svc.valuesReceived[n50][0] | |
243 | delete(svc.valuesReceived, n50) | |
244 | } | |
245 | if len(svc.valuesReceived[n90]) > 0 { | |
246 | p90 = svc.valuesReceived[n90][0] | |
247 | delete(svc.valuesReceived, n90) | |
248 | } | |
211 | 249 | |
212 | 250 | // our teststat.TestHistogram wants us to give p95 and p99, |
213 | 251 | // but with custom percentiles we don't have those. |
16 | 16 | |
17 | 17 | func TestGauge(t *testing.T) { |
18 | 18 | gauge := NewGauge("expvar_gauge").With("label values", "not supported").(*Gauge) |
19 | value := func() float64 { f, _ := strconv.ParseFloat(gauge.f.String(), 64); return f } | |
19 | value := func() []float64 { f, _ := strconv.ParseFloat(gauge.f.String(), 64); return []float64{f} } | |
20 | 20 | if err := teststat.TestGauge(gauge, value); err != nil { |
21 | 21 | t.Fatal(err) |
22 | 22 | } |
44 | 44 | if want, have := name, gauge.Name; want != have { |
45 | 45 | t.Errorf("Name: want %q, have %q", want, have) |
46 | 46 | } |
47 | value := gauge.Value | |
47 | value := func() []float64 { return []float64{gauge.Value()} } | |
48 | 48 | if err := teststat.TestGauge(gauge, value); err != nil { |
49 | 49 | t.Fatal(err) |
50 | 50 | } |
33 | 33 | in := New(map[string]string{"foo": "alpha"}, influxdb.BatchPointsConfig{}, log.NewNopLogger()) |
34 | 34 | re := regexp.MustCompile(`influx_gauge,foo=alpha value=([0-9\.]+) [0-9]+`) |
35 | 35 | gauge := in.NewGauge("influx_gauge") |
36 | value := func() float64 { | |
36 | value := func() []float64 { | |
37 | 37 | client := &bufWriter{} |
38 | 38 | in.WriteTo(client) |
39 | 39 | match := re.FindStringSubmatch(client.buf.String()) |
40 | 40 | f, _ := strconv.ParseFloat(match[1], 64) |
41 | return f | |
41 | return []float64{f} | |
42 | 42 | } |
43 | 43 | if err := teststat.TestGauge(gauge, value); err != nil { |
44 | 44 | t.Fatal(err) |
39 | 39 | |
40 | 40 | gauge = gauge.With("label values", "not supported").(*Gauge) |
41 | 41 | |
42 | value := func() float64 { f := gauge.g.Val(); return f } | |
42 | value := func() []float64 { f := gauge.g.Val(); return []float64{f} } | |
43 | 43 | if err := teststat.TestGauge(gauge, value); err != nil { |
44 | 44 | t.Fatal(err) |
45 | 45 | } |
67 | 67 | Help: "This is a different help string.", |
68 | 68 | }, []string{"foo"}).With("foo", "bar") |
69 | 69 | |
70 | value := func() float64 { | |
70 | value := func() []float64 { | |
71 | 71 | matches := re.FindStringSubmatch(scrape()) |
72 | 72 | f, _ := strconv.ParseFloat(matches[1], 64) |
73 | return f | |
73 | return []float64{f} | |
74 | 74 | } |
75 | 75 | |
76 | 76 | if err := teststat.TestGauge(gauge, value); err != nil { |
22 | 22 | // LastLine expects a regex whose first capture group can be parsed as a |
23 | 23 | // float64. It will dump the WriterTo and parse each line, expecting to find a |
24 | 24 | // match. It returns the final captured float. |
25 | func LastLine(w io.WriterTo, regex string) func() float64 { | |
26 | return func() float64 { | |
25 | func LastLine(w io.WriterTo, regex string) func() []float64 { | |
26 | return func() []float64 { | |
27 | 27 | _, final := stats(w, regex, nil) |
28 | return final | |
28 | return []float64{final} | |
29 | 29 | } |
30 | 30 | } |
31 | 31 |
5 | 5 | "fmt" |
6 | 6 | "math" |
7 | 7 | "math/rand" |
8 | "reflect" | |
8 | 9 | "strings" |
9 | 10 | |
10 | 11 | "github.com/go-kit/kit/metrics" |
37 | 38 | |
38 | 39 | // TestGauge puts some values through the gauge, and then calls the value func |
39 | 40 | // to check that the gauge has the correct final value. |
40 | func TestGauge(gauge metrics.Gauge, value func() float64) error { | |
41 | func TestGauge(gauge metrics.Gauge, value func() []float64) error { | |
41 | 42 | a := rand.Perm(100) |
42 | 43 | n := rand.Intn(len(a)) |
43 | 44 | |
44 | var want float64 | |
45 | var want []float64 | |
45 | 46 | for i := 0; i < n; i++ { |
46 | 47 | f := float64(a[i]) |
47 | 48 | gauge.Set(f) |
48 | want = f | |
49 | want = append(want, f) | |
49 | 50 | } |
50 | 51 | |
51 | 52 | for i := 0; i < n; i++ { |
52 | 53 | f := float64(a[i]) |
53 | 54 | gauge.Add(f) |
54 | want += f | |
55 | want[len(want)-1] += f | |
55 | 56 | } |
56 | 57 | |
57 | if have := value(); want != have { | |
58 | if have := value(); reflect.DeepEqual(want, have) { | |
58 | 59 | return fmt.Errorf("want %f, have %f", want, have) |
59 | 60 | } |
60 | 61 |