Merge pull request #19 from peterbourgon/gauge-callbacks
metrics: add callback-style constructors for Gauges
Peter Bourgon
9 years ago
18 | 18 | import ( |
19 | 19 | "expvar" |
20 | 20 | "fmt" |
21 | "strconv" | |
21 | 22 | "sync" |
22 | 23 | "time" |
23 | 24 | |
37 | 38 | } |
38 | 39 | |
39 | 40 | func (c *counter) With(metrics.Field) metrics.Counter { return c } |
40 | ||
41 | func (c *counter) Add(delta uint64) { c.v.Add(int64(delta)) } | |
41 | func (c *counter) Add(delta uint64) { c.v.Add(int64(delta)) } | |
42 | 42 | |
43 | 43 | type gauge struct { |
44 | 44 | v *expvar.Float |
45 | 45 | } |
46 | 46 | |
47 | // NewGauge returns a new Gauge backed by an expvar with the given name. | |
48 | // Fields are ignored. | |
47 | // NewGauge returns a new Gauge backed by an expvar with the given name. It | |
48 | // should be updated manually; for a callback-based approach, see | |
49 | // NewCallbackGauge. Fields are ignored. | |
49 | 50 | func NewGauge(name string) metrics.Gauge { |
50 | 51 | return &gauge{expvar.NewFloat(name)} |
51 | 52 | } |
55 | 56 | func (g *gauge) Add(delta float64) { g.v.Add(delta) } |
56 | 57 | |
57 | 58 | func (g *gauge) Set(value float64) { g.v.Set(value) } |
59 | ||
60 | // PublishCallbackGauge publishes a Gauge as an expvar with the given name, | |
61 | // whose value is determined at collect time by the passed callback function. | |
62 | // The callback determines the value, and fields are ignored, so | |
63 | // PublishCallbackGauge returns nothing. | |
64 | func PublishCallbackGauge(name string, callback func() float64) { | |
65 | expvar.Publish(name, callbackGauge(callback)) | |
66 | } | |
67 | ||
68 | type callbackGauge func() float64 | |
69 | ||
70 | func (g callbackGauge) String() string { return strconv.FormatFloat(g(), 'g', -1, 64) } | |
58 | 71 | |
59 | 72 | type histogram struct { |
60 | 73 | mu sync.Mutex |
0 | 0 | package expvar_test |
1 | 1 | |
2 | 2 | import ( |
3 | stdexpvar "expvar" | |
4 | "fmt" | |
3 | 5 | "testing" |
4 | 6 | |
5 | 7 | "github.com/peterbourgon/gokit/metrics/expvar" |
15 | 17 | teststat.PopulateNormalHistogram(t, h, seed, mean, stdev) |
16 | 18 | teststat.AssertExpvarNormalHistogram(t, metricName, mean, stdev, quantiles) |
17 | 19 | } |
20 | ||
21 | func TestCallbackGauge(t *testing.T) { | |
22 | value := 42.43 | |
23 | metricName := "foo" | |
24 | expvar.PublishCallbackGauge(metricName, func() float64 { return value }) | |
25 | if want, have := fmt.Sprint(value), stdexpvar.Get(metricName).String(); want != have { | |
26 | t.Errorf("want %q, have %q", want, have) | |
27 | } | |
28 | } |
110 | 110 | g.GaugeVec.With(prometheus.Labels(g.Pairs)).Add(delta) |
111 | 111 | } |
112 | 112 | |
113 | // RegisterCallbackGauge registers a Gauge with Prometheus whose value is | |
114 | // determined at collect time by the passed callback function. The callback | |
115 | // determines the value, and fields are ignored, so RegisterCallbackGauge | |
116 | // returns nothing. | |
117 | func RegisterCallbackGauge(namespace, subsystem, name, help string, callback func() float64) { | |
118 | RegisterCallbackGaugeWithLabels(namespace, subsystem, name, help, prometheus.Labels{}, callback) | |
119 | } | |
120 | ||
121 | // RegisterCallbackGaugeWithLabels is the same as RegisterCallbackGauge, but | |
122 | // attaches a set of const label pairs to the metric. | |
123 | func RegisterCallbackGaugeWithLabels(namespace, subsystem, name, help string, constLabels prometheus.Labels, callback func() float64) { | |
124 | prometheus.MustRegister(prometheus.NewGaugeFunc( | |
125 | prometheus.GaugeOpts{ | |
126 | Namespace: namespace, | |
127 | Subsystem: subsystem, | |
128 | Name: name, | |
129 | Help: help, | |
130 | ConstLabels: constLabels, | |
131 | }, | |
132 | callback, | |
133 | )) | |
134 | } | |
135 | ||
113 | 136 | type prometheusHistogram struct { |
114 | 137 | *prometheus.SummaryVec |
115 | 138 | Pairs map[string]string |
65 | 65 | } |
66 | 66 | } |
67 | 67 | |
68 | func TestPrometheusCallbackGauge(t *testing.T) { | |
69 | value := 123.456 | |
70 | cb := func() float64 { return value } | |
71 | prometheus.RegisterCallbackGauge("test", "prometheus_gauge", "bazbaz", "Help string.", cb) | |
72 | if want, have := strings.Join([]string{ | |
73 | `# HELP test_prometheus_gauge_bazbaz Help string.`, | |
74 | `# TYPE test_prometheus_gauge_bazbaz gauge`, | |
75 | `test_prometheus_gauge_bazbaz 123.456`, | |
76 | }, "\n"), teststat.ScrapePrometheus(t); !strings.Contains(have, want) { | |
77 | t.Errorf("metric stanza not found or incorrect\n%s", have) | |
78 | } | |
79 | } | |
80 | ||
68 | 81 | func TestPrometheusHistogram(t *testing.T) { |
69 | 82 | h := prometheus.NewHistogram("test", "prometheus_histogram", "foobar", "Qwerty asdf.", []string{}) |
70 | 83 |
29 | 29 | type statsdCounter chan string |
30 | 30 | |
31 | 31 | // NewCounter returns a Counter that emits observations in the statsd protocol |
32 | // to the passed writer. Observations are buffered for the reporting interval | |
33 | // or until the buffer exceeds a max packet size, whichever comes first. | |
34 | // Fields are ignored. | |
32 | // to the passed writer. Observations are buffered for the report interval or | |
33 | // until the buffer exceeds a max packet size, whichever comes first. Fields | |
34 | // are ignored. | |
35 | 35 | // |
36 | 36 | // TODO: support for sampling. |
37 | func NewCounter(w io.Writer, key string, interval time.Duration) metrics.Counter { | |
37 | func NewCounter(w io.Writer, key string, reportInterval time.Duration) metrics.Counter { | |
38 | 38 | c := make(chan string) |
39 | go fwd(w, key, interval, c) | |
39 | go fwd(w, key, reportInterval, c) | |
40 | 40 | return statsdCounter(c) |
41 | 41 | } |
42 | 42 | |
47 | 47 | type statsdGauge chan string |
48 | 48 | |
49 | 49 | // NewGauge returns a Gauge that emits values in the statsd protocol to the |
50 | // passed writer. Values are buffered for the reporting interval or until the | |
50 | // passed writer. Values are buffered for the report interval or until the | |
51 | 51 | // buffer exceeds a max packet size, whichever comes first. Fields are |
52 | 52 | // ignored. |
53 | 53 | // |
54 | 54 | // TODO: support for sampling. |
55 | func NewGauge(w io.Writer, key string, interval time.Duration) metrics.Gauge { | |
55 | func NewGauge(w io.Writer, key string, reportInterval time.Duration) metrics.Gauge { | |
56 | 56 | g := make(chan string) |
57 | go fwd(w, key, interval, g) | |
57 | go fwd(w, key, reportInterval, g) | |
58 | 58 | return statsdGauge(g) |
59 | 59 | } |
60 | 60 | |
71 | 71 | |
72 | 72 | func (g statsdGauge) Set(value float64) { |
73 | 73 | g <- fmt.Sprintf("%f|g", value) |
74 | } | |
75 | ||
76 | // NewCallbackGauge emits values in the statsd protocol to the passed writer. | |
77 | // It collects values every scrape interval from the callback. Values are | |
78 | // buffered for the report interval or until the buffer exceeds a max packet | |
79 | // size, whichever comes first. The report and scrape intervals may be the | |
80 | // same. The callback determines the value, and fields are ignored, so | |
81 | // NewCallbackGauge returns nothing. | |
82 | func NewCallbackGauge(w io.Writer, key string, reportInterval, scrapeInterval time.Duration, callback func() float64) { | |
83 | go fwd(w, key, reportInterval, emitEvery(scrapeInterval, callback)) | |
84 | } | |
85 | ||
86 | func emitEvery(d time.Duration, callback func() float64) <-chan string { | |
87 | c := make(chan string) | |
88 | go func() { | |
89 | for range tick(d) { | |
90 | c <- fmt.Sprintf("%f|g", callback()) | |
91 | } | |
92 | }() | |
93 | return c | |
74 | 94 | } |
75 | 95 | |
76 | 96 | type statsdHistogram chan string |
92 | 112 | // NewTimeHistogram(statsdHistogram, time.Millisecond) |
93 | 113 | // |
94 | 114 | // TODO: support for sampling. |
95 | func NewHistogram(w io.Writer, key string, interval time.Duration) metrics.Histogram { | |
115 | func NewHistogram(w io.Writer, key string, reportInterval time.Duration) metrics.Histogram { | |
96 | 116 | h := make(chan string) |
97 | go fwd(w, key, interval, h) | |
117 | go fwd(w, key, reportInterval, h) | |
98 | 118 | return statsdHistogram(h) |
99 | 119 | } |
100 | 120 | |
106 | 126 | |
107 | 127 | var tick = time.Tick |
108 | 128 | |
109 | func fwd(w io.Writer, key string, interval time.Duration, c chan string) { | |
129 | func fwd(w io.Writer, key string, reportInterval time.Duration, c <-chan string) { | |
110 | 130 | buf := &bytes.Buffer{} |
111 | tick := tick(interval) | |
131 | tick := tick(reportInterval) | |
112 | 132 | for { |
113 | 133 | select { |
114 | 134 | case s := <-c: |
3 | 3 | |
4 | 4 | import ( |
5 | 5 | "bytes" |
6 | "fmt" | |
6 | 7 | "runtime" |
7 | 8 | "testing" |
8 | 9 | "time" |
37 | 38 | buf := &bytes.Buffer{} |
38 | 39 | g := NewGauge(buf, "test_statsd_gauge", time.Second) |
39 | 40 | |
40 | g.Add(1) // send command | |
41 | delta := 1.0 | |
42 | g.Add(delta) // send command | |
41 | 43 | runtime.Gosched() // yield to buffer write |
42 | 44 | ch <- time.Now() // signal flush |
43 | 45 | runtime.Gosched() // yield to flush |
44 | if want, have := "test_statsd_gauge:+1.000000|g\n", buf.String(); want != have { | |
46 | if want, have := fmt.Sprintf("test_statsd_gauge:+%f|g\n", delta), buf.String(); want != have { | |
45 | 47 | t.Errorf("want %q, have %q", want, have) |
46 | 48 | } |
47 | 49 | |
48 | 50 | buf.Reset() |
49 | 51 | |
50 | g.Add(-2) | |
52 | delta = -2.0 | |
53 | g.Add(delta) | |
51 | 54 | runtime.Gosched() |
52 | 55 | ch <- time.Now() |
53 | 56 | runtime.Gosched() |
54 | if want, have := "test_statsd_gauge:-2.000000|g\n", buf.String(); want != have { | |
57 | if want, have := fmt.Sprintf("test_statsd_gauge:%f|g\n", delta), buf.String(); want != have { | |
55 | 58 | t.Errorf("want %q, have %q", want, have) |
56 | 59 | } |
57 | 60 | |
58 | 61 | buf.Reset() |
59 | 62 | |
60 | g.Set(3) | |
63 | value := 3.0 | |
64 | g.Set(value) | |
61 | 65 | runtime.Gosched() |
62 | 66 | ch <- time.Now() |
63 | 67 | runtime.Gosched() |
64 | if want, have := "test_statsd_gauge:3.000000|g\n", buf.String(); want != have { | |
68 | if want, have := fmt.Sprintf("test_statsd_gauge:%f|g\n", value), buf.String(); want != have { | |
69 | t.Errorf("want %q, have %q", want, have) | |
70 | } | |
71 | } | |
72 | ||
73 | func TestCallbackGauge(t *testing.T) { | |
74 | ch := make(chan time.Time) | |
75 | tick = func(time.Duration) <-chan time.Time { return ch } | |
76 | defer func() { tick = time.Tick }() | |
77 | ||
78 | buf := &bytes.Buffer{} | |
79 | value := 55.55 | |
80 | cb := func() float64 { return value } | |
81 | NewCallbackGauge(buf, "test_statsd_callback_gauge", time.Second, time.Nanosecond, cb) | |
82 | ||
83 | ch <- time.Now() // signal emitter | |
84 | runtime.Gosched() // yield to emitter | |
85 | ch <- time.Now() // signal flush | |
86 | runtime.Gosched() // yield to flush | |
87 | ||
88 | if want, have := fmt.Sprintf("test_statsd_callback_gauge:%f|g\n", value), buf.String(); want != have { | |
65 | 89 | t.Errorf("want %q, have %q", want, have) |
66 | 90 | } |
67 | 91 | } |