Codebase list golang-github-go-kit-kit / bcc4b77
Merge pull request #588 from jfhamlin/fix/dogstatsd Track gauge values and support default tags for dogstatsd Peter Bourgon authored 6 years ago GitHub committed 6 years ago
2 changed file(s) with 129 addition(s) and 54 deletion(s). Raw diff Collapse all Expand all
1313 "fmt"
1414 "io"
1515 "strings"
16 "sync"
17 "sync/atomic"
1618 "time"
1719
1820 "github.com/go-kit/kit/log"
1921 "github.com/go-kit/kit/metrics"
22 "github.com/go-kit/kit/metrics/generic"
2023 "github.com/go-kit/kit/metrics/internal/lv"
2124 "github.com/go-kit/kit/metrics/internal/ratemap"
2225 "github.com/go-kit/kit/util/conn"
3336 // To regularly report metrics to an io.Writer, use the WriteLoop helper method.
3437 // To send to a DogStatsD server, use the SendLoop helper method.
3538 type Dogstatsd struct {
39 mtx sync.RWMutex
3640 prefix string
3741 rates *ratemap.RateMap
3842 counters *lv.Space
39 gauges *lv.Space
43 gauges map[string]*gaugeNode
4044 timings *lv.Space
4145 histograms *lv.Space
4246 logger log.Logger
47 lvs lv.LabelValues
4348 }
4449
4550 // New returns a Dogstatsd object that may be used to create metrics. Prefix is
4651 // applied to all created metrics. Callers must ensure that regular calls to
4752 // WriteTo are performed, either manually or with one of the helper methods.
48 func New(prefix string, logger log.Logger) *Dogstatsd {
53 func New(prefix string, logger log.Logger, lvs ...string) *Dogstatsd {
54 if len(lvs)%2 != 0 {
55 panic("odd number of LabelValues; programmer error!")
56 }
4957 return &Dogstatsd{
5058 prefix: prefix,
5159 rates: ratemap.New(),
5260 counters: lv.NewSpace(),
53 gauges: lv.NewSpace(),
61 gauges: map[string]*gaugeNode{},
5462 timings: lv.NewSpace(),
5563 histograms: lv.NewSpace(),
5664 logger: logger,
65 lvs: lvs,
5766 }
5867 }
5968
6069 // NewCounter returns a counter, sending observations to this Dogstatsd object.
6170 func (d *Dogstatsd) NewCounter(name string, sampleRate float64) *Counter {
62 d.rates.Set(d.prefix+name, sampleRate)
71 d.rates.Set(name, sampleRate)
6372 return &Counter{
64 name: d.prefix + name,
73 name: name,
6574 obs: d.counters.Observe,
6675 }
6776 }
6877
6978 // NewGauge returns a gauge, sending observations to this Dogstatsd object.
7079 func (d *Dogstatsd) NewGauge(name string) *Gauge {
71 return &Gauge{
72 name: d.prefix + name,
73 obs: d.gauges.Observe,
74 add: d.gauges.Add,
75 }
80 d.mtx.Lock()
81 n, ok := d.gauges[name]
82 if !ok {
83 n = &gaugeNode{gauge: &Gauge{g: generic.NewGauge(name), ddog: d}}
84 d.gauges[name] = n
85 }
86 d.mtx.Unlock()
87 return n.gauge
7688 }
7789
7890 // NewTiming returns a histogram whose observations are interpreted as
7991 // millisecond durations, and are forwarded to this Dogstatsd object.
8092 func (d *Dogstatsd) NewTiming(name string, sampleRate float64) *Timing {
81 d.rates.Set(d.prefix+name, sampleRate)
93 d.rates.Set(name, sampleRate)
8294 return &Timing{
83 name: d.prefix + name,
95 name: name,
8496 obs: d.timings.Observe,
8597 }
8698 }
88100 // NewHistogram returns a histogram whose observations are of an unspecified
89101 // unit, and are forwarded to this Dogstatsd object.
90102 func (d *Dogstatsd) NewHistogram(name string, sampleRate float64) *Histogram {
91 d.rates.Set(d.prefix+name, sampleRate)
103 d.rates.Set(name, sampleRate)
92104 return &Histogram{
93 name: d.prefix + name,
105 name: name,
94106 obs: d.histograms.Observe,
95107 }
96108 }
124136 var n int
125137
126138 d.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
127 n, err = fmt.Fprintf(w, "%s:%f|c%s%s\n", name, sum(values), sampling(d.rates.Get(name)), tagValues(lvs))
139 n, err = fmt.Fprintf(w, "%s%s:%f|c%s%s\n", d.prefix, name, sum(values), sampling(d.rates.Get(name)), d.tagValues(lvs))
128140 if err != nil {
129141 return false
130142 }
135147 return count, err
136148 }
137149
138 d.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
139 n, err = fmt.Fprintf(w, "%s:%f|g%s\n", name, last(values), tagValues(lvs))
140 if err != nil {
141 return false
142 }
143 count += int64(n)
150 d.mtx.RLock()
151 for _, root := range d.gauges {
152 root.walk(func(name string, lvs lv.LabelValues, value float64) bool {
153 n, err = fmt.Fprintf(w, "%s%s:%f|g%s\n", d.prefix, name, value, d.tagValues(lvs))
154 if err != nil {
155 return false
156 }
157 count += int64(n)
158 return true
159 })
160 }
161 d.mtx.RUnlock()
162
163 d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
164 sampleRate := d.rates.Get(name)
165 for _, value := range values {
166 n, err = fmt.Fprintf(w, "%s%s:%f|ms%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs))
167 if err != nil {
168 return false
169 }
170 count += int64(n)
171 }
144172 return true
145173 })
146174 if err != nil {
147175 return count, err
148176 }
149177
150 d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
178 d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
151179 sampleRate := d.rates.Get(name)
152180 for _, value := range values {
153 n, err = fmt.Fprintf(w, "%s:%f|ms%s%s\n", name, value, sampling(sampleRate), tagValues(lvs))
181 n, err = fmt.Fprintf(w, "%s%s:%f|h%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs))
154182 if err != nil {
155183 return false
156184 }
162190 return count, err
163191 }
164192
165 d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
166 sampleRate := d.rates.Get(name)
167 for _, value := range values {
168 n, err = fmt.Fprintf(w, "%s:%f|h%s%s\n", name, value, sampling(sampleRate), tagValues(lvs))
169 if err != nil {
170 return false
171 }
172 count += int64(n)
173 }
174 return true
175 })
176 if err != nil {
177 return count, err
178 }
179
180193 return count, err
181194 }
182195
200213 return sv
201214 }
202215
203 func tagValues(labelValues []string) string {
216 func (d *Dogstatsd) tagValues(labelValues []string) string {
204217 if len(labelValues) == 0 {
205218 return ""
206219 }
207220 if len(labelValues)%2 != 0 {
208221 panic("tagValues received a labelValues with an odd number of strings")
209222 }
210 pairs := make([]string, 0, len(labelValues)/2)
223 pairs := make([]string, 0, (len(d.lvs)+len(labelValues))/2)
224 for i := 0; i < len(d.lvs); i += 2 {
225 pairs = append(pairs, d.lvs[i]+":"+d.lvs[i+1])
226 }
211227 for i := 0; i < len(labelValues); i += 2 {
212228 pairs = append(pairs, labelValues[i]+":"+labelValues[i+1])
213229 }
241257 // Gauge is a DogStatsD gauge. Observations are forwarded to a Dogstatsd
242258 // object, and aggregated (the last observation selected) per timeseries.
243259 type Gauge struct {
244 name string
245 lvs lv.LabelValues
246 obs observeFunc
247 add observeFunc
260 g *generic.Gauge
261 ddog *Dogstatsd
262 set int32
248263 }
249264
250265 // With implements metrics.Gauge.
251266 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
252 return &Gauge{
253 name: g.name,
254 lvs: g.lvs.With(labelValues...),
255 obs: g.obs,
256 add: g.add,
257 }
267 g.ddog.mtx.RLock()
268 node := g.ddog.gauges[g.g.Name]
269 g.ddog.mtx.RUnlock()
270
271 ga := &Gauge{g: g.g.With(labelValues...).(*generic.Gauge), ddog: g.ddog}
272 return node.addGauge(ga, ga.g.LabelValues())
258273 }
259274
260275 // Set implements metrics.Gauge.
261276 func (g *Gauge) Set(value float64) {
262 g.obs(g.name, g.lvs, value)
277 g.g.Set(value)
278 g.touch()
263279 }
264280
265281 // Add implements metrics.Gauge.
266282 func (g *Gauge) Add(delta float64) {
267 g.add(g.name, g.lvs, delta)
283 g.g.Add(delta)
284 g.touch()
268285 }
269286
270287 // Timing is a DogStatsD timing, or metrics.Histogram. Observations are
311328 func (h *Histogram) Observe(value float64) {
312329 h.obs(h.name, h.lvs, value)
313330 }
331
332 type pair struct{ label, value string }
333
334 type gaugeNode struct {
335 mtx sync.RWMutex
336 gauge *Gauge
337 children map[pair]*gaugeNode
338 }
339
340 func (n *gaugeNode) addGauge(g *Gauge, lvs lv.LabelValues) *Gauge {
341 n.mtx.Lock()
342 defer n.mtx.Unlock()
343 if len(lvs) == 0 {
344 if n.gauge == nil {
345 n.gauge = g
346 }
347 return n.gauge
348 }
349 if len(lvs) < 2 {
350 panic("too few LabelValues; programmer error!")
351 }
352 head, tail := pair{lvs[0], lvs[1]}, lvs[2:]
353 if n.children == nil {
354 n.children = map[pair]*gaugeNode{}
355 }
356 child, ok := n.children[head]
357 if !ok {
358 child = &gaugeNode{}
359 n.children[head] = child
360 }
361 return child.addGauge(g, tail)
362 }
363
364 func (n *gaugeNode) walk(fn func(string, lv.LabelValues, float64) bool) bool {
365 n.mtx.RLock()
366 defer n.mtx.RUnlock()
367 if n.gauge != nil {
368 value, ok := n.gauge.read()
369 if ok && !fn(n.gauge.g.Name, n.gauge.g.LabelValues(), value) {
370 return false
371 }
372 }
373 for _, child := range n.children {
374 if !child.walk(fn) {
375 return false
376 }
377 }
378 return true
379 }
380
381 func (g *Gauge) touch() {
382 atomic.StoreInt32(&(g.set), 1)
383 }
384
385 func (g *Gauge) read() (float64, bool) {
386 set := atomic.SwapInt32(&(g.set), 0)
387 return g.g.Value(), set != 0
388 }
2828 func TestGauge(t *testing.T) {
2929 prefix, name := "ghi.", "jkl"
3030 label, value := "xyz", "abc"
31 regex := `^` + prefix + name + `:([0-9\.]+)\|g\|#` + label + `:` + value + `$`
32 d := New(prefix, log.NewNopLogger())
31 regex := `^` + prefix + name + `:([0-9\.]+)\|g\|#hostname:foohost,` + label + `:` + value + `$`
32 d := New(prefix, log.NewNopLogger(), "hostname", "foohost")
3333 gauge := d.NewGauge(name).With(label, value)
3434 valuef := teststat.LastLine(d, regex)
3535 if err := teststat.TestGauge(gauge, valuef); err != nil {