Package list golang-github-go-kit-kit / dd92366
Merge pull request #313 from go-kit/new-metrics Refactor package metrics Peter Bourgon authored 5 years ago GitHub committed 5 years ago
73 changed file(s) with 3377 addition(s) and 4065 deletion(s). Raw diff Collapse all Expand all
33
44 go:
55 - 1.5.3
6 - 1.6
7 #- tip
6 - 1.6.3
7 - 1.7
8 - tip
99 "os/signal"
1010 "strings"
1111 "syscall"
12 "time"
1312
1413 "github.com/apache/thrift/lib/go/thrift"
1514 lightstep "github.com/lightstep/lightstep-tracer-go"
6059 var ints, chars metrics.Counter
6160 {
6261 // Business level metrics.
63 ints = prometheus.NewCounter(stdprometheus.CounterOpts{
62 ints = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
6463 Namespace: "addsvc",
6564 Name: "integers_summed",
6665 Help: "Total count of integers summed via the Sum method.",
6766 }, []string{})
68 chars = prometheus.NewCounter(stdprometheus.CounterOpts{
67 chars = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
6968 Namespace: "addsvc",
7069 Name: "characters_concatenated",
7170 Help: "Total count of characters concatenated via the Concat method.",
7271 }, []string{})
7372 }
74 var duration metrics.TimeHistogram
73 var duration metrics.Histogram
7574 {
7675 // Transport level metrics.
77 duration = metrics.NewTimeHistogram(time.Nanosecond, prometheus.NewSummary(stdprometheus.SummaryOpts{
76 duration = prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
7877 Namespace: "addsvc",
7978 Name: "request_duration_ns",
8079 Help: "Request duration in nanoseconds.",
81 }, []string{"method", "success"}))
80 }, []string{"method", "success"})
8281 }
8382
8483 // Tracing domain.
131130 // Endpoint domain.
132131 var sumEndpoint endpoint.Endpoint
133132 {
134 sumDuration := duration.With(metrics.Field{Key: "method", Value: "Sum"})
133 sumDuration := duration.With("method", "Sum")
135134 sumLogger := log.NewContext(logger).With("method", "Sum")
136135
137136 sumEndpoint = addsvc.MakeSumEndpoint(service)
141140 }
142141 var concatEndpoint endpoint.Endpoint
143142 {
144 concatDuration := duration.With(metrics.Field{Key: "method", Value: "Concat"})
143 concatDuration := duration.With("method", "Concat")
145144 concatLogger := log.NewContext(logger).With("method", "Concat")
146145
147146 concatEndpoint = addsvc.MakeConcatEndpoint(service)
8787 // the duration of each invocation to the passed histogram. The middleware adds
8888 // a single field: "success", which is "true" if no error is returned, and
8989 // "false" otherwise.
90 func EndpointInstrumentingMiddleware(duration metrics.TimeHistogram) endpoint.Middleware {
90 func EndpointInstrumentingMiddleware(duration metrics.Histogram) endpoint.Middleware {
9191 return func(next endpoint.Endpoint) endpoint.Endpoint {
9292 return func(ctx context.Context, request interface{}) (response interface{}, err error) {
9393
9494 defer func(begin time.Time) {
95 f := metrics.Field{Key: "success", Value: fmt.Sprint(err == nil)}
96 duration.With(f).Observe(time.Since(begin))
95 duration.With("success", fmt.Sprint(err == nil)).Observe(time.Since(begin).Seconds())
9796 }(time.Now())
9897 return next(ctx, request)
9998
152152
153153 func (mw serviceInstrumentingMiddleware) Sum(ctx context.Context, a, b int) (int, error) {
154154 v, err := mw.next.Sum(ctx, a, b)
155 mw.ints.Add(uint64(v))
155 mw.ints.Add(float64(v))
156156 return v, err
157157 }
158158
159159 func (mw serviceInstrumentingMiddleware) Concat(ctx context.Context, a, b string) (string, error) {
160160 v, err := mw.next.Concat(ctx, a, b)
161 mw.chars.Add(uint64(len(v)))
161 mw.chars.Add(float64(len(v)))
162162 return v, err
163163 }
1010
1111 type instrumentingService struct {
1212 requestCount metrics.Counter
13 requestLatency metrics.TimeHistogram
13 requestLatency metrics.Histogram
1414 Service
1515 }
1616
1717 // NewInstrumentingService returns an instance of an instrumenting Service.
18 func NewInstrumentingService(requestCount metrics.Counter, requestLatency metrics.TimeHistogram, s Service) Service {
18 func NewInstrumentingService(requestCount metrics.Counter, requestLatency metrics.Histogram, s Service) Service {
1919 return &instrumentingService{
2020 requestCount: requestCount,
2121 requestLatency: requestLatency,
2525
2626 func (s *instrumentingService) BookNewCargo(origin, destination location.UNLocode, arrivalDeadline time.Time) (cargo.TrackingID, error) {
2727 defer func(begin time.Time) {
28 methodField := metrics.Field{Key: "method", Value: "book"}
29 s.requestCount.With(methodField).Add(1)
30 s.requestLatency.With(methodField).Observe(time.Since(begin))
28 s.requestCount.With("method", "book").Add(1)
29 s.requestLatency.With("method", "book").Observe(time.Since(begin).Seconds())
3130 }(time.Now())
3231
3332 return s.Service.BookNewCargo(origin, destination, arrivalDeadline)
3534
3635 func (s *instrumentingService) LoadCargo(id cargo.TrackingID) (c Cargo, err error) {
3736 defer func(begin time.Time) {
38 methodField := metrics.Field{Key: "method", Value: "load"}
39 s.requestCount.With(methodField).Add(1)
40 s.requestLatency.With(methodField).Observe(time.Since(begin))
37 s.requestCount.With("method", "load").Add(1)
38 s.requestLatency.With("method", "load").Observe(time.Since(begin).Seconds())
4139 }(time.Now())
4240
4341 return s.Service.LoadCargo(id)
4543
4644 func (s *instrumentingService) RequestPossibleRoutesForCargo(id cargo.TrackingID) []cargo.Itinerary {
4745 defer func(begin time.Time) {
48 methodField := metrics.Field{Key: "method", Value: "request_routes"}
49 s.requestCount.With(methodField).Add(1)
50 s.requestLatency.With(methodField).Observe(time.Since(begin))
46 s.requestCount.With("method", "request_routes").Add(1)
47 s.requestLatency.With("method", "request_routes").Observe(time.Since(begin).Seconds())
5148 }(time.Now())
5249
5350 return s.Service.RequestPossibleRoutesForCargo(id)
5552
5653 func (s *instrumentingService) AssignCargoToRoute(id cargo.TrackingID, itinerary cargo.Itinerary) (err error) {
5754 defer func(begin time.Time) {
58 methodField := metrics.Field{Key: "method", Value: "assign_to_route"}
59 s.requestCount.With(methodField).Add(1)
60 s.requestLatency.With(methodField).Observe(time.Since(begin))
55 s.requestCount.With("method", "assign_to_route").Add(1)
56 s.requestLatency.With("method", "assign_to_route").Observe(time.Since(begin).Seconds())
6157 }(time.Now())
6258
6359 return s.Service.AssignCargoToRoute(id, itinerary)
6561
6662 func (s *instrumentingService) ChangeDestination(id cargo.TrackingID, l location.UNLocode) (err error) {
6763 defer func(begin time.Time) {
68 methodField := metrics.Field{Key: "method", Value: "change_destination"}
69 s.requestCount.With(methodField).Add(1)
70 s.requestLatency.With(methodField).Observe(time.Since(begin))
64 s.requestCount.With("method", "change_destination").Add(1)
65 s.requestLatency.With("method", "change_destination").Observe(time.Since(begin).Seconds())
7166 }(time.Now())
7267
7368 return s.Service.ChangeDestination(id, l)
7570
7671 func (s *instrumentingService) Cargos() []Cargo {
7772 defer func(begin time.Time) {
78 methodField := metrics.Field{Key: "method", Value: "list_cargos"}
79 s.requestCount.With(methodField).Add(1)
80 s.requestLatency.With(methodField).Observe(time.Since(begin))
73 s.requestCount.With("method", "list_cargos").Add(1)
74 s.requestLatency.With("method", "list_cargos").Observe(time.Since(begin).Seconds())
8175 }(time.Now())
8276
8377 return s.Service.Cargos()
8579
8680 func (s *instrumentingService) Locations() []Location {
8781 defer func(begin time.Time) {
88 methodField := metrics.Field{Key: "method", Value: "list_locations"}
89 s.requestCount.With(methodField).Add(1)
90 s.requestLatency.With(methodField).Observe(time.Since(begin))
82 s.requestCount.With("method", "list_locations").Add(1)
83 s.requestLatency.With("method", "list_locations").Observe(time.Since(begin).Seconds())
9184 }(time.Now())
9285
9386 return s.Service.Locations()
1111
1212 type instrumentingService struct {
1313 requestCount metrics.Counter
14 requestLatency metrics.TimeHistogram
14 requestLatency metrics.Histogram
1515 Service
1616 }
1717
1818 // NewInstrumentingService returns an instance of an instrumenting Service.
19 func NewInstrumentingService(requestCount metrics.Counter, requestLatency metrics.TimeHistogram, s Service) Service {
19 func NewInstrumentingService(requestCount metrics.Counter, requestLatency metrics.Histogram, s Service) Service {
2020 return &instrumentingService{
2121 requestCount: requestCount,
2222 requestLatency: requestLatency,
2828 loc location.UNLocode, eventType cargo.HandlingEventType) error {
2929
3030 defer func(begin time.Time) {
31 methodField := metrics.Field{Key: "method", Value: "register_incident"}
32 s.requestCount.With(methodField).Add(1)
33 s.requestLatency.With(methodField).Observe(time.Since(begin))
31 s.requestCount.With("method", "register_incident").Add(1)
32 s.requestLatency.With("method", "register_incident").Observe(time.Since(begin).Seconds())
3433 }(time.Now())
3534
3635 return s.Service.RegisterHandlingEvent(completionTime, trackingID, voyage, loc, eventType)
1313 "golang.org/x/net/context"
1414
1515 "github.com/go-kit/kit/log"
16 "github.com/go-kit/kit/metrics"
1716 kitprometheus "github.com/go-kit/kit/metrics/prometheus"
1817
1918 "github.com/go-kit/kit/examples/shipping/booking"
8079 bs = booking.NewService(cargos, locations, handlingEvents, rs)
8180 bs = booking.NewLoggingService(log.NewContext(logger).With("component", "booking"), bs)
8281 bs = booking.NewInstrumentingService(
83 kitprometheus.NewCounter(stdprometheus.CounterOpts{
82 kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
8483 Namespace: "api",
8584 Subsystem: "booking_service",
8685 Name: "request_count",
8786 Help: "Number of requests received.",
8887 }, fieldKeys),
89 metrics.NewTimeHistogram(time.Microsecond, kitprometheus.NewSummary(stdprometheus.SummaryOpts{
88 kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
9089 Namespace: "api",
9190 Subsystem: "booking_service",
9291 Name: "request_latency_microseconds",
9392 Help: "Total duration of requests in microseconds.",
94 }, fieldKeys)), bs)
93 }, fieldKeys),
94 bs,
95 )
9596
9697 var ts tracking.Service
9798 ts = tracking.NewService(cargos, handlingEvents)
9899 ts = tracking.NewLoggingService(log.NewContext(logger).With("component", "tracking"), ts)
99100 ts = tracking.NewInstrumentingService(
100 kitprometheus.NewCounter(stdprometheus.CounterOpts{
101 kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
101102 Namespace: "api",
102103 Subsystem: "tracking_service",
103104 Name: "request_count",
104105 Help: "Number of requests received.",
105106 }, fieldKeys),
106 metrics.NewTimeHistogram(time.Microsecond, kitprometheus.NewSummary(stdprometheus.SummaryOpts{
107 kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
107108 Namespace: "api",
108109 Subsystem: "tracking_service",
109110 Name: "request_latency_microseconds",
110111 Help: "Total duration of requests in microseconds.",
111 }, fieldKeys)), ts)
112 }, fieldKeys),
113 ts,
114 )
112115
113116 var hs handling.Service
114117 hs = handling.NewService(handlingEvents, handlingEventFactory, handlingEventHandler)
115118 hs = handling.NewLoggingService(log.NewContext(logger).With("component", "handling"), hs)
116119 hs = handling.NewInstrumentingService(
117 kitprometheus.NewCounter(stdprometheus.CounterOpts{
120 kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
118121 Namespace: "api",
119122 Subsystem: "handling_service",
120123 Name: "request_count",
121124 Help: "Number of requests received.",
122125 }, fieldKeys),
123 metrics.NewTimeHistogram(time.Microsecond, kitprometheus.NewSummary(stdprometheus.SummaryOpts{
126 kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
124127 Namespace: "api",
125128 Subsystem: "handling_service",
126129 Name: "request_latency_microseconds",
127130 Help: "Total duration of requests in microseconds.",
128 }, fieldKeys)), hs)
131 }, fieldKeys),
132 hs,
133 )
129134
130135 httpLogger := log.NewContext(logger).With("component", "http")
131136
77
88 type instrumentingService struct {
99 requestCount metrics.Counter
10 requestLatency metrics.TimeHistogram
10 requestLatency metrics.Histogram
1111 Service
1212 }
1313
1414 // NewInstrumentingService returns an instance of an instrumenting Service.
15 func NewInstrumentingService(requestCount metrics.Counter, requestLatency metrics.TimeHistogram, s Service) Service {
15 func NewInstrumentingService(requestCount metrics.Counter, requestLatency metrics.Histogram, s Service) Service {
1616 return &instrumentingService{
1717 requestCount: requestCount,
1818 requestLatency: requestLatency,
2222
2323 func (s *instrumentingService) Track(id string) (Cargo, error) {
2424 defer func(begin time.Time) {
25 methodField := metrics.Field{Key: "method", Value: "track"}
26 s.requestCount.With(methodField).Add(1)
27 s.requestLatency.With(methodField).Observe(time.Since(begin))
25 s.requestCount.With("method", "track").Add(1)
26 s.requestLatency.With("method", "track").Observe(time.Since(begin).Seconds())
2827 }(time.Now())
2928
3029 return s.Service.Track(id)
88
99 type instrumentingMiddleware struct {
1010 requestCount metrics.Counter
11 requestLatency metrics.TimeHistogram
11 requestLatency metrics.Histogram
1212 countResult metrics.Histogram
1313 next StringService
1414 }
1515
1616 func (mw instrumentingMiddleware) Uppercase(s string) (output string, err error) {
1717 defer func(begin time.Time) {
18 methodField := metrics.Field{Key: "method", Value: "uppercase"}
19 errorField := metrics.Field{Key: "error", Value: fmt.Sprintf("%v", err)}
20 mw.requestCount.With(methodField).With(errorField).Add(1)
21 mw.requestLatency.With(methodField).With(errorField).Observe(time.Since(begin))
18 lvs := []string{"method", "uppercase", "error", fmt.Sprint(err == nil)}
19 mw.requestCount.With(lvs...).Add(1)
20 mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds())
2221 }(time.Now())
2322
2423 output, err = mw.next.Uppercase(s)
2726
2827 func (mw instrumentingMiddleware) Count(s string) (n int) {
2928 defer func(begin time.Time) {
30 methodField := metrics.Field{Key: "method", Value: "count"}
31 errorField := metrics.Field{Key: "error", Value: fmt.Sprintf("%v", error(nil))}
32 mw.requestCount.With(methodField).With(errorField).Add(1)
33 mw.requestLatency.With(methodField).With(errorField).Observe(time.Since(begin))
34 mw.countResult.Observe(int64(n))
29 lvs := []string{"method", "count"}
30 mw.requestCount.With(lvs...).Add(1)
31 mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds())
32 mw.countResult.Observe(float64(n))
3533 }(time.Now())
3634
3735 n = mw.next.Count(s)
22 import (
33 "net/http"
44 "os"
5 "time"
65
76 stdprometheus "github.com/prometheus/client_golang/prometheus"
87 "golang.org/x/net/context"
98
109 "github.com/go-kit/kit/log"
11 "github.com/go-kit/kit/metrics"
1210 kitprometheus "github.com/go-kit/kit/metrics/prometheus"
1311 httptransport "github.com/go-kit/kit/transport/http"
1412 )
1816 logger := log.NewLogfmtLogger(os.Stderr)
1917
2018 fieldKeys := []string{"method", "error"}
21 requestCount := kitprometheus.NewCounter(stdprometheus.CounterOpts{
19 requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
2220 Namespace: "my_group",
2321 Subsystem: "string_service",
2422 Name: "request_count",
2523 Help: "Number of requests received.",
2624 }, fieldKeys)
27 requestLatency := metrics.NewTimeHistogram(time.Microsecond, kitprometheus.NewSummary(stdprometheus.SummaryOpts{
25 requestLatency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
2826 Namespace: "my_group",
2927 Subsystem: "string_service",
3028 Name: "request_latency_microseconds",
3129 Help: "Total duration of requests in microseconds.",
32 }, fieldKeys))
33 countResult := kitprometheus.NewSummary(stdprometheus.SummaryOpts{
30 }, fieldKeys)
31 countResult := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
3432 Namespace: "my_group",
3533 Subsystem: "string_service",
3634 Name: "count_result",
88
99 func instrumentingMiddleware(
1010 requestCount metrics.Counter,
11 requestLatency metrics.TimeHistogram,
11 requestLatency metrics.Histogram,
1212 countResult metrics.Histogram,
1313 ) ServiceMiddleware {
1414 return func(next StringService) StringService {
1818
1919 type instrmw struct {
2020 requestCount metrics.Counter
21 requestLatency metrics.TimeHistogram
21 requestLatency metrics.Histogram
2222 countResult metrics.Histogram
2323 StringService
2424 }
2525
2626 func (mw instrmw) Uppercase(s string) (output string, err error) {
2727 defer func(begin time.Time) {
28 methodField := metrics.Field{Key: "method", Value: "uppercase"}
29 errorField := metrics.Field{Key: "error", Value: fmt.Sprintf("%v", err)}
30 mw.requestCount.With(methodField).With(errorField).Add(1)
31 mw.requestLatency.With(methodField).With(errorField).Observe(time.Since(begin))
28 lvs := []string{"method", "uppercase", "error", fmt.Sprint(err == nil)}
29 mw.requestCount.With(lvs...).Add(1)
30 mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds())
3231 }(time.Now())
3332
3433 output, err = mw.StringService.Uppercase(s)
3736
3837 func (mw instrmw) Count(s string) (n int) {
3938 defer func(begin time.Time) {
40 methodField := metrics.Field{Key: "method", Value: "count"}
41 errorField := metrics.Field{Key: "error", Value: fmt.Sprintf("%v", error(nil))}
42 mw.requestCount.With(methodField).With(errorField).Add(1)
43 mw.requestLatency.With(methodField).With(errorField).Observe(time.Since(begin))
44 mw.countResult.Observe(int64(n))
39 lvs := []string{"method", "uppercase"}
40 mw.requestCount.With(lvs...).Add(1)
41 mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds())
42 mw.countResult.Observe(float64(n))
4543 }(time.Now())
4644
4745 n = mw.StringService.Count(s)
33 "flag"
44 "net/http"
55 "os"
6 "time"
76
87 stdprometheus "github.com/prometheus/client_golang/prometheus"
98 "golang.org/x/net/context"
109
1110 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/metrics"
1311 kitprometheus "github.com/go-kit/kit/metrics/prometheus"
1412 httptransport "github.com/go-kit/kit/transport/http"
1513 )
2826 ctx := context.Background()
2927
3028 fieldKeys := []string{"method", "error"}
31 requestCount := kitprometheus.NewCounter(stdprometheus.CounterOpts{
29 requestCount := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
3230 Namespace: "my_group",
3331 Subsystem: "string_service",
3432 Name: "request_count",
3533 Help: "Number of requests received.",
3634 }, fieldKeys)
37 requestLatency := metrics.NewTimeHistogram(time.Microsecond, kitprometheus.NewSummary(stdprometheus.SummaryOpts{
35 requestLatency := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
3836 Namespace: "my_group",
3937 Subsystem: "string_service",
4038 Name: "request_latency_microseconds",
4139 Help: "Total duration of requests in microseconds.",
42 }, fieldKeys))
43 countResult := kitprometheus.NewSummary(stdprometheus.SummaryOpts{
40 }, fieldKeys)
41 countResult := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
4442 Namespace: "my_group",
4543 Subsystem: "string_service",
4644 Name: "count_result",
00 # package metrics
11
22 `package metrics` provides a set of uniform interfaces for service instrumentation.
3 It has **[counters][]**, **[gauges][]**, and **[histograms][]**,
4 and provides adapters to popular metrics packages, like **[expvar][]**, **[statsd][]**, and **[Prometheus][]**.
5
6 [counters]: http://prometheus.io/docs/concepts/metric_types/#counter
7 [gauges]: http://prometheus.io/docs/concepts/metric_types/#gauge
8 [histograms]: http://prometheus.io/docs/concepts/metric_types/#histogram
9 [expvar]: https://golang.org/pkg/expvar
10 [statsd]: https://github.com/etsy/statsd
11 [Prometheus]: http://prometheus.io
3 It has
4 [counters](http://prometheus.io/docs/concepts/metric_types/#counter),
5 [gauges](http://prometheus.io/docs/concepts/metric_types/#gauge), and
6 [histograms](http://prometheus.io/docs/concepts/metric_types/#histogram),
7 and provides adapters to popular metrics packages, like
8 [expvar](https://golang.org/pkg/expvar),
9 [StatsD](https://github.com/etsy/statsd), and
10 [Prometheus](https://prometheus.io).
1211
1312 ## Rationale
1413
15 Code instrumentation is absolutely essential to achieve [observability][] into a distributed system.
14 Code instrumentation is absolutely essential to achieve
15 [observability](https://speakerdeck.com/mattheath/observability-in-micro-service-architectures)
16 into a distributed system.
1617 Metrics and instrumentation tools have coalesced around a few well-defined idioms.
17 `package metrics` provides a common, minimal interface those idioms for service authors.
18
19 [observability]: https://speakerdeck.com/mattheath/observability-in-micro-service-architectures
18 `package metrics` provides a common, minimal interface those idioms for service authors.
2019
2120 ## Usage
2221
2322 A simple counter, exported via expvar.
2423
2524 ```go
26 import "github.com/go-kit/kit/metrics/expvar"
25 import (
26 "github.com/go-kit/kit/metrics"
27 "github.com/go-kit/kit/metrics/expvar"
28 )
2729
2830 func main() {
29 myCount := expvar.NewCounter("my_count")
31 var myCount metrics.Counter
32 myCount = expvar.NewCounter("my_count")
3033 myCount.Add(1)
3134 }
3235 ```
3336
34 A histogram for request duration, exported via a Prometheus summary with
35 dynamically-computed quantiles.
37 A histogram for request duration,
38 exported via a Prometheus summary with dynamically-computed quantiles.
3639
3740 ```go
3841 import (
42 "time"
43
3944 stdprometheus "github.com/prometheus/client_golang/prometheus"
4045
4146 "github.com/go-kit/kit/metrics"
4247 "github.com/go-kit/kit/metrics/prometheus"
4348 )
4449
45 var requestDuration = prometheus.NewSummary(stdprometheus.SummaryOpts{
46 Namespace: "myservice",
47 Subsystem: "api",
48 Name: "request_duration_nanoseconds_count",
49 Help: "Total time spent serving requests.",
50 }, []string{})
50 func main() {
51 var dur metrics.Histogram = prometheus.NewSummary(stdprometheus.SummaryOpts{
52 Namespace: "myservice",
53 Subsystem: "api",
54 Name: "request_duration_seconds",
55 Help: "Total time spent serving requests.",
56 }, []string{})
57 // ...
58 }
5159
52 func handleRequest() {
53 defer func(begin time.Time) { requestDuration.Observe(time.Since(begin)) }(time.Now())
60 func handleRequest(dur metrics.Histogram) {
61 defer func(begin time.Time) { dur.Observe(time.Since(begin).Seconds()) }(time.Now())
5462 // handle request
5563 }
5664 ```
5765
58 A gauge for the number of goroutines currently running, exported via statsd.
66 A gauge for the number of goroutines currently running, exported via StatsD.
5967
6068 ```go
6169 import (
6472 "runtime"
6573 "time"
6674
75 "github.com/go-kit/kit/metrics"
6776 "github.com/go-kit/kit/metrics/statsd"
6877 )
6978
7079 func main() {
71 statsdWriter, err := net.Dial("udp", "127.0.0.1:8126")
72 if err != nil {
73 panic(err)
74 }
80 statsd := statsd.New("foo_svc.", log.NewNopLogger())
81 report := time.NewTicker(5 * time.Second)
82 defer report.Stop()
83 go statsd.SendLoop(report.C, "tcp", "statsd.internal:8125")
84 goroutines := statsd.NewGauge("goroutine_count")
85 go exportGoroutines(goroutines)
86 // ...
87 }
7588
76 reportInterval := 5 * time.Second
77 goroutines := statsd.NewGauge(statsdWriter, "total_goroutines", reportInterval)
78 for range time.Tick(reportInterval) {
79 goroutines.Set(float64(runtime.NumGoroutine()))
89 func exportGoroutines(g metrics.Gauge) {
90 for range time.Tick(time.Second) {
91 g.Set(float64(runtime.NumGoroutine()))
8092 }
8193 }
8294 ```
0 // Package circonus provides a Circonus backend for metrics.
1 package circonus
2
3 import (
4 "github.com/circonus-labs/circonus-gometrics"
5
6 "github.com/go-kit/kit/metrics"
7 )
8
9 // Circonus wraps a CirconusMetrics object and provides constructors for each of
10 // the Go kit metrics. The CirconusMetrics object manages aggregation of
11 // observations and emission to the Circonus server.
12 type Circonus struct {
13 m *circonusgometrics.CirconusMetrics
14 }
15
16 // New creates a new Circonus object wrapping the passed CirconusMetrics, which
17 // the caller should create and set in motion. The Circonus object can be used
18 // to construct individual Go kit metrics.
19 func New(m *circonusgometrics.CirconusMetrics) *Circonus {
20 return &Circonus{
21 m: m,
22 }
23 }
24
25 // NewCounter returns a counter metric with the given name.
26 func (c *Circonus) NewCounter(name string) *Counter {
27 return &Counter{
28 name: name,
29 m: c.m,
30 }
31 }
32
33 // NewGauge returns a gauge metric with the given name.
34 func (c *Circonus) NewGauge(name string) *Gauge {
35 return &Gauge{
36 name: name,
37 m: c.m,
38 }
39 }
40
41 // NewHistogram returns a histogram metric with the given name.
42 func (c *Circonus) NewHistogram(name string) *Histogram {
43 return &Histogram{
44 h: c.m.NewHistogram(name),
45 }
46 }
47
48 // Counter is a Circonus implementation of a counter metric.
49 type Counter struct {
50 name string
51 m *circonusgometrics.CirconusMetrics
52 }
53
54 // With implements Counter, but is a no-op, because Circonus metrics have no
55 // concept of per-observation label values.
56 func (c *Counter) With(labelValues ...string) metrics.Counter { return c }
57
58 // Add implements Counter. Delta is converted to uint64; precision will be lost.
59 func (c *Counter) Add(delta float64) { c.m.Add(c.name, uint64(delta)) }
60
61 // Gauge is a Circonus implementation of a gauge metric.
62 type Gauge struct {
63 name string
64 m *circonusgometrics.CirconusMetrics
65 }
66
67 // With implements Gauge, but is a no-op, because Circonus metrics have no
68 // concept of per-observation label values.
69 func (g *Gauge) With(labelValues ...string) metrics.Gauge { return g }
70
71 // Set implements Gauge.
72 func (g *Gauge) Set(value float64) { g.m.SetGauge(g.name, value) }
73
74 // Histogram is a Circonus implementation of a histogram metric.
75 type Histogram struct {
76 h *circonusgometrics.Histogram
77 }
78
79 // With implements Histogram, but is a no-op, because Circonus metrics have no
80 // concept of per-observation label values.
81 func (h *Histogram) With(labelValues ...string) metrics.Histogram { return h }
82
83 // Observe implements Histogram. No precision is lost.
84 func (h *Histogram) Observe(value float64) { h.h.RecordValue(value) }
0 package circonus
1
2 import (
3 "encoding/json"
4 "net/http"
5 "net/http/httptest"
6 "regexp"
7 "strconv"
8 "testing"
9
10 "github.com/circonus-labs/circonus-gometrics"
11 "github.com/circonus-labs/circonus-gometrics/checkmgr"
12
13 "github.com/go-kit/kit/metrics/generic"
14 "github.com/go-kit/kit/metrics/teststat"
15 )
16
17 func TestCounter(t *testing.T) {
18 // The only way to extract values from Circonus is to pose as a Circonus
19 // server and receive real HTTP writes.
20 const name = "abc"
21 var val int64
22 s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
23 var res map[string]struct {
24 Value int64 `json:"_value"` // reverse-engineered :\
25 }
26 json.NewDecoder(r.Body).Decode(&res)
27 val = res[name].Value
28 }))
29 defer s.Close()
30
31 // Set up a Circonus object, submitting to our HTTP server.
32 m := newCirconusMetrics(s.URL)
33 counter := New(m).NewCounter(name).With("label values", "not supported")
34 value := func() float64 { m.Flush(); return float64(val) }
35
36 // Engage.
37 if err := teststat.TestCounter(counter, value); err != nil {
38 t.Fatal(err)
39 }
40 }
41
42 func TestGauge(t *testing.T) {
43 const name = "def"
44 var val float64
45 s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
46 var res map[string]struct {
47 Value string `json:"_value"`
48 }
49 json.NewDecoder(r.Body).Decode(&res)
50 val, _ = strconv.ParseFloat(res[name].Value, 64)
51 }))
52 defer s.Close()
53
54 m := newCirconusMetrics(s.URL)
55 gauge := New(m).NewGauge(name).With("label values", "not supported")
56 value := func() float64 { m.Flush(); return val }
57
58 if err := teststat.TestGauge(gauge, value); err != nil {
59 t.Fatal(err)
60 }
61 }
62
63 func TestHistogram(t *testing.T) {
64 const name = "ghi"
65
66 // Circonus just emits bucketed counts. We'll dump them into a generic
67 // histogram (losing some precision) and take statistics from there. Note
68 // this does assume that the generic histogram computes statistics properly,
69 // but we have another test for that :)
70 re := regexp.MustCompile(`^H\[([0-9\.e\+]+)\]=([0-9]+)$`) // H[1.2e+03]=456
71
72 var p50, p90, p95, p99 float64
73 s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
74 var res map[string]struct {
75 Values []string `json:"_value"` // reverse-engineered :\
76 }
77 json.NewDecoder(r.Body).Decode(&res)
78
79 h := generic.NewHistogram("dummy", len(res[name].Values)) // match tbe bucket counts
80 for _, v := range res[name].Values {
81 match := re.FindStringSubmatch(v)
82 f, _ := strconv.ParseFloat(match[1], 64)
83 n, _ := strconv.ParseInt(match[2], 10, 64)
84 for i := int64(0); i < n; i++ {
85 h.Observe(f)
86 }
87 }
88
89 p50 = h.Quantile(0.50)
90 p90 = h.Quantile(0.90)
91 p95 = h.Quantile(0.95)
92 p99 = h.Quantile(0.99)
93 }))
94 defer s.Close()
95
96 m := newCirconusMetrics(s.URL)
97 histogram := New(m).NewHistogram(name).With("label values", "not supported")
98 quantiles := func() (float64, float64, float64, float64) { m.Flush(); return p50, p90, p95, p99 }
99
100 // Circonus metrics, because they do their own bucketing, are less precise
101 // than other systems. So, we bump the tolerance to 5 percent.
102 if err := teststat.TestHistogram(histogram, quantiles, 0.05); err != nil {
103 t.Fatal(err)
104 }
105 }
106
107 func newCirconusMetrics(url string) *circonusgometrics.CirconusMetrics {
108 m, err := circonusgometrics.NewCirconusMetrics(&circonusgometrics.Config{
109 CheckManager: checkmgr.Config{
110 Check: checkmgr.CheckConfig{
111 SubmissionURL: url,
112 },
113 },
114 })
115 if err != nil {
116 panic(err)
117 }
118 return m
119 }
0 // Package discard implements a backend for package metrics that succeeds
1 // without doing anything.
0 // Package discard provides a no-op metrics backend.
21 package discard
32
43 import "github.com/go-kit/kit/metrics"
54
6 type counter struct {
7 name string
8 }
5 type counter struct{}
96
10 // NewCounter returns a Counter that does nothing.
11 func NewCounter(name string) metrics.Counter { return &counter{name} }
7 // NewCounter returns a new no-op counter.
8 func NewCounter() metrics.Counter { return counter{} }
129
13 func (c *counter) Name() string { return c.name }
14 func (c *counter) With(metrics.Field) metrics.Counter { return c }
15 func (c *counter) Add(delta uint64) {}
10 // With implements Counter.
11 func (c counter) With(labelValues ...string) metrics.Counter { return c }
1612
17 type gauge struct {
18 name string
19 }
13 // Add implements Counter.
14 func (c counter) Add(delta float64) {}
2015
21 // NewGauge returns a Gauge that does nothing.
22 func NewGauge(name string) metrics.Gauge { return &gauge{name} }
16 type gauge struct{}
2317
24 func (g *gauge) Name() string { return g.name }
25 func (g *gauge) With(metrics.Field) metrics.Gauge { return g }
26 func (g *gauge) Set(value float64) {}
27 func (g *gauge) Add(delta float64) {}
28 func (g *gauge) Get() float64 { return 0 }
18 // NewGauge returns a new no-op gauge.
19 func NewGauge() metrics.Gauge { return gauge{} }
2920
30 type histogram struct {
31 name string
32 }
21 // With implements Gauge.
22 func (g gauge) With(labelValues ...string) metrics.Gauge { return g }
3323
34 // NewHistogram returns a Histogram that does nothing.
35 func NewHistogram(name string) metrics.Histogram { return &histogram{name} }
24 // Set implements Gauge.
25 func (g gauge) Set(value float64) {}
3626
37 func (h *histogram) Name() string { return h.name }
38 func (h *histogram) With(metrics.Field) metrics.Histogram { return h }
39 func (h *histogram) Observe(value int64) {}
40 func (h *histogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
41 return []metrics.Bucket{}, []metrics.Quantile{}
42 }
27 type histogram struct{}
28
29 // NewHistogram returns a new no-op histogram.
30 func NewHistogram() metrics.Histogram { return histogram{} }
31
32 // With implements Histogram.
33 func (h histogram) With(labelValues ...string) metrics.Histogram { return h }
34
35 // Observe implements histogram.
36 func (h histogram) Observe(value float64) {}
00 // Package metrics provides a framework for application instrumentation. All
11 // metrics are safe for concurrent use. Considerable design influence has been
22 // taken from https://github.com/codahale/metrics and https://prometheus.io.
3 //
4 // This package contains the common interfaces. Your code should take these
5 // interfaces as parameters. Implementations are provided for different
6 // instrumentation systems in the various subdirectories.
7 //
8 // Usage
9 //
10 // Metrics are dependencies and should be passed to the components that need
11 // them in the same way you'd construct and pass a database handle, or reference
12 // to another component. So, create metrics in your func main, using whichever
13 // concrete implementation is appropriate for your organization.
14 //
15 // latency := prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
16 // Namespace: "myteam",
17 // Subsystem: "foosvc",
18 // Name: "request_latency_seconds",
19 // Help: "Incoming request latency in seconds."
20 // }, []string{"method", "status_code"})
21 //
22 // Write your components to take the metrics they will use as parameters to
23 // their constructors. Use the interface types, not the concrete types. That is,
24 //
25 // // NewAPI takes metrics.Histogram, not *prometheus.Summary
26 // func NewAPI(s Store, logger log.Logger, latency metrics.Histogram) *API {
27 // // ...
28 // }
29 //
30 // func (a *API) ServeFoo(w http.ResponseWriter, r *http.Request) {
31 // begin := time.Now()
32 // // ...
33 // a.latency.Observe(time.Since(begin).Seconds())
34 // }
35 //
36 // Finally, pass the metrics as dependencies when building your object graph.
37 // This should happen in func main, not in the global scope.
38 //
39 // api := NewAPI(store, logger, latency)
40 // http.ListenAndServe("/", api)
41 //
42 // Implementation details
43 //
44 // Each telemetry system has different semantics for label values, push vs.
45 // pull, support for histograms, etc. These properties influence the design of
46 // their respective packages. This table attempts to summarize the key points of
47 // distinction.
48 //
49 // SYSTEM DIM COUNTERS GAUGES HISTOGRAMS
50 // dogstatsd n batch, push-aggregate batch, push-aggregate native, batch, push-each
51 // statsd 1 batch, push-aggregate batch, push-aggregate native, batch, push-each
52 // graphite 1 batch, push-aggregate batch, push-aggregate synthetic, batch, push-aggregate
53 // expvar 1 atomic atomic synthetic, batch, in-place expose
54 // influx n custom custom custom
55 // prometheus n native native native
56 // circonus 1 native native native
57 //
358 package metrics
0 // Package dogstatsd implements a DogStatsD backend for package metrics.
0 // Package dogstatsd provides a DogStatsD backend for package metrics. It's very
1 // similar to StatsD, but supports arbitrary tags per-metric, which map to Go
2 // kit's label values. So, while label values are no-ops in StatsD, they are
3 // supported here. For more details, see the documentation at
4 // http://docs.datadoghq.com/guides/dogstatsd/.
15 //
2 // This implementation supports Datadog tags that provide additional metric
3 // filtering capabilities. See the DogStatsD documentation for protocol
4 // specifics:
5 // http://docs.datadoghq.com/guides/dogstatsd/
6 //
6 // This package batches observations and emits them on some schedule to the
7 // remote server. This is useful even if you connect to your DogStatsD server
8 // over UDP. Emitting one network packet per observation can quickly overwhelm
9 // even the fastest internal network.
710 package dogstatsd
811
912 import (
10 "bytes"
1113 "fmt"
1214 "io"
13 "log"
14 "math"
15 "strings"
1516 "time"
1617
17 "sync/atomic"
18
18 "github.com/go-kit/kit/log"
1919 "github.com/go-kit/kit/metrics"
20 "github.com/go-kit/kit/metrics/internal/lv"
21 "github.com/go-kit/kit/metrics/internal/ratemap"
22 "github.com/go-kit/kit/util/conn"
2023 )
2124
22 // dogstatsd metrics were based on the statsd package in go-kit
23
24 const maxBufferSize = 1400 // bytes
25
26 type counter struct {
27 key string
28 c chan string
29 tags []metrics.Field
30 }
31
32 // NewCounter returns a Counter that emits observations in the DogStatsD protocol
33 // to the passed writer. Observations are buffered for the report interval or
34 // until the buffer exceeds a max packet size, whichever comes first.
25 // Dogstatsd receives metrics observations and forwards them to a DogStatsD
26 // server. Create a Dogstatsd object, use it to create metrics, and pass those
27 // metrics as dependencies to the components that will use them.
3528 //
36 // TODO: support for sampling.
37 func NewCounter(w io.Writer, key string, reportInterval time.Duration, globalTags []metrics.Field) metrics.Counter {
38 return NewCounterTick(w, key, time.Tick(reportInterval), globalTags)
39 }
40
41 // NewCounterTick is the same as NewCounter, but allows the user to pass in a
42 // ticker channel instead of invoking time.Tick.
43 func NewCounterTick(w io.Writer, key string, reportTicker <-chan time.Time, tags []metrics.Field) metrics.Counter {
44 c := &counter{
45 key: key,
46 c: make(chan string),
47 tags: tags,
48 }
49 go fwd(w, key, reportTicker, c.c)
50 return c
51 }
52
53 func (c *counter) Name() string { return c.key }
54
55 func (c *counter) With(f metrics.Field) metrics.Counter {
56 return &counter{
57 key: c.key,
58 c: c.c,
59 tags: append(c.tags, f),
60 }
61 }
62
63 func (c *counter) Add(delta uint64) { c.c <- applyTags(fmt.Sprintf("%d|c", delta), c.tags) }
64
65 type gauge struct {
66 key string
67 lastValue uint64 // math.Float64frombits
68 g chan string
69 tags []metrics.Field
70 }
71
72 // NewGauge returns a Gauge that emits values in the DogStatsD protocol to the
73 // passed writer. Values are buffered for the report interval or until the
74 // buffer exceeds a max packet size, whichever comes first.
29 // All metrics are buffered until WriteTo is called. Counters and gauges are
30 // aggregated into a single observation per timeseries per write. Timings and
31 // histograms are buffered but not aggregated.
7532 //
76 // TODO: support for sampling.
77 func NewGauge(w io.Writer, key string, reportInterval time.Duration, tags []metrics.Field) metrics.Gauge {
78 return NewGaugeTick(w, key, time.Tick(reportInterval), tags)
79 }
80
81 // NewGaugeTick is the same as NewGauge, but allows the user to pass in a ticker
82 // channel instead of invoking time.Tick.
83 func NewGaugeTick(w io.Writer, key string, reportTicker <-chan time.Time, tags []metrics.Field) metrics.Gauge {
84 g := &gauge{
85 key: key,
86 g: make(chan string),
87 tags: tags,
88 }
89 go fwd(w, key, reportTicker, g.g)
90 return g
91 }
92
93 func (g *gauge) Name() string { return g.key }
94
95 func (g *gauge) With(f metrics.Field) metrics.Gauge {
96 return &gauge{
97 key: g.key,
98 lastValue: g.lastValue,
99 g: g.g,
100 tags: append(g.tags, f),
101 }
102 }
103
104 func (g *gauge) Add(delta float64) {
105 // https://github.com/etsy/statsd/blob/master/docs/metric_types.md#gauges
106 sign := "+"
107 if delta < 0 {
108 sign, delta = "-", -delta
109 }
110 g.g <- applyTags(fmt.Sprintf("%s%f|g", sign, delta), g.tags)
111 }
112
113 func (g *gauge) Set(value float64) {
114 atomic.StoreUint64(&g.lastValue, math.Float64bits(value))
115 g.g <- applyTags(fmt.Sprintf("%f|g", value), g.tags)
116 }
117
118 func (g *gauge) Get() float64 {
119 return math.Float64frombits(atomic.LoadUint64(&g.lastValue))
120 }
121
122 // NewCallbackGauge emits values in the DogStatsD protocol to the passed writer.
123 // It collects values every scrape interval from the callback. Values are
124 // buffered for the report interval or until the buffer exceeds a max packet
125 // size, whichever comes first. The report and scrape intervals may be the
126 // same. The callback determines the value, and fields are ignored, so
127 // NewCallbackGauge returns nothing.
128 func NewCallbackGauge(w io.Writer, key string, reportInterval, scrapeInterval time.Duration, callback func() float64) {
129 NewCallbackGaugeTick(w, key, time.Tick(reportInterval), time.Tick(scrapeInterval), callback)
130 }
131
132 // NewCallbackGaugeTick is the same as NewCallbackGauge, but allows the user to
133 // pass in ticker channels instead of durations to control report and scrape
134 // intervals.
135 func NewCallbackGaugeTick(w io.Writer, key string, reportTicker, scrapeTicker <-chan time.Time, callback func() float64) {
136 go fwd(w, key, reportTicker, emitEvery(scrapeTicker, callback))
137 }
138
139 func emitEvery(emitTicker <-chan time.Time, callback func() float64) <-chan string {
140 c := make(chan string)
141 go func() {
142 for range emitTicker {
143 c <- fmt.Sprintf("%f|g", callback())
144 }
145 }()
146 return c
147 }
148
149 type histogram struct {
150 key string
151 h chan string
152 tags []metrics.Field
153 }
154
155 // NewHistogram returns a Histogram that emits observations in the DogStatsD
156 // protocol to the passed writer. Observations are buffered for the reporting
157 // interval or until the buffer exceeds a max packet size, whichever comes
158 // first.
159 //
160 // NewHistogram is mapped to a statsd Timing, so observations should represent
161 // milliseconds. If you observe in units of nanoseconds, you can make the
162 // translation with a ScaledHistogram:
163 //
164 // NewScaledHistogram(dogstatsdHistogram, time.Millisecond)
165 //
166 // You can also enforce the constraint in a typesafe way with a millisecond
167 // TimeHistogram:
168 //
169 // NewTimeHistogram(dogstatsdHistogram, time.Millisecond)
170 //
171 // TODO: support for sampling.
172 func NewHistogram(w io.Writer, key string, reportInterval time.Duration, tags []metrics.Field) metrics.Histogram {
173 return NewHistogramTick(w, key, time.Tick(reportInterval), tags)
174 }
175
176 // NewHistogramTick is the same as NewHistogram, but allows the user to pass a
177 // ticker channel instead of invoking time.Tick.
178 func NewHistogramTick(w io.Writer, key string, reportTicker <-chan time.Time, tags []metrics.Field) metrics.Histogram {
179 h := &histogram{
180 key: key,
181 h: make(chan string),
182 tags: tags,
183 }
184 go fwd(w, key, reportTicker, h.h)
185 return h
186 }
187
188 func (h *histogram) Name() string { return h.key }
189
190 func (h *histogram) With(f metrics.Field) metrics.Histogram {
191 return &histogram{
192 key: h.key,
193 h: h.h,
194 tags: append(h.tags, f),
195 }
196 }
197
198 func (h *histogram) Observe(value int64) {
199 h.h <- applyTags(fmt.Sprintf("%d|ms", value), h.tags)
200 }
201
202 func (h *histogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
203 // TODO(pb): no way to do this without introducing e.g. codahale/hdrhistogram
204 return []metrics.Bucket{}, []metrics.Quantile{}
205 }
206
207 func fwd(w io.Writer, key string, reportTicker <-chan time.Time, c <-chan string) {
208 buf := &bytes.Buffer{}
209 for {
210 select {
211 case s := <-c:
212 fmt.Fprintf(buf, "%s:%s\n", key, s)
213 if buf.Len() > maxBufferSize {
214 flush(w, buf)
33 // To regularly report metrics to an io.Writer, use the WriteLoop helper method.
34 // To send to a DogStatsD server, use the SendLoop helper method.
35 type Dogstatsd struct {
36 prefix string
37 rates *ratemap.RateMap
38 counters *lv.Space
39 gauges *lv.Space
40 timings *lv.Space
41 histograms *lv.Space
42 logger log.Logger
43 }
44
45 // New returns a Dogstatsd object that may be used to create metrics. Prefix is
46 // applied to all created metrics. Callers must ensure that regular calls to
47 // WriteTo are performed, either manually or with one of the helper methods.
48 func New(prefix string, logger log.Logger) *Dogstatsd {
49 return &Dogstatsd{
50 prefix: prefix,
51 rates: ratemap.New(),
52 counters: lv.NewSpace(),
53 gauges: lv.NewSpace(),
54 timings: lv.NewSpace(),
55 histograms: lv.NewSpace(),
56 logger: logger,
57 }
58 }
59
60 // NewCounter returns a counter, sending observations to this Dogstatsd object.
61 func (d *Dogstatsd) NewCounter(name string, sampleRate float64) *Counter {
62 d.rates.Set(d.prefix+name, sampleRate)
63 return &Counter{
64 name: d.prefix + name,
65 obs: d.counters.Observe,
66 }
67 }
68
69 // NewGauge returns a gauge, sending observations to this Dogstatsd object.
70 func (d *Dogstatsd) NewGauge(name string) *Gauge {
71 return &Gauge{
72 name: d.prefix + name,
73 obs: d.gauges.Observe,
74 }
75 }
76
77 // NewTiming returns a histogram whose observations are interpreted as
78 // millisecond durations, and are forwarded to this Dogstatsd object.
79 func (d *Dogstatsd) NewTiming(name string, sampleRate float64) *Timing {
80 d.rates.Set(d.prefix+name, sampleRate)
81 return &Timing{
82 name: d.prefix + name,
83 obs: d.timings.Observe,
84 }
85 }
86
87 // NewHistogram returns a histogram whose observations are of an unspecified
88 // unit, and are forwarded to this Dogstatsd object.
89 func (d *Dogstatsd) NewHistogram(name string, sampleRate float64) *Histogram {
90 d.rates.Set(d.prefix+name, sampleRate)
91 return &Histogram{
92 name: d.prefix + name,
93 obs: d.histograms.Observe,
94 }
95 }
96
97 // WriteLoop is a helper method that invokes WriteTo to the passed writer every
98 // time the passed channel fires. This method blocks until the channel is
99 // closed, so clients probably want to run it in its own goroutine. For typical
100 // usage, create a time.Ticker and pass its C channel to this method.
101 func (d *Dogstatsd) WriteLoop(c <-chan time.Time, w io.Writer) {
102 for range c {
103 if _, err := d.WriteTo(w); err != nil {
104 d.logger.Log("during", "WriteTo", "err", err)
105 }
106 }
107 }
108
109 // SendLoop is a helper method that wraps WriteLoop, passing a managed
110 // connection to the network and address. Like WriteLoop, this method blocks
111 // until the channel is closed, so clients probably want to start it in its own
112 // goroutine. For typical usage, create a time.Ticker and pass its C channel to
113 // this method.
114 func (d *Dogstatsd) SendLoop(c <-chan time.Time, network, address string) {
115 d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger))
116 }
117
118 // WriteTo flushes the buffered content of the metrics to the writer, in
119 // DogStatsD format. WriteTo abides best-effort semantics, so observations are
120 // lost if there is a problem with the write. Clients should be sure to call
121 // WriteTo regularly, ideally through the WriteLoop or SendLoop helper methods.
122 func (d *Dogstatsd) WriteTo(w io.Writer) (count int64, err error) {
123 var n int
124
125 d.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
126 n, err = fmt.Fprintf(w, "%s:%f|c%s%s\n", name, sum(values), sampling(d.rates.Get(name)), tagValues(lvs))
127 if err != nil {
128 return false
129 }
130 count += int64(n)
131 return true
132 })
133 if err != nil {
134 return count, err
135 }
136
137 d.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
138 n, err = fmt.Fprintf(w, "%s:%f|g%s\n", name, last(values), tagValues(lvs))
139 if err != nil {
140 return false
141 }
142 count += int64(n)
143 return true
144 })
145 if err != nil {
146 return count, err
147 }
148
149 d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
150 sampleRate := d.rates.Get(name)
151 for _, value := range values {
152 n, err = fmt.Fprintf(w, "%s:%f|ms%s%s\n", name, value, sampling(sampleRate), tagValues(lvs))
153 if err != nil {
154 return false
215155 }
216
217 case <-reportTicker:
218 flush(w, buf)
219 }
220 }
221 }
222
223 func flush(w io.Writer, buf *bytes.Buffer) {
224 if buf.Len() <= 0 {
225 return
226 }
227 if _, err := w.Write(buf.Bytes()); err != nil {
228 log.Printf("error: could not write to dogstatsd: %v", err)
229 }
230 buf.Reset()
231 }
232
233 func applyTags(value string, tags []metrics.Field) string {
234 if len(tags) > 0 {
235 var tagsString string
236 for _, t := range tags {
237 switch tagsString {
238 case "":
239 tagsString = t.Key + ":" + t.Value
240 default:
241 tagsString = tagsString + "," + t.Key + ":" + t.Value
156 count += int64(n)
157 }
158 return true
159 })
160 if err != nil {
161 return count, err
162 }
163
164 d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
165 sampleRate := d.rates.Get(name)
166 for _, value := range values {
167 n, err = fmt.Fprintf(w, "%s:%f|h%s%s\n", name, value, sampling(sampleRate), tagValues(lvs))
168 if err != nil {
169 return false
242170 }
243 }
244 value = value + "|#" + tagsString
245 }
246 return value
247 }
171 count += int64(n)
172 }
173 return true
174 })
175 if err != nil {
176 return count, err
177 }
178
179 return count, err
180 }
181
182 func sum(a []float64) float64 {
183 var v float64
184 for _, f := range a {
185 v += f
186 }
187 return v
188 }
189
190 func last(a []float64) float64 {
191 return a[len(a)-1]
192 }
193
194 func sampling(r float64) string {
195 var sv string
196 if r < 1.0 {
197 sv = fmt.Sprintf("|@%f", r)
198 }
199 return sv
200 }
201
202 func tagValues(labelValues []string) string {
203 if len(labelValues) == 0 {
204 return ""
205 }
206 if len(labelValues)%2 != 0 {
207 panic("tagValues received a labelValues with an odd number of strings")
208 }
209 pairs := make([]string, 0, len(labelValues)/2)
210 for i := 0; i < len(labelValues); i += 2 {
211 pairs = append(pairs, labelValues[i]+":"+labelValues[i+1])
212 }
213 return "|#" + strings.Join(pairs, ",")
214 }
215
216 type observeFunc func(name string, lvs lv.LabelValues, value float64)
217
218 // Counter is a DogStatsD counter. Observations are forwarded to a Dogstatsd
219 // object, and aggregated (summed) per timeseries.
220 type Counter struct {
221 name string
222 lvs lv.LabelValues
223 obs observeFunc
224 }
225
226 // With implements metrics.Counter.
227 func (c *Counter) With(labelValues ...string) metrics.Counter {
228 return &Counter{
229 name: c.name,
230 lvs: c.lvs.With(labelValues...),
231 obs: c.obs,
232 }
233 }
234
235 // Add implements metrics.Counter.
236 func (c *Counter) Add(delta float64) {
237 c.obs(c.name, c.lvs, delta)
238 }
239
240 // Gauge is a DogStatsD gauge. Observations are forwarded to a Dogstatsd
241 // object, and aggregated (the last observation selected) per timeseries.
242 type Gauge struct {
243 name string
244 lvs lv.LabelValues
245 obs observeFunc
246 }
247
248 // With implements metrics.Gauge.
249 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
250 return &Gauge{
251 name: g.name,
252 lvs: g.lvs.With(labelValues...),
253 obs: g.obs,
254 }
255 }
256
257 // Set implements metrics.Gauge.
258 func (g *Gauge) Set(value float64) {
259 g.obs(g.name, g.lvs, value)
260 }
261
262 // Timing is a DogStatsD timing, or metrics.Histogram. Observations are
263 // forwarded to a Dogstatsd object, and collected (but not aggregated) per
264 // timeseries.
265 type Timing struct {
266 name string
267 lvs lv.LabelValues
268 obs observeFunc
269 }
270
271 // With implements metrics.Timing.
272 func (t *Timing) With(labelValues ...string) metrics.Histogram {
273 return &Timing{
274 name: t.name,
275 lvs: t.lvs.With(labelValues...),
276 obs: t.obs,
277 }
278 }
279
280 // Observe implements metrics.Histogram. Value is interpreted as milliseconds.
281 func (t *Timing) Observe(value float64) {
282 t.obs(t.name, t.lvs, value)
283 }
284
285 // Histogram is a DogStatsD histrogram. Observations are forwarded to a
286 // Dogstatsd object, and collected (but not aggregated) per timeseries.
287 type Histogram struct {
288 name string
289 lvs lv.LabelValues
290 obs observeFunc
291 }
292
293 // With implements metrics.Histogram.
294 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
295 return &Histogram{
296 name: h.name,
297 lvs: h.lvs.With(labelValues...),
298 obs: h.obs,
299 }
300 }
301
302 // Observe implements metrics.Histogram.
303 func (h *Histogram) Observe(value float64) {
304 h.obs(h.name, h.lvs, value)
305 }
00 package dogstatsd
11
22 import (
3 "bytes"
4 "fmt"
5 "net"
6 "strings"
7 "sync"
83 "testing"
9 "time"
104
115 "github.com/go-kit/kit/log"
12 "github.com/go-kit/kit/metrics"
13 "github.com/go-kit/kit/util/conn"
6 "github.com/go-kit/kit/metrics/teststat"
147 )
158
16 func TestEmitterCounter(t *testing.T) {
17 e, buf := testEmitter()
18
19 c := e.NewCounter("test_statsd_counter")
20 c.Add(1)
21 c.Add(2)
22
23 // give time for things to emit
24 time.Sleep(time.Millisecond * 250)
25 // force a flush and stop
26 e.Stop()
27
28 want := "prefix.test_statsd_counter:1|c\nprefix.test_statsd_counter:2|c\n"
29 have := buf.String()
30 if want != have {
31 t.Errorf("want %q, have %q", want, have)
9 func TestCounter(t *testing.T) {
10 prefix, name := "abc.", "def"
11 label, value := "label", "value"
12 regex := `^` + prefix + name + `:([0-9\.]+)\|c\|#` + label + `:` + value + `$`
13 d := New(prefix, log.NewNopLogger())
14 counter := d.NewCounter(name, 1.0).With(label, value)
15 valuef := teststat.SumLines(d, regex)
16 if err := teststat.TestCounter(counter, valuef); err != nil {
17 t.Fatal(err)
3218 }
3319 }
3420
35 func TestEmitterGauge(t *testing.T) {
36 e, buf := testEmitter()
21 func TestCounterSampled(t *testing.T) {
22 // This will involve multiplying the observed sum by the inverse of the
23 // sample rate and checking against the expected value within some
24 // tolerance.
25 t.Skip("TODO")
26 }
3727
38 g := e.NewGauge("test_statsd_gauge")
39
40 delta := 1.0
41 g.Add(delta)
42
43 // give time for things to emit
44 time.Sleep(time.Millisecond * 250)
45 // force a flush and stop
46 e.Stop()
47
48 want := fmt.Sprintf("prefix.test_statsd_gauge:+%f|g\n", delta)
49 have := buf.String()
50 if want != have {
51 t.Errorf("want %q, have %q", want, have)
28 func TestGauge(t *testing.T) {
29 prefix, name := "ghi.", "jkl"
30 label, value := "xyz", "abc"
31 regex := `^` + prefix + name + `:([0-9\.]+)\|g\|#` + label + `:` + value + `$`
32 d := New(prefix, log.NewNopLogger())
33 gauge := d.NewGauge(name).With(label, value)
34 valuef := teststat.LastLine(d, regex)
35 if err := teststat.TestGauge(gauge, valuef); err != nil {
36 t.Fatal(err)
5237 }
5338 }
5439
55 func TestEmitterHistogram(t *testing.T) {
56 e, buf := testEmitter()
57 h := e.NewHistogram("test_statsd_histogram")
40 // DogStatsD histograms just emit all observations. So, we collect them into
41 // a generic histogram, and run the statistics test on that.
5842
59 h.Observe(123)
60
61 // give time for things to emit
62 time.Sleep(time.Millisecond * 250)
63 // force a flush and stop
64 e.Stop()
65
66 want := "prefix.test_statsd_histogram:123|ms\n"
67 have := buf.String()
68 if want != have {
69 t.Errorf("want %q, have %q", want, have)
43 func TestHistogram(t *testing.T) {
44 prefix, name := "dogstatsd.", "histogram_test"
45 label, value := "abc", "def"
46 regex := `^` + prefix + name + `:([0-9\.]+)\|h\|#` + label + `:` + value + `$`
47 d := New(prefix, log.NewNopLogger())
48 histogram := d.NewHistogram(name, 1.0).With(label, value)
49 quantiles := teststat.Quantiles(d, regex, 50) // no |@0.X
50 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
51 t.Fatal(err)
7052 }
7153 }
7254
73 func TestCounter(t *testing.T) {
74 buf := &syncbuf{buf: &bytes.Buffer{}}
75 reportc := make(chan time.Time)
76 tags := []metrics.Field{}
77 c := NewCounterTick(buf, "test_statsd_counter", reportc, tags)
78
79 c.Add(1)
80 c.With(metrics.Field{"foo", "bar"}).Add(2)
81 c.With(metrics.Field{"foo", "bar"}).With(metrics.Field{"abc", "123"}).Add(2)
82 c.Add(3)
83
84 want, have := "test_statsd_counter:1|c\ntest_statsd_counter:2|c|#foo:bar\ntest_statsd_counter:2|c|#foo:bar,abc:123\ntest_statsd_counter:3|c\n", ""
85 by(t, 100*time.Millisecond, func() bool {
86 have = buf.String()
87 return want == have
88 }, func() {
89 reportc <- time.Now()
90 }, fmt.Sprintf("want %q, have %q", want, have))
91 }
92
93 func TestGauge(t *testing.T) {
94 buf := &syncbuf{buf: &bytes.Buffer{}}
95 reportc := make(chan time.Time)
96 tags := []metrics.Field{}
97 g := NewGaugeTick(buf, "test_statsd_gauge", reportc, tags)
98
99 delta := 1.0
100 g.Add(delta)
101
102 want, have := fmt.Sprintf("test_statsd_gauge:+%f|g\n", delta), ""
103 by(t, 100*time.Millisecond, func() bool {
104 have = buf.String()
105 return want == have
106 }, func() {
107 reportc <- time.Now()
108 }, fmt.Sprintf("want %q, have %q", want, have))
109
110 buf.Reset()
111 delta = -2.0
112 g.With(metrics.Field{"foo", "bar"}).Add(delta)
113
114 want, have = fmt.Sprintf("test_statsd_gauge:%f|g|#foo:bar\n", delta), ""
115 by(t, 100*time.Millisecond, func() bool {
116 have = buf.String()
117 return want == have
118 }, func() {
119 reportc <- time.Now()
120 }, fmt.Sprintf("want %q, have %q", want, have))
121
122 buf.Reset()
123 value := 3.0
124 g.With(metrics.Field{"foo", "bar"}).With(metrics.Field{"abc", "123"}).Set(value)
125
126 want, have = fmt.Sprintf("test_statsd_gauge:%f|g|#foo:bar,abc:123\n", value), ""
127 by(t, 100*time.Millisecond, func() bool {
128 have = buf.String()
129 return want == have
130 }, func() {
131 reportc <- time.Now()
132 }, fmt.Sprintf("want %q, have %q", want, have))
133 }
134
135 func TestCallbackGauge(t *testing.T) {
136 buf := &syncbuf{buf: &bytes.Buffer{}}
137 reportc, scrapec := make(chan time.Time), make(chan time.Time)
138 value := 55.55
139 cb := func() float64 { return value }
140 NewCallbackGaugeTick(buf, "test_statsd_callback_gauge", reportc, scrapec, cb)
141
142 scrapec <- time.Now()
143 reportc <- time.Now()
144
145 // Travis is annoying
146 by(t, time.Second, func() bool {
147 return buf.String() != ""
148 }, func() {
149 reportc <- time.Now()
150 }, "buffer never got write+flush")
151
152 want, have := fmt.Sprintf("test_statsd_callback_gauge:%f|g\n", value), ""
153 by(t, 100*time.Millisecond, func() bool {
154 have = buf.String()
155 return strings.HasPrefix(have, want) // HasPrefix because we might get multiple writes
156 }, func() {
157 reportc <- time.Now()
158 }, fmt.Sprintf("want %q, have %q", want, have))
159 }
160
161 func TestHistogram(t *testing.T) {
162 buf := &syncbuf{buf: &bytes.Buffer{}}
163 reportc := make(chan time.Time)
164 tags := []metrics.Field{}
165 h := NewHistogramTick(buf, "test_statsd_histogram", reportc, tags)
166
167 h.Observe(123)
168 h.With(metrics.Field{"foo", "bar"}).Observe(456)
169
170 want, have := "test_statsd_histogram:123|ms\ntest_statsd_histogram:456|ms|#foo:bar\n", ""
171 by(t, 100*time.Millisecond, func() bool {
172 have = buf.String()
173 return want == have
174 }, func() {
175 reportc <- time.Now()
176 }, fmt.Sprintf("want %q, have %q", want, have))
177 }
178
179 func by(t *testing.T, d time.Duration, check func() bool, execute func(), msg string) {
180 deadline := time.Now().Add(d)
181 for !check() {
182 if time.Now().After(deadline) {
183 t.Fatal(msg)
184 }
185 execute()
55 func TestHistogramSampled(t *testing.T) {
56 prefix, name := "dogstatsd.", "sampled_histogram_test"
57 label, value := "foo", "bar"
58 regex := `^` + prefix + name + `:([0-9\.]+)\|h\|@0\.01[0]*\|#` + label + `:` + value + `$`
59 d := New(prefix, log.NewNopLogger())
60 histogram := d.NewHistogram(name, 0.01).With(label, value)
61 quantiles := teststat.Quantiles(d, regex, 50)
62 if err := teststat.TestHistogram(histogram, quantiles, 0.02); err != nil {
63 t.Fatal(err)
18664 }
18765 }
18866
189 type syncbuf struct {
190 mtx sync.Mutex
191 buf *bytes.Buffer
192 }
193
194 func (s *syncbuf) Write(p []byte) (int, error) {
195 s.mtx.Lock()
196 defer s.mtx.Unlock()
197 return s.buf.Write(p)
198 }
199
200 func (s *syncbuf) String() string {
201 s.mtx.Lock()
202 defer s.mtx.Unlock()
203 return s.buf.String()
204 }
205
206 func (s *syncbuf) Reset() {
207 s.mtx.Lock()
208 defer s.mtx.Unlock()
209 s.buf.Reset()
210 }
211
212 func testEmitter() (*Emitter, *syncbuf) {
213 buf := &syncbuf{buf: &bytes.Buffer{}}
214 e := &Emitter{
215 prefix: "prefix.",
216 mgr: conn.NewManager(mockDialer(buf), "", "", time.After, log.NewNopLogger()),
217 logger: log.NewNopLogger(),
218 keyVals: make(chan keyVal),
219 quitc: make(chan chan struct{}),
220 }
221 go e.loop(time.Millisecond * 20)
222 return e, buf
223 }
224
225 func mockDialer(buf *syncbuf) conn.Dialer {
226 return func(net, addr string) (net.Conn, error) {
227 return &mockConn{buf}, nil
67 func TestTiming(t *testing.T) {
68 prefix, name := "dogstatsd.", "timing_test"
69 label, value := "wiggle", "bottom"
70 regex := `^` + prefix + name + `:([0-9\.]+)\|ms\|#` + label + `:` + value + `$`
71 d := New(prefix, log.NewNopLogger())
72 histogram := d.NewTiming(name, 1.0).With(label, value)
73 quantiles := teststat.Quantiles(d, regex, 50) // no |@0.X
74 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
75 t.Fatal(err)
22876 }
22977 }
23078
231 type mockConn struct {
232 buf *syncbuf
79 func TestTimingSampled(t *testing.T) {
80 prefix, name := "dogstatsd.", "sampled_timing_test"
81 label, value := "internal", "external"
82 regex := `^` + prefix + name + `:([0-9\.]+)\|ms\|@0.03[0]*\|#` + label + `:` + value + `$`
83 d := New(prefix, log.NewNopLogger())
84 histogram := d.NewTiming(name, 0.03).With(label, value)
85 quantiles := teststat.Quantiles(d, regex, 50)
86 if err := teststat.TestHistogram(histogram, quantiles, 0.02); err != nil {
87 t.Fatal(err)
88 }
23389 }
234
235 func (c *mockConn) Read(b []byte) (n int, err error) {
236 panic("not implemented")
237 }
238
239 func (c *mockConn) Write(b []byte) (n int, err error) {
240 return c.buf.Write(b)
241 }
242
243 func (c *mockConn) Close() error {
244 panic("not implemented")
245 }
246
247 func (c *mockConn) LocalAddr() net.Addr {
248 panic("not implemented")
249 }
250
251 func (c *mockConn) RemoteAddr() net.Addr {
252 panic("not implemented")
253 }
254
255 func (c *mockConn) SetDeadline(t time.Time) error {
256 panic("not implemented")
257 }
258
259 func (c *mockConn) SetReadDeadline(t time.Time) error {
260 panic("not implemented")
261 }
262
263 func (c *mockConn) SetWriteDeadline(t time.Time) error {
264 panic("not implemented")
265 }
+0
-159
metrics/dogstatsd/emitter.go less more
0 package dogstatsd
1
2 import (
3 "bytes"
4 "fmt"
5 "net"
6 "time"
7
8 "github.com/go-kit/kit/log"
9 "github.com/go-kit/kit/metrics"
10 "github.com/go-kit/kit/util/conn"
11 )
12
13 // Emitter is a struct to manage connections and orchestrate the emission of
14 // metrics to a DogStatsd process.
15 type Emitter struct {
16 prefix string
17 keyVals chan keyVal
18 mgr *conn.Manager
19 logger log.Logger
20 quitc chan chan struct{}
21 }
22
23 type keyVal struct {
24 key string
25 val string
26 }
27
28 func stringToKeyVal(key string, keyVals chan keyVal) chan string {
29 vals := make(chan string)
30 go func() {
31 for val := range vals {
32 keyVals <- keyVal{key: key, val: val}
33 }
34 }()
35 return vals
36 }
37
38 // NewEmitter will return an Emitter that will prefix all metrics names with the
39 // given prefix. Once started, it will attempt to create a connection with the
40 // given network and address via `net.Dial` and periodically post metrics to the
41 // connection in the DogStatsD protocol.
42 func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
43 return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
44 }
45
46 // NewEmitterDial is the same as NewEmitter, but allows you to specify your own
47 // Dialer function. This is primarily useful for tests.
48 func NewEmitterDial(dialer conn.Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
49 e := &Emitter{
50 prefix: metricsPrefix,
51 mgr: conn.NewManager(dialer, network, address, time.After, logger),
52 logger: logger,
53 keyVals: make(chan keyVal),
54 quitc: make(chan chan struct{}),
55 }
56 go e.loop(flushInterval)
57 return e
58 }
59
60 // NewCounter returns a Counter that emits observations in the DogStatsD protocol
61 // via the Emitter's connection manager. Observations are buffered for the
62 // report interval or until the buffer exceeds a max packet size, whichever
63 // comes first. Fields are ignored.
64 func (e *Emitter) NewCounter(key string) metrics.Counter {
65 key = e.prefix + key
66 return &counter{
67 key: key,
68 c: stringToKeyVal(key, e.keyVals),
69 }
70 }
71
72 // NewHistogram returns a Histogram that emits observations in the DogStatsD
73 // protocol via the Emitter's connection manager. Observations are buffered for
74 // the reporting interval or until the buffer exceeds a max packet size,
75 // whichever comes first. Fields are ignored.
76 //
77 // NewHistogram is mapped to a statsd Timing, so observations should represent
78 // milliseconds. If you observe in units of nanoseconds, you can make the
79 // translation with a ScaledHistogram:
80 //
81 // NewScaledHistogram(histogram, time.Millisecond)
82 //
83 // You can also enforce the constraint in a typesafe way with a millisecond
84 // TimeHistogram:
85 //
86 // NewTimeHistogram(histogram, time.Millisecond)
87 //
88 // TODO: support for sampling.
89 func (e *Emitter) NewHistogram(key string) metrics.Histogram {
90 key = e.prefix + key
91 return &histogram{
92 key: key,
93 h: stringToKeyVal(key, e.keyVals),
94 }
95 }
96
97 // NewGauge returns a Gauge that emits values in the DogStatsD protocol via the
98 // the Emitter's connection manager. Values are buffered for the report
99 // interval or until the buffer exceeds a max packet size, whichever comes
100 // first. Fields are ignored.
101 //
102 // TODO: support for sampling
103 func (e *Emitter) NewGauge(key string) metrics.Gauge {
104 key = e.prefix + key
105 return &gauge{
106 key: key,
107 g: stringToKeyVal(key, e.keyVals),
108 }
109 }
110
111 func (e *Emitter) loop(d time.Duration) {
112 ticker := time.NewTicker(d)
113 defer ticker.Stop()
114 buf := &bytes.Buffer{}
115 for {
116 select {
117 case kv := <-e.keyVals:
118 fmt.Fprintf(buf, "%s:%s\n", kv.key, kv.val)
119 if buf.Len() > maxBufferSize {
120 e.Flush(buf)
121 }
122
123 case <-ticker.C:
124 e.Flush(buf)
125
126 case q := <-e.quitc:
127 e.Flush(buf)
128 close(q)
129 return
130 }
131 }
132 }
133
134 // Stop will flush the current metrics and close the active connection. Calling
135 // stop more than once is a programmer error.
136 func (e *Emitter) Stop() {
137 q := make(chan struct{})
138 e.quitc <- q
139 <-q
140 }
141
142 // Flush will write the given buffer to a connection provided by the Emitter's
143 // connection manager.
144 func (e *Emitter) Flush(buf *bytes.Buffer) {
145 conn := e.mgr.Take()
146 if conn == nil {
147 e.logger.Log("during", "flush", "err", "connection unavailable")
148 return
149 }
150
151 _, err := conn.Write(buf.Bytes())
152 if err != nil {
153 e.logger.Log("during", "flush", "err", err)
154 }
155 buf.Reset()
156
157 e.mgr.Put(err)
158 }
0 // Package expvar implements an expvar backend for package metrics.
1 //
2 // The current implementation ignores fields. In the future, it would be good
3 // to have an implementation that accepted a set of predeclared field names at
4 // construction time, and used field values to produce delimiter-separated
5 // bucket (key) names. That is,
6 //
7 // c := NewFieldedCounter(..., "path", "status")
8 // c.Add(1) // "myprefix_unknown_unknown" += 1
9 // c2 := c.With("path", "foo").With("status": "200")
10 // c2.Add(1) // "myprefix_foo_200" += 1
11 //
12 // It would also be possible to have an implementation that generated more
13 // sophisticated expvar.Values. For example, a Counter could be implemented as
14 // a map, representing a tree of key/value pairs whose leaves were the actual
15 // expvar.Ints.
0 // Package expvar provides expvar backends for metrics.
1 // Label values are not supported.
162 package expvar
173
184 import (
195 "expvar"
20 "fmt"
21 "sort"
22 "strconv"
236 "sync"
24 "time"
25
26 "github.com/codahale/hdrhistogram"
277
288 "github.com/go-kit/kit/metrics"
9 "github.com/go-kit/kit/metrics/generic"
2910 )
3011
31 type counter struct {
32 name string
33 v *expvar.Int
12 // Counter implements the counter metric with an expvar float.
13 // Label values are not supported.
14 type Counter struct {
15 f *expvar.Float
3416 }
3517
36 // NewCounter returns a new Counter backed by an expvar with the given name.
37 // Fields are ignored.
38 func NewCounter(name string) metrics.Counter {
39 return &counter{
40 name: name,
41 v: expvar.NewInt(name),
18 // NewCounter creates an expvar Float with the given name, and returns an object
19 // that implements the Counter interface.
20 func NewCounter(name string) *Counter {
21 return &Counter{
22 f: expvar.NewFloat(name),
4223 }
4324 }
4425
45 func (c *counter) Name() string { return c.name }
46 func (c *counter) With(metrics.Field) metrics.Counter { return c }
47 func (c *counter) Add(delta uint64) { c.v.Add(int64(delta)) }
26 // With is a no-op.
27 func (c *Counter) With(labelValues ...string) metrics.Counter { return c }
4828
49 type gauge struct {
50 name string
51 v *expvar.Float
29 // Add implements Counter.
30 func (c *Counter) Add(delta float64) { c.f.Add(delta) }
31
32 // Gauge implements the gauge metric wtih an expvar float.
33 // Label values are not supported.
34 type Gauge struct {
35 f *expvar.Float
5236 }
5337
54 // NewGauge returns a new Gauge backed by an expvar with the given name. It
55 // should be updated manually; for a callback-based approach, see
56 // PublishCallbackGauge. Fields are ignored.
57 func NewGauge(name string) metrics.Gauge {
58 return &gauge{
59 name: name,
60 v: expvar.NewFloat(name),
38 // NewGauge creates an expvar Float with the given name, and returns an object
39 // that implements the Gauge interface.
40 func NewGauge(name string) *Gauge {
41 return &Gauge{
42 f: expvar.NewFloat(name),
6143 }
6244 }
6345
64 func (g *gauge) Name() string { return g.name }
65 func (g *gauge) With(metrics.Field) metrics.Gauge { return g }
66 func (g *gauge) Add(delta float64) { g.v.Add(delta) }
67 func (g *gauge) Set(value float64) { g.v.Set(value) }
68 func (g *gauge) Get() float64 { return mustParseFloat64(g.v.String()) }
46 // With is a no-op.
47 func (g *Gauge) With(labelValues ...string) metrics.Gauge { return g }
6948
70 // PublishCallbackGauge publishes a Gauge as an expvar with the given name,
71 // whose value is determined at collect time by the passed callback function.
72 // The callback determines the value, and fields are ignored, so
73 // PublishCallbackGauge returns nothing.
74 func PublishCallbackGauge(name string, callback func() float64) {
75 expvar.Publish(name, callbackGauge(callback))
49 // Set implements Gauge.
50 func (g *Gauge) Set(value float64) { g.f.Set(value) }
51
52 // Histogram implements the histogram metric with a combination of the generic
53 // Histogram object and several expvar Floats, one for each of the 50th, 90th,
54 // 95th, and 99th quantiles of observed values, with the quantile attached to
55 // the name as a suffix. Label values are not supported.
56 type Histogram struct {
57 mtx sync.Mutex
58 h *generic.Histogram
59 p50 *expvar.Float
60 p90 *expvar.Float
61 p95 *expvar.Float
62 p99 *expvar.Float
7663 }
7764
78 type callbackGauge func() float64
79
80 func (g callbackGauge) String() string { return strconv.FormatFloat(g(), 'g', -1, 64) }
81
82 type histogram struct {
83 mu sync.Mutex
84 hist *hdrhistogram.WindowedHistogram
85
86 name string
87 gauges map[int]metrics.Gauge
88 }
89
90 // NewHistogram is taken from http://github.com/codahale/metrics. It returns a
91 // windowed HDR histogram which drops data older than five minutes.
92 //
93 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
94 // should be integers in the range 1..99. The gauge names are assigned by
95 // using the passed name as a prefix and appending "_pNN" e.g. "_p50".
96 func NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) metrics.Histogram {
97 gauges := map[int]metrics.Gauge{}
98 for _, quantile := range quantiles {
99 if quantile <= 0 || quantile >= 100 {
100 panic(fmt.Sprintf("invalid quantile %d", quantile))
101 }
102 gauges[quantile] = NewGauge(fmt.Sprintf("%s_p%02d", name, quantile))
103 }
104 h := &histogram{
105 hist: hdrhistogram.NewWindowed(5, minValue, maxValue, sigfigs),
106 name: name,
107 gauges: gauges,
108 }
109 go h.rotateLoop(1 * time.Minute)
110 return h
111 }
112
113 func (h *histogram) Name() string { return h.name }
114 func (h *histogram) With(metrics.Field) metrics.Histogram { return h }
115
116 func (h *histogram) Observe(value int64) {
117 h.mu.Lock()
118 err := h.hist.Current.RecordValue(value)
119 h.mu.Unlock()
120
121 if err != nil {
122 panic(err.Error())
123 }
124
125 for q, gauge := range h.gauges {
126 gauge.Set(float64(h.hist.Current.ValueAtQuantile(float64(q))))
65 // NewHistogram returns a Histogram object with the given name and number of
66 // buckets in the underlying histogram object. 50 is a good default number of
67 // buckets.
68 func NewHistogram(name string, buckets int) *Histogram {
69 return &Histogram{
70 h: generic.NewHistogram(name, buckets),
71 p50: expvar.NewFloat(name + ".p50"),
72 p90: expvar.NewFloat(name + ".p90"),
73 p95: expvar.NewFloat(name + ".p95"),
74 p99: expvar.NewFloat(name + ".p99"),
12775 }
12876 }
12977
130 func (h *histogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
131 bars := h.hist.Merge().Distribution()
132 buckets := make([]metrics.Bucket, len(bars))
133 for i, bar := range bars {
134 buckets[i] = metrics.Bucket{
135 From: bar.From,
136 To: bar.To,
137 Count: bar.Count,
138 }
139 }
140 quantiles := make([]metrics.Quantile, 0, len(h.gauges))
141 for quantile, gauge := range h.gauges {
142 quantiles = append(quantiles, metrics.Quantile{
143 Quantile: quantile,
144 Value: int64(gauge.Get()),
145 })
146 }
147 sort.Sort(quantileSlice(quantiles))
148 return buckets, quantiles
78 // With is a no-op.
79 func (h *Histogram) With(labelValues ...string) metrics.Histogram { return h }
80
81 // Observe impleemts Histogram.
82 func (h *Histogram) Observe(value float64) {
83 h.mtx.Lock()
84 defer h.mtx.Unlock()
85 h.h.Observe(value)
86 h.p50.Set(h.h.Quantile(0.50))
87 h.p90.Set(h.h.Quantile(0.90))
88 h.p95.Set(h.h.Quantile(0.95))
89 h.p99.Set(h.h.Quantile(0.99))
14990 }
150
151 func (h *histogram) rotateLoop(d time.Duration) {
152 for range time.Tick(d) {
153 h.mu.Lock()
154 h.hist.Rotate()
155 h.mu.Unlock()
156 }
157 }
158
159 func mustParseFloat64(s string) float64 {
160 f, err := strconv.ParseFloat(s, 64)
161 if err != nil {
162 panic(err)
163 }
164 return f
165 }
166
167 type quantileSlice []metrics.Quantile
168
169 func (a quantileSlice) Len() int { return len(a) }
170 func (a quantileSlice) Less(i, j int) bool { return a[i].Quantile < a[j].Quantile }
171 func (a quantileSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
0 package expvar_test
0 package expvar
11
22 import (
3 stdexpvar "expvar"
4 "fmt"
3 "strconv"
54 "testing"
65
7 "github.com/go-kit/kit/metrics"
8 "github.com/go-kit/kit/metrics/expvar"
96 "github.com/go-kit/kit/metrics/teststat"
107 )
118
12 func TestHistogramQuantiles(t *testing.T) {
13 var (
14 name = "test_histogram_quantiles"
15 quantiles = []int{50, 90, 95, 99}
16 h = expvar.NewHistogram(name, 0, 100, 3, quantiles...).With(metrics.Field{Key: "ignored", Value: "field"})
17 )
18 const seed, mean, stdev int64 = 424242, 50, 10
19 teststat.PopulateNormalHistogram(t, h, seed, mean, stdev)
20 teststat.AssertExpvarNormalHistogram(t, name, mean, stdev, quantiles)
21 }
22
23 func TestCallbackGauge(t *testing.T) {
24 var (
25 name = "foo"
26 value = 42.43
27 )
28 expvar.PublishCallbackGauge(name, func() float64 { return value })
29 if want, have := fmt.Sprint(value), stdexpvar.Get(name).String(); want != have {
30 t.Errorf("want %q, have %q", want, have)
31 }
32 }
33
349 func TestCounter(t *testing.T) {
35 var (
36 name = "m"
37 value = 123
38 )
39 expvar.NewCounter(name).With(metrics.Field{Key: "ignored", Value: "field"}).Add(uint64(value))
40 if want, have := fmt.Sprint(value), stdexpvar.Get(name).String(); want != have {
41 t.Errorf("want %q, have %q", want, have)
10 counter := NewCounter("expvar_counter").With("label values", "not supported").(*Counter)
11 value := func() float64 { f, _ := strconv.ParseFloat(counter.f.String(), 64); return f }
12 if err := teststat.TestCounter(counter, value); err != nil {
13 t.Fatal(err)
4214 }
4315 }
4416
4517 func TestGauge(t *testing.T) {
46 var (
47 name = "xyz"
48 value = 54321
49 delta = 12345
50 g = expvar.NewGauge(name).With(metrics.Field{Key: "ignored", Value: "field"})
51 )
52 g.Set(float64(value))
53 g.Add(float64(delta))
54 if want, have := fmt.Sprint(value+delta), stdexpvar.Get(name).String(); want != have {
55 t.Errorf("want %q, have %q", want, have)
18 gauge := NewGauge("expvar_gauge").With("label values", "not supported").(*Gauge)
19 value := func() float64 { f, _ := strconv.ParseFloat(gauge.f.String(), 64); return f }
20 if err := teststat.TestGauge(gauge, value); err != nil {
21 t.Fatal(err)
5622 }
5723 }
5824
59 func TestInvalidQuantile(t *testing.T) {
60 defer func() {
61 if err := recover(); err == nil {
62 t.Errorf("expected panic, got none")
63 } else {
64 t.Logf("got expected panic: %v", err)
65 }
66 }()
67 expvar.NewHistogram("foo", 0.0, 100.0, 3, 50, 90, 95, 99, 101)
25 func TestHistogram(t *testing.T) {
26 histogram := NewHistogram("expvar_histogram", 50).With("label values", "not supported").(*Histogram)
27 quantiles := func() (float64, float64, float64, float64) {
28 p50, _ := strconv.ParseFloat(histogram.p50.String(), 64)
29 p90, _ := strconv.ParseFloat(histogram.p90.String(), 64)
30 p95, _ := strconv.ParseFloat(histogram.p95.String(), 64)
31 p99, _ := strconv.ParseFloat(histogram.p99.String(), 64)
32 return p50, p90, p95, p99
33 }
34 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
35 t.Fatal(err)
36 }
6837 }
0 // Package generic implements generic versions of each of the metric types. They
1 // can be embedded by other implementations, and converted to specific formats
2 // as necessary.
3 package generic
4
5 import (
6 "fmt"
7 "io"
8 "math"
9 "sync"
10 "sync/atomic"
11
12 "github.com/VividCortex/gohistogram"
13
14 "github.com/go-kit/kit/metrics"
15 "github.com/go-kit/kit/metrics/internal/lv"
16 )
17
18 // Counter is an in-memory implementation of a Counter.
19 type Counter struct {
20 Name string
21 lvs lv.LabelValues
22 bits uint64
23 }
24
25 // NewCounter returns a new, usable Counter.
26 func NewCounter(name string) *Counter {
27 return &Counter{
28 Name: name,
29 }
30 }
31
32 // With implements Counter.
33 func (c *Counter) With(labelValues ...string) metrics.Counter {
34 return &Counter{
35 bits: atomic.LoadUint64(&c.bits),
36 lvs: c.lvs.With(labelValues...),
37 }
38 }
39
40 // Add implements Counter.
41 func (c *Counter) Add(delta float64) {
42 for {
43 var (
44 old = atomic.LoadUint64(&c.bits)
45 newf = math.Float64frombits(old) + delta
46 new = math.Float64bits(newf)
47 )
48 if atomic.CompareAndSwapUint64(&c.bits, old, new) {
49 break
50 }
51 }
52 }
53
54 // Value returns the current value of the counter.
55 func (c *Counter) Value() float64 {
56 return math.Float64frombits(atomic.LoadUint64(&c.bits))
57 }
58
59 // ValueReset returns the current value of the counter, and resets it to zero.
60 // This is useful for metrics backends whose counter aggregations expect deltas,
61 // like Graphite.
62 func (c *Counter) ValueReset() float64 {
63 for {
64 var (
65 old = atomic.LoadUint64(&c.bits)
66 newf = 0.0
67 new = math.Float64bits(newf)
68 )
69 if atomic.CompareAndSwapUint64(&c.bits, old, new) {
70 return math.Float64frombits(old)
71 }
72 }
73 }
74
75 // LabelValues returns the set of label values attached to the counter.
76 func (c *Counter) LabelValues() []string {
77 return c.lvs
78 }
79
80 // Gauge is an in-memory implementation of a Gauge.
81 type Gauge struct {
82 Name string
83 lvs lv.LabelValues
84 bits uint64
85 }
86
87 // NewGauge returns a new, usable Gauge.
88 func NewGauge(name string) *Gauge {
89 return &Gauge{
90 Name: name,
91 }
92 }
93
94 // With implements Gauge.
95 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
96 return &Gauge{
97 bits: atomic.LoadUint64(&g.bits),
98 lvs: g.lvs.With(labelValues...),
99 }
100 }
101
102 // Set implements Gauge.
103 func (g *Gauge) Set(value float64) {
104 atomic.StoreUint64(&g.bits, math.Float64bits(value))
105 }
106
107 // Value returns the current value of the gauge.
108 func (g *Gauge) Value() float64 {
109 return math.Float64frombits(atomic.LoadUint64(&g.bits))
110 }
111
112 // LabelValues returns the set of label values attached to the gauge.
113 func (g *Gauge) LabelValues() []string {
114 return g.lvs
115 }
116
117 // Histogram is an in-memory implementation of a streaming histogram, based on
118 // VividCortex/gohistogram. It dynamically computes quantiles, so it's not
119 // suitable for aggregation.
120 type Histogram struct {
121 Name string
122 lvs lv.LabelValues
123 h gohistogram.Histogram
124 }
125
126 // NewHistogram returns a numeric histogram based on VividCortex/gohistogram. A
127 // good default value for buckets is 50.
128 func NewHistogram(name string, buckets int) *Histogram {
129 return &Histogram{
130 Name: name,
131 h: gohistogram.NewHistogram(buckets),
132 }
133 }
134
135 // With implements Histogram.
136 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
137 return &Histogram{
138 lvs: h.lvs.With(labelValues...),
139 h: h.h,
140 }
141 }
142
143 // Observe implements Histogram.
144 func (h *Histogram) Observe(value float64) {
145 h.h.Add(value)
146 }
147
148 // Quantile returns the value of the quantile q, 0.0 < q < 1.0.
149 func (h *Histogram) Quantile(q float64) float64 {
150 return h.h.Quantile(q)
151 }
152
153 // LabelValues returns the set of label values attached to the histogram.
154 func (h *Histogram) LabelValues() []string {
155 return h.lvs
156 }
157
158 // Print writes a string representation of the histogram to the passed writer.
159 // Useful for printing to a terminal.
160 func (h *Histogram) Print(w io.Writer) {
161 fmt.Fprintf(w, h.h.String())
162 }
163
164 // Bucket is a range in a histogram which aggregates observations.
165 type Bucket struct {
166 From, To, Count int64
167 }
168
169 // Quantile is a pair of a quantile (0..100) and its observed maximum value.
170 type Quantile struct {
171 Quantile int // 0..100
172 Value int64
173 }
174
175 // SimpleHistogram is an in-memory implementation of a Histogram. It only tracks
176 // an approximate moving average, so is likely too naïve for many use cases.
177 type SimpleHistogram struct {
178 mtx sync.RWMutex
179 lvs lv.LabelValues
180 avg float64
181 n uint64
182 }
183
184 // NewSimpleHistogram returns a SimpleHistogram, ready for observations.
185 func NewSimpleHistogram() *SimpleHistogram {
186 return &SimpleHistogram{}
187 }
188
189 // With implements Histogram.
190 func (h *SimpleHistogram) With(labelValues ...string) metrics.Histogram {
191 return &SimpleHistogram{
192 lvs: h.lvs.With(labelValues...),
193 avg: h.avg,
194 n: h.n,
195 }
196 }
197
198 // Observe implements Histogram.
199 func (h *SimpleHistogram) Observe(value float64) {
200 h.mtx.Lock()
201 defer h.mtx.Unlock()
202 h.n++
203 h.avg -= h.avg / float64(h.n)
204 h.avg += value / float64(h.n)
205 }
206
207 // ApproximateMovingAverage returns the approximate moving average of observations.
208 func (h *SimpleHistogram) ApproximateMovingAverage() float64 {
209 h.mtx.RLock()
210 h.mtx.RUnlock()
211 return h.avg
212 }
213
214 // LabelValues returns the set of label values attached to the histogram.
215 func (h *SimpleHistogram) LabelValues() []string {
216 return h.lvs
217 }
0 package generic_test
1
2 // This is package generic_test in order to get around an import cycle: this
3 // package imports teststat to do its testing, but package teststat imports
4 // generic to use its Histogram in the Quantiles helper function.
5
6 import (
7 "math"
8 "math/rand"
9 "testing"
10
11 "github.com/go-kit/kit/metrics/generic"
12 "github.com/go-kit/kit/metrics/teststat"
13 )
14
15 func TestCounter(t *testing.T) {
16 counter := generic.NewCounter("my_counter").With("label", "counter").(*generic.Counter)
17 value := func() float64 { return counter.Value() }
18 if err := teststat.TestCounter(counter, value); err != nil {
19 t.Fatal(err)
20 }
21 }
22
23 func TestValueReset(t *testing.T) {
24 counter := generic.NewCounter("test_value_reset")
25 counter.Add(123)
26 counter.Add(456)
27 counter.Add(789)
28 if want, have := float64(123+456+789), counter.ValueReset(); want != have {
29 t.Errorf("want %f, have %f", want, have)
30 }
31 if want, have := float64(0), counter.Value(); want != have {
32 t.Errorf("want %f, have %f", want, have)
33 }
34 }
35
36 func TestGauge(t *testing.T) {
37 gauge := generic.NewGauge("my_gauge").With("label", "gauge").(*generic.Gauge)
38 value := func() float64 { return gauge.Value() }
39 if err := teststat.TestGauge(gauge, value); err != nil {
40 t.Fatal(err)
41 }
42 }
43
44 func TestHistogram(t *testing.T) {
45 histogram := generic.NewHistogram("my_histogram", 50).With("label", "histogram").(*generic.Histogram)
46 quantiles := func() (float64, float64, float64, float64) {
47 return histogram.Quantile(0.50), histogram.Quantile(0.90), histogram.Quantile(0.95), histogram.Quantile(0.99)
48 }
49 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
50 t.Fatal(err)
51 }
52 }
53
54 func TestSimpleHistogram(t *testing.T) {
55 histogram := generic.NewSimpleHistogram().With("label", "simple_histogram").(*generic.SimpleHistogram)
56 var (
57 sum int
58 count = 1234 // not too big
59 )
60 for i := 0; i < count; i++ {
61 value := rand.Intn(1000)
62 sum += value
63 histogram.Observe(float64(value))
64 }
65
66 var (
67 want = float64(sum) / float64(count)
68 have = histogram.ApproximateMovingAverage()
69 tolerance = 0.001 // real real slim
70 )
71 if math.Abs(want-have)/want > tolerance {
72 t.Errorf("want %f, have %f", want, have)
73 }
74 }
+0
-159
metrics/graphite/emitter.go less more
0 package graphite
1
2 import (
3 "bufio"
4 "fmt"
5 "io"
6 "net"
7 "sync"
8 "time"
9
10 "github.com/go-kit/kit/log"
11 "github.com/go-kit/kit/metrics"
12 "github.com/go-kit/kit/util/conn"
13 )
14
15 // Emitter is a struct to manage connections and orchestrate the emission of
16 // metrics to a Graphite system.
17 type Emitter struct {
18 mtx sync.Mutex
19 prefix string
20 mgr *conn.Manager
21 counters []*counter
22 histograms []*windowedHistogram
23 gauges []*gauge
24 logger log.Logger
25 quitc chan chan struct{}
26 }
27
28 // NewEmitter will return an Emitter that will prefix all metrics names with the
29 // given prefix. Once started, it will attempt to create a connection with the
30 // given network and address via `net.Dial` and periodically post metrics to the
31 // connection in the Graphite plaintext protocol.
32 func NewEmitter(network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
33 return NewEmitterDial(net.Dial, network, address, metricsPrefix, flushInterval, logger)
34 }
35
36 // NewEmitterDial is the same as NewEmitter, but allows you to specify your own
37 // Dialer function. This is primarily useful for tests.
38 func NewEmitterDial(dialer conn.Dialer, network, address string, metricsPrefix string, flushInterval time.Duration, logger log.Logger) *Emitter {
39 e := &Emitter{
40 prefix: metricsPrefix,
41 mgr: conn.NewManager(dialer, network, address, time.After, logger),
42 logger: logger,
43 quitc: make(chan chan struct{}),
44 }
45 go e.loop(flushInterval)
46 return e
47 }
48
49 // NewCounter returns a Counter whose value will be periodically emitted in
50 // a Graphite-compatible format once the Emitter is started. Fields are ignored.
51 func (e *Emitter) NewCounter(name string) metrics.Counter {
52 e.mtx.Lock()
53 defer e.mtx.Unlock()
54 c := newCounter(name)
55 e.counters = append(e.counters, c)
56 return c
57 }
58
59 // NewHistogram is taken from http://github.com/codahale/metrics. It returns a
60 // windowed HDR histogram which drops data older than five minutes.
61 //
62 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
63 // should be integers in the range 1..99. The gauge names are assigned by using
64 // the passed name as a prefix and appending "_pNN" e.g. "_p50".
65 //
66 // The values of this histogram will be periodically emitted in a
67 // Graphite-compatible format once the Emitter is started. Fields are ignored.
68 func (e *Emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
69 gauges := map[int]metrics.Gauge{}
70 for _, quantile := range quantiles {
71 if quantile <= 0 || quantile >= 100 {
72 return nil, fmt.Errorf("invalid quantile %d", quantile)
73 }
74 gauges[quantile] = e.gauge(fmt.Sprintf("%s_p%02d", name, quantile))
75 }
76 h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges, e.logger)
77
78 e.mtx.Lock()
79 defer e.mtx.Unlock()
80 e.histograms = append(e.histograms, h)
81 return h, nil
82 }
83
84 // NewGauge returns a Gauge whose value will be periodically emitted in a
85 // Graphite-compatible format once the Emitter is started. Fields are ignored.
86 func (e *Emitter) NewGauge(name string) metrics.Gauge {
87 e.mtx.Lock()
88 defer e.mtx.Unlock()
89 return e.gauge(name)
90 }
91
92 func (e *Emitter) gauge(name string) metrics.Gauge {
93 g := &gauge{name, 0}
94 e.gauges = append(e.gauges, g)
95 return g
96 }
97
98 func (e *Emitter) loop(d time.Duration) {
99 ticker := time.NewTicker(d)
100 defer ticker.Stop()
101
102 for {
103 select {
104 case <-ticker.C:
105 e.Flush()
106
107 case q := <-e.quitc:
108 e.Flush()
109 close(q)
110 return
111 }
112 }
113 }
114
115 // Stop will flush the current metrics and close the active connection. Calling
116 // stop more than once is a programmer error.
117 func (e *Emitter) Stop() {
118 q := make(chan struct{})
119 e.quitc <- q
120 <-q
121 }
122
123 // Flush will write the current metrics to the Emitter's connection in the
124 // Graphite plaintext protocol.
125 func (e *Emitter) Flush() {
126 e.mtx.Lock() // one flush at a time
127 defer e.mtx.Unlock()
128
129 conn := e.mgr.Take()
130 if conn == nil {
131 e.logger.Log("during", "flush", "err", "connection unavailable")
132 return
133 }
134
135 err := e.flush(conn)
136 if err != nil {
137 e.logger.Log("during", "flush", "err", err)
138 }
139 e.mgr.Put(err)
140 }
141
142 func (e *Emitter) flush(w io.Writer) error {
143 bw := bufio.NewWriter(w)
144
145 for _, c := range e.counters {
146 c.flush(bw, e.prefix)
147 }
148
149 for _, h := range e.histograms {
150 h.flush(bw, e.prefix)
151 }
152
153 for _, g := range e.gauges {
154 g.flush(bw, e.prefix)
155 }
156
157 return bw.Flush()
158 }
0 // Package graphite implements a Graphite backend for package metrics. Metrics
1 // will be emitted to a Graphite server in the plaintext protocol which looks
2 // like:
0 // Package graphite provides a Graphite backend for metrics. Metrics are batched
1 // and emitted in the plaintext protocol. For more information, see
2 // http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol
33 //
4 // "<metric path> <metric value> <metric timestamp>"
5 //
6 // See http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol.
7 // The current implementation ignores fields.
4 // Graphite does not have a native understanding of metric parameterization, so
5 // label values not supported. Use distinct metrics for each unique combination
6 // of label values.
87 package graphite
98
109 import (
1110 "fmt"
1211 "io"
13 "math"
14 "sort"
1512 "sync"
16 "sync/atomic"
1713 "time"
18
19 "github.com/codahale/hdrhistogram"
2014
2115 "github.com/go-kit/kit/log"
2216 "github.com/go-kit/kit/metrics"
17 "github.com/go-kit/kit/metrics/generic"
18 "github.com/go-kit/kit/util/conn"
2319 )
2420
25 func newCounter(name string) *counter {
26 return &counter{name, 0}
27 }
28
29 func newGauge(name string) *gauge {
30 return &gauge{name, 0}
31 }
32
33 // counter implements the metrics.counter interface but also provides a
34 // Flush method to emit the current counter values in the Graphite plaintext
35 // protocol.
36 type counter struct {
37 key string
38 count uint64
39 }
40
41 func (c *counter) Name() string { return c.key }
42
43 // With currently ignores fields.
44 func (c *counter) With(metrics.Field) metrics.Counter { return c }
45
46 func (c *counter) Add(delta uint64) { atomic.AddUint64(&c.count, delta) }
47
48 func (c *counter) get() uint64 { return atomic.LoadUint64(&c.count) }
49
50 // flush will emit the current counter value in the Graphite plaintext
51 // protocol to the given io.Writer.
52 func (c *counter) flush(w io.Writer, prefix string) {
53 fmt.Fprintf(w, "%s.count %d %d\n", prefix+c.Name(), c.get(), time.Now().Unix())
54 }
55
56 // gauge implements the metrics.gauge interface but also provides a
57 // Flush method to emit the current counter values in the Graphite plaintext
58 // protocol.
59 type gauge struct {
60 key string
61 value uint64 // math.Float64bits
62 }
63
64 func (g *gauge) Name() string { return g.key }
65
66 // With currently ignores fields.
67 func (g *gauge) With(metrics.Field) metrics.Gauge { return g }
68
69 func (g *gauge) Add(delta float64) {
70 for {
71 old := atomic.LoadUint64(&g.value)
72 new := math.Float64bits(math.Float64frombits(old) + delta)
73 if atomic.CompareAndSwapUint64(&g.value, old, new) {
74 return
75 }
76 }
77 }
78
79 func (g *gauge) Set(value float64) {
80 atomic.StoreUint64(&g.value, math.Float64bits(value))
81 }
82
83 func (g *gauge) Get() float64 {
84 return math.Float64frombits(atomic.LoadUint64(&g.value))
85 }
86
87 // Flush will emit the current gauge value in the Graphite plaintext
88 // protocol to the given io.Writer.
89 func (g *gauge) flush(w io.Writer, prefix string) {
90 fmt.Fprintf(w, "%s %.2f %d\n", prefix+g.Name(), g.Get(), time.Now().Unix())
91 }
92
93 // windowedHistogram is taken from http://github.com/codahale/metrics. It
94 // is a windowed HDR histogram which drops data older than five minutes.
21 // Graphite receives metrics observations and forwards them to a Graphite server.
22 // Create a Graphite object, use it to create metrics, and pass those metrics as
23 // dependencies to the components that will use them.
9524 //
96 // The histogram exposes metrics for each passed quantile as gauges. Quantiles
97 // should be integers in the range 1..99. The gauge names are assigned by using
98 // the passed name as a prefix and appending "_pNN" e.g. "_p50".
25 // All metrics are buffered until WriteTo is called. Counters and gauges are
26 // aggregated into a single observation per timeseries per write. Histograms are
27 // exploded into per-quantile gauges and reported once per write.
9928 //
100 // The values of this histogram will be periodically emitted in a
101 // Graphite-compatible format once the GraphiteProvider is started. Fields are ignored.
102 type windowedHistogram struct {
103 mtx sync.Mutex
104 hist *hdrhistogram.WindowedHistogram
105
106 name string
107 gauges map[int]metrics.Gauge
108 logger log.Logger
109 }
110
111 func newWindowedHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles map[int]metrics.Gauge, logger log.Logger) *windowedHistogram {
112 h := &windowedHistogram{
113 hist: hdrhistogram.NewWindowed(5, minValue, maxValue, sigfigs),
114 name: name,
115 gauges: quantiles,
116 logger: logger,
117 }
118 go h.rotateLoop(1 * time.Minute)
29 // To regularly report metrics to an io.Writer, use the WriteLoop helper method.
30 // To send to a Graphite server, use the SendLoop helper method.
31 type Graphite struct {
32 mtx sync.RWMutex
33 prefix string
34 counters map[string]*Counter
35 gauges map[string]*Gauge
36 histograms map[string]*Histogram
37 logger log.Logger
38 }
39
40 // New returns a Statsd object that may be used to create metrics. Prefix is
41 // applied to all created metrics. Callers must ensure that regular calls to
42 // WriteTo are performed, either manually or with one of the helper methods.
43 func New(prefix string, logger log.Logger) *Graphite {
44 return &Graphite{
45 prefix: prefix,
46 counters: map[string]*Counter{},
47 gauges: map[string]*Gauge{},
48 histograms: map[string]*Histogram{},
49 logger: logger,
50 }
51 }
52
53 // NewCounter returns a counter. Observations are aggregated and emitted once
54 // per write invocation.
55 func (g *Graphite) NewCounter(name string) *Counter {
56 c := NewCounter(g.prefix + name)
57 g.mtx.Lock()
58 g.counters[g.prefix+name] = c
59 g.mtx.Unlock()
60 return c
61 }
62
63 // NewGauge returns a gauge. Observations are aggregated and emitted once per
64 // write invocation.
65 func (g *Graphite) NewGauge(name string) *Gauge {
66 ga := NewGauge(g.prefix + name)
67 g.mtx.Lock()
68 g.gauges[g.prefix+name] = ga
69 g.mtx.Unlock()
70 return ga
71 }
72
73 // NewHistogram returns a histogram. Observations are aggregated and emitted as
74 // per-quantile gauges, once per write invocation. 50 is a good default value
75 // for buckets.
76 func (g *Graphite) NewHistogram(name string, buckets int) *Histogram {
77 h := NewHistogram(g.prefix+name, buckets)
78 g.mtx.Lock()
79 g.histograms[g.prefix+name] = h
80 g.mtx.Unlock()
11981 return h
12082 }
12183
122 func (h *windowedHistogram) Name() string { return h.name }
123
124 func (h *windowedHistogram) With(metrics.Field) metrics.Histogram { return h }
125
126 func (h *windowedHistogram) Observe(value int64) {
127 h.mtx.Lock()
128 err := h.hist.Current.RecordValue(value)
129 h.mtx.Unlock()
130
131 if err != nil {
132 h.logger.Log("err", err, "msg", "unable to record histogram value")
133 return
134 }
135
136 for q, gauge := range h.gauges {
137 gauge.Set(float64(h.hist.Current.ValueAtQuantile(float64(q))))
138 }
139 }
140
141 func (h *windowedHistogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) {
142 bars := h.hist.Merge().Distribution()
143 buckets := make([]metrics.Bucket, len(bars))
144 for i, bar := range bars {
145 buckets[i] = metrics.Bucket{
146 From: bar.From,
147 To: bar.To,
148 Count: bar.Count,
149 }
150 }
151 quantiles := make([]metrics.Quantile, 0, len(h.gauges))
152 for quantile, gauge := range h.gauges {
153 quantiles = append(quantiles, metrics.Quantile{
154 Quantile: quantile,
155 Value: int64(gauge.Get()),
156 })
157 }
158 sort.Sort(quantileSlice(quantiles))
159 return buckets, quantiles
160 }
161
162 func (h *windowedHistogram) flush(w io.Writer, prefix string) {
163 name := prefix + h.Name()
164 hist := h.hist.Merge()
84 // WriteLoop is a helper method that invokes WriteTo to the passed writer every
85 // time the passed channel fires. This method blocks until the channel is
86 // closed, so clients probably want to run it in its own goroutine. For typical
87 // usage, create a time.Ticker and pass its C channel to this method.
88 func (g *Graphite) WriteLoop(c <-chan time.Time, w io.Writer) {
89 for range c {
90 if _, err := g.WriteTo(w); err != nil {
91 g.logger.Log("during", "WriteTo", "err", err)
92 }
93 }
94 }
95
96 // SendLoop is a helper method that wraps WriteLoop, passing a managed
97 // connection to the network and address. Like WriteLoop, this method blocks
98 // until the channel is closed, so clients probably want to start it in its own
99 // goroutine. For typical usage, create a time.Ticker and pass its C channel to
100 // this method.
101 func (g *Graphite) SendLoop(c <-chan time.Time, network, address string) {
102 g.WriteLoop(c, conn.NewDefaultManager(network, address, g.logger))
103 }
104
105 // WriteTo flushes the buffered content of the metrics to the writer, in
106 // Graphite plaintext format. WriteTo abides best-effort semantics, so
107 // observations are lost if there is a problem with the write. Clients should be
108 // sure to call WriteTo regularly, ideally through the WriteLoop or SendLoop
109 // helper methods.
110 func (g *Graphite) WriteTo(w io.Writer) (count int64, err error) {
111 g.mtx.RLock()
112 defer g.mtx.RUnlock()
165113 now := time.Now().Unix()
166 fmt.Fprintf(w, "%s.count %d %d\n", name, hist.TotalCount(), now)
167 fmt.Fprintf(w, "%s.min %d %d\n", name, hist.Min(), now)
168 fmt.Fprintf(w, "%s.max %d %d\n", name, hist.Max(), now)
169 fmt.Fprintf(w, "%s.mean %.2f %d\n", name, hist.Mean(), now)
170 fmt.Fprintf(w, "%s.std-dev %.2f %d\n", name, hist.StdDev(), now)
171 }
172
173 func (h *windowedHistogram) rotateLoop(d time.Duration) {
174 for range time.Tick(d) {
175 h.mtx.Lock()
176 h.hist.Rotate()
177 h.mtx.Unlock()
178 }
179 }
180
181 type quantileSlice []metrics.Quantile
182
183 func (a quantileSlice) Len() int { return len(a) }
184 func (a quantileSlice) Less(i, j int) bool { return a[i].Quantile < a[j].Quantile }
185 func (a quantileSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
114
115 for name, c := range g.counters {
116 n, err := fmt.Fprintf(w, "%s %f %d\n", name, c.c.ValueReset(), now)
117 if err != nil {
118 return count, err
119 }
120 count += int64(n)
121 }
122
123 for name, ga := range g.gauges {
124 n, err := fmt.Fprintf(w, "%s %f %d\n", name, ga.g.Value(), now)
125 if err != nil {
126 return count, err
127 }
128 count += int64(n)
129 }
130
131 for name, h := range g.histograms {
132 for _, p := range []struct {
133 s string
134 f float64
135 }{
136 {"50", 0.50},
137 {"90", 0.90},
138 {"95", 0.95},
139 {"99", 0.99},
140 } {
141 n, err := fmt.Fprintf(w, "%s.p%s %f %d\n", name, p.s, h.h.Quantile(p.f), now)
142 if err != nil {
143 return count, err
144 }
145 count += int64(n)
146 }
147 }
148
149 return count, err
150 }
151
152 // Counter is a Graphite counter metric.
153 type Counter struct {
154 c *generic.Counter
155 }
156
157 // NewCounter returns a new usable counter metric.
158 func NewCounter(name string) *Counter {
159 return &Counter{generic.NewCounter(name)}
160 }
161
162 // With is a no-op.
163 func (c *Counter) With(...string) metrics.Counter { return c }
164
165 // Add implements counter.
166 func (c *Counter) Add(delta float64) { c.c.Add(delta) }
167
168 // Gauge is a Graphite gauge metric.
169 type Gauge struct {
170 g *generic.Gauge
171 }
172
173 // NewGauge returns a new usable Gauge metric.
174 func NewGauge(name string) *Gauge {
175 return &Gauge{generic.NewGauge(name)}
176 }
177
178 // With is a no-op.
179 func (g *Gauge) With(...string) metrics.Gauge { return g }
180
181 // Set implements gauge.
182 func (g *Gauge) Set(value float64) { g.g.Set(value) }
183
184 // Histogram is a Graphite histogram metric. Observations are bucketed into
185 // per-quantile gauges.
186 type Histogram struct {
187 h *generic.Histogram
188 }
189
190 // NewHistogram returns a new usable Histogram metric.
191 func NewHistogram(name string, buckets int) *Histogram {
192 return &Histogram{generic.NewHistogram(name, buckets)}
193 }
194
195 // With is a no-op.
196 func (h *Histogram) With(...string) metrics.Histogram { return h }
197
198 // Observe implements histogram.
199 func (h *Histogram) Observe(value float64) { h.h.Observe(value) }
11
22 import (
33 "bytes"
4 "fmt"
5 "strings"
4 "regexp"
5 "strconv"
66 "testing"
7 "time"
87
98 "github.com/go-kit/kit/log"
10 "github.com/go-kit/kit/metrics"
119 "github.com/go-kit/kit/metrics/teststat"
1210 )
1311
14 func TestHistogramQuantiles(t *testing.T) {
15 prefix := "prefix."
16 e := NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
17 var (
18 name = "test_histogram_quantiles"
19 quantiles = []int{50, 90, 95, 99}
20 )
21 h, err := e.NewHistogram(name, 0, 100, 3, quantiles...)
22 if err != nil {
23 t.Fatalf("unable to create test histogram: %v", err)
24 }
25 h = h.With(metrics.Field{Key: "ignored", Value: "field"})
26 const seed, mean, stdev int64 = 424242, 50, 10
27 teststat.PopulateNormalHistogram(t, h, seed, mean, stdev)
28
29 // flush the current metrics into a buffer to examine
30 var b bytes.Buffer
31 e.flush(&b)
32 teststat.AssertGraphiteNormalHistogram(t, prefix, name, mean, stdev, quantiles, b.String())
33 }
34
3512 func TestCounter(t *testing.T) {
36 var (
37 prefix = "prefix."
38 name = "m"
39 value = 123
40 e = NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
41 b bytes.Buffer
42 )
43 e.NewCounter(name).With(metrics.Field{Key: "ignored", Value: "field"}).Add(uint64(value))
44 e.flush(&b)
45 want := fmt.Sprintf("%s%s.count %d", prefix, name, value)
46 payload := b.String()
47 if !strings.HasPrefix(payload, want) {
48 t.Errorf("counter %s want\n%s, have\n%s", name, want, payload)
13 prefix, name := "abc.", "def"
14 label, value := "label", "value" // ignored for Graphite
15 regex := `^` + prefix + name + ` ([0-9\.]+) [0-9]+$`
16 g := New(prefix, log.NewNopLogger())
17 counter := g.NewCounter(name).With(label, value)
18 valuef := teststat.SumLines(g, regex)
19 if err := teststat.TestCounter(counter, valuef); err != nil {
20 t.Fatal(err)
4921 }
5022 }
5123
5224 func TestGauge(t *testing.T) {
53 var (
54 prefix = "prefix."
55 name = "xyz"
56 value = 54321
57 delta = 12345
58 e = NewEmitter("", "", prefix, time.Second, log.NewNopLogger())
59 b bytes.Buffer
60 g = e.NewGauge(name).With(metrics.Field{Key: "ignored", Value: "field"})
61 )
62
63 g.Set(float64(value))
64 g.Add(float64(delta))
65
66 e.flush(&b)
67 payload := b.String()
68
69 want := fmt.Sprintf("%s%s %d", prefix, name, value+delta)
70 if !strings.HasPrefix(payload, want) {
71 t.Errorf("gauge %s want\n%s, have\n%s", name, want, payload)
25 prefix, name := "ghi.", "jkl"
26 label, value := "xyz", "abc" // ignored for Graphite
27 regex := `^` + prefix + name + ` ([0-9\.]+) [0-9]+$`
28 g := New(prefix, log.NewNopLogger())
29 gauge := g.NewGauge(name).With(label, value)
30 valuef := teststat.LastLine(g, regex)
31 if err := teststat.TestGauge(gauge, valuef); err != nil {
32 t.Fatal(err)
7233 }
7334 }
7435
75 func TestEmitterStops(t *testing.T) {
76 e := NewEmitter("foo", "bar", "baz", time.Second, log.NewNopLogger())
77 time.Sleep(100 * time.Millisecond)
78 e.Stop()
36 func TestHistogram(t *testing.T) {
37 // The histogram test is actually like 4 gauge tests.
38 prefix, name := "statsd.", "histogram_test"
39 label, value := "abc", "def" // ignored for Graphite
40 re50 := regexp.MustCompile(prefix + name + `.p50 ([0-9\.]+) [0-9]+`)
41 re90 := regexp.MustCompile(prefix + name + `.p90 ([0-9\.]+) [0-9]+`)
42 re95 := regexp.MustCompile(prefix + name + `.p95 ([0-9\.]+) [0-9]+`)
43 re99 := regexp.MustCompile(prefix + name + `.p99 ([0-9\.]+) [0-9]+`)
44 g := New(prefix, log.NewNopLogger())
45 histogram := g.NewHistogram(name, 50).With(label, value)
46 quantiles := func() (float64, float64, float64, float64) {
47 var buf bytes.Buffer
48 g.WriteTo(&buf)
49 match50 := re50.FindStringSubmatch(buf.String())
50 p50, _ := strconv.ParseFloat(match50[1], 64)
51 match90 := re90.FindStringSubmatch(buf.String())
52 p90, _ := strconv.ParseFloat(match90[1], 64)
53 match95 := re95.FindStringSubmatch(buf.String())
54 p95, _ := strconv.ParseFloat(match95[1], 64)
55 match99 := re99.FindStringSubmatch(buf.String())
56 p99, _ := strconv.ParseFloat(match99[1], 64)
57 return p50, p90, p95, p99
58 }
59 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
60 t.Fatal(err)
61 }
7962 }
0 // Package influx provides an InfluxDB implementation for metrics. The model is
1 // similar to other push-based instrumentation systems. Observations are
2 // aggregated locally and emitted to the Influx server on regular intervals.
3 package influx
4
5 import (
6 "time"
7
8 influxdb "github.com/influxdata/influxdb/client/v2"
9
10 "github.com/go-kit/kit/log"
11 "github.com/go-kit/kit/metrics"
12 "github.com/go-kit/kit/metrics/internal/lv"
13 )
14
15 // Influx is a store for metrics that will be emitted to an Influx database.
16 //
17 // Influx is a general purpose time-series database, and has no native concepts
18 // of counters, gauges, or histograms. Counters are modeled as a timeseries with
19 // one data point per flush, with a "count" field that reflects all adds since
20 // the last flush. Gauges are modeled as a timeseries with one data point per
21 // flush, with a "value" field that reflects the current state of the gauge.
22 // Histograms are modeled as a timeseries with one data point per observation,
23 // with a "value" field that reflects each observation; use e.g. the HISTOGRAM
24 // aggregate function to compute histograms.
25 //
26 // Influx tags are immutable, attached to the Influx object, and given to each
27 // metric at construction. Influx fields are mapped to Go kit label values, and
28 // may be mutated via With functions. Actual metric values are provided as
29 // fields with specific names depending on the metric.
30 //
31 // All observations are collected in memory locally, and flushed on demand.
32 type Influx struct {
33 counters *lv.Space
34 gauges *lv.Space
35 histograms *lv.Space
36 tags map[string]string
37 conf influxdb.BatchPointsConfig
38 logger log.Logger
39 }
40
41 // New returns an Influx, ready to create metrics and collect observations. Tags
42 // are applied to all metrics created from this object. The BatchPointsConfig is
43 // used during flushing.
44 func New(tags map[string]string, conf influxdb.BatchPointsConfig, logger log.Logger) *Influx {
45 return &Influx{
46 counters: lv.NewSpace(),
47 gauges: lv.NewSpace(),
48 histograms: lv.NewSpace(),
49 tags: tags,
50 conf: conf,
51 logger: logger,
52 }
53 }
54
55 // NewCounter returns an Influx counter.
56 func (in *Influx) NewCounter(name string) *Counter {
57 return &Counter{
58 name: name,
59 obs: in.counters.Observe,
60 }
61 }
62
63 // NewGauge returns an Influx gauge.
64 func (in *Influx) NewGauge(name string) *Gauge {
65 return &Gauge{
66 name: name,
67 obs: in.gauges.Observe,
68 }
69 }
70
71 // NewHistogram returns an Influx histogram.
72 func (in *Influx) NewHistogram(name string) *Histogram {
73 return &Histogram{
74 name: name,
75 obs: in.histograms.Observe,
76 }
77 }
78
79 // BatchPointsWriter captures a subset of the influxdb.Client methods necessary
80 // for emitting metrics observations.
81 type BatchPointsWriter interface {
82 Write(influxdb.BatchPoints) error
83 }
84
85 // WriteLoop is a helper method that invokes WriteTo to the passed writer every
86 // time the passed channel fires. This method blocks until the channel is
87 // closed, so clients probably want to run it in its own goroutine. For typical
88 // usage, create a time.Ticker and pass its C channel to this method.
89 func (in *Influx) WriteLoop(c <-chan time.Time, w BatchPointsWriter) {
90 for range c {
91 if err := in.WriteTo(w); err != nil {
92 in.logger.Log("during", "WriteTo", "err", err)
93 }
94 }
95 }
96
97 // WriteTo flushes the buffered content of the metrics to the writer, in an
98 // Influx BatchPoints format. WriteTo abides best-effort semantics, so
99 // observations are lost if there is a problem with the write. Clients should be
100 // sure to call WriteTo regularly, ideally through the WriteLoop helper method.
101 func (in *Influx) WriteTo(w BatchPointsWriter) (err error) {
102 bp, err := influxdb.NewBatchPoints(in.conf)
103 if err != nil {
104 return err
105 }
106
107 now := time.Now()
108
109 in.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
110 fields := fieldsFrom(lvs)
111 fields["count"] = sum(values)
112 var p *influxdb.Point