Codebase list golang-github-go-kit-kit / 53fb81e
allowing tcp or udp, cleaning up loggers and panics and style changes JP Robinson 7 years ago
3 changed file(s) with 141 addition(s) and 124 deletion(s). Raw diff Collapse all Expand all
0 // Package graphite implements a graphite backend for package metrics.
0 // Package graphite implements a Graphite backend for package metrics. Metrics
1 // will be emitted to a Graphite server in the plaintext protocol
2 // (http://graphite.readthedocs.io/en/latest/feeding-carbon.html#the-plaintext-protocol)
3 // which looks like:
4 // "<metric path> <metric value> <metric timestamp>"
15 //
26 // The current implementation ignores fields.
37 package graphite
610 "bufio"
711 "fmt"
812 "io"
9 "log"
1013 "math"
1114 "net"
1215 "sort"
1518 "time"
1619
1720 "github.com/codahale/hdrhistogram"
21 "github.com/go-kit/kit/log"
1822 "github.com/go-kit/kit/metrics"
1923 )
2024
2125 // Emitter will keep track of all metrics and, once started,
2226 // will emit the metrics via the Flush method to the given address.
2327 type Emitter interface {
24 NewCounter(string) metrics.Counter
25 NewHistogram(string, int64, int64, int, ...int) metrics.Histogram
26 NewTimeHistogram(string, time.Duration, int64, int64, int, ...int) metrics.TimeHistogram
27 NewGauge(string) metrics.Gauge
28
29 Start(time.Duration)
30 Flush() error
28 NewCounter(name string) metrics.Counter
29 NewHistogram(name string, min int64, max int64, sigfigs int, quantiles ...int) (metrics.Histogram, error)
30 NewTimeHistogram(name string, unit time.Duration, min int64, max int64, sigfigs int, quantiles ...int) (metrics.TimeHistogram, error)
31 NewGauge(name string) metrics.Gauge
32
33 Start(reportInvterval time.Duration) error
34 Flush()
3135 }
3236
3337 type emitter struct {
34 addr *net.TCPAddr
3538 prefix string
3639
37 metricMu *sync.Mutex
40 addr string
41 tcp bool
42 conn net.Conn
43
44 mtx sync.Mutex
3845 counters []*counter
3946 histograms []*windowedHistogram
4047 gauges []*gauge
48
49 logger log.Logger
4150 }
4251
4352 // NewEmitter will return an Emitter that will prefix all
4453 // metrics names with the given prefix. Once started, it will attempt to create
45 // a TCP connection with the given address and periodically post metrics to the
46 // connection in a Graphite-compatible format.
47 func NewEmitter(addr *net.TCPAddr, prefix string) Emitter {
48 e := &emitter{
49 addr, prefix, &sync.Mutex{},
50 []*counter{}, []*windowedHistogram{}, []*gauge{},
51 }
52
53 return e
54 // a TCP or a UDP connection with the given address and periodically post
55 // metrics to the connection in the Graphite plaintext protocol.
56 // If the provided `tcp` parameter is false, a UDP connection will be used.
57 func NewEmitter(addr string, tcp bool, metricsPrefix string, logger log.Logger) Emitter {
58 return &emitter{
59 addr: addr,
60 tcp: tcp,
61 prefix: metricsPrefix,
62 logger: logger,
63 }
5464 }
5565
5666 // NewCounter returns a Counter whose value will be periodically emitted in
5767 // a Graphite-compatible format once the Emitter is started. Fields are ignored.
5868 func (e *emitter) NewCounter(name string) metrics.Counter {
5969 // only one flush at a time
60 e.metricMu.Lock()
61 defer e.metricMu.Unlock()
6270 c := &counter{name, 0}
71 e.mtx.Lock()
6372 e.counters = append(e.counters, c)
73 e.mtx.Unlock()
6474 return c
6575 }
6676
7383 //
7484 // The values of this histogram will be periodically emitted in a Graphite-compatible
7585 // format once the Emitter is started. Fields are ignored.
76 func (e *emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) metrics.Histogram {
77 // only one flush at a time
78 e.metricMu.Lock()
79 defer e.metricMu.Unlock()
80
86 func (e *emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
8187 gauges := map[int]metrics.Gauge{}
8288 for _, quantile := range quantiles {
8389 if quantile <= 0 || quantile >= 100 {
84 panic(fmt.Sprintf("invalid quantile %d", quantile))
90 return nil, fmt.Errorf("invalid quantile %d", quantile)
8591 }
8692 gauges[quantile] = e.gauge(fmt.Sprintf("%s_p%02d", name, quantile))
8793 }
88 h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges)
94 h := newWindowedHistogram(name, minValue, maxValue, sigfigs, gauges, e.logger)
95
96 e.mtx.Lock()
8997 e.histograms = append(e.histograms, h)
90 return h
98 e.mtx.Unlock()
99 return h, nil
91100 }
92101
93102 // NewTimeHistogram returns a TimeHistogram wrapper around the windowed
94103 // HDR histrogram provided by this package.
95 func (e *emitter) NewTimeHistogram(name string, unit time.Duration, minValue, maxValue int64, sigfigs int, quantiles ...int) metrics.TimeHistogram {
96 h := e.NewHistogram(name, minValue, maxValue, sigfigs, quantiles...)
97 return metrics.NewTimeHistogram(unit, h)
104 func (e *emitter) NewTimeHistogram(name string, unit time.Duration, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.TimeHistogram, error) {
105 h, err := e.NewHistogram(name, minValue, maxValue, sigfigs, quantiles...)
106 if err != nil {
107 return nil, err
108 }
109 return metrics.NewTimeHistogram(unit, h), nil
98110 }
99111
100112 // NewGauge returns a Gauge whose value will be periodically emitted in
101113 // a Graphite-compatible format once the Emitter is started. Fields are ignored.
102114 func (e *emitter) NewGauge(name string) metrics.Gauge {
103 // only one flush at a time
104 e.metricMu.Lock()
105 defer e.metricMu.Unlock()
115 e.mtx.Lock()
116 defer e.mtx.Unlock()
106117 return e.gauge(name)
107118 }
108119
112123 return g
113124 }
114125
126 func (e *emitter) dial() error {
127 if e.tcp {
128 tAddr, err := net.ResolveTCPAddr("tcp", e.addr)
129 if err != nil {
130 return err
131 }
132 e.conn, err = net.DialTCP("tcp", nil, tAddr)
133 if err != nil {
134 return err
135 }
136 } else {
137 uAddr, err := net.ResolveUDPAddr("udp", e.addr)
138 if err != nil {
139 return err
140 }
141 e.conn, err = net.DialUDP("udp", nil, uAddr)
142 if err != nil {
143 return err
144 }
145 }
146 return nil
147 }
148
115149 // Start will kick off a background goroutine to
116150 // call Flush once every interval.
117 func (e *emitter) Start(interval time.Duration) {
151 func (e *emitter) Start(interval time.Duration) error {
152 err := e.dial()
153 if err != nil {
154 return err
155 }
118156 go func() {
119 t := time.Tick(interval)
120 for range t {
121 err := e.Flush()
122 if err != nil {
123 log.Print("error: could not dial graphite host: ", err)
124 continue
125 }
157 for range time.Tick(interval) {
158 e.Flush()
126159 }
127160 }()
161 return nil
128162 }
129163
130164 // Flush will attempt to create a connection with the given address
131 // and write the current metrics to it in a Graphite-compatible format.
165 // and write the current metrics to it in the Graphite plaintext protocol.
132166 //
133167 // Users can call this method on process shutdown to ensure
134168 // the current metrics are pushed to Graphite.
135 func (e *emitter) Flush() error {
136 // open connection
137 conn, err := net.DialTCP("tcp", nil, e.addr)
138 if err != nil {
139 return err
140 }
141
142 // flush stats to connection
143 e.flush(conn)
144
145 // close connection
146 conn.Close()
147 return nil
148 }
169 func (e *emitter) Flush() { e.flush(e.conn) }
149170
150171 func (e *emitter) flush(conn io.Writer) {
151172 // only one flush at a time
152 e.metricMu.Lock()
153 defer e.metricMu.Unlock()
173 e.mtx.Lock()
174 defer e.mtx.Unlock()
154175
155176 // buffer the writer and make sure to flush it
156177 w := bufio.NewWriter(conn)
157178 defer w.Flush()
158179
159 now := time.Now().Unix()
160
161180 // emit counter stats
162181 for _, c := range e.counters {
163 fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, c.Name(), c.count, now)
182 fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, c.Name(), c.count, time.Now().Unix())
164183 }
165184
166185 // emit histogram specific stats
167186 for _, h := range e.histograms {
168187 hist := h.hist.Merge()
188 now := time.Now().Unix()
169189 fmt.Fprintf(w, "%s.%s.count %d %d\n", e.prefix, h.Name(), hist.TotalCount(), now)
170190 fmt.Fprintf(w, "%s.%s.min %d %d\n", e.prefix, h.Name(), hist.Min(), now)
171191 fmt.Fprintf(w, "%s.%s.max %d %d\n", e.prefix, h.Name(), hist.Max(), now)
175195
176196 // emit gauge stats (which can include some histogram quantiles)
177197 for _, g := range e.gauges {
178 fmt.Fprintf(w, "%s.%s %.2f %d\n", e.prefix, g.Name(), g.Get(), now)
198 fmt.Fprintf(w, "%s.%s %.2f %d\n", e.prefix, g.Name(), g.Get(), time.Now().Unix())
179199 }
180200 }
181201
218238 }
219239
220240 type windowedHistogram struct {
221 mu sync.Mutex
241 mtx sync.Mutex
222242 hist *hdrhistogram.WindowedHistogram
223243
224244 name string
225245 gauges map[int]metrics.Gauge
246 logger log.Logger
226247 }
227248
228249 // newWindowedHistogram is taken from http://github.com/codahale/metrics. It returns a
231252 // The histogram exposes metrics for each passed quantile as gauges. Users are expected
232253 // to provide their own set of Gauges for quantiles to make this Histogram work across multiple
233254 // metrics providers.
234 func newWindowedHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles map[int]metrics.Gauge) *windowedHistogram {
255 func newWindowedHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles map[int]metrics.Gauge, logger log.Logger) *windowedHistogram {
235256 h := &windowedHistogram{
236257 hist: hdrhistogram.NewWindowed(5, minValue, maxValue, sigfigs),
237258 name: name,
238259 gauges: quantiles,
260 logger: logger,
239261 }
240262 go h.rotateLoop(1 * time.Minute)
241263 return h
245267 func (h *windowedHistogram) With(metrics.Field) metrics.Histogram { return h }
246268
247269 func (h *windowedHistogram) Observe(value int64) {
248 h.mu.Lock()
270 h.mtx.Lock()
249271 err := h.hist.Current.RecordValue(value)
250 h.mu.Unlock()
272 h.mtx.Unlock()
251273
252274 if err != nil {
253 panic(err.Error())
275 h.logger.Log("err", err, "msg", "unable to record histogram value")
276 return
254277 }
255278
256279 for q, gauge := range h.gauges {
281304
282305 func (h *windowedHistogram) rotateLoop(d time.Duration) {
283306 for range time.Tick(d) {
284 h.mu.Lock()
307 h.mtx.Lock()
285308 h.hist.Rotate()
286 h.mu.Unlock()
309 h.mtx.Unlock()
287310 }
288311 }
289312
1111
1212 func TestHistogramQuantiles(t *testing.T) {
1313 prefix := "prefix"
14 e := NewEmitter(nil, prefix)
14 e := NewEmitter("", true, prefix, nil)
1515 var (
1616 name = "test_histogram_quantiles"
1717 quantiles = []int{50, 90, 95, 99}
18 h = e.NewHistogram(name, 0, 100, 3, quantiles...).With(metrics.Field{Key: "ignored", Value: "field"})
1918 )
19 h, err := e.NewHistogram(name, 0, 100, 3, quantiles...)
20 if err != nil {
21 t.Fatalf("unable to create test histogram: ", err)
22 }
23 h = h.With(metrics.Field{Key: "ignored", Value: "field"})
2024 const seed, mean, stdev int64 = 424242, 50, 10
2125 teststat.PopulateNormalHistogram(t, h, seed, mean, stdev)
2226
3135 prefix = "prefix"
3236 name = "m"
3337 value = 123
34 e = NewEmitter(nil, prefix)
38 e = NewEmitter("", true, prefix, nil).(*emitter)
3539 b bytes.Buffer
3640 )
3741 e.NewCounter(name).With(metrics.Field{Key: "ignored", Value: "field"}).Add(uint64(value))
38 e.(*emitter).flush(&b)
42 e.flush(&b)
3943 want := fmt.Sprintf("%s.%s.count %d", prefix, name, value)
4044 payload := b.String()
4145 if !strings.HasPrefix(payload, want) {
4953 name = "xyz"
5054 value = 54321
5155 delta = 12345
52 e = NewEmitter(nil, prefix)
56 e = NewEmitter("", true, prefix, nil)
5357 b bytes.Buffer
5458 g = e.NewGauge(name).With(metrics.Field{Key: "ignored", Value: "field"})
5559 )
6569 t.Errorf("gauge %s want\n%s, have\n%s", name, want, payload)
6670 }
6771 }
68
69 func TestInvalidQuantile(t *testing.T) {
70 e := NewEmitter(nil, "")
71 defer func() {
72 if err := recover(); err == nil {
73 t.Errorf("expected panic, got none")
74 } else {
75 t.Logf("got expected panic: %v", err)
76 }
77 }()
78 e.NewHistogram("foo", 0.0, 100.0, 3, 50, 90, 95, 99, 101)
79 }
1010 // AssertGraphiteNormalHistogram ensures the expvar Histogram referenced by
1111 // metricName abides a normal distribution.
1212 func AssertGraphiteNormalHistogram(t *testing.T, prefix, metricName string, mean, stdev int64, quantiles []int, gPayload string) {
13 const tolerance int = 2
14
1513 // check for hdr histo data
16 wants := map[string]int64{"count": 1234, "min": 15, "max": 83, "std-dev": stdev, "mean": mean}
14 wants := map[string]int64{"count": 1234, "min": 15, "max": 83}
1715 for key, want := range wants {
1816 re := regexp.MustCompile(fmt.Sprintf("%s.%s.%s (\\d*)", prefix, metricName, key))
19 if res := re.FindAllStringSubmatch(gPayload, 1); res != nil {
20 if len(res[0]) == 1 {
21 t.Errorf("bad regex found, please check the test scenario")
22 continue
23 }
17 res := re.FindAllStringSubmatch(gPayload, 1)
18 if res == nil {
19 t.Error("did not find metrics log for", key, "in \n", gPayload)
20 continue
21 }
2422
25 have, err := strconv.ParseInt(res[0][1], 10, 64)
26 if err != nil {
27 t.Fatal(err)
28 }
23 if len(res[0]) == 1 {
24 t.Fatalf("%q: bad regex, please check the test scenario", key)
25 }
2926
30 if int(math.Abs(float64(want-have))) > tolerance {
31 t.Errorf("key %s: want %d, have %d", key, want, have)
32 }
33 } else {
34 t.Error("did not find metrics log for", key, "in \n", gPayload)
27 have, err := strconv.ParseInt(res[0][1], 10, 64)
28 if err != nil {
29 t.Fatal(err)
30 }
31
32 if want != have {
33 t.Errorf("key %s: want %d, have %d", key, want, have)
3534 }
3635 }
3736
37 const tolerance int = 2
38 wants = map[string]int64{".std-dev": stdev, ".mean": mean}
39 for _, quantile := range quantiles {
40 wants[fmt.Sprintf("_p%02d", quantile)] = normalValueAtQuantile(mean, stdev, quantile)
41 }
3842 // check for quantile gauges
39 for _, quantile := range quantiles {
40 want := normalValueAtQuantile(mean, stdev, quantile)
41
42 re := regexp.MustCompile(fmt.Sprintf("%s.%s_p%02d (\\d*\\.\\d*)", prefix, metricName, quantile))
43 if res := re.FindAllStringSubmatch(gPayload, 1); res != nil {
44 if len(res[0]) == 1 {
45 t.Errorf("bad regex found, please check the test scenario")
46 continue
47 }
48 have, err := strconv.ParseFloat(res[0][1], 64)
49 if err != nil {
50 t.Fatal(err)
51 }
52 if int(math.Abs(float64(want)-have)) > tolerance {
53 t.Errorf("quantile %d: want %.2f, have %.2f", quantile, want, have)
54 }
55 } else {
56 t.Errorf("did not find metrics log for %d", quantile)
43 for key, want := range wants {
44 re := regexp.MustCompile(fmt.Sprintf("%s.%s%s (\\d*\\.\\d*)", prefix, metricName, key))
45 res := re.FindAllStringSubmatch(gPayload, 1)
46 if res == nil {
47 t.Errorf("did not find metrics log for %s", key)
48 continue
5749 }
5850
51 if len(res[0]) == 1 {
52 t.Fatalf("%q: bad regex found, please check the test scenario", key)
53 }
54 have, err := strconv.ParseFloat(res[0][1], 64)
55 if err != nil {
56 t.Fatal(err)
57 }
58 if int(math.Abs(float64(want)-have)) > tolerance {
59 t.Errorf("key %s: want %.2f, have %.2f", key, want, have)
60 }
5961 }
6062 }