diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index 4bda287..0aad8a3 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -20,6 +20,7 @@ const ( maxConcurrentRequests = 20 + maxValuesInABatch = 150 ) type Percentiles []struct { @@ -174,13 +175,32 @@ }) cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { - value := last(values) - datums = append(datums, &cloudwatch.MetricDatum{ + datum := &cloudwatch.MetricDatum{ MetricName: aws.String(name), Dimensions: makeDimensions(lvs...), - Value: aws.Float64(value), Timestamp: aws.Time(now), - }) + } + + if len(values) == 0 { + return true + } + + // CloudWatch Put Metrics API (https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html) + // expects batch of unique values including the array of corresponding counts + valuesCounter := make(map[float64]int) + for _, v := range values { + valuesCounter[v]++ + } + + for value, count := range valuesCounter { + if len(datum.Values) == maxValuesInABatch { + break + } + datum.Values = append(datum.Values, aws.Float64(value)) + datum.Counts = append(datum.Counts, aws.Float64(float64(count))) + } + + datums = append(datums, datum) return true }) diff --git a/metrics/cloudwatch/cloudwatch_test.go b/metrics/cloudwatch/cloudwatch_test.go index 5c0c3c1..7496525 100644 --- a/metrics/cloudwatch/cloudwatch_test.go +++ b/metrics/cloudwatch/cloudwatch_test.go @@ -18,13 +18,13 @@ type mockCloudWatch struct { cloudwatchiface.CloudWatchAPI mtx sync.RWMutex - valuesReceived map[string]float64 + valuesReceived map[string][]float64 dimensionsReceived map[string][]*cloudwatch.Dimension } func newMockCloudWatch() *mockCloudWatch { return &mockCloudWatch{ - valuesReceived: map[string]float64{}, + valuesReceived: map[string][]float64{}, dimensionsReceived: map[string][]*cloudwatch.Dimension{}, } } @@ -33,7 +33,13 @@ mcw.mtx.Lock() defer mcw.mtx.Unlock() for _, datum := range input.MetricData { - mcw.valuesReceived[*datum.MetricName] = *datum.Value + if len(datum.Values) > 0 { + for _, v := range datum.Values { + mcw.valuesReceived[*datum.MetricName] = append(mcw.valuesReceived[*datum.MetricName], *v) + } + } else { + mcw.valuesReceived[*datum.MetricName] = append(mcw.valuesReceived[*datum.MetricName], *datum.Value) + } mcw.dimensionsReceived[*datum.MetricName] = datum.Dimensions } return nil, nil @@ -76,13 +82,15 @@ cw := New(namespace, svc, WithLogger(log.NewNopLogger())) counter := cw.NewCounter(name).With(label, value) valuef := func() float64 { - err := cw.Send() - if err != nil { - t.Fatal(err) - } - svc.mtx.RLock() - defer svc.mtx.RUnlock() - return svc.valuesReceived[name] + if err := cw.Send(); err != nil { + t.Fatal(err) + } + svc.mtx.RLock() + defer svc.mtx.RUnlock() + value := svc.valuesReceived[name][len(svc.valuesReceived[name])-1] + delete(svc.valuesReceived, name) + + return value } if err := teststat.TestCounter(counter, valuef); err != nil { t.Fatal(err) @@ -123,7 +131,13 @@ } for i, name := range names { - if svc.valuesReceived[name] != wants[i] { + if l := len(svc.valuesReceived[name]); l == 0 && wants[i] == 0 { + continue + } else if l != 1 { + t.Fatalf("one value expected, got %d", l) + } + + if svc.valuesReceived[name][0] != wants[i] { t.Fatalf("want %f, have %f", wants[i], svc.valuesReceived[name]) } if err := svc.testDimensions(name, labels[i], values[i]); err != nil { @@ -138,15 +152,17 @@ svc := newMockCloudWatch() cw := New(namespace, svc, WithLogger(log.NewNopLogger())) gauge := cw.NewGauge(name).With(label, value) - valuef := func() float64 { - err := cw.Send() - if err != nil { - t.Fatal(err) - } - svc.mtx.RLock() - defer svc.mtx.RUnlock() - return svc.valuesReceived[name] - } + valuef := func() []float64 { + if err := cw.Send(); err != nil { + t.Fatal(err) + } + svc.mtx.RLock() + res := svc.valuesReceived[name] + delete(svc.valuesReceived, name) + defer svc.mtx.RUnlock() + return res + } + if err := teststat.TestGauge(gauge, valuef); err != nil { t.Fatal(err) } @@ -170,12 +186,28 @@ if err != nil { t.Fatal(err) } - svc.mtx.RLock() - defer svc.mtx.RUnlock() - p50 = svc.valuesReceived[n50] - p90 = svc.valuesReceived[n90] - p95 = svc.valuesReceived[n95] - p99 = svc.valuesReceived[n99] + + svc.mtx.RLock() + defer svc.mtx.RUnlock() + if len(svc.valuesReceived[n50]) > 0 { + p50 = svc.valuesReceived[n50][0] + delete(svc.valuesReceived, n50) + } + + if len(svc.valuesReceived[n90]) > 0 { + p90 = svc.valuesReceived[n90][0] + delete(svc.valuesReceived, n90) + } + + if len(svc.valuesReceived[n95]) > 0 { + p95 = svc.valuesReceived[n95][0] + delete(svc.valuesReceived, n95) + } + + if len(svc.valuesReceived[n99]) > 0 { + p99 = svc.valuesReceived[n99][0] + delete(svc.valuesReceived, n99) + } return } if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil { @@ -207,8 +239,14 @@ } svc.mtx.RLock() defer svc.mtx.RUnlock() - p50 = svc.valuesReceived[n50] - p90 = svc.valuesReceived[n90] + if len(svc.valuesReceived[n50]) > 0 { + p50 = svc.valuesReceived[n50][0] + delete(svc.valuesReceived, n50) + } + if len(svc.valuesReceived[n90]) > 0 { + p90 = svc.valuesReceived[n90][0] + delete(svc.valuesReceived, n90) + } // our teststat.TestHistogram wants us to give p95 and p99, // but with custom percentiles we don't have those. diff --git a/metrics/expvar/expvar_test.go b/metrics/expvar/expvar_test.go index 9af4ae0..838cde5 100644 --- a/metrics/expvar/expvar_test.go +++ b/metrics/expvar/expvar_test.go @@ -17,7 +17,7 @@ func TestGauge(t *testing.T) { gauge := NewGauge("expvar_gauge").With("label values", "not supported").(*Gauge) - value := func() float64 { f, _ := strconv.ParseFloat(gauge.f.String(), 64); return f } + value := func() []float64 { f, _ := strconv.ParseFloat(gauge.f.String(), 64); return []float64{f} } if err := teststat.TestGauge(gauge, value); err != nil { t.Fatal(err) } diff --git a/metrics/generic/generic_test.go b/metrics/generic/generic_test.go index de30f39..cb725e0 100644 --- a/metrics/generic/generic_test.go +++ b/metrics/generic/generic_test.go @@ -45,7 +45,7 @@ if want, have := name, gauge.Name; want != have { t.Errorf("Name: want %q, have %q", want, have) } - value := gauge.Value + value := func() []float64 { return []float64{gauge.Value()} } if err := teststat.TestGauge(gauge, value); err != nil { t.Fatal(err) } diff --git a/metrics/influx/influx_test.go b/metrics/influx/influx_test.go index 39462e3..62f867a 100644 --- a/metrics/influx/influx_test.go +++ b/metrics/influx/influx_test.go @@ -34,12 +34,12 @@ in := New(map[string]string{"foo": "alpha"}, influxdb.BatchPointsConfig{}, log.NewNopLogger()) re := regexp.MustCompile(`influx_gauge,foo=alpha value=([0-9\.]+) [0-9]+`) gauge := in.NewGauge("influx_gauge") - value := func() float64 { + value := func() []float64 { client := &bufWriter{} in.WriteTo(client) match := re.FindStringSubmatch(client.buf.String()) f, _ := strconv.ParseFloat(match[1], 64) - return f + return []float64{f} } if err := teststat.TestGauge(gauge, value); err != nil { t.Fatal(err) diff --git a/metrics/pcp/pcp_test.go b/metrics/pcp/pcp_test.go index ce847a1..eb5f543 100644 --- a/metrics/pcp/pcp_test.go +++ b/metrics/pcp/pcp_test.go @@ -40,7 +40,7 @@ gauge = gauge.With("label values", "not supported").(*Gauge) - value := func() float64 { f := gauge.g.Val(); return f } + value := func() []float64 { f := gauge.g.Val(); return []float64{f} } if err := teststat.TestGauge(gauge, value); err != nil { t.Fatal(err) } diff --git a/metrics/prometheus/prometheus_test.go b/metrics/prometheus/prometheus_test.go index b252ac6..deb1586 100644 --- a/metrics/prometheus/prometheus_test.go +++ b/metrics/prometheus/prometheus_test.go @@ -68,10 +68,10 @@ Help: "This is a different help string.", }, []string{"foo"}).With("foo", "bar") - value := func() float64 { + value := func() []float64 { matches := re.FindStringSubmatch(scrape()) f, _ := strconv.ParseFloat(matches[1], 64) - return f + return []float64{f} } if err := teststat.TestGauge(gauge, value); err != nil { diff --git a/metrics/teststat/buffers.go b/metrics/teststat/buffers.go index d04c52c..c738d79 100644 --- a/metrics/teststat/buffers.go +++ b/metrics/teststat/buffers.go @@ -23,10 +23,10 @@ // LastLine expects a regex whose first capture group can be parsed as a // float64. It will dump the WriterTo and parse each line, expecting to find a // match. It returns the final captured float. -func LastLine(w io.WriterTo, regex string) func() float64 { - return func() float64 { +func LastLine(w io.WriterTo, regex string) func() []float64 { + return func() []float64 { _, final := stats(w, regex, nil) - return final + return []float64{final} } } diff --git a/metrics/teststat/teststat.go b/metrics/teststat/teststat.go index 74019f5..be1d489 100644 --- a/metrics/teststat/teststat.go +++ b/metrics/teststat/teststat.go @@ -6,6 +6,7 @@ "fmt" "math" "math/rand" + "reflect" "strings" "github.com/go-kit/kit/metrics" @@ -38,24 +39,24 @@ // TestGauge puts some values through the gauge, and then calls the value func // to check that the gauge has the correct final value. -func TestGauge(gauge metrics.Gauge, value func() float64) error { +func TestGauge(gauge metrics.Gauge, value func() []float64) error { a := rand.Perm(100) n := rand.Intn(len(a)) - var want float64 + var want []float64 for i := 0; i < n; i++ { f := float64(a[i]) gauge.Set(f) - want = f + want = append(want, f) } for i := 0; i < n; i++ { f := float64(a[i]) gauge.Add(f) - want += f + want[len(want)-1] += f } - if have := value(); want != have { + if have := value(); reflect.DeepEqual(want, have) { return fmt.Errorf("want %f, have %f", want, have) }