Codebase list golang-github-go-kit-kit / c66fcdb
Merge pull request #591 from feliksik/bug/cloudwatch-reset-counter Make sure Cloudwatch-metrics only send the *new* counter values. Peter Bourgon authored 6 years ago GitHub committed 6 years ago
2 changed file(s) with 232 addition(s) and 134 deletion(s). Raw diff Collapse all Expand all
11
22 import (
33 "fmt"
4 "os"
45 "sync"
56 "time"
67
1112 "github.com/go-kit/kit/log"
1213 "github.com/go-kit/kit/metrics"
1314 "github.com/go-kit/kit/metrics/generic"
15 "github.com/go-kit/kit/metrics/internal/lv"
1416 )
1517
1618 const (
1719 maxConcurrentRequests = 20
1820 )
21
22 type Percentiles []struct {
23 s string
24 f float64
25 }
1926
2027 // CloudWatch receives metrics observations and forwards them to CloudWatch.
2128 // Create a CloudWatch object, use it to create metrics, and pass those metrics as
2633 mtx sync.RWMutex
2734 sem chan struct{}
2835 namespace string
36 svc cloudwatchiface.CloudWatchAPI
37 counters *lv.Space
38 gauges *lv.Space
39 histograms *lv.Space
40 percentiles Percentiles
41 logger log.Logger
2942 numConcurrentRequests int
30 svc cloudwatchiface.CloudWatchAPI
31 counters map[string]*counter
32 gauges map[string]*gauge
33 histograms map[string]*histogram
34 logger log.Logger
43 }
44
45 type option func(*CloudWatch)
46
47 func (s *CloudWatch) apply(opt option) {
48 if opt != nil {
49 opt(s)
50 }
51 }
52
53 func WithLogger(logger log.Logger) option {
54 return func(c *CloudWatch) {
55 c.logger = logger
56 }
57 }
58
59 func WithPercentiles(p Percentiles) option {
60 return func(c *CloudWatch) {
61 validated := Percentiles{}
62 for _, entry := range p {
63 if entry.f < 0 || entry.f > 1 {
64 continue // illegal entry
65 }
66 validated = append(validated, entry)
67 }
68 c.percentiles = validated
69 }
70 }
71
72 func WithConcurrentRequests(n int) option {
73 return func(c *CloudWatch) {
74 if n > maxConcurrentRequests {
75 n = maxConcurrentRequests
76 }
77 c.numConcurrentRequests = n
78 }
3579 }
3680
3781 // New returns a CloudWatch object that may be used to create metrics.
3882 // Namespace is applied to all created metrics and maps to the CloudWatch namespace.
39 // NumConcurrent sets the number of simultaneous requests to Amazon.
40 // A good default value is 10 and the maximum is 20.
4183 // Callers must ensure that regular calls to Send are performed, either
4284 // manually or with one of the helper methods.
43 func New(namespace string, svc cloudwatchiface.CloudWatchAPI, numConcurrent int, logger log.Logger) *CloudWatch {
44 if numConcurrent > maxConcurrentRequests {
45 numConcurrent = maxConcurrentRequests
46 }
47
48 return &CloudWatch{
49 sem: make(chan struct{}, numConcurrent),
85 func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) *CloudWatch {
86 cw := &CloudWatch{
87 sem: nil, // set below
5088 namespace: namespace,
51 numConcurrentRequests: numConcurrent,
52 svc: svc,
53 counters: map[string]*counter{},
54 gauges: map[string]*gauge{},
55 histograms: map[string]*histogram{},
56 logger: logger,
57 }
89 svc: svc,
90 counters: lv.NewSpace(),
91 gauges: lv.NewSpace(),
92 histograms: lv.NewSpace(),
93 numConcurrentRequests: 10,
94 logger: log.NewLogfmtLogger(os.Stderr),
95 percentiles: Percentiles{
96 {"50", 0.50},
97 {"90", 0.90},
98 {"95", 0.95},
99 {"99", 0.99},
100 },
101 }
102
103 for _, optFunc := range options {
104 optFunc(cw)
105 }
106
107 cw.sem = make(chan struct{}, cw.numConcurrentRequests)
108
109 return cw
58110 }
59111
60112 // NewCounter returns a counter. Observations are aggregated and emitted once
61113 // per write invocation.
62114 func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
63 cw.mtx.Lock()
64 defer cw.mtx.Unlock()
65 c := &counter{c: generic.NewCounter(name)}
66 cw.counters[name] = c
67 return c
68 }
69
70 // NewGauge returns a gauge. Observations are aggregated and emitted once per
71 // write invocation.
115 return &Counter{
116 name: name,
117 obs: cw.counters.Observe,
118 }
119 }
120
121 // NewGauge returns an gauge.
72122 func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
73 cw.mtx.Lock()
74 defer cw.mtx.Unlock()
75 g := &gauge{g: generic.NewGauge(name)}
76 cw.gauges[name] = g
77 return g
78 }
79
80 // NewHistogram returns a histogram. Observations are aggregated and emitted as
81 // per-quantile gauges, once per write invocation. 50 is a good default value
82 // for buckets.
83 func (cw *CloudWatch) NewHistogram(name string, buckets int) metrics.Histogram {
84 cw.mtx.Lock()
85 defer cw.mtx.Unlock()
86 h := &histogram{h: generic.NewHistogram(name, buckets)}
87 cw.histograms[name] = h
88 return h
123 return &Gauge{
124 name: name,
125 obs: cw.gauges.Observe,
126 add: cw.gauges.Add,
127 }
128 }
129
130 // NewHistogram returns a histogram.
131 func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
132 return &Histogram{
133 name: name,
134 obs: cw.histograms.Observe,
135 }
89136 }
90137
91138 // WriteLoop is a helper method that invokes Send every time the passed
109156
110157 var datums []*cloudwatch.MetricDatum
111158
112 for name, c := range cw.counters {
159 cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
160 value := sum(values)
113161 datums = append(datums, &cloudwatch.MetricDatum{
114162 MetricName: aws.String(name),
115 Dimensions: makeDimensions(c.c.LabelValues()...),
116 Value: aws.Float64(c.c.Value()),
163 Dimensions: makeDimensions(lvs...),
164 Value: aws.Float64(value),
117165 Timestamp: aws.Time(now),
118166 })
119 }
120
121 for name, g := range cw.gauges {
167 return true
168 })
169
170 cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
171 value := last(values)
122172 datums = append(datums, &cloudwatch.MetricDatum{
123173 MetricName: aws.String(name),
124 Dimensions: makeDimensions(g.g.LabelValues()...),
125 Value: aws.Float64(g.g.Value()),
174 Dimensions: makeDimensions(lvs...),
175 Value: aws.Float64(value),
126176 Timestamp: aws.Time(now),
127177 })
128 }
129
130 for name, h := range cw.histograms {
131 for _, p := range []struct {
132 s string
133 f float64
134 }{
135 {"50", 0.50},
136 {"90", 0.90},
137 {"95", 0.95},
138 {"99", 0.99},
139 } {
178 return true
179 })
180
181 cw.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
182 histogram := generic.NewHistogram(name, 50)
183
184 for _, v := range values {
185 histogram.Observe(v)
186 }
187
188 for _, p := range cw.percentiles {
189 value := histogram.Quantile(p.f)
140190 datums = append(datums, &cloudwatch.MetricDatum{
141191 MetricName: aws.String(fmt.Sprintf("%s_%s", name, p.s)),
142 Dimensions: makeDimensions(h.h.LabelValues()...),
143 Value: aws.Float64(h.h.Quantile(p.f)),
192 Dimensions: makeDimensions(lvs...),
193 Value: aws.Float64(value),
144194 Timestamp: aws.Time(now),
145195 })
146196 }
147 }
197 return true
198 })
148199
149200 var batches [][]*cloudwatch.MetricDatum
150201 for len(datums) > 0 {
178229 return firstErr
179230 }
180231
232 func sum(a []float64) float64 {
233 var v float64
234 for _, f := range a {
235 v += f
236 }
237 return v
238 }
239
240 func last(a []float64) float64 {
241 return a[len(a)-1]
242 }
243
181244 func min(a, b int) int {
182245 if a < b {
183246 return a
185248 return b
186249 }
187250
188 // counter is a CloudWatch counter metric.
189 type counter struct {
190 c *generic.Counter
191 }
192
193 // With implements counter
194 func (c *counter) With(labelValues ...string) metrics.Counter {
195 c.c = c.c.With(labelValues...).(*generic.Counter)
196 return c
197 }
198
199 // Add implements counter.
200 func (c *counter) Add(delta float64) {
201 c.c.Add(delta)
202 }
203
204 // gauge is a CloudWatch gauge metric.
205 type gauge struct {
206 g *generic.Gauge
207 }
208
209 // With implements gauge
210 func (g *gauge) With(labelValues ...string) metrics.Gauge {
211 g.g = g.g.With(labelValues...).(*generic.Gauge)
212 return g
213 }
214
215 // Set implements gauge
216 func (g *gauge) Set(value float64) {
217 g.g.Set(value)
218 }
219
220 // Add implements gauge
221 func (g *gauge) Add(delta float64) {
222 g.g.Add(delta)
223 }
224
225 // histogram is a CloudWatch histogram metric
226 type histogram struct {
227 h *generic.Histogram
228 }
229
230 // With implements histogram
231 func (h *histogram) With(labelValues ...string) metrics.Histogram {
232 h.h = h.h.With(labelValues...).(*generic.Histogram)
233 return h
234 }
235
236 // Observe implements histogram
237 func (h *histogram) Observe(value float64) {
238 h.h.Observe(value)
251 type observeFunc func(name string, lvs lv.LabelValues, value float64)
252
253 // Counter is a counter. Observations are forwarded to a node
254 // object, and aggregated (summed) per timeseries.
255 type Counter struct {
256 name string
257 lvs lv.LabelValues
258 obs observeFunc
259 }
260
261 // With implements metrics.Counter.
262 func (c *Counter) With(labelValues ...string) metrics.Counter {
263 return &Counter{
264 name: c.name,
265 lvs: c.lvs.With(labelValues...),
266 obs: c.obs,
267 }
268 }
269
270 // Add implements metrics.Counter.
271 func (c *Counter) Add(delta float64) {
272 c.obs(c.name, c.lvs, delta)
273 }
274
275 // Gauge is a gauge. Observations are forwarded to a node
276 // object, and aggregated (the last observation selected) per timeseries.
277 type Gauge struct {
278 name string
279 lvs lv.LabelValues
280 obs observeFunc
281 add observeFunc
282 }
283
284 // With implements metrics.Gauge.
285 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
286 return &Gauge{
287 name: g.name,
288 lvs: g.lvs.With(labelValues...),
289 obs: g.obs,
290 add: g.add,
291 }
292 }
293
294 // Set implements metrics.Gauge.
295 func (g *Gauge) Set(value float64) {
296 g.obs(g.name, g.lvs, value)
297 }
298
299 // Add implements metrics.Gauge.
300 func (g *Gauge) Add(delta float64) {
301 g.add(g.name, g.lvs, delta)
302 }
303
304 // Histogram is an Influx histrogram. Observations are aggregated into a
305 // generic.Histogram and emitted as per-quantile gauges to the Influx server.
306 type Histogram struct {
307 name string
308 lvs lv.LabelValues
309 obs observeFunc
310 }
311
312 // With implements metrics.Histogram.
313 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
314 return &Histogram{
315 name: h.name,
316 lvs: h.lvs.With(labelValues...),
317 obs: h.obs,
318 }
319 }
320
321 // Observe implements metrics.Histogram.
322 func (h *Histogram) Observe(value float64) {
323 h.obs(h.name, h.lvs, value)
239324 }
240325
241326 func makeDimensions(labelValues ...string) []*cloudwatch.Dimension {
3838 return nil, nil
3939 }
4040
41 func testDimensions(svc *mockCloudWatch, name string, labelValues ...string) error {
42 dimensions, ok := svc.dimensionsReceived[name]
41 func (mcw *mockCloudWatch) testDimensions(name string, labelValues ...string) error {
42 mcw.mtx.RLock()
43 _, hasValue := mcw.valuesReceived[name]
44 if !hasValue {
45 return nil // nothing to check; 0 samples were received
46 }
47 dimensions, ok := mcw.dimensionsReceived[name]
48 mcw.mtx.RUnlock()
49
4350 if !ok {
4451 if len(labelValues) > 0 {
4552 return errors.New("Expected dimensions to be available, but none were")
6572 namespace, name := "abc", "def"
6673 label, value := "label", "value"
6774 svc := newMockCloudWatch()
68 cw := New(namespace, svc, 10, log.NewNopLogger())
75 cw := New(namespace, svc, WithLogger(log.NewNopLogger()))
6976 counter := cw.NewCounter(name).With(label, value)
7077 valuef := func() float64 {
7178 err := cw.Send()
7986 if err := teststat.TestCounter(counter, valuef); err != nil {
8087 t.Fatal(err)
8188 }
82 if err := testDimensions(svc, name, label, value); err != nil {
89 if err := teststat.TestCounter(counter, valuef); err != nil {
90 t.Fatal("Fill and flush counter 2nd time: ", err)
91 }
92 if err := svc.testDimensions(name, label, value); err != nil {
8393 t.Fatal(err)
8494 }
8595 }
94104 values = append(values, "value"+num)
95105 }
96106 svc := newMockCloudWatch()
97 cw := New(namespace, svc, 2, log.NewNopLogger())
107 cw := New(namespace, svc,
108 WithLogger(log.NewNopLogger()),
109 WithConcurrentRequests(2),
110 )
98111
99112 counters := make(map[string]metrics.Counter)
100113 var wants []float64
112125 if svc.valuesReceived[name] != wants[i] {
113126 t.Fatalf("want %f, have %f", wants[i], svc.valuesReceived[name])
114127 }
115 if err := testDimensions(svc, name, labels[i], values[i]); err != nil {
128 if err := svc.testDimensions(name, labels[i], values[i]); err != nil {
116129 t.Fatal(err)
117130 }
118131 }
122135 namespace, name := "abc", "def"
123136 label, value := "label", "value"
124137 svc := newMockCloudWatch()
125 cw := New(namespace, svc, 10, log.NewNopLogger())
138 cw := New(namespace, svc, WithLogger(log.NewNopLogger()))
126139 gauge := cw.NewGauge(name).With(label, value)
127140 valuef := func() float64 {
128141 err := cw.Send()
136149 if err := teststat.TestGauge(gauge, valuef); err != nil {
137150 t.Fatal(err)
138151 }
139 if err := testDimensions(svc, name, label, value); err != nil {
152 if err := svc.testDimensions(name, label, value); err != nil {
140153 t.Fatal(err)
141154 }
142155 }
145158 namespace, name := "abc", "def"
146159 label, value := "label", "value"
147160 svc := newMockCloudWatch()
148 cw := New(namespace, svc, 10, log.NewNopLogger())
149 histogram := cw.NewHistogram(name, 50).With(label, value)
161 cw := New(namespace, svc, WithLogger(log.NewNopLogger()))
162 histogram := cw.NewHistogram(name).With(label, value)
150163 n50 := fmt.Sprintf("%s_50", name)
151164 n90 := fmt.Sprintf("%s_90", name)
152165 n95 := fmt.Sprintf("%s_95", name)
167180 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
168181 t.Fatal(err)
169182 }
170 if err := testDimensions(svc, n50, label, value); err != nil {
183 if err := svc.testDimensions(n50, label, value); err != nil {
171184 t.Fatal(err)
172185 }
173 if err := testDimensions(svc, n90, label, value); err != nil {
186 if err := svc.testDimensions(n90, label, value); err != nil {
174187 t.Fatal(err)
175188 }
176 if err := testDimensions(svc, n95, label, value); err != nil {
189 if err := svc.testDimensions(n95, label, value); err != nil {
177190 t.Fatal(err)
178191 }
179 if err := testDimensions(svc, n99, label, value); err != nil {
192 if err := svc.testDimensions(n99, label, value); err != nil {
180193 t.Fatal(err)
181194 }
182195 }