Metrics - Cloudwatch v2 (#668)
* Enable using counters as the other types
* For completeness - gauge<->histogram
* Create metrics/cloudwatch2 pkg
* Address PR comments
* Comment around prebuilt zeros StatisticsSet
* Address PR comments
Nelz authored 6 years ago
Peter Bourgon committed 6 years ago
0 | // Package cloudwatch2 emits all data as a StatisticsSet (rather than | |
1 | // a singular Value) to CloudWatch via the aws-sdk-go-v2 SDK. | |
2 | package cloudwatch2 | |
3 | ||
4 | import ( | |
5 | "math" | |
6 | "sync" | |
7 | "time" | |
8 | ||
9 | "github.com/aws/aws-sdk-go-v2/aws" | |
10 | "github.com/aws/aws-sdk-go-v2/service/cloudwatch" | |
11 | "github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface" | |
12 | "golang.org/x/sync/errgroup" | |
13 | ||
14 | "github.com/go-kit/kit/log" | |
15 | "github.com/go-kit/kit/metrics" | |
16 | "github.com/go-kit/kit/metrics/internal/convert" | |
17 | "github.com/go-kit/kit/metrics/internal/lv" | |
18 | ) | |
19 | ||
20 | const ( | |
21 | maxConcurrentRequests = 20 | |
22 | ) | |
23 | ||
24 | // CloudWatch receives metrics observations and forwards them to CloudWatch. | |
25 | // Create a CloudWatch object, use it to create metrics, and pass those metrics as | |
26 | // dependencies to the components that will use them. | |
27 | // | |
28 | // To regularly report metrics to CloudWatch, use the WriteLoop helper method. | |
29 | type CloudWatch struct { | |
30 | mtx sync.RWMutex | |
31 | sem chan struct{} | |
32 | namespace string | |
33 | svc cloudwatchiface.CloudWatchAPI | |
34 | counters *lv.Space | |
35 | logger log.Logger | |
36 | numConcurrentRequests int | |
37 | } | |
38 | ||
39 | // Option is a function adapter to change config of the CloudWatch struct | |
40 | type Option func(*CloudWatch) | |
41 | ||
42 | // WithLogger sets the Logger that will recieve error messages generated | |
43 | // during the WriteLoop. By default, no logger is used. | |
44 | func WithLogger(logger log.Logger) Option { | |
45 | return func(cw *CloudWatch) { | |
46 | cw.logger = logger | |
47 | } | |
48 | } | |
49 | ||
50 | // WithConcurrentRequests sets the upper limit on how many | |
51 | // cloudwatch.PutMetricDataRequest may be under way at any | |
52 | // given time. If n is greater than 20, 20 is used. By default, | |
53 | // the max is set at 10 concurrent requests. | |
54 | func WithConcurrentRequests(n int) Option { | |
55 | return func(cw *CloudWatch) { | |
56 | if n > maxConcurrentRequests { | |
57 | n = maxConcurrentRequests | |
58 | } | |
59 | cw.numConcurrentRequests = n | |
60 | } | |
61 | } | |
62 | ||
63 | // New returns a CloudWatch object that may be used to create metrics. | |
64 | // Namespace is applied to all created metrics and maps to the CloudWatch namespace. | |
65 | // Callers must ensure that regular calls to Send are performed, either | |
66 | // manually or with one of the helper methods. | |
67 | func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...Option) *CloudWatch { | |
68 | cw := &CloudWatch{ | |
69 | namespace: namespace, | |
70 | svc: svc, | |
71 | counters: lv.NewSpace(), | |
72 | numConcurrentRequests: 10, | |
73 | logger: log.NewNopLogger(), | |
74 | } | |
75 | ||
76 | for _, optFunc := range options { | |
77 | optFunc(cw) | |
78 | } | |
79 | ||
80 | cw.sem = make(chan struct{}, cw.numConcurrentRequests) | |
81 | ||
82 | return cw | |
83 | } | |
84 | ||
85 | // NewCounter returns a counter. Observations are aggregated and emitted once | |
86 | // per write invocation. | |
87 | func (cw *CloudWatch) NewCounter(name string) metrics.Counter { | |
88 | return &Counter{ | |
89 | name: name, | |
90 | obs: cw.counters.Observe, | |
91 | } | |
92 | } | |
93 | ||
94 | // NewGauge returns an gauge. Under the covers, there is no distinctions | |
95 | // in CloudWatch for how Counters/Histograms/Gauges are reported, so this | |
96 | // just wraps a cloudwatch2.Counter. | |
97 | func (cw *CloudWatch) NewGauge(name string) metrics.Gauge { | |
98 | return convert.NewCounterAsGauge(cw.NewCounter(name)) | |
99 | } | |
100 | ||
101 | // NewHistogram returns a histogram. Under the covers, there is no distinctions | |
102 | // in CloudWatch for how Counters/Histograms/Gauges are reported, so this | |
103 | // just wraps a cloudwatch2.Counter. | |
104 | func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram { | |
105 | return convert.NewCounterAsHistogram(cw.NewCounter(name)) | |
106 | } | |
107 | ||
108 | // WriteLoop is a helper method that invokes Send every time the passed | |
109 | // channel fires. This method blocks until the channel is closed, so clients | |
110 | // probably want to run it in its own goroutine. For typical usage, create a | |
111 | // time.Ticker and pass its C channel to this method. | |
112 | func (cw *CloudWatch) WriteLoop(c <-chan time.Time) { | |
113 | for range c { | |
114 | if err := cw.Send(); err != nil { | |
115 | cw.logger.Log("during", "Send", "err", err) | |
116 | } | |
117 | } | |
118 | } | |
119 | ||
120 | // Send will fire an API request to CloudWatch with the latest stats for | |
121 | // all metrics. It is preferred that the WriteLoop method is used. | |
122 | func (cw *CloudWatch) Send() error { | |
123 | cw.mtx.RLock() | |
124 | defer cw.mtx.RUnlock() | |
125 | now := time.Now() | |
126 | ||
127 | var datums []cloudwatch.MetricDatum | |
128 | ||
129 | cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { | |
130 | datums = append(datums, cloudwatch.MetricDatum{ | |
131 | MetricName: aws.String(name), | |
132 | Dimensions: makeDimensions(lvs...), | |
133 | StatisticValues: stats(values), | |
134 | Timestamp: aws.Time(now), | |
135 | }) | |
136 | return true | |
137 | }) | |
138 | ||
139 | var batches [][]cloudwatch.MetricDatum | |
140 | for len(datums) > 0 { | |
141 | var batch []cloudwatch.MetricDatum | |
142 | lim := len(datums) | |
143 | if lim > maxConcurrentRequests { | |
144 | lim = maxConcurrentRequests | |
145 | } | |
146 | batch, datums = datums[:lim], datums[lim:] | |
147 | batches = append(batches, batch) | |
148 | } | |
149 | ||
150 | var g errgroup.Group | |
151 | for _, batch := range batches { | |
152 | batch := batch | |
153 | g.Go(func() error { | |
154 | cw.sem <- struct{}{} | |
155 | defer func() { | |
156 | <-cw.sem | |
157 | }() | |
158 | req := cw.svc.PutMetricDataRequest(&cloudwatch.PutMetricDataInput{ | |
159 | Namespace: aws.String(cw.namespace), | |
160 | MetricData: batch, | |
161 | }) | |
162 | _, err := req.Send() | |
163 | return err | |
164 | }) | |
165 | } | |
166 | return g.Wait() | |
167 | } | |
168 | ||
169 | var zero = float64(0.0) | |
170 | ||
171 | // Just build this once to reduce construction costs whenever | |
172 | // someone does a Send with no aggregated values. | |
173 | var zeros = cloudwatch.StatisticSet{ | |
174 | Maximum: &zero, | |
175 | Minimum: &zero, | |
176 | Sum: &zero, | |
177 | SampleCount: &zero, | |
178 | } | |
179 | ||
180 | func stats(a []float64) *cloudwatch.StatisticSet { | |
181 | count := float64(len(a)) | |
182 | if count == 0 { | |
183 | return &zeros | |
184 | } | |
185 | ||
186 | var sum float64 | |
187 | var min = math.MaxFloat64 | |
188 | var max = math.MaxFloat64 * -1 | |
189 | for _, f := range a { | |
190 | sum += f | |
191 | if f < min { | |
192 | min = f | |
193 | } | |
194 | if f > max { | |
195 | max = f | |
196 | } | |
197 | } | |
198 | ||
199 | return &cloudwatch.StatisticSet{ | |
200 | Maximum: &max, | |
201 | Minimum: &min, | |
202 | Sum: &sum, | |
203 | SampleCount: &count, | |
204 | } | |
205 | } | |
206 | ||
207 | func makeDimensions(labelValues ...string) []cloudwatch.Dimension { | |
208 | dimensions := make([]cloudwatch.Dimension, len(labelValues)/2) | |
209 | for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 { | |
210 | dimensions[j] = cloudwatch.Dimension{ | |
211 | Name: aws.String(labelValues[i]), | |
212 | Value: aws.String(labelValues[i+1]), | |
213 | } | |
214 | } | |
215 | return dimensions | |
216 | } | |
217 | ||
218 | type observeFunc func(name string, lvs lv.LabelValues, value float64) | |
219 | ||
220 | // Counter is a counter. Observations are forwarded to a node | |
221 | // object, and aggregated per timeseries. | |
222 | type Counter struct { | |
223 | name string | |
224 | lvs lv.LabelValues | |
225 | obs observeFunc | |
226 | } | |
227 | ||
228 | // With implements metrics.Counter. | |
229 | func (c *Counter) With(labelValues ...string) metrics.Counter { | |
230 | return &Counter{ | |
231 | name: c.name, | |
232 | lvs: c.lvs.With(labelValues...), | |
233 | obs: c.obs, | |
234 | } | |
235 | } | |
236 | ||
237 | // Add implements metrics.Counter. | |
238 | func (c *Counter) Add(delta float64) { | |
239 | c.obs(c.name, c.lvs, delta) | |
240 | } |
0 | package cloudwatch2 | |
1 | ||
2 | import ( | |
3 | "strings" | |
4 | "testing" | |
5 | ||
6 | "github.com/aws/aws-sdk-go-v2/aws" | |
7 | "github.com/aws/aws-sdk-go-v2/service/cloudwatch" | |
8 | "github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface" | |
9 | ) | |
10 | ||
11 | func TestStats(t *testing.T) { | |
12 | testCases := []struct { | |
13 | name string | |
14 | vals []float64 | |
15 | xMin float64 | |
16 | xMax float64 | |
17 | xSum float64 | |
18 | xCt float64 | |
19 | }{ | |
20 | { | |
21 | "empty", | |
22 | []float64{}, | |
23 | 0.0, | |
24 | 0.0, | |
25 | 0.0, | |
26 | 0.0, | |
27 | }, | |
28 | { | |
29 | "single", | |
30 | []float64{3.1416}, | |
31 | 3.1416, | |
32 | 3.1416, | |
33 | 3.1416, | |
34 | 1.0, | |
35 | }, | |
36 | { | |
37 | "double", | |
38 | []float64{1.0, 9.0}, | |
39 | 1.0, | |
40 | 9.0, | |
41 | 10.0, | |
42 | 2.0, | |
43 | }, | |
44 | { | |
45 | "multiple", | |
46 | []float64{5.0, 1.0, 9.0, 5.0}, | |
47 | 1.0, | |
48 | 9.0, | |
49 | 20.0, | |
50 | 4.0, | |
51 | }, | |
52 | } | |
53 | ||
54 | for _, tc := range testCases { | |
55 | t.Run(tc.name, func(t *testing.T) { | |
56 | s := stats(tc.vals) | |
57 | if tc.xMin != *s.Minimum { | |
58 | t.Errorf("expected [%f]: %f\n", tc.xMin, *s.Minimum) | |
59 | } | |
60 | if tc.xMax != *s.Maximum { | |
61 | t.Errorf("expected [%f]: %f\n", tc.xMax, *s.Maximum) | |
62 | } | |
63 | if tc.xSum != *s.Sum { | |
64 | t.Errorf("expected [%f]: %f\n", tc.xSum, *s.Sum) | |
65 | } | |
66 | if tc.xCt != *s.SampleCount { | |
67 | t.Errorf("expected [%f]: %f\n", tc.xCt, *s.SampleCount) | |
68 | } | |
69 | }) | |
70 | } | |
71 | } | |
72 | ||
73 | type mockCloudWatch struct { | |
74 | cloudwatchiface.CloudWatchAPI | |
75 | latestName string | |
76 | latestData []cloudwatch.MetricDatum | |
77 | } | |
78 | ||
79 | func (mcw *mockCloudWatch) PutMetricDataRequest(in *cloudwatch.PutMetricDataInput) cloudwatch.PutMetricDataRequest { | |
80 | mcw.latestName = *in.Namespace | |
81 | mcw.latestData = in.MetricData | |
82 | return cloudwatch.PutMetricDataRequest{ | |
83 | // To mock the V2 API, most of the functions spit | |
84 | // out structs that you need to call Send() on. | |
85 | // The non-intuitive thing is that to get the Send() to avoid actually | |
86 | // going across the wire, you just create a dumb aws.Request with either | |
87 | // aws.Request.Data defined (for succes) or with aws.Request.Error | |
88 | // to simulate an Error. | |
89 | Request: &aws.Request{Data: &cloudwatch.PutMetricDataOutput{}}, | |
90 | Input: in, | |
91 | } | |
92 | } | |
93 | ||
94 | func TestSend(t *testing.T) { | |
95 | ns := "example-namespace" | |
96 | svc := &mockCloudWatch{} | |
97 | cw := New(ns, svc) | |
98 | ||
99 | c := cw.NewCounter("c").With("charlie", "cat") | |
100 | h := cw.NewHistogram("h").With("hotel", "horse") | |
101 | g := cw.NewGauge("g").With("golf", "giraffe") | |
102 | ||
103 | c.Add(4.0) | |
104 | c.Add(5.0) | |
105 | c.Add(6.0) | |
106 | h.Observe(3.0) | |
107 | h.Observe(5.0) | |
108 | h.Observe(7.0) | |
109 | g.Set(2.0) | |
110 | g.Set(5.0) | |
111 | g.Set(8.0) | |
112 | ||
113 | err := cw.Send() | |
114 | if err != nil { | |
115 | t.Fatalf("unexpected: %v\n", err) | |
116 | } | |
117 | ||
118 | if ns != svc.latestName { | |
119 | t.Errorf("expected namespace %q; not %q\n", ns, svc.latestName) | |
120 | } | |
121 | ||
122 | if len(svc.latestData) != 3 { | |
123 | t.Errorf("expected 3 datums: %v\n", svc.latestData) | |
124 | } | |
125 | for _, datum := range svc.latestData { | |
126 | initial := *datum.MetricName | |
127 | if len(datum.Dimensions) != 1 { | |
128 | t.Errorf("expected 1 dimension: %v\n", datum) | |
129 | } | |
130 | if !strings.HasPrefix(*datum.Dimensions[0].Name, initial) { | |
131 | t.Errorf("expected %q in Name of %v\n", initial, datum.Dimensions) | |
132 | } | |
133 | if !strings.HasPrefix(*datum.Dimensions[0].Value, initial) { | |
134 | t.Errorf("expected %q in Value of %v\n", initial, datum.Dimensions) | |
135 | } | |
136 | if datum.StatisticValues == nil { | |
137 | t.Errorf("expected StatisticValues in %v\n", datum) | |
138 | } | |
139 | if *datum.StatisticValues.Sum != 15.0 { | |
140 | t.Errorf("expected 15.0 for Sum in %v\n", datum) | |
141 | } | |
142 | if *datum.StatisticValues.SampleCount != 3.0 { | |
143 | t.Errorf("expected 3.0 for SampleCount in %v\n", datum) | |
144 | } | |
145 | } | |
146 | } |
0 | // Package convert provides a way to use Counters, Histograms, or Gauges | |
1 | // as one of the other types | |
2 | package convert | |
3 | ||
4 | import "github.com/go-kit/kit/metrics" | |
5 | ||
6 | type counterHistogram struct { | |
7 | c metrics.Counter | |
8 | } | |
9 | ||
10 | // NewCounterAsHistogram returns a Histogram that actually writes the | |
11 | // value on an underlying Counter | |
12 | func NewCounterAsHistogram(c metrics.Counter) metrics.Histogram { | |
13 | return counterHistogram{c} | |
14 | } | |
15 | ||
16 | // With implements Histogram. | |
17 | func (ch counterHistogram) With(labelValues ...string) metrics.Histogram { | |
18 | return counterHistogram{ch.c.With(labelValues...)} | |
19 | } | |
20 | ||
21 | // Observe implements histogram. | |
22 | func (ch counterHistogram) Observe(value float64) { | |
23 | ch.c.Add(value) | |
24 | } | |
25 | ||
26 | type histogramCounter struct { | |
27 | h metrics.Histogram | |
28 | } | |
29 | ||
30 | // NewHistogramAsCounter returns a Counter that actually writes the | |
31 | // value on an underlying Histogram | |
32 | func NewHistogramAsCounter(h metrics.Histogram) metrics.Counter { | |
33 | return histogramCounter{h} | |
34 | } | |
35 | ||
36 | // With implements Counter. | |
37 | func (hc histogramCounter) With(labelValues ...string) metrics.Counter { | |
38 | return histogramCounter{hc.h.With(labelValues...)} | |
39 | } | |
40 | ||
41 | // Add implements Counter. | |
42 | func (hc histogramCounter) Add(delta float64) { | |
43 | hc.h.Observe(delta) | |
44 | } | |
45 | ||
46 | type counterGauge struct { | |
47 | c metrics.Counter | |
48 | } | |
49 | ||
50 | // NewCounterAsGauge returns a Gauge that actually writes the | |
51 | // value on an underlying Counter | |
52 | func NewCounterAsGauge(c metrics.Counter) metrics.Gauge { | |
53 | return counterGauge{c} | |
54 | } | |
55 | ||
56 | // With implements Gauge. | |
57 | func (cg counterGauge) With(labelValues ...string) metrics.Gauge { | |
58 | return counterGauge{cg.c.With(labelValues...)} | |
59 | } | |
60 | ||
61 | // Set implements Gauge. | |
62 | func (cg counterGauge) Set(value float64) { | |
63 | cg.c.Add(value) | |
64 | } | |
65 | ||
66 | // Add implements metrics.Gauge. | |
67 | func (cg counterGauge) Add(delta float64) { | |
68 | cg.c.Add(delta) | |
69 | } | |
70 | ||
71 | type gaugeCounter struct { | |
72 | g metrics.Gauge | |
73 | } | |
74 | ||
75 | // NewGaugeAsCounter returns a Counter that actually writes the | |
76 | // value on an underlying Gauge | |
77 | func NewGaugeAsCounter(g metrics.Gauge) metrics.Counter { | |
78 | return gaugeCounter{g} | |
79 | } | |
80 | ||
81 | // With implements Counter. | |
82 | func (gc gaugeCounter) With(labelValues ...string) metrics.Counter { | |
83 | return gaugeCounter{gc.g.With(labelValues...)} | |
84 | } | |
85 | ||
86 | // Add implements Counter. | |
87 | func (gc gaugeCounter) Add(delta float64) { | |
88 | gc.g.Set(delta) | |
89 | } | |
90 | ||
91 | type histogramGauge struct { | |
92 | h metrics.Histogram | |
93 | } | |
94 | ||
95 | // NewHistogramAsGauge returns a Gauge that actually writes the | |
96 | // value on an underlying Histogram | |
97 | func NewHistogramAsGauge(h metrics.Histogram) metrics.Gauge { | |
98 | return histogramGauge{h} | |
99 | } | |
100 | ||
101 | // With implements Gauge. | |
102 | func (hg histogramGauge) With(labelValues ...string) metrics.Gauge { | |
103 | return histogramGauge{hg.h.With(labelValues...)} | |
104 | } | |
105 | ||
106 | // Set implements Gauge. | |
107 | func (hg histogramGauge) Set(value float64) { | |
108 | hg.h.Observe(value) | |
109 | } | |
110 | ||
111 | // Add implements metrics.Gauge. | |
112 | func (hg histogramGauge) Add(delta float64) { | |
113 | hg.h.Observe(delta) | |
114 | } | |
115 | ||
116 | type gaugeHistogram struct { | |
117 | g metrics.Gauge | |
118 | } | |
119 | ||
120 | // NewGaugeAsHistogram returns a Histogram that actually writes the | |
121 | // value on an underlying Gauge | |
122 | func NewGaugeAsHistogram(g metrics.Gauge) metrics.Histogram { | |
123 | return gaugeHistogram{g} | |
124 | } | |
125 | ||
126 | // With implements Histogram. | |
127 | func (gh gaugeHistogram) With(labelValues ...string) metrics.Histogram { | |
128 | return gaugeHistogram{gh.g.With(labelValues...)} | |
129 | } | |
130 | ||
131 | // Observe implements histogram. | |
132 | func (gh gaugeHistogram) Observe(value float64) { | |
133 | gh.g.Set(value) | |
134 | } |
0 | package convert | |
1 | ||
2 | import ( | |
3 | "testing" | |
4 | ||
5 | "github.com/go-kit/kit/metrics/generic" | |
6 | "github.com/go-kit/kit/metrics/teststat" | |
7 | ) | |
8 | ||
9 | func TestCounterHistogramConversion(t *testing.T) { | |
10 | name := "my_counter" | |
11 | c := generic.NewCounter(name) | |
12 | h := NewCounterAsHistogram(c) | |
13 | top := NewHistogramAsCounter(h).With("label", "counter").(histogramCounter) | |
14 | mid := top.h.(counterHistogram) | |
15 | low := mid.c.(*generic.Counter) | |
16 | if want, have := name, low.Name; want != have { | |
17 | t.Errorf("Name: want %q, have %q", want, have) | |
18 | } | |
19 | value := func() float64 { return low.Value() } | |
20 | if err := teststat.TestCounter(top, value); err != nil { | |
21 | t.Fatal(err) | |
22 | } | |
23 | } | |
24 | ||
25 | func TestCounterGaugeConversion(t *testing.T) { | |
26 | name := "my_counter" | |
27 | c := generic.NewCounter(name) | |
28 | g := NewCounterAsGauge(c) | |
29 | top := NewGaugeAsCounter(g).With("label", "counter").(gaugeCounter) | |
30 | mid := top.g.(counterGauge) | |
31 | low := mid.c.(*generic.Counter) | |
32 | if want, have := name, low.Name; want != have { | |
33 | t.Errorf("Name: want %q, have %q", want, have) | |
34 | } | |
35 | value := func() float64 { return low.Value() } | |
36 | if err := teststat.TestCounter(top, value); err != nil { | |
37 | t.Fatal(err) | |
38 | } | |
39 | } | |
40 | ||
41 | func TestHistogramGaugeConversion(t *testing.T) { | |
42 | name := "my_histogram" | |
43 | h := generic.NewHistogram(name, 50) | |
44 | g := NewHistogramAsGauge(h) | |
45 | top := NewGaugeAsHistogram(g).With("label", "histogram").(gaugeHistogram) | |
46 | mid := top.g.(histogramGauge) | |
47 | low := mid.h.(*generic.Histogram) | |
48 | if want, have := name, low.Name; want != have { | |
49 | t.Errorf("Name: want %q, have %q", want, have) | |
50 | } | |
51 | quantiles := func() (float64, float64, float64, float64) { | |
52 | return low.Quantile(0.50), low.Quantile(0.90), low.Quantile(0.95), low.Quantile(0.99) | |
53 | } | |
54 | if err := teststat.TestHistogram(top, quantiles, 0.01); err != nil { | |
55 | t.Fatal(err) | |
56 | } | |
57 | } |