Codebase list golang-github-go-kit-kit / 2a3c64e
Refactor cloudwatch: Reset().Walk() on every Send(), like influx impl does. Note there is a breaking API change, as the cloudwatch object now has optional parameters. Eric Feliksik 6 years ago
2 changed file(s) with 228 addition(s) and 133 deletion(s). Raw diff Collapse all Expand all
11
22 import (
33 "fmt"
4 "os"
45 "sync"
56 "time"
67
1112 "github.com/go-kit/kit/log"
1213 "github.com/go-kit/kit/metrics"
1314 "github.com/go-kit/kit/metrics/generic"
15 "github.com/go-kit/kit/metrics/internal/lv"
1416 )
1517
1618 const (
1719 maxConcurrentRequests = 20
1820 )
21
22 type Percentiles []struct {
23 s string
24 f float64
25 }
1926
2027 // CloudWatch receives metrics observations and forwards them to CloudWatch.
2128 // Create a CloudWatch object, use it to create metrics, and pass those metrics as
2330 //
2431 // To regularly report metrics to CloudWatch, use the WriteLoop helper method.
2532 type CloudWatch struct {
26 mtx sync.RWMutex
27 sem chan struct{}
28 namespace string
33 mtx sync.RWMutex
34 sem chan struct{}
35 namespace string
36 svc cloudwatchiface.CloudWatchAPI
37 counters *lv.Space
38 gauges *lv.Space
39 histograms *lv.Space
40 *cwoptions
41 }
42
43 type cwoptions struct {
44 percentiles Percentiles
45 logger log.Logger
2946 numConcurrentRequests int
30 svc cloudwatchiface.CloudWatchAPI
31 counters map[string]*counter
32 gauges map[string]*gauge
33 histograms map[string]*histogram
34 logger log.Logger
47 }
48
49 type option func(*cwoptions)
50
51 func (s *cwoptions) apply(opt option) {
52 if opt != nil {
53 opt(s)
54 }
55 }
56
57 func WithLogger(logger log.Logger) option {
58 return func(o *cwoptions) {
59 o.logger = logger
60 }
61 }
62
63 func WithPercentiles(p Percentiles) option {
64 return func(o *cwoptions) {
65 validated := Percentiles{}
66 for _, entry := range p {
67 if entry.f < 0 || entry.f > 1 {
68 continue // illegal entry
69 }
70 validated = append(validated, entry)
71 }
72 o.percentiles = validated
73 }
74 }
75
76 func WithConcurrentRequests(n int) option {
77 return func(o *cwoptions) {
78 if n > maxConcurrentRequests {
79 n = maxConcurrentRequests
80 }
81 o.numConcurrentRequests = n
82 }
3583 }
3684
3785 // New returns a CloudWatch object that may be used to create metrics.
3886 // Namespace is applied to all created metrics and maps to the CloudWatch namespace.
39 // NumConcurrent sets the number of simultaneous requests to Amazon.
40 // A good default value is 10 and the maximum is 20.
4187 // Callers must ensure that regular calls to Send are performed, either
4288 // manually or with one of the helper methods.
43 func New(namespace string, svc cloudwatchiface.CloudWatchAPI, numConcurrent int, logger log.Logger) *CloudWatch {
44 if numConcurrent > maxConcurrentRequests {
45 numConcurrent = maxConcurrentRequests
89 func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...option) *CloudWatch {
90 useOptions := &cwoptions{
91 numConcurrentRequests: 10,
92 logger: log.NewLogfmtLogger(os.Stderr),
93 percentiles: Percentiles{
94 {"50", 0.50},
95 {"90", 0.90},
96 {"95", 0.95},
97 {"99", 0.99},
98 },
99 }
100
101 for _, opt := range options {
102 useOptions.apply(opt)
46103 }
47104
48105 return &CloudWatch{
49 sem: make(chan struct{}, numConcurrent),
50 namespace: namespace,
51 numConcurrentRequests: numConcurrent,
106 sem: make(chan struct{}, useOptions.numConcurrentRequests),
107 namespace: namespace,
52108 svc: svc,
53 counters: map[string]*counter{},
54 gauges: map[string]*gauge{},
55 histograms: map[string]*histogram{},
56 logger: logger,
109 counters: lv.NewSpace(),
110 gauges: lv.NewSpace(),
111 histograms: lv.NewSpace(),
112 cwoptions: useOptions,
57113 }
58114 }
59115
60116 // NewCounter returns a counter. Observations are aggregated and emitted once
61117 // per write invocation.
62118 func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
63 cw.mtx.Lock()
64 defer cw.mtx.Unlock()
65 c := &counter{c: generic.NewCounter(name)}
66 cw.counters[name] = c
67 return c
68 }
69
70 // NewGauge returns a gauge. Observations are aggregated and emitted once per
71 // write invocation.
119 return &Counter{
120 name: name,
121 obs: cw.counters.Observe,
122 }
123 }
124
125 // NewGauge returns an gauge.
72126 func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
73 cw.mtx.Lock()
74 defer cw.mtx.Unlock()
75 g := &gauge{g: generic.NewGauge(name)}
76 cw.gauges[name] = g
77 return g
78 }
79
80 // NewHistogram returns a histogram. Observations are aggregated and emitted as
81 // per-quantile gauges, once per write invocation. 50 is a good default value
82 // for buckets.
83 func (cw *CloudWatch) NewHistogram(name string, buckets int) metrics.Histogram {
84 cw.mtx.Lock()
85 defer cw.mtx.Unlock()
86 h := &histogram{h: generic.NewHistogram(name, buckets)}
87 cw.histograms[name] = h
88 return h
127 return &Gauge{
128 name: name,
129 obs: cw.gauges.Observe,
130 add: cw.gauges.Add,
131 }
132 }
133
134 // NewHistogram returns a histogram.
135 func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
136 return &Histogram{
137 name: name,
138 obs: cw.histograms.Observe,
139 }
89140 }
90141
91142 // WriteLoop is a helper method that invokes Send every time the passed
109160
110161 var datums []*cloudwatch.MetricDatum
111162
112 for name, c := range cw.counters {
163 cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
164 value := sum(values)
113165 datums = append(datums, &cloudwatch.MetricDatum{
114166 MetricName: aws.String(name),
115 Dimensions: makeDimensions(c.c.LabelValues()...),
116 Value: aws.Float64(c.c.Value()),
167 Dimensions: makeDimensions(lvs...),
168 Value: aws.Float64(value),
117169 Timestamp: aws.Time(now),
118170 })
119 }
120
121 for name, g := range cw.gauges {
171 return true
172 })
173
174 cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
175 value := last(values)
122176 datums = append(datums, &cloudwatch.MetricDatum{
123177 MetricName: aws.String(name),
124 Dimensions: makeDimensions(g.g.LabelValues()...),
125 Value: aws.Float64(g.g.Value()),
178 Dimensions: makeDimensions(lvs...),
179 Value: aws.Float64(value),
126180 Timestamp: aws.Time(now),
127181 })
128 }
129
130 for name, h := range cw.histograms {
131 for _, p := range []struct {
132 s string
133 f float64
134 }{
135 {"50", 0.50},
136 {"90", 0.90},
137 {"95", 0.95},
138 {"99", 0.99},
139 } {
182 return true
183 })
184
185 cw.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
186 histogram := generic.NewHistogram(name, 50)
187
188 for _, v := range values {
189 histogram.Observe(v)
190 }
191
192 for _, p := range cw.percentiles {
193 value := histogram.Quantile(p.f)
140194 datums = append(datums, &cloudwatch.MetricDatum{
141195 MetricName: aws.String(fmt.Sprintf("%s_%s", name, p.s)),
142 Dimensions: makeDimensions(h.h.LabelValues()...),
143 Value: aws.Float64(h.h.Quantile(p.f)),
196 Dimensions: makeDimensions(lvs...),
197 Value: aws.Float64(value),
144198 Timestamp: aws.Time(now),
145199 })
146200 }
147 }
201 return true
202 })
148203
149204 var batches [][]*cloudwatch.MetricDatum
150205 for len(datums) > 0 {
178233 return firstErr
179234 }
180235
236 func sum(a []float64) float64 {
237 var v float64
238 for _, f := range a {
239 v += f
240 }
241 return v
242 }
243
244 func last(a []float64) float64 {
245 return a[len(a)-1]
246 }
247
181248 func min(a, b int) int {
182249 if a < b {
183250 return a
185252 return b
186253 }
187254
188 // counter is a CloudWatch counter metric.
189 type counter struct {
190 c *generic.Counter
191 }
192
193 // With implements counter
194 func (c *counter) With(labelValues ...string) metrics.Counter {
195 c.c = c.c.With(labelValues...).(*generic.Counter)
196 return c
197 }
198
199 // Add implements counter.
200 func (c *counter) Add(delta float64) {
201 c.c.Add(delta)
202 }
203
204 // gauge is a CloudWatch gauge metric.
205 type gauge struct {
206 g *generic.Gauge
207 }
208
209 // With implements gauge
210 func (g *gauge) With(labelValues ...string) metrics.Gauge {
211 g.g = g.g.With(labelValues...).(*generic.Gauge)
212 return g
213 }
214
215 // Set implements gauge
216 func (g *gauge) Set(value float64) {
217 g.g.Set(value)
218 }
219
220 // Add implements gauge
221 func (g *gauge) Add(delta float64) {
222 g.g.Add(delta)
223 }
224
225 // histogram is a CloudWatch histogram metric
226 type histogram struct {
227 h *generic.Histogram
228 }
229
230 // With implements histogram
231 func (h *histogram) With(labelValues ...string) metrics.Histogram {
232 h.h = h.h.With(labelValues...).(*generic.Histogram)
233 return h
234 }
235
236 // Observe implements histogram
237 func (h *histogram) Observe(value float64) {
238 h.h.Observe(value)
255 type observeFunc func(name string, lvs lv.LabelValues, value float64)
256
257 // Counter is a counter. Observations are forwarded to a node
258 // object, and aggregated (summed) per timeseries.
259 type Counter struct {
260 name string
261 lvs lv.LabelValues
262 obs observeFunc
263 }
264
265 // With implements metrics.Counter.
266 func (c *Counter) With(labelValues ...string) metrics.Counter {
267 return &Counter{
268 name: c.name,
269 lvs: c.lvs.With(labelValues...),
270 obs: c.obs,
271 }
272 }
273
274 // Add implements metrics.Counter.
275 func (c *Counter) Add(delta float64) {
276 c.obs(c.name, c.lvs, delta)
277 }
278
279 // Gauge is a gauge. Observations are forwarded to a node
280 // object, and aggregated (the last observation selected) per timeseries.
281 type Gauge struct {
282 name string
283 lvs lv.LabelValues
284 obs observeFunc
285 add observeFunc
286 }
287
288 // With implements metrics.Gauge.
289 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
290 return &Gauge{
291 name: g.name,
292 lvs: g.lvs.With(labelValues...),
293 obs: g.obs,
294 add: g.add,
295 }
296 }
297
298 // Set implements metrics.Gauge.
299 func (g *Gauge) Set(value float64) {
300 g.obs(g.name, g.lvs, value)
301 }
302
303 // Add implements metrics.Gauge.
304 func (g *Gauge) Add(delta float64) {
305 g.add(g.name, g.lvs, delta)
306 }
307
308 // Histogram is an Influx histrogram. Observations are aggregated into a
309 // generic.Histogram and emitted as per-quantile gauges to the Influx server.
310 type Histogram struct {
311 name string
312 lvs lv.LabelValues
313 obs observeFunc
314 }
315
316 // With implements metrics.Histogram.
317 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
318 return &Histogram{
319 name: h.name,
320 lvs: h.lvs.With(labelValues...),
321 obs: h.obs,
322 }
323 }
324
325 // Observe implements metrics.Histogram.
326 func (h *Histogram) Observe(value float64) {
327 h.obs(h.name, h.lvs, value)
239328 }
240329
241330 func makeDimensions(labelValues ...string) []*cloudwatch.Dimension {
3838 return nil, nil
3939 }
4040
41 func testDimensions(svc *mockCloudWatch, name string, labelValues ...string) error {
42 dimensions, ok := svc.dimensionsReceived[name]
41 func (mcw *mockCloudWatch) testDimensions(name string, labelValues ...string) error {
42 mcw.mtx.RLock()
43 dimensions, ok := mcw.dimensionsReceived[name]
44 mcw.mtx.RUnlock()
45
4346 if !ok {
4447 if len(labelValues) > 0 {
4548 return errors.New("Expected dimensions to be available, but none were")
6568 namespace, name := "abc", "def"
6669 label, value := "label", "value"
6770 svc := newMockCloudWatch()
68 cw := New(namespace, svc, 10, log.NewNopLogger())
71 cw := New(namespace, svc, WithLogger(log.NewNopLogger()))
6972 counter := cw.NewCounter(name).With(label, value)
7073 valuef := func() float64 {
7174 err := cw.Send()
8285 if err := teststat.TestCounter(counter, valuef); err != nil {
8386 t.Fatal("Fill and flush counter 2nd time: ", err)
8487 }
85 if err := testDimensions(svc, name, label, value); err != nil {
88 if err := svc.testDimensions(name, label, value); err != nil {
8689 t.Fatal(err)
8790 }
8891 }
97100 values = append(values, "value"+num)
98101 }
99102 svc := newMockCloudWatch()
100 cw := New(namespace, svc, 2, log.NewNopLogger())
103 cw := New(namespace, svc,
104 WithLogger(log.NewNopLogger()),
105 WithConcurrentRequests(2),
106 )
101107
102108 counters := make(map[string]metrics.Counter)
103109 var wants []float64
115121 if svc.valuesReceived[name] != wants[i] {
116122 t.Fatalf("want %f, have %f", wants[i], svc.valuesReceived[name])
117123 }
118 if err := testDimensions(svc, name, labels[i], values[i]); err != nil {
124 if err := svc.testDimensions(name, labels[i], values[i]); err != nil {
119125 t.Fatal(err)
120126 }
121127 }
125131 namespace, name := "abc", "def"
126132 label, value := "label", "value"
127133 svc := newMockCloudWatch()
128 cw := New(namespace, svc, 10, log.NewNopLogger())
134 cw := New(namespace, svc, WithLogger(log.NewNopLogger()))
129135 gauge := cw.NewGauge(name).With(label, value)
130136 valuef := func() float64 {
131137 err := cw.Send()
139145 if err := teststat.TestGauge(gauge, valuef); err != nil {
140146 t.Fatal(err)
141147 }
142 if err := testDimensions(svc, name, label, value); err != nil {
148 if err := svc.testDimensions(name, label, value); err != nil {
143149 t.Fatal(err)
144150 }
145151 }
148154 namespace, name := "abc", "def"
149155 label, value := "label", "value"
150156 svc := newMockCloudWatch()
151 cw := New(namespace, svc, 10, log.NewNopLogger())
152 histogram := cw.NewHistogram(name, 50).With(label, value)
157 cw := New(namespace, svc, WithLogger(log.NewNopLogger()))
158 histogram := cw.NewHistogram(name).With(label, value)
153159 n50 := fmt.Sprintf("%s_50", name)
154160 n90 := fmt.Sprintf("%s_90", name)
155161 n95 := fmt.Sprintf("%s_95", name)
170176 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
171177 t.Fatal(err)
172178 }
173 if err := testDimensions(svc, n50, label, value); err != nil {
179 if err := svc.testDimensions(n50, label, value); err != nil {
174180 t.Fatal(err)
175181 }
176 if err := testDimensions(svc, n90, label, value); err != nil {
182 if err := svc.testDimensions(n90, label, value); err != nil {
177183 t.Fatal(err)
178184 }
179 if err := testDimensions(svc, n95, label, value); err != nil {
185 if err := svc.testDimensions(n95, label, value); err != nil {
180186 t.Fatal(err)
181187 }
182 if err := testDimensions(svc, n99, label, value); err != nil {
188 if err := svc.testDimensions(n99, label, value); err != nil {
183189 t.Fatal(err)
184190 }
185191 }