Codebase list golang-github-beorn7-perks / ac5ec5b
@bradfitz suggestions Blake Mizerany 11 years ago
6 changed file(s) with 327 addition(s) and 353 deletion(s). Raw diff Collapse all Expand all
+0
-59
quantile/buffer.go less more
0 package quantile
1
2 import (
3 "sort"
4 )
5
6 type buffer struct {
7 *stream
8 b Samples
9 }
10
11 func (buf *buffer) Insert(v float64) {
12 buf.insert(Sample{Value: v, Width: 1})
13 }
14
15 func (buf *buffer) insert(sample Sample) {
16 buf.b = append(buf.b, sample)
17 if len(buf.b) == cap(buf.b) {
18 buf.flush()
19 buf.compress()
20 }
21 }
22
23 func (buf *buffer) Query(q float64) float64 {
24 if buf.flushed() {
25 // Fast path when there hasn't been enough data for a flush;
26 // this also yeilds better accuracy for small sets of data.
27 i := float64(len(buf.b)) * q
28 return buf.b[int(i)].Value
29 }
30 buf.flush()
31 return buf.stream.Query(q)
32 }
33
34 func (buf *buffer) Merge(samples Samples) {
35 buf.stream.Merge(samples)
36 }
37
38 func (buf *buffer) Init() {
39 buf.stream.Init()
40 buf.b = buf.b[:0]
41 }
42
43 func (buf *buffer) Samples() Samples {
44 if !buf.flushed() {
45 return buf.b
46 }
47 return buf.stream.Samples()
48 }
49
50 func (buf *buffer) flush() {
51 sort.Sort(buf.b)
52 buf.stream.Merge(buf.b)
53 buf.b = buf.b[:0]
54 }
55
56 func (buf *buffer) flushed() bool {
57 return buf.stream.l.Len() == 0
58 }
+0
-87
quantile/buffer_test.go less more
0 package quantile
1
2 import (
3 "math"
4 "math/rand"
5 "sort"
6 "testing"
7 )
8
9 func TestQuantRandQuery(t *testing.T) {
10 s := New(0.01, 0.5, 0.90, 0.99)
11 a := make([]float64, 0, 1e5)
12 rand.Seed(42)
13 for i := 0; i < cap(a); i++ {
14 v := float64(rand.Int63())
15 s.Insert(v)
16 a = append(a, v)
17 }
18 t.Logf("len: %d", s.Count())
19 sort.Float64s(a)
20 w := getPerc(a, 0.50)
21 if g := s.Query(0.50); math.Abs(w-g)/w > 0.03 {
22 t.Errorf("perc50: want %v, got %v", w, g)
23 t.Logf("e: %f", math.Abs(w-g)/w)
24 }
25 w = getPerc(a, 0.90)
26 if g := s.Query(0.90); math.Abs(w-g)/w > 0.03 {
27 t.Errorf("perc90: want %v, got %v", w, g)
28 t.Logf("e: %f", math.Abs(w-g)/w)
29 }
30 w = getPerc(a, 0.99)
31 if g := s.Query(0.99); math.Abs(w-g)/w > 0.03 {
32 t.Errorf("perc99: want %v, got %v", w, g)
33 t.Logf("e: %f", math.Abs(w-g)/w)
34 }
35 }
36
37 func TestQuantRandMergeQuery(t *testing.T) {
38
39 ch := make(chan float64)
40 done := make(chan Interface)
41 for i := 0; i < 2; i++ {
42 go func() {
43 s := New(0.01, 0.5, 0.90, 0.99)
44 for v := range ch {
45 s.Insert(v)
46 }
47 done <- s
48 }()
49 }
50
51 rand.Seed(42)
52 a := make([]float64, 0, 1e6)
53 for i := 0; i < cap(a); i++ {
54 v := float64(rand.Int63())
55 a = append(a, v)
56 ch <- v
57 }
58 close(ch)
59
60 s := <-done
61 o := <-done
62 s.Merge(o.Samples())
63
64 t.Logf("len: %d", s.Count())
65 sort.Float64s(a)
66 w := getPerc(a, 0.50)
67 if g := s.Query(0.50); math.Abs(w-g)/w > 0.03 {
68 t.Errorf("perc50: want %v, got %v", w, g)
69 t.Logf("e: %f", math.Abs(w-g)/w)
70 }
71 w = getPerc(a, 0.90)
72 if g := s.Query(0.90); math.Abs(w-g)/w > 0.03 {
73 t.Errorf("perc90: want %v, got %v", w, g)
74 t.Logf("e: %f", math.Abs(w-g)/w)
75 }
76 w = getPerc(a, 0.99)
77 if g := s.Query(0.99); math.Abs(w-g)/w > 0.03 {
78 t.Errorf("perc99: want %v, got %v", w, g)
79 t.Logf("e: %f", math.Abs(w-g)/w)
80 }
81 }
82
83 func getPerc(x []float64, p float64) float64 {
84 k := int(float64(len(x)) * p)
85 return x[k]
86 }
+0
-184
quantile/quantile.go less more
0 // The quantile package implements Effective Computation of Biased Quantiles
1 // over Data Streams http://www.cs.rutgers.edu/~muthu/bquant.pdf
2 //
3 // This package is useful for calculating targeted quantiles for large datasets
4 // within low memory and cpu bounds. This means your trading a small amount of
5 // accuracy in rank selection, for efficiency.
6 //
7 // NOTE: Multiple streams can be merged before a Query, allowing clients to be distributed across threads.
8 package quantile
9
10 import (
11 "container/list"
12 "math"
13 )
14
15 type Interface interface {
16 // Query returns the calculated qth percentiles value. Calling Query
17 // with q not in the set quantiles given to New will have non-deterministic
18 // results.
19 Query(q float64) float64
20
21 // Insert inserts v into the stream.
22 Insert(v float64)
23
24 // Merge merges samples into the underlying streams samples. This is handy when
25 // merging multiple streams from seperate threads.
26 Merge(samples Samples)
27
28 // Samples returns the streams held samples.
29 Samples() Samples
30
31 // Count returns the total number of samples observed in the stream
32 // since initialization.
33 Count() int
34
35 // Init initializes or clears the list.
36 Init()
37
38 // Min returns the minimum value observed in the list.
39 Min() float64
40
41 // Max returns the maximum value observed in the list.
42 Max() float64
43 }
44
45 type stream struct {
46 e float64
47 q []float64
48 n float64
49 l *list.List
50 max float64
51 }
52
53 // New returns an initialized stream for targeted quantiles using error e. e is usually 0.01.
54 func New(e float64, quantiles ...float64) Interface {
55 x := &stream{e: e, q: quantiles, l: list.New()}
56 return &buffer{x, make(Samples, 0, 500)}
57 }
58
59 func (qt *stream) Init() {
60 qt.l.Init()
61 qt.n = 0
62 }
63
64 func (qt *stream) ƒ(r float64) float64 {
65 var m float64 = math.MaxFloat64
66 var f float64
67 for _, q := range qt.q {
68 if q*qt.n <= r {
69 f = (2 * qt.e * r) / q
70 } else {
71 f = (2 * qt.e * (qt.n - r)) / (1 - q)
72 }
73 m = math.Min(m, f)
74 }
75 return m
76 }
77
78 func (qt *stream) Insert(v float64) {
79 fn := qt.mergeFunc()
80 fn(v, 1)
81 }
82
83 func (qt *stream) Merge(samples Samples) {
84 fn := qt.mergeFunc()
85 for _, s := range samples {
86 fn(s.Value, s.Width)
87 }
88 }
89
90 func (qt *stream) mergeFunc() func(v, w float64) {
91 // NOTE: I used a goto over defer because it bought me a few extra
92 // nanoseconds. I know. I know.
93 var r float64
94 e := qt.l.Front()
95 return func(v, w float64) {
96 if v > qt.max {
97 qt.max = v
98 }
99
100 for ; e != nil; e = e.Next() {
101 c := e.Value.(*Sample)
102 if c.Value > v {
103 s := &Sample{v, w, math.Floor(qt.ƒ(r)) - 1}
104 qt.l.InsertBefore(s, e)
105 goto inserted
106 }
107 r += c.Width
108 }
109 qt.l.PushBack(&Sample{v, w, 0})
110 inserted:
111 qt.n += w
112 }
113 }
114
115 func (qt *stream) Count() int {
116 return int(qt.n)
117 }
118
119 func (qt *stream) Query(q float64) float64 {
120 e := qt.l.Front()
121 t := math.Ceil(q * qt.n)
122 t += math.Ceil(qt.ƒ(t) / 2)
123 p := e.Value.(*Sample)
124 e = e.Next()
125 r := float64(0)
126 for e != nil {
127 c := e.Value.(*Sample)
128 if r+c.Width+c.Delta > t {
129 return p.Value
130 }
131 r += p.Width
132 p = c
133 e = e.Next()
134 }
135 return p.Value
136 }
137
138 func (qt *stream) compress() {
139 if qt.l.Len() < 2 {
140 return
141 }
142 e := qt.l.Back()
143 x := e.Value.(*Sample)
144 r := qt.n - 1 - x.Width
145 e = e.Prev()
146 for e != nil {
147 c := e.Value.(*Sample)
148 if c.Width+x.Width+x.Delta <= qt.ƒ(r) {
149 x.Width += c.Width
150 o := e
151 e = e.Prev()
152 qt.l.Remove(o)
153 } else {
154 x = c
155 e = e.Prev()
156 }
157 r -= c.Width
158 }
159 }
160
161 func (qt *stream) Samples() Samples {
162 samples := make(Samples, 0, qt.l.Len())
163 for e := qt.l.Front(); e != nil; e = e.Next() {
164 samples = append(samples, *e.Value.(*Sample))
165 }
166 return samples
167 }
168
169 // Min returns the mininmul value observed in the stream.
170 func (qt *stream) Min() float64 {
171 if e := qt.l.Front(); e != nil {
172 return e.Value.(*Sample).Value
173 }
174 return math.NaN()
175 }
176
177 // Max returns the maximum value observed in the stream within the error epsilon.
178 func (qt *stream) Max() float64 {
179 if qt.l.Len() > 0 {
180 return qt.max
181 }
182 return math.NaN()
183 }
+0
-23
quantile/sample.go less more
0 package quantile
1
2 // Sample holds an observed value and meta information for compression. JSON
3 // tags have been added for convenience.
4 type Sample struct {
5 Value float64 `json:",string"`
6 Width float64 `json:",string"`
7 Delta float64 `json:",string"`
8 }
9
10 type Samples []Sample
11
12 func (a Samples) Len() int {
13 return len(a)
14 }
15
16 func (a Samples) Less(i, j int) bool {
17 return a[i].Value < a[j].Value
18 }
19
20 func (a Samples) Swap(i, j int) {
21 a[i], a[j] = a[j], a[i]
22 }
0 // The quantile package implements Effective Computation of Biased Quantiles
1 // over Data Streams http://www.cs.rutgers.edu/~muthu/bquant.pdf
2 //
3 // This package is useful for calculating targeted quantiles for large datasets
4 // within low memory and cpu bounds. This means your trading a small amount of
5 // accuracy in rank selection, for efficiency.
6 //
7 // NOTE: Multiple streams can be merged before a Query, allowing clients to be distributed across threads.
8 package quantile
9
10 import (
11 "container/list"
12 "math"
13 "sort"
14 )
15
16 // Sample holds an observed value and meta information for compression. JSON
17 // tags have been added for convenience.
18 type Sample struct {
19 Value float64 `json:",string"`
20 Width float64 `json:",string"`
21 Delta float64 `json:",string"`
22 }
23
24 type Samples []Sample
25
26 func (a Samples) Len() int {
27 return len(a)
28 }
29
30 func (a Samples) Less(i, j int) bool {
31 return a[i].Value < a[j].Value
32 }
33
34 func (a Samples) Swap(i, j int) {
35 a[i], a[j] = a[j], a[i]
36 }
37
38 type Stream struct {
39 *stream
40 b Samples
41 }
42
43 // New returns an initialized stream for targeted quantiles using error e. e is usually 0.01.
44 func New(e float64, quantiles ...float64) *Stream {
45 x := &stream{e: e, q: quantiles, l: list.New()}
46 return &Stream{x, make(Samples, 0, 500)}
47 }
48
49 // Insert inserts v into the stream.
50 func (s *Stream) Insert(v float64) {
51 s.insert(Sample{Value: v, Width: 1})
52 }
53
54 func (s *Stream) insert(sample Sample) {
55 s.b = append(s.b, sample)
56 if len(s.b) == cap(s.b) {
57 s.flush()
58 s.compress()
59 }
60 }
61
62 // Query returns the calculated qth percentiles value. Calling Query with q not
63 // in the set quantiles given to New will have non-deterministic results.
64 func (s *Stream) Query(q float64) float64 {
65 if s.flushed() {
66 // Fast path when there hasn't been enough data for a flush;
67 // this also yeilds better accuracy for small sets of data.
68 i := float64(len(s.b)) * q
69 return s.b[int(i)].Value
70 }
71 s.flush()
72 return s.stream.query(q)
73 }
74
75 // Merge merges samples into the underlying streams samples. This is handy when
76 // merging multiple streams from seperate threads.
77 func (s *Stream) Merge(samples Samples) {
78 s.stream.merge(samples)
79 }
80
81 // Init initializes or clears the list.
82 func (s *Stream) Init() {
83 s.stream.Init()
84 s.b = s.b[:0]
85 }
86
87 // Samples returns the streams held samples.
88 func (s *Stream) Samples() Samples {
89 if !s.flushed() {
90 return s.b
91 }
92 return s.stream.samples()
93 }
94
95 func (s *Stream) flush() {
96 sort.Sort(s.b)
97 s.stream.merge(s.b)
98 s.b = s.b[:0]
99 }
100
101 func (s *Stream) flushed() bool {
102 return s.stream.l.Len() == 0
103 }
104
105 type stream struct {
106 e float64
107 q []float64
108 n float64
109 l *list.List
110 max float64
111 }
112
113 func (s *stream) Init() {
114 s.l.Init()
115 s.n = 0
116 }
117
118 func (s *stream) ƒ(r float64) float64 {
119 var m float64 = math.MaxFloat64
120 var f float64
121 for _, q := range s.q {
122 if q*s.n <= r {
123 f = (2 * s.e * r) / q
124 } else {
125 f = (2 * s.e * (s.n - r)) / (1 - q)
126 }
127 m = math.Min(m, f)
128 }
129 return m
130 }
131
132 func (s *stream) insert(v float64) {
133 fn := s.mergeFunc()
134 fn(v, 1)
135 }
136
137 func (s *stream) merge(samples Samples) {
138 fn := s.mergeFunc()
139 for _, s := range samples {
140 fn(s.Value, s.Width)
141 }
142 }
143
144 func (s *stream) mergeFunc() func(v, w float64) {
145 // NOTE: I used a goto over defer because it bought me a few extra
146 // nanoseconds. I know. I know.
147 var r float64
148 e := s.l.Front()
149 return func(v, w float64) {
150 if v > s.max {
151 s.max = v
152 }
153
154 for ; e != nil; e = e.Next() {
155 c := e.Value.(*Sample)
156 if c.Value > v {
157 sm := &Sample{v, w, math.Floor(s.ƒ(r)) - 1}
158 s.l.InsertBefore(sm, e)
159 goto inserted
160 }
161 r += c.Width
162 }
163 s.l.PushBack(&Sample{v, w, 0})
164 inserted:
165 s.n += w
166 }
167 }
168
169 // Count returns the total number of samples observed in the stream
170 // since initialization.
171 func (s *stream) Count() int {
172 return int(s.n)
173 }
174
175 func (s *stream) query(q float64) float64 {
176 e := s.l.Front()
177 t := math.Ceil(q * s.n)
178 t += math.Ceil(s.ƒ(t) / 2)
179 p := e.Value.(*Sample)
180 e = e.Next()
181 r := float64(0)
182 for e != nil {
183 c := e.Value.(*Sample)
184 if r+c.Width+c.Delta > t {
185 return p.Value
186 }
187 r += p.Width
188 p = c
189 e = e.Next()
190 }
191 return p.Value
192 }
193
194 func (s *stream) compress() {
195 if s.l.Len() < 2 {
196 return
197 }
198 e := s.l.Back()
199 x := e.Value.(*Sample)
200 r := s.n - 1 - x.Width
201 e = e.Prev()
202 for e != nil {
203 c := e.Value.(*Sample)
204 if c.Width+x.Width+x.Delta <= s.ƒ(r) {
205 x.Width += c.Width
206 o := e
207 e = e.Prev()
208 s.l.Remove(o)
209 } else {
210 x = c
211 e = e.Prev()
212 }
213 r -= c.Width
214 }
215 }
216
217 func (s *stream) samples() Samples {
218 samples := make(Samples, 0, s.l.Len())
219 for e := s.l.Front(); e != nil; e = e.Next() {
220 samples = append(samples, *e.Value.(*Sample))
221 }
222 return samples
223 }
224
225 // Min returns the mininmul value observed in the stream.
226 func (s *stream) Min() float64 {
227 if e := s.l.Front(); e != nil {
228 return e.Value.(*Sample).Value
229 }
230 return math.NaN()
231 }
232
233 // Max returns the maximum value observed in the stream within the error epsilon.
234 func (s *stream) Max() float64 {
235 if s.l.Len() > 0 {
236 return s.max
237 }
238 return math.NaN()
239 }
0 package quantile
1
2 import (
3 "math"
4 "math/rand"
5 "sort"
6 "testing"
7 )
8
9 func TestQuantRandQuery(t *testing.T) {
10 s := New(0.01, 0.5, 0.90, 0.99)
11 a := make([]float64, 0, 1e5)
12 rand.Seed(42)
13 for i := 0; i < cap(a); i++ {
14 v := float64(rand.Int63())
15 s.Insert(v)
16 a = append(a, v)
17 }
18 t.Logf("len: %d", s.Count())
19 sort.Float64s(a)
20 w := getPerc(a, 0.50)
21 if g := s.Query(0.50); math.Abs(w-g)/w > 0.03 {
22 t.Errorf("perc50: want %v, got %v", w, g)
23 t.Logf("e: %f", math.Abs(w-g)/w)
24 }
25 w = getPerc(a, 0.90)
26 if g := s.Query(0.90); math.Abs(w-g)/w > 0.03 {
27 t.Errorf("perc90: want %v, got %v", w, g)
28 t.Logf("e: %f", math.Abs(w-g)/w)
29 }
30 w = getPerc(a, 0.99)
31 if g := s.Query(0.99); math.Abs(w-g)/w > 0.03 {
32 t.Errorf("perc99: want %v, got %v", w, g)
33 t.Logf("e: %f", math.Abs(w-g)/w)
34 }
35 }
36
37 func TestQuantRandMergeQuery(t *testing.T) {
38
39 ch := make(chan float64)
40 done := make(chan *Stream)
41 for i := 0; i < 2; i++ {
42 go func() {
43 s := New(0.01, 0.5, 0.90, 0.99)
44 for v := range ch {
45 s.Insert(v)
46 }
47 done <- s
48 }()
49 }
50
51 rand.Seed(42)
52 a := make([]float64, 0, 1e6)
53 for i := 0; i < cap(a); i++ {
54 v := float64(rand.Int63())
55 a = append(a, v)
56 ch <- v
57 }
58 close(ch)
59
60 s := <-done
61 o := <-done
62 s.Merge(o.Samples())
63
64 t.Logf("len: %d", s.Count())
65 sort.Float64s(a)
66 w := getPerc(a, 0.50)
67 if g := s.Query(0.50); math.Abs(w-g)/w > 0.03 {
68 t.Errorf("perc50: want %v, got %v", w, g)
69 t.Logf("e: %f", math.Abs(w-g)/w)
70 }
71 w = getPerc(a, 0.90)
72 if g := s.Query(0.90); math.Abs(w-g)/w > 0.03 {
73 t.Errorf("perc90: want %v, got %v", w, g)
74 t.Logf("e: %f", math.Abs(w-g)/w)
75 }
76 w = getPerc(a, 0.99)
77 if g := s.Query(0.99); math.Abs(w-g)/w > 0.03 {
78 t.Errorf("perc99: want %v, got %v", w, g)
79 t.Logf("e: %f", math.Abs(w-g)/w)
80 }
81 }
82
83 func getPerc(x []float64, p float64) float64 {
84 k := int(float64(len(x)) * p)
85 return x[k]
86 }