Codebase list golang-github-go-kit-kit / 6cf957e
Merge branch 'master' into update-grpc-transport-readme David Sudia 6 years ago
9 changed file(s) with 118 addition(s) and 31 deletion(s). Raw diff Collapse all Expand all
1212
1313 Go has emerged as the language of the server, but it remains underrepresented
1414 in so-called "modern enterprise" companies like Facebook, Twitter, Netflix, and
15 SoundCloud. These organizations have largely adopted JVM-based stacks for their
16 business logic, owing in large part to libraries and ecosystems that directly
17 support their microservice architectures.
15 SoundCloud. Many of these organizations have turned to JVM-based stacks for
16 their business logic, owing in large part to libraries and ecosystems that
17 directly support their microservice architectures.
1818
1919 To reach its next level of success, Go needs more than simple primitives and
2020 idioms. It needs a comprehensive toolkit, for coherent distributed programming
0 package main
0 package main
11
22 import (
33 "context"
0 package main
0 package main
11
22 import (
33 "flag"
0 package log
0 package log
11
22 import (
33 "io"
0 package log_test
0 package log_test
11
22 import (
33 "bytes"
1111 "github.com/go-kit/kit/log"
1212 "github.com/go-kit/kit/metrics"
1313 "github.com/go-kit/kit/metrics/generic"
14 )
15
16 const (
17 maxConcurrentRequests = 20
1418 )
1519
1620 // CloudWatch receives metrics observations and forwards them to CloudWatch.
1923 //
2024 // To regularly report metrics to CloudWatch, use the WriteLoop helper method.
2125 type CloudWatch struct {
22 mtx sync.RWMutex
23 namespace string
24 svc cloudwatchiface.CloudWatchAPI
25 counters map[string]*counter
26 gauges map[string]*gauge
27 histograms map[string]*histogram
28 logger log.Logger
26 mtx sync.RWMutex
27 sem chan struct{}
28 namespace string
29 numConcurrentRequests int
30 svc cloudwatchiface.CloudWatchAPI
31 counters map[string]*counter
32 gauges map[string]*gauge
33 histograms map[string]*histogram
34 logger log.Logger
2935 }
3036
3137 // New returns a CloudWatch object that may be used to create metrics.
3238 // Namespace is applied to all created metrics and maps to the CloudWatch namespace.
39 // NumConcurrent sets the number of simultaneous requests to Amazon.
40 // A good default value is 10 and the maximum is 20.
3341 // Callers must ensure that regular calls to Send are performed, either
3442 // manually or with one of the helper methods.
35 func New(namespace string, svc cloudwatchiface.CloudWatchAPI, logger log.Logger) *CloudWatch {
43 func New(namespace string, svc cloudwatchiface.CloudWatchAPI, numConcurrent int, logger log.Logger) *CloudWatch {
44 if numConcurrent > maxConcurrentRequests {
45 numConcurrent = maxConcurrentRequests
46 }
47
3648 return &CloudWatch{
37 namespace: namespace,
49 sem: make(chan struct{}, numConcurrent),
50 namespace: namespace,
51 numConcurrentRequests: numConcurrent,
3852 svc: svc,
3953 counters: map[string]*counter{},
4054 gauges: map[string]*gauge{},
132146 }
133147 }
134148
135 _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
136 Namespace: aws.String(cw.namespace),
137 MetricData: datums,
138 })
139 return err
149 var batches [][]*cloudwatch.MetricDatum
150 for len(datums) > 0 {
151 var batch []*cloudwatch.MetricDatum
152 lim := min(len(datums), maxConcurrentRequests)
153 batch, datums = datums[:lim], datums[lim:]
154 batches = append(batches, batch)
155 }
156
157 var errors = make(chan error, len(batches))
158 for _, batch := range batches {
159 go func(batch []*cloudwatch.MetricDatum) {
160 cw.sem <- struct{}{}
161 defer func() {
162 <-cw.sem
163 }()
164 _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
165 Namespace: aws.String(cw.namespace),
166 MetricData: batch,
167 })
168 errors <- err
169 }(batch)
170 }
171 var firstErr error
172 for i := 0; i < cap(errors); i++ {
173 if err := <-errors; err != nil && firstErr != nil {
174 firstErr = err
175 }
176 }
177
178 return firstErr
179 }
180
181 func min(a, b int) int {
182 if a < b {
183 return a
184 }
185 return b
140186 }
141187
142188 // counter is a CloudWatch counter metric.
22 import (
33 "errors"
44 "fmt"
5 "strconv"
56 "sync"
67 "testing"
78
910 "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
1011
1112 "github.com/go-kit/kit/log"
13 "github.com/go-kit/kit/metrics"
1214 "github.com/go-kit/kit/metrics/teststat"
1315 )
1416
6365 namespace, name := "abc", "def"
6466 label, value := "label", "value"
6567 svc := newMockCloudWatch()
66 cw := New(namespace, svc, log.NewNopLogger())
68 cw := New(namespace, svc, 10, log.NewNopLogger())
6769 counter := cw.NewCounter(name).With(label, value)
6870 valuef := func() float64 {
6971 err := cw.Send()
8284 }
8385 }
8486
87 func TestCounterLowSendConcurrency(t *testing.T) {
88 namespace := "abc"
89 var names, labels, values []string
90 for i := 1; i <= 45; i++ {
91 num := strconv.Itoa(i)
92 names = append(names, "name"+num)
93 labels = append(labels, "label"+num)
94 values = append(values, "value"+num)
95 }
96 svc := newMockCloudWatch()
97 cw := New(namespace, svc, 2, log.NewNopLogger())
98
99 counters := make(map[string]metrics.Counter)
100 var wants []float64
101 for i, name := range names {
102 counters[name] = cw.NewCounter(name).With(labels[i], values[i])
103 wants = append(wants, teststat.FillCounter(counters[name]))
104 }
105
106 err := cw.Send()
107 if err != nil {
108 t.Fatal(err)
109 }
110
111 for i, name := range names {
112 if svc.valuesReceived[name] != wants[i] {
113 t.Fatal("want %f, have %f", wants[i], svc.valuesReceived[name])
114 }
115 if err := testDimensions(svc, name, labels[i], values[i]); err != nil {
116 t.Fatal(err)
117 }
118 }
119 }
120
85121 func TestGauge(t *testing.T) {
86122 namespace, name := "abc", "def"
87123 label, value := "label", "value"
88124 svc := newMockCloudWatch()
89 cw := New(namespace, svc, log.NewNopLogger())
125 cw := New(namespace, svc, 10, log.NewNopLogger())
90126 gauge := cw.NewGauge(name).With(label, value)
91127 valuef := func() float64 {
92128 err := cw.Send()
109145 namespace, name := "abc", "def"
110146 label, value := "label", "value"
111147 svc := newMockCloudWatch()
112 cw := New(namespace, svc, log.NewNopLogger())
148 cw := New(namespace, svc, 10, log.NewNopLogger())
113149 histogram := cw.NewHistogram(name, 50).With(label, value)
114150 n50 := fmt.Sprintf("%s_50", name)
115151 n90 := fmt.Sprintf("%s_90", name)
1313 // TestCounter puts some deltas through the counter, and then calls the value
1414 // func to check that the counter has the correct final value.
1515 func TestCounter(counter metrics.Counter, value func() float64) error {
16 want := FillCounter(counter)
17 if have := value(); want != have {
18 return fmt.Errorf("want %f, have %f", want, have)
19 }
20
21 return nil
22 }
23
24 // FillCounter puts some deltas through the counter and returns the total value.
25 func FillCounter(counter metrics.Counter) float64 {
1626 a := rand.Perm(100)
1727 n := rand.Intn(len(a))
1828
2232 counter.Add(f)
2333 want += f
2434 }
25
26 if have := value(); want != have {
27 return fmt.Errorf("want %f, have %f", want, have)
28 }
29
30 return nil
35 return want
3136 }
3237
3338 // TestGauge puts some values through the gauge, and then calls the value func
2020 logger log.Logger
2121 }
2222
23 // NewServer constructs a new server, which implements http.Server and wraps
23 // NewServer constructs a new server, which implements http.Handler and wraps
2424 // the provided endpoint.
2525 func NewServer(
2626 e endpoint.Endpoint,