Distribution shall also return quantile counts
Peter Bourgon
7 years ago
18 | 18 | import ( |
19 | 19 | "expvar" |
20 | 20 | "fmt" |
21 | "sort" | |
21 | 22 | "strconv" |
22 | 23 | "sync" |
23 | 24 | "time" |
64 | 65 | func (g *gauge) With(metrics.Field) metrics.Gauge { return g } |
65 | 66 | func (g *gauge) Add(delta float64) { g.v.Add(delta) } |
66 | 67 | func (g *gauge) Set(value float64) { g.v.Set(value) } |
68 | func (g *gauge) Get() float64 { return mustParseFloat64(g.v.String()) } | |
67 | 69 | |
68 | 70 | // PublishCallbackGauge publishes a Gauge as an expvar with the given name, |
69 | 71 | // whose value is determined at collect time by the passed callback function. |
125 | 127 | } |
126 | 128 | } |
127 | 129 | |
128 | func (h *histogram) Distribution() []metrics.Bucket { | |
129 | bars := h.hist.Current.Distribution() | |
130 | func (h *histogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) { | |
131 | bars := h.hist.Merge().Distribution() | |
130 | 132 | buckets := make([]metrics.Bucket, len(bars)) |
131 | 133 | for i, bar := range bars { |
132 | 134 | buckets[i] = metrics.Bucket{ |
135 | 137 | Count: bar.Count, |
136 | 138 | } |
137 | 139 | } |
138 | return buckets | |
140 | quantiles := make([]metrics.Quantile, 0, len(h.gauges)) | |
141 | for quantile, gauge := range h.gauges { | |
142 | quantiles = append(quantiles, metrics.Quantile{ | |
143 | Quantile: quantile, | |
144 | Value: int64(gauge.Get()), | |
145 | }) | |
146 | } | |
147 | sort.Sort(quantileSlice(quantiles)) | |
148 | return buckets, quantiles | |
139 | 149 | } |
140 | 150 | |
141 | 151 | func (h *histogram) rotateLoop(d time.Duration) { |
145 | 155 | h.mu.Unlock() |
146 | 156 | } |
147 | 157 | } |
158 | ||
159 | func mustParseFloat64(s string) float64 { | |
160 | f, err := strconv.ParseFloat(s, 64) | |
161 | if err != nil { | |
162 | panic(err) | |
163 | } | |
164 | return f | |
165 | } | |
166 | ||
167 | type quantileSlice []metrics.Quantile | |
168 | ||
169 | func (a quantileSlice) Len() int { return len(a) } | |
170 | func (a quantileSlice) Less(i, j int) bool { return a[i].Quantile < a[j].Quantile } | |
171 | func (a quantileSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } |
20 | 20 | With(Field) Gauge |
21 | 21 | Set(value float64) |
22 | 22 | Add(delta float64) |
23 | Get() float64 | |
23 | 24 | } |
24 | 25 | |
25 | 26 | // Histogram tracks the distribution of a stream of values (e.g. the number of |
29 | 30 | Name() string |
30 | 31 | With(Field) Histogram |
31 | 32 | Observe(value int64) |
32 | Distribution() []Bucket | |
33 | Distribution() ([]Bucket, []Quantile) | |
33 | 34 | } |
34 | 35 | |
35 | 36 | // Field is a key/value pair associated with an observation for a specific |
45 | 46 | To int64 |
46 | 47 | Count int64 |
47 | 48 | } |
49 | ||
50 | // Quantile is a pair of quantile (0..100) and its observed maximum value. | |
51 | type Quantile struct { | |
52 | Quantile int // 0..100 | |
53 | Value int64 | |
54 | } |
69 | 69 | } |
70 | 70 | } |
71 | 71 | |
72 | func (g multiGauge) Get() float64 { | |
73 | panic("cannot call Get on a MultiGauge") | |
74 | } | |
75 | ||
72 | 76 | type multiHistogram struct { |
73 | 77 | name string |
74 | 78 | a []Histogram |
101 | 105 | } |
102 | 106 | } |
103 | 107 | |
104 | func (h multiHistogram) Distribution() []Bucket { | |
105 | return []Bucket{} // TODO(pb): can this be statistically valid? | |
108 | func (h multiHistogram) Distribution() ([]Bucket, []Quantile) { | |
109 | // TODO(pb): there may be a way to do this | |
110 | panic("cannot call Distribution on a MultiHistogram") | |
106 | 111 | } |
12 | 12 | |
13 | 13 | // PrintDistribution writes a human-readable graph of the distribution to the |
14 | 14 | // passed writer. |
15 | func PrintDistribution(w io.Writer, name string, buckets []Bucket) { | |
16 | fmt.Fprintf(w, "name: %v\n", name) | |
15 | func PrintDistribution(w io.Writer, h Histogram) { | |
16 | buckets, quantiles := h.Distribution() | |
17 | ||
18 | fmt.Fprintf(w, "name: %v\n", h.Name()) | |
19 | fmt.Fprintf(w, "quantiles: %v\n", quantiles) | |
17 | 20 | |
18 | 21 | var total float64 |
19 | 22 | for _, bucket := range buckets { |
12 | 12 | |
13 | 13 | func TestPrintDistribution(t *testing.T) { |
14 | 14 | var ( |
15 | name = "foobar" | |
16 | 15 | quantiles = []int{50, 90, 95, 99} |
17 | h = expvar.NewHistogram("test_print_distribution", 1, 10, 3, quantiles...) | |
16 | h = expvar.NewHistogram("test_print_distribution", 0, 100, 3, quantiles...) | |
18 | 17 | seed = int64(555) |
19 | 18 | mean = int64(5) |
20 | 19 | stdev = int64(1) |
22 | 21 | teststat.PopulateNormalHistogram(t, h, seed, mean, stdev) |
23 | 22 | |
24 | 23 | var buf bytes.Buffer |
25 | metrics.PrintDistribution(&buf, name, h.Distribution()) | |
24 | metrics.PrintDistribution(&buf, h) | |
26 | 25 | t.Logf("\n%s\n", buf.String()) |
27 | 26 | |
28 | 27 | // Count the number of bar chart characters. |
29 | // We should have roughly 100 in any distribution. | |
28 | // We should have ca. 100 in any distribution with a small-enough stdev. | |
30 | 29 | |
31 | 30 | var n int |
32 | 31 | for _, r := range buf.String() { |
83 | 83 | |
84 | 84 | func (g prometheusGauge) Add(delta float64) { |
85 | 85 | g.GaugeVec.With(prometheus.Labels(g.Pairs)).Add(delta) |
86 | } | |
87 | ||
88 | func (g prometheusGauge) Get() float64 { | |
89 | // TODO(pb): see https://github.com/prometheus/client_golang/issues/58 | |
90 | return 0.0 | |
86 | 91 | } |
87 | 92 | |
88 | 93 | // RegisterCallbackGauge registers a Gauge with Prometheus whose value is |
128 | 133 | s.SummaryVec.With(prometheus.Labels(s.Pairs)).Observe(float64(value)) |
129 | 134 | } |
130 | 135 | |
131 | func (s prometheusSummary) Distribution() []metrics.Bucket { | |
136 | func (s prometheusSummary) Distribution() ([]metrics.Bucket, []metrics.Quantile) { | |
132 | 137 | // TODO(pb): see https://github.com/prometheus/client_golang/issues/58 |
133 | return []metrics.Bucket{} | |
138 | return []metrics.Bucket{}, []metrics.Quantile{} | |
134 | 139 | } |
135 | 140 | |
136 | 141 | type prometheusHistogram struct { |
168 | 173 | h.HistogramVec.With(prometheus.Labels(h.Pairs)).Observe(float64(value)) |
169 | 174 | } |
170 | 175 | |
171 | func (h prometheusHistogram) Distribution() []metrics.Bucket { | |
176 | func (h prometheusHistogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) { | |
172 | 177 | // TODO(pb): see https://github.com/prometheus/client_golang/issues/58 |
173 | return []metrics.Bucket{} | |
178 | return []metrics.Bucket{}, []metrics.Quantile{} | |
174 | 179 | } |
175 | 180 | |
176 | 181 | func pairsFrom(fieldKeys []string) map[string]string { |
16 | 16 | "fmt" |
17 | 17 | "io" |
18 | 18 | "log" |
19 | "math" | |
19 | 20 | "time" |
21 | ||
22 | "sync/atomic" | |
20 | 23 | |
21 | 24 | "github.com/go-kit/kit/metrics" |
22 | 25 | ) |
53 | 56 | func (c *statsdCounter) Add(delta uint64) { c.c <- fmt.Sprintf("%d|c", delta) } |
54 | 57 | |
55 | 58 | type statsdGauge struct { |
56 | key string | |
57 | g chan string | |
59 | key string | |
60 | lastValue uint64 // math.Float64frombits | |
61 | g chan string | |
58 | 62 | } |
59 | 63 | |
60 | 64 | // NewGauge returns a Gauge that emits values in the statsd protocol to the |
86 | 90 | } |
87 | 91 | |
88 | 92 | func (g *statsdGauge) Set(value float64) { |
93 | atomic.StoreUint64(&g.lastValue, math.Float64bits(value)) | |
89 | 94 | g.g <- fmt.Sprintf("%f|g", value) |
95 | } | |
96 | ||
97 | func (g *statsdGauge) Get() float64 { | |
98 | return math.Float64frombits(atomic.LoadUint64(&g.lastValue)) | |
90 | 99 | } |
91 | 100 | |
92 | 101 | // NewCallbackGauge emits values in the statsd protocol to the passed writer. |
148 | 157 | h.h <- fmt.Sprintf("%d|ms", value) |
149 | 158 | } |
150 | 159 | |
151 | func (h *statsdHistogram) Distribution() []metrics.Bucket { | |
160 | func (h *statsdHistogram) Distribution() ([]metrics.Bucket, []metrics.Quantile) { | |
152 | 161 | // TODO(pb): no way to do this without introducing e.g. codahale/hdrhistogram |
153 | return []metrics.Bucket{} | |
162 | return []metrics.Bucket{}, []metrics.Quantile{} | |
154 | 163 | } |
155 | 164 | |
156 | 165 | var tick = time.Tick |