Package list golang-github-go-kit-kit / ac7fd0a
influxd metrics fixed kpacha 5 years ago
2 changed file(s) with 117 addition(s) and 31 deletion(s). Raw diff Collapse all Expand all
99
1010 "github.com/go-kit/kit/log"
1111 "github.com/go-kit/kit/metrics"
12 "github.com/go-kit/kit/metrics/generic"
1213 "github.com/go-kit/kit/metrics/internal/lv"
1314 )
1415
107108 now := time.Now()
108109
109110 in.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
110 fields := fieldsFrom(lvs)
111 fields["count"] = sum(values)
111 tags := mergeTags(in.tags, lvs)
112112 var p *influxdb.Point
113 p, err = influxdb.NewPoint(name, in.tags, fields, now)
113 fields := map[string]interface{}{"count": sum(values)}
114 p, err = influxdb.NewPoint(name, tags, fields, now)
114115 if err != nil {
115116 return false
116117 }
122123 }
123124
124125 in.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
125 fields := fieldsFrom(lvs)
126 fields["value"] = last(values)
126 tags := mergeTags(in.tags, lvs)
127127 var p *influxdb.Point
128 p, err = influxdb.NewPoint(name, in.tags, fields, now)
128 fields := map[string]interface{}{"value": last(values)}
129 p, err = influxdb.NewPoint(name, tags, fields, now)
129130 if err != nil {
130131 return false
131132 }
137138 }
138139
139140 in.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
140 fields := fieldsFrom(lvs)
141 ps := make([]*influxdb.Point, len(values))
142 for i, v := range values {
143 fields["value"] = v // overwrite each time
144 ps[i], err = influxdb.NewPoint(name, in.tags, fields, now)
145 if err != nil {
146 return false
147 }
148 }
149 bp.AddPoints(ps)
141 histogram := generic.NewHistogram(name, 50)
142 tags := mergeTags(in.tags, lvs)
143 var p *influxdb.Point
144 for _, v := range values {
145 histogram.Observe(v)
146 }
147 fields := map[string]interface{}{
148 "p50": histogram.Quantile(0.50),
149 "p90": histogram.Quantile(0.90),
150 "p95": histogram.Quantile(0.95),
151 "p99": histogram.Quantile(0.99),
152 }
153 p, err = influxdb.NewPoint(name, tags, fields, now)
154 if err != nil {
155 return false
156 }
157 bp.AddPoint(p)
150158 return true
151159 })
152160 if err != nil {
156164 return w.Write(bp)
157165 }
158166
159 func fieldsFrom(labelValues []string) map[string]interface{} {
167 func mergeTags(tags map[string]string, labelValues []string) map[string]string {
160168 if len(labelValues)%2 != 0 {
161 panic("fieldsFrom received a labelValues with an odd number of strings")
162 }
163 fields := make(map[string]interface{}, len(labelValues)/2)
169 panic("mergeTags received a labelValues with an odd number of strings")
170 }
164171 for i := 0; i < len(labelValues); i += 2 {
165 fields[labelValues[i]] = labelValues[i+1]
166 }
167 return fields
172 tags[labelValues[i]] = labelValues[i+1]
173 }
174 return tags
168175 }
169176
170177 func sum(a []float64) float64 {
1010 influxdb "github.com/influxdata/influxdb/client/v2"
1111
1212 "github.com/go-kit/kit/log"
13 "github.com/go-kit/kit/metrics/generic"
1413 "github.com/go-kit/kit/metrics/teststat"
1514 )
1615
3029 }
3130 }
3231
32 func TestCounter_multipleTags(t *testing.T) {
33 in := New(map[string]string{"a": "b"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
34 counter := in.NewCounter("influx_counter")
35 counter.Add(10)
36 counter.With("error", "true").Add(1)
37 counter.With("error", "false").Add(2)
38 counter.Add(50)
39
40 client := &bufWriter{}
41 in.WriteTo(client)
42
43 expectedLines := []string{
44 `influx_counter,a=b count=60 [0-9]+`,
45 `influx_counter,a=b,error=true count=1 [0-9]+`,
46 `influx_counter,a=b,error=false count=2 [0-9]+`,
47 }
48
49 if err := validateMessage(expectedLines, client.buf.String()); err != nil {
50 t.Error(err.Error())
51 }
52 }
53
3354 func TestGauge(t *testing.T) {
3455 in := New(map[string]string{"foo": "alpha"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
3556 re := regexp.MustCompile(`influx_gauge,foo=alpha value=([0-9\.]+) [0-9]+`)
4667 }
4768 }
4869
70 func TestGauge_multipleTags(t *testing.T) {
71 in := New(map[string]string{"a": "b"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
72 gauge := in.NewGauge("influx_gauge")
73 gauge.Set(10)
74 gauge.With("error", "true").Set(2)
75 gauge.With("error", "true").Set(1)
76 gauge.With("error", "false").Set(2)
77 gauge.Set(50)
78
79 client := &bufWriter{}
80 in.WriteTo(client)
81
82 expectedLines := []string{
83 `influx_gauge,a=b value=50 [0-9]+`,
84 `influx_gauge,a=b,error=true value=1 [0-9]+`,
85 `influx_gauge,a=b,error=false value=2 [0-9]+`,
86 }
87
88 if err := validateMessage(expectedLines, client.buf.String()); err != nil {
89 t.Error(err.Error())
90 }
91 }
92
4993 func TestHistogram(t *testing.T) {
5094 in := New(map[string]string{"foo": "alpha"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
51 re := regexp.MustCompile(`influx_histogram,foo=alpha bar="beta",value=([0-9\.]+) [0-9]+`)
95 re := regexp.MustCompile(`influx_histogram,bar=beta,foo=alpha p50=([0-9\.]+),p90=([0-9\.]+),p95=([0-9\.]+),p99=([0-9\.]+) [0-9]+`)
5296 histogram := in.NewHistogram("influx_histogram").With("bar", "beta")
5397 quantiles := func() (float64, float64, float64, float64) {
5498 w := &bufWriter{}
5599 in.WriteTo(w)
56 h := generic.NewHistogram("h", 50)
57 matches := re.FindAllStringSubmatch(w.buf.String(), -1)
58 for _, match := range matches {
59 f, _ := strconv.ParseFloat(match[1], 64)
60 h.Observe(f)
100 match := re.FindStringSubmatch(w.buf.String())
101 if len(match) != 5 {
102 t.Errorf("These are not the quantiles you're looking for: %v\n", match)
61103 }
62 return h.Quantile(0.50), h.Quantile(0.90), h.Quantile(0.95), h.Quantile(0.99)
104 var result [4]float64
105 for i, q := range match[1:] {
106 result[i], _ = strconv.ParseFloat(q, 64)
107 }
108 return result[0], result[1], result[2], result[3]
63109 }
64110 if err := teststat.TestHistogram(histogram, quantiles, 0.01); err != nil {
65111 t.Fatal(err)
112 }
113 }
114
115 func TestHistogram_multipleTags(t *testing.T) {
116 in := New(map[string]string{"foo": "alpha"}, influxdb.BatchPointsConfig{}, log.NewNopLogger())
117 histogram := in.NewHistogram("influx_histogram")
118 histogram.Observe(float64(10))
119 histogram.With("error", "true").Observe(float64(1))
120 histogram.With("error", "false").Observe(float64(2))
121 histogram.Observe(float64(50))
122
123 client := &bufWriter{}
124 in.WriteTo(client)
125
126 expectedLines := []string{
127 `influx_histogram,foo=alpha p50=10,p90=50,p95=50,p99=50 [0-9]+`,
128 `influx_histogram,error=true,foo=alpha p50=1,p90=1,p95=1,p99=1 [0-9]+`,
129 `influx_histogram,error=false,foo=alpha p50=2,p90=2,p95=2,p99=2 [0-9]+`,
130 }
131
132 if err := validateMessage(expectedLines, client.buf.String()); err != nil {
133 t.Error(err.Error())
66134 }
67135 }
68136
90158 }
91159 return nil
92160 }
161
162 func validateMessage(expected []string, msg string) error {
163 for _, pattern := range expected {
164 re := regexp.MustCompile(pattern)
165 match := re.FindStringSubmatch(msg)
166 if len(match) != 1 {
167 return fmt.Errorf("Pattern not found! {%s} %s\n", pattern, msg)
168 }
169 }
170 return nil
171 }