Codebase list golang-github-beorn7-perks / f15ca8f
Merge pull request #1 from beorn7/fix/quantiles Fix the quantile implementation. Björn Rabenstein 9 years ago
3 changed file(s) with 198 addition(s) and 117 deletion(s). Raw diff Collapse all Expand all
66 func BenchmarkInsertTargeted(b *testing.B) {
77 b.ReportAllocs()
88
9 s := NewTargeted(0.01, 0.5, 0.9, 0.99)
9 s := NewTargeted(Targets)
1010 b.ResetTimer()
1111 for i := float64(0); i < float64(b.N); i++ {
1212 s.Insert(i)
1414 }
1515
1616 func BenchmarkInsertTargetedSmallEpsilon(b *testing.B) {
17 s := NewTargeted(0.01, 0.5, 0.9, 0.99)
18 s.SetEpsilon(0.0001)
17 s := NewTargeted(TargetsSmallEpsilon)
1918 b.ResetTimer()
2019 for i := float64(0); i < float64(b.N); i++ {
2120 s.Insert(i)
2322 }
2423
2524 func BenchmarkInsertBiased(b *testing.B) {
26 s := NewBiased()
25 s := NewLowBiased(0.01)
2726 b.ResetTimer()
2827 for i := float64(0); i < float64(b.N); i++ {
2928 s.Insert(i)
3130 }
3231
3332 func BenchmarkInsertBiasedSmallEpsilon(b *testing.B) {
34 s := NewBiased()
35 s.SetEpsilon(0.0001)
33 s := NewLowBiased(0.0001)
3634 b.ResetTimer()
3735 for i := float64(0); i < float64(b.N); i++ {
3836 s.Insert(i)
4038 }
4139
4240 func BenchmarkQuery(b *testing.B) {
43 s := NewTargeted(0.01, 0.5, 0.9, 0.99)
41 s := NewTargeted(Targets)
4442 for i := float64(0); i < 1e6; i++ {
4543 s.Insert(i)
4644 }
5250 }
5351
5452 func BenchmarkQuerySmallEpsilon(b *testing.B) {
55 s := NewTargeted(0.01, 0.5, 0.9, 0.99)
56 s.SetEpsilon(0.0001)
53 s := NewTargeted(TargetsSmallEpsilon)
5754 for i := float64(0); i < 1e6; i++ {
5855 s.Insert(i)
5956 }
3535
3636 type invariant func(s *stream, r float64) float64
3737
38 // NewBiased returns an initialized Stream for high-biased quantiles (e.g.
39 // 50th, 90th, 99th) not known a priori with finer error guarantees for the
40 // higher ranks of the data distribution.
41 // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
42 func NewBiased() *Stream {
38 // NewLowBiased returns an initialized Stream for low-biased quantiles
39 // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
40 // error guarantees can still be given even for the lower ranks of the data
41 // distribution.
42 //
43 // The provided epsilon is a relative error, i.e. the true quantile of a value
44 // returned by a query is guaranteed to be within (1±Epsilon)*Quantile.
45 //
46 // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
47 // properties.
48 func NewLowBiased(epsilon float64) *Stream {
4349 ƒ := func(s *stream, r float64) float64 {
44 return 2 * s.epsilon * r
50 return 2 * epsilon * r
51 }
52 return newStream(ƒ)
53 }
54
55 // NewHighBiased returns an initialized Stream for high-biased quantiles
56 // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
57 // error guarantees can still be given even for the higher ranks of the data
58 // distribution.
59 //
60 // The provided epsilon is a relative error, i.e. the true quantile of a value
61 // returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile).
62 //
63 // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
64 // properties.
65 func NewHighBiased(epsilon float64) *Stream {
66 ƒ := func(s *stream, r float64) float64 {
67 return 2 * epsilon * (s.n - r)
4568 }
4669 return newStream(ƒ)
4770 }
4871
4972 // NewTargeted returns an initialized Stream concerned with a particular set of
5073 // quantile values that are supplied a priori. Knowing these a priori reduces
51 // space and computation time.
74 // space and computation time. The targets map maps the desired quantiles to
75 // their absolute errors, i.e. the true quantile of a value returned by a query
76 // is guaranteed to be within (Quantile±Epsilon).
77 //
5278 // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
53 func NewTargeted(quantiles ...float64) *Stream {
79 func NewTargeted(targets map[float64]float64) *Stream {
5480 ƒ := func(s *stream, r float64) float64 {
55 var m float64 = math.MaxFloat64
81 var m = math.MaxFloat64
5682 var f float64
57 for _, q := range quantiles {
58 if q*s.n <= r {
59 f = (2 * s.epsilon * r) / q
83 for quantile, epsilon := range targets {
84 if quantile*s.n <= r {
85 f = (2 * epsilon * r) / quantile
6086 } else {
61 f = (2 * s.epsilon * (s.n - r)) / (1 - q)
87 f = (2 * epsilon * (s.n - r)) / (1 - quantile)
6288 }
6389 if f < m {
6490 m = f
78104 }
79105
80106 func newStream(ƒ invariant) *Stream {
81 const defaultEpsilon = 0.01
82 x := &stream{epsilon: defaultEpsilon, ƒ: ƒ}
107 x := &stream{ƒ: ƒ}
83108 return &Stream{x, make(Samples, 0, 500), true}
84109 }
85110
93118 s.sorted = false
94119 if len(s.b) == cap(s.b) {
95120 s.flush()
96 s.compress()
97121 }
98122 }
99123
121145
122146 // Merge merges samples into the underlying streams samples. This is handy when
123147 // merging multiple streams from separate threads, database shards, etc.
148 //
149 // ATTENTION: This method is broken and does not yield correct results. The
150 // underlying algorithm is not capable of merging streams correctly.
124151 func (s *Stream) Merge(samples Samples) {
125152 sort.Sort(samples)
126153 s.stream.merge(samples)
138165 return s.b
139166 }
140167 s.flush()
141 s.compress()
142168 return s.stream.samples()
143169 }
144170
166192 }
167193
168194 type stream struct {
169 epsilon float64
170 n float64
171 l []Sample
172 ƒ invariant
173 }
174
175 // SetEpsilon sets the error epsilon for the Stream. The default epsilon is
176 // 0.01 and is usually satisfactory. If needed, this must be called before all
177 // Inserts.
178 // To learn more, see: http://www.cs.rutgers.edu/~muthu/bquant.pdf
179 func (s *stream) SetEpsilon(epsilon float64) {
180 s.epsilon = epsilon
195 n float64
196 l []Sample
197 ƒ invariant
181198 }
182199
183200 func (s *stream) reset() {
190207 }
191208
192209 func (s *stream) merge(samples Samples) {
210 // TODO(beorn7): This tries to merge not only individual samples, but
211 // whole summaries. The paper doesn't mention merging summaries at
212 // all. Unittests show that the merging is inaccurate. Find out how to
213 // do merges properly.
193214 var r float64
194215 i := 0
195216 for _, sample := range samples {
199220 // Insert at position i.
200221 s.l = append(s.l, Sample{})
201222 copy(s.l[i+1:], s.l[i:])
202 s.l[i] = Sample{sample.Value, sample.Width, math.Floor(s.ƒ(s, r)) - 1}
223 s.l[i] = Sample{
224 sample.Value,
225 sample.Width,
226 math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1),
227 // TODO(beorn7): How to calculate delta correctly?
228 }
203229 i++
204230 goto inserted
205231 }
209235 i++
210236 inserted:
211237 s.n += sample.Width
212 }
238 r += sample.Width
239 }
240 s.compress()
213241 }
214242
215243 func (s *stream) count() int {
220248 t := math.Ceil(q * s.n)
221249 t += math.Ceil(s.ƒ(s, t) / 2)
222250 p := s.l[0]
223 r := float64(0)
251 var r float64
224252 for _, c := range s.l[1:] {
253 r += p.Width
225254 if r+c.Width+c.Delta > t {
226255 return p.Value
227256 }
228 r += p.Width
229257 p = c
230258 }
231259 return p.Value
00 package quantile
11
22 import (
3 "math"
34 "math/rand"
45 "sort"
56 "testing"
67 )
78
8 func TestQuantRandQuery(t *testing.T) {
9 s := NewTargeted(0.5, 0.90, 0.99)
10 a := make([]float64, 0, 1e5)
11 rand.Seed(42)
9 var (
10 Targets = map[float64]float64{
11 0.01: 0.001,
12 0.10: 0.01,
13 0.50: 0.05,
14 0.90: 0.01,
15 0.99: 0.001,
16 }
17 TargetsSmallEpsilon = map[float64]float64{
18 0.01: 0.0001,
19 0.10: 0.001,
20 0.50: 0.005,
21 0.90: 0.001,
22 0.99: 0.0001,
23 }
24 LowQuantiles = []float64{0.01, 0.1, 0.5}
25 HighQuantiles = []float64{0.99, 0.9, 0.5}
26 )
27
28 const RelativeEpsilon = 0.01
29
30 func verifyPercsWithAbsoluteEpsilon(t *testing.T, a []float64, s *Stream) {
31 sort.Float64s(a)
32 for quantile, epsilon := range Targets {
33 n := float64(len(a))
34 k := int(quantile * n)
35 lower := int((quantile - epsilon) * n)
36 if lower < 1 {
37 lower = 1
38 }
39 upper := int(math.Ceil((quantile + epsilon) * n))
40 if upper > len(a) {
41 upper = len(a)
42 }
43 w, min, max := a[k-1], a[lower-1], a[upper-1]
44 if g := s.Query(quantile); g < min || g > max {
45 t.Errorf("q=%f: want %v [%f,%f], got %v", quantile, w, min, max, g)
46 }
47 }
48 }
49
50 func verifyLowPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) {
51 sort.Float64s(a)
52 for _, qu := range LowQuantiles {
53 n := float64(len(a))
54 k := int(qu * n)
55
56 lowerRank := int((1 - RelativeEpsilon) * qu * n)
57 upperRank := int(math.Ceil((1 + RelativeEpsilon) * qu * n))
58 w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1]
59 if g := s.Query(qu); g < min || g > max {
60 t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g)
61 }
62 }
63 }
64
65 func verifyHighPercsWithRelativeEpsilon(t *testing.T, a []float64, s *Stream) {
66 sort.Float64s(a)
67 for _, qu := range HighQuantiles {
68 n := float64(len(a))
69 k := int(qu * n)
70
71 lowerRank := int((1 - (1+RelativeEpsilon)*(1-qu)) * n)
72 upperRank := int(math.Ceil((1 - (1-RelativeEpsilon)*(1-qu)) * n))
73 w, min, max := a[k-1], a[lowerRank-1], a[upperRank-1]
74 if g := s.Query(qu); g < min || g > max {
75 t.Errorf("q=%f: want %v [%f,%f], got %v", qu, w, min, max, g)
76 }
77 }
78 }
79
80 func populateStream(s *Stream) []float64 {
81 a := make([]float64, 0, 1e5+100)
1282 for i := 0; i < cap(a); i++ {
1383 v := rand.NormFloat64()
84 // Add 5% asymmetric outliers.
85 if i%20 == 0 {
86 v = v*v + 1
87 }
1488 s.Insert(v)
1589 a = append(a, v)
1690 }
17 t.Logf("len: %d", s.Count())
18 sort.Float64s(a)
19 w, min, max := getPerc(a, 0.50)
20 if g := s.Query(0.50); g < min || g > max {
21 t.Errorf("perc50: want %v [%f,%f], got %v", w, min, max, g)
22 }
23 w, min, max = getPerc(a, 0.90)
24 if g := s.Query(0.90); g < min || g > max {
25 t.Errorf("perc90: want %v [%f,%f], got %v", w, min, max, g)
26 }
27 w, min, max = getPerc(a, 0.99)
28 if g := s.Query(0.99); g < min || g > max {
29 t.Errorf("perc99: want %v [%f,%f], got %v", w, min, max, g)
30 }
91 return a
3192 }
3293
33 func TestQuantRandMergeQuery(t *testing.T) {
34 ch := make(chan float64)
35 done := make(chan *Stream)
36 for i := 0; i < 2; i++ {
37 go func() {
38 s := NewTargeted(0.5, 0.90, 0.99)
39 for v := range ch {
40 s.Insert(v)
41 }
42 done <- s
43 }()
44 }
94 func TestTargetedQuery(t *testing.T) {
95 rand.Seed(42)
96 s := NewTargeted(Targets)
97 a := populateStream(s)
98 verifyPercsWithAbsoluteEpsilon(t, a, s)
99 }
45100
101 func TestLowBiasedQuery(t *testing.T) {
46102 rand.Seed(42)
47 a := make([]float64, 0, 1e6)
48 for i := 0; i < cap(a); i++ {
49 v := rand.NormFloat64()
50 a = append(a, v)
51 ch <- v
52 }
53 close(ch)
103 s := NewLowBiased(RelativeEpsilon)
104 a := populateStream(s)
105 verifyLowPercsWithRelativeEpsilon(t, a, s)
106 }
54107
55 s := <-done
56 o := <-done
57 s.Merge(o.Samples())
108 func TestHighBiasedQuery(t *testing.T) {
109 rand.Seed(42)
110 s := NewHighBiased(RelativeEpsilon)
111 a := populateStream(s)
112 verifyHighPercsWithRelativeEpsilon(t, a, s)
113 }
58114
59 t.Logf("len: %d", s.Count())
60 sort.Float64s(a)
61 w, min, max := getPerc(a, 0.50)
62 if g := s.Query(0.50); g < min || g > max {
63 t.Errorf("perc50: want %v [%f,%f], got %v", w, min, max, g)
64 }
65 w, min, max = getPerc(a, 0.90)
66 if g := s.Query(0.90); g < min || g > max {
67 t.Errorf("perc90: want %v [%f,%f], got %v", w, min, max, g)
68 }
69 w, min, max = getPerc(a, 0.99)
70 if g := s.Query(0.99); g < min || g > max {
71 t.Errorf("perc99: want %v [%f,%f], got %v", w, min, max, g)
72 }
115 func TestTargetedMerge(t *testing.T) {
116 rand.Seed(42)
117 s1 := NewTargeted(Targets)
118 s2 := NewTargeted(Targets)
119 a := populateStream(s1)
120 a = append(a, populateStream(s2)...)
121 s1.Merge(s2.Samples())
122 verifyPercsWithAbsoluteEpsilon(t, a, s1)
123 }
124
125 func TestLowBiasedMerge(t *testing.T) {
126 rand.Seed(42)
127 s1 := NewLowBiased(RelativeEpsilon)
128 s2 := NewLowBiased(RelativeEpsilon)
129 a := populateStream(s1)
130 a = append(a, populateStream(s2)...)
131 s1.Merge(s2.Samples())
132 verifyLowPercsWithRelativeEpsilon(t, a, s2)
133 }
134
135 func TestHighBiasedMerge(t *testing.T) {
136 rand.Seed(42)
137 s1 := NewHighBiased(RelativeEpsilon)
138 s2 := NewHighBiased(RelativeEpsilon)
139 a := populateStream(s1)
140 a = append(a, populateStream(s2)...)
141 s1.Merge(s2.Samples())
142 verifyHighPercsWithRelativeEpsilon(t, a, s2)
73143 }
74144
75145 func TestUncompressed(t *testing.T) {
76 tests := []float64{0.50, 0.90, 0.95, 0.99}
77 q := NewTargeted(tests...)
146 q := NewTargeted(Targets)
78147 for i := 100; i > 0; i-- {
79148 q.Insert(float64(i))
80149 }
82151 t.Errorf("want count 100, got %d", g)
83152 }
84153 // Before compression, Query should have 100% accuracy.
85 for _, v := range tests {
86 w := v * 100
87 if g := q.Query(v); g != w {
154 for quantile := range Targets {
155 w := quantile * 100
156 if g := q.Query(quantile); g != w {
88157 t.Errorf("want %f, got %f", w, g)
89158 }
90159 }
91160 }
92161
93162 func TestUncompressedSamples(t *testing.T) {
94 q := NewTargeted(0.99)
163 q := NewTargeted(map[float64]float64{0.99: 0.001})
95164 for i := 1; i <= 100; i++ {
96165 q.Insert(float64(i))
97166 }
101170 }
102171
103172 func TestUncompressedOne(t *testing.T) {
104 q := NewTargeted(0.90)
173 q := NewTargeted(map[float64]float64{0.99: 0.01})
105174 q.Insert(3.14)
106175 if g := q.Query(0.90); g != 3.14 {
107176 t.Error("want PI, got", g)
109178 }
110179
111180 func TestDefaults(t *testing.T) {
112 if g := NewTargeted(0.99).Query(0.99); g != 0 {
181 if g := NewTargeted(map[float64]float64{0.99: 0.001}).Query(0.99); g != 0 {
113182 t.Errorf("want 0, got %f", g)
114183 }
115184 }
116
117 func getPerc(x []float64, p float64) (want, min, max float64) {
118 k := int(float64(len(x)) * p)
119 lower := int(float64(len(x)) * (p - 0.04))
120 if lower < 0 {
121 lower = 0
122 }
123 upper := int(float64(len(x))*(p+0.04)) + 1
124 if upper >= len(x) {
125 upper = len(x) - 1
126 }
127 return x[k], x[lower], x[upper]
128 }