Codebase list golang-github-go-kit-kit / 196891a
Issue #529: replaced SetConcurrency() with new parameter in New(). Moved semaphore into the cw struct and use defer when using it. Fixed data partitioning logic and separate batch creation from goroutines launching. Improved tests. Alejandro Pedraza 6 years ago
2 changed file(s) with 49 addition(s) and 44 deletion(s). Raw diff Collapse all Expand all
1111 "github.com/go-kit/kit/log"
1212 "github.com/go-kit/kit/metrics"
1313 "github.com/go-kit/kit/metrics/generic"
14 )
15
16 const (
17 maxConcurrentRequests = 20
1418 )
1519
1620 // CloudWatch receives metrics observations and forwards them to CloudWatch.
2024 // To regularly report metrics to CloudWatch, use the WriteLoop helper method.
2125 type CloudWatch struct {
2226 mtx sync.RWMutex
27 sem chan struct{}
2328 namespace string
2429 numConcurrentRequests int
2530 svc cloudwatchiface.CloudWatchAPI
3136
3237 // New returns a CloudWatch object that may be used to create metrics.
3338 // Namespace is applied to all created metrics and maps to the CloudWatch namespace.
39 // NumConcurrent sets the number of simultaneous requests to Amazon.
40 // A good default value is 10 and the maximum is 20.
3441 // Callers must ensure that regular calls to Send are performed, either
3542 // manually or with one of the helper methods.
36 func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch {
43 func New(namespace string, svc cloudwatchiface.CloudWatchAPI, numConcurrent int, logger log.Logger) *CloudWatch {
44 if numConcurrent > maxConcurrentRequests {
45 numConcurrent = maxConcurrentRequests
46 }
47
3748 return &CloudWatch{
49 sem: make(chan struct{}, numConcurrent),
3850 namespace: namespace,
39 numConcurrentRequests: 10,
51 numConcurrentRequests: numConcurrent,
4052 svc: svc,
4153 counters: map[string]*counter{},
4254 gauges: map[string]*gauge{},
4355 histograms: map[string]*histogram{},
4456 logger: logger,
4557 }
46 }
47
48 // SetConcurrency overrides the default number (10) of concurrent requests sent to CloudWatch.
49 // CloudWatch allows a maximum of 20 metrics to be sent per request, so when Send is called,
50 // we partition the metrics and concurrently call their API. This parameter sets maximum number
51 // of concurrent requests.
52 func (cw *CloudWatch) SetConcurrency(numConcurrentRequests int) *CloudWatch {
53 cw.numConcurrentRequests = numConcurrentRequests
54 return cw
5558 }
5659
5760 // NewCounter returns a counter. Observations are aggregated and emitted once
143146 }
144147 }
145148
146 var tokens = make(chan struct{}, cw.numConcurrentRequests)
147 var errors = make(chan error)
148 var n int
149
149 var batches [][]*cloudwatch.MetricDatum
150150 for len(datums) > 0 {
151151 var batch []*cloudwatch.MetricDatum
152 lim := min(len(datums), cw.numConcurrentRequests)
152 lim := min(len(datums), maxConcurrentRequests)
153153 batch, datums = datums[:lim], datums[lim:]
154 n++
154 batches = append(batches, batch)
155 }
156
157 var errors = make(chan error, len(batches))
158 for _, batch := range batches {
155159 go func(batch []*cloudwatch.MetricDatum) {
156 tokens <- struct{}{}
160 cw.sem <- struct{}{}
161 defer func() {
162 <-cw.sem
163 }()
157164 _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
158165 Namespace: aws.String(cw.namespace),
159166 MetricData: batch,
160167 })
161 <-tokens
162168 errors <- err
163169 }(batch)
164170 }
165
166171 var firstErr error
167 for ; n > 0; n-- {
172 for i := 0; i < cap(errors); i++ {
168173 if err := <-errors; err != nil && firstErr != nil {
169174 firstErr = err
170175 }
22 import (
33 "errors"
44 "fmt"
5 "strconv"
56 "sync"
67 "testing"
78
6465 namespace, name := "abc", "def"
6566 label, value := "label", "value"
6667 svc := newMockCloudWatch()
67 cw := New(namespace, svc, log.NewNopLogger())
68 cw := New(namespace, svc, 10, log.NewNopLogger())
6869 counter := cw.NewCounter(name).With(label, value)
6970 valuef := func() float64 {
7071 err := cw.Send()
8586
8687 func TestCounterLowSendConcurrency(t *testing.T) {
8788 namespace := "abc"
88 names := []string{"name1", "name2", "name3", "name4", "name5", "name6"}
89 label, value := "label", "value"
89 var names, labels, values []string
90 for i := 1; i <= 45; i++ {
91 num := strconv.Itoa(i)
92 names = append(names, "name"+num)
93 labels = append(labels, "label"+num)
94 values = append(values, "value"+num)
95 }
9096 svc := newMockCloudWatch()
91 cw := New(namespace, svc, log.NewNopLogger())
92 cw = cw.SetConcurrency(2)
97 cw := New(namespace, svc, 2, log.NewNopLogger())
9398
9499 counters := make(map[string]metrics.Counter)
95 for _, name := range names {
96 counters[name] = cw.NewCounter(name).With(label, value)
100 var wants []float64
101 for i, name := range names {
102 counters[name] = cw.NewCounter(name).With(labels[i], values[i])
103 wants = append(wants, teststat.FillCounter(counters[name]))
97104 }
98105
99 valuef := func(name string) func() float64 {
100 return func() float64 {
101 err := cw.Send()
102 if err != nil {
103 t.Fatal(err)
104 }
105 svc.mtx.RLock()
106 defer svc.mtx.RUnlock()
107 return svc.valuesReceived[name]
108 }
106 err := cw.Send()
107 if err != nil {
108 t.Fatal(err)
109109 }
110110
111 for _, name := range names {
112 if err := teststat.TestCounter(counters[name], valuef(name)); err != nil {
113 t.Fatal(err)
111 for i, name := range names {
112 if svc.valuesReceived[name] != wants[i] {
113 t.Fatal("want %f, have %f", wants[i], svc.valuesReceived[name])
114114 }
115 if err := testDimensions(svc, name, label, value); err != nil {
115 if err := testDimensions(svc, name, labels[i], values[i]); err != nil {
116116 t.Fatal(err)
117117 }
118118 }
122122 namespace, name := "abc", "def"
123123 label, value := "label", "value"
124124 svc := newMockCloudWatch()
125 cw := New(namespace, svc, log.NewNopLogger())
125 cw := New(namespace, svc, 10, log.NewNopLogger())
126126 gauge := cw.NewGauge(name).With(label, value)
127127 valuef := func() float64 {
128128 err := cw.Send()
145145 namespace, name := "abc", "def"
146146 label, value := "label", "value"
147147 svc := newMockCloudWatch()
148 cw := New(namespace, svc, log.NewNopLogger())
148 cw := New(namespace, svc, 10, log.NewNopLogger())
149149 histogram := cw.NewHistogram(name, 50).With(label, value)
150150 n50 := fmt.Sprintf("%s_50", name)
151151 n90 := fmt.Sprintf("%s_90", name)