diff --git a/metrics/dogstatsd/dogstatsd.go b/metrics/dogstatsd/dogstatsd.go index 13e0b4f..ba784b7 100644 --- a/metrics/dogstatsd/dogstatsd.go +++ b/metrics/dogstatsd/dogstatsd.go @@ -14,10 +14,13 @@ "fmt" "io" "strings" + "sync" + "sync/atomic" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/generic" "github.com/go-kit/kit/metrics/internal/lv" "github.com/go-kit/kit/metrics/internal/ratemap" "github.com/go-kit/kit/util/conn" @@ -34,54 +37,63 @@ // To regularly report metrics to an io.Writer, use the WriteLoop helper method. // To send to a DogStatsD server, use the SendLoop helper method. type Dogstatsd struct { + mtx sync.RWMutex prefix string rates *ratemap.RateMap counters *lv.Space - gauges *lv.Space + gauges map[string]*gaugeNode timings *lv.Space histograms *lv.Space logger log.Logger + lvs lv.LabelValues } // New returns a Dogstatsd object that may be used to create metrics. Prefix is // applied to all created metrics. Callers must ensure that regular calls to // WriteTo are performed, either manually or with one of the helper methods. -func New(prefix string, logger log.Logger) *Dogstatsd { +func New(prefix string, logger log.Logger, lvs ...string) *Dogstatsd { + if len(lvs)%2 != 0 { + panic("odd number of LabelValues; programmer error!") + } return &Dogstatsd{ prefix: prefix, rates: ratemap.New(), counters: lv.NewSpace(), - gauges: lv.NewSpace(), + gauges: map[string]*gaugeNode{}, timings: lv.NewSpace(), histograms: lv.NewSpace(), logger: logger, + lvs: lvs, } } // NewCounter returns a counter, sending observations to this Dogstatsd object. func (d *Dogstatsd) NewCounter(name string, sampleRate float64) *Counter { - d.rates.Set(d.prefix+name, sampleRate) + d.rates.Set(name, sampleRate) return &Counter{ - name: d.prefix + name, + name: name, obs: d.counters.Observe, } } // NewGauge returns a gauge, sending observations to this Dogstatsd object. func (d *Dogstatsd) NewGauge(name string) *Gauge { - return &Gauge{ - name: d.prefix + name, - obs: d.gauges.Observe, - add: d.gauges.Add, - } + d.mtx.Lock() + n, ok := d.gauges[name] + if !ok { + n = &gaugeNode{gauge: &Gauge{g: generic.NewGauge(name), ddog: d}} + d.gauges[name] = n + } + d.mtx.Unlock() + return n.gauge } // NewTiming returns a histogram whose observations are interpreted as // millisecond durations, and are forwarded to this Dogstatsd object. func (d *Dogstatsd) NewTiming(name string, sampleRate float64) *Timing { - d.rates.Set(d.prefix+name, sampleRate) + d.rates.Set(name, sampleRate) return &Timing{ - name: d.prefix + name, + name: name, obs: d.timings.Observe, } } @@ -89,9 +101,9 @@ // NewHistogram returns a histogram whose observations are of an unspecified // unit, and are forwarded to this Dogstatsd object. func (d *Dogstatsd) NewHistogram(name string, sampleRate float64) *Histogram { - d.rates.Set(d.prefix+name, sampleRate) + d.rates.Set(name, sampleRate) return &Histogram{ - name: d.prefix + name, + name: name, obs: d.histograms.Observe, } } @@ -125,7 +137,7 @@ var n int d.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { - n, err = fmt.Fprintf(w, "%s:%f|c%s%s\n", name, sum(values), sampling(d.rates.Get(name)), tagValues(lvs)) + n, err = fmt.Fprintf(w, "%s%s:%f|c%s%s\n", d.prefix, name, sum(values), sampling(d.rates.Get(name)), d.tagValues(lvs)) if err != nil { return false } @@ -136,22 +148,38 @@ return count, err } - d.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { - n, err = fmt.Fprintf(w, "%s:%f|g%s\n", name, last(values), tagValues(lvs)) - if err != nil { - return false - } - count += int64(n) + d.mtx.RLock() + for _, root := range d.gauges { + root.walk(func(name string, lvs lv.LabelValues, value float64) bool { + n, err = fmt.Fprintf(w, "%s%s:%f|g%s\n", d.prefix, name, value, d.tagValues(lvs)) + if err != nil { + return false + } + count += int64(n) + return true + }) + } + d.mtx.RUnlock() + + d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { + sampleRate := d.rates.Get(name) + for _, value := range values { + n, err = fmt.Fprintf(w, "%s%s:%f|ms%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs)) + if err != nil { + return false + } + count += int64(n) + } return true }) if err != nil { return count, err } - d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { + d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { sampleRate := d.rates.Get(name) for _, value := range values { - n, err = fmt.Fprintf(w, "%s:%f|ms%s%s\n", name, value, sampling(sampleRate), tagValues(lvs)) + n, err = fmt.Fprintf(w, "%s%s:%f|h%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs)) if err != nil { return false } @@ -163,21 +191,6 @@ return count, err } - d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool { - sampleRate := d.rates.Get(name) - for _, value := range values { - n, err = fmt.Fprintf(w, "%s:%f|h%s%s\n", name, value, sampling(sampleRate), tagValues(lvs)) - if err != nil { - return false - } - count += int64(n) - } - return true - }) - if err != nil { - return count, err - } - return count, err } @@ -201,14 +214,17 @@ return sv } -func tagValues(labelValues []string) string { +func (d *Dogstatsd) tagValues(labelValues []string) string { if len(labelValues) == 0 { return "" } if len(labelValues)%2 != 0 { panic("tagValues received a labelValues with an odd number of strings") } - pairs := make([]string, 0, len(labelValues)/2) + pairs := make([]string, 0, (len(d.lvs)+len(labelValues))/2) + for i := 0; i < len(d.lvs); i += 2 { + pairs = append(pairs, d.lvs[i]+":"+d.lvs[i+1]) + } for i := 0; i < len(labelValues); i += 2 { pairs = append(pairs, labelValues[i]+":"+labelValues[i+1]) } @@ -242,30 +258,31 @@ // Gauge is a DogStatsD gauge. Observations are forwarded to a Dogstatsd // object, and aggregated (the last observation selected) per timeseries. type Gauge struct { - name string - lvs lv.LabelValues - obs observeFunc - add observeFunc + g *generic.Gauge + ddog *Dogstatsd + set int32 } // With implements metrics.Gauge. func (g *Gauge) With(labelValues ...string) metrics.Gauge { - return &Gauge{ - name: g.name, - lvs: g.lvs.With(labelValues...), - obs: g.obs, - add: g.add, - } + g.ddog.mtx.RLock() + node := g.ddog.gauges[g.g.Name] + g.ddog.mtx.RUnlock() + + ga := &Gauge{g: g.g.With(labelValues...).(*generic.Gauge), ddog: g.ddog} + return node.addGauge(ga, ga.g.LabelValues()) } // Set implements metrics.Gauge. func (g *Gauge) Set(value float64) { - g.obs(g.name, g.lvs, value) + g.g.Set(value) + g.touch() } // Add implements metrics.Gauge. func (g *Gauge) Add(delta float64) { - g.add(g.name, g.lvs, delta) + g.g.Add(delta) + g.touch() } // Timing is a DogStatsD timing, or metrics.Histogram. Observations are @@ -312,3 +329,61 @@ func (h *Histogram) Observe(value float64) { h.obs(h.name, h.lvs, value) } + +type pair struct{ label, value string } + +type gaugeNode struct { + mtx sync.RWMutex + gauge *Gauge + children map[pair]*gaugeNode +} + +func (n *gaugeNode) addGauge(g *Gauge, lvs lv.LabelValues) *Gauge { + n.mtx.Lock() + defer n.mtx.Unlock() + if len(lvs) == 0 { + if n.gauge == nil { + n.gauge = g + } + return n.gauge + } + if len(lvs) < 2 { + panic("too few LabelValues; programmer error!") + } + head, tail := pair{lvs[0], lvs[1]}, lvs[2:] + if n.children == nil { + n.children = map[pair]*gaugeNode{} + } + child, ok := n.children[head] + if !ok { + child = &gaugeNode{} + n.children[head] = child + } + return child.addGauge(g, tail) +} + +func (n *gaugeNode) walk(fn func(string, lv.LabelValues, float64) bool) bool { + n.mtx.RLock() + defer n.mtx.RUnlock() + if n.gauge != nil { + value, ok := n.gauge.read() + if ok && !fn(n.gauge.g.Name, n.gauge.g.LabelValues(), value) { + return false + } + } + for _, child := range n.children { + if !child.walk(fn) { + return false + } + } + return true +} + +func (g *Gauge) touch() { + atomic.StoreInt32(&(g.set), 1) +} + +func (g *Gauge) read() (float64, bool) { + set := atomic.SwapInt32(&(g.set), 0) + return g.g.Value(), set != 0 +} diff --git a/metrics/dogstatsd/dogstatsd_test.go b/metrics/dogstatsd/dogstatsd_test.go index 2485cad..ef6a5a4 100644 --- a/metrics/dogstatsd/dogstatsd_test.go +++ b/metrics/dogstatsd/dogstatsd_test.go @@ -29,8 +29,8 @@ func TestGauge(t *testing.T) { prefix, name := "ghi.", "jkl" label, value := "xyz", "abc" - regex := `^` + prefix + name + `:([0-9\.]+)\|g\|#` + label + `:` + value + `$` - d := New(prefix, log.NewNopLogger()) + regex := `^` + prefix + name + `:([0-9\.]+)\|g\|#hostname:foohost,` + label + `:` + value + `$` + d := New(prefix, log.NewNopLogger(), "hostname", "foohost") gauge := d.NewGauge(name).With(label, value) valuef := teststat.LastLine(d, regex) if err := teststat.TestGauge(gauge, valuef); err != nil {