Codebase list golang-github-beorn7-perks / 61af1a0
init Blake Mizerany 11 years ago
6 changed file(s) with 356 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
0 *.test
0 package quantile
1
2 import (
3 "testing"
4 )
5
6 func BenchmarkInsert(b *testing.B) {
7 b.StopTimer()
8 s := New(0.01, 0.5, 0.9, 0.99)
9 b.StartTimer()
10 for i := float64(0); i < float64(b.N); i++ {
11 s.Insert(i)
12 }
13 }
0 package quantile
1
2 import (
3 "sort"
4 )
5
6 type buffer struct {
7 *stream
8 b Samples
9 }
10
11 func (buf *buffer) Insert(v float64) {
12 buf.insert(Sample{Value: v, Width: 1})
13 }
14
15 func (buf *buffer) insert(sample Sample) {
16 buf.b = append(buf.b, sample)
17 if len(buf.b) == cap(buf.b) {
18 buf.flush()
19 buf.compress()
20 }
21 }
22
23 func (buf *buffer) Query(q float64) float64 {
24 if buf.flushed() {
25 // Fast path when there hasn't been enough data for a flush;
26 // this also yeilds better accuracy for small sets of data.
27 i := float64(len(buf.b)) * q
28 return buf.b[int(i)].Value
29 }
30 buf.flush()
31 return buf.stream.Query(q)
32 }
33
34 func (buf *buffer) Merge(samples Samples) {
35 buf.stream.Merge(samples)
36 }
37
38 func (buf *buffer) Init() {
39 buf.stream.Init()
40 buf.b = buf.b[:0]
41 }
42
43 func (buf *buffer) Samples() Samples {
44 if !buf.flushed() {
45 return buf.b
46 }
47 return buf.stream.Samples()
48 }
49
50 func (buf *buffer) flush() {
51 sort.Sort(buf.b)
52 buf.stream.Merge(buf.b)
53 buf.b = buf.b[:0]
54 }
55
56 func (buf *buffer) flushed() bool {
57 return buf.stream.l.Len() == 0
58 }
0 // The quantile package implements Effective Computation of Biased Quantiles over Data Streams
1 // http://www.cs.rutgers.edu/~muthu/bquant.pdf
2 package quantile
3
4 import (
5 "container/list"
6 "math"
7 )
8
9 type Interface interface {
10 // Query returns the calculated qth percentiles value.
11 Query(q float64) float64
12
13 // Insert inserts v into the list.
14 Insert(v float64)
15
16 // Merge merges samples into the list. This handy when
17 // merging multiple streams from seperate threads.
18 Merge(samples Samples)
19
20 // Samples returns a copy of the list of samples kept from the data
21 // stream.
22 Samples() Samples
23
24 // Count returns the total number of samples observed in the stream
25 // since initialization.
26 Count() int
27
28 // Init initializes or clears the list.
29 Init()
30
31 // Min returns the minimum value observed in the list.
32 Min() float64
33
34 // Max returns the maximum value observed in the list.
35 Max() float64
36 }
37
38 type stream struct {
39 e float64
40 q []float64
41 n float64
42 l *list.List
43 }
44
45 // New returns an initialized stream.
46 func New(e float64, quantiles ...float64) Interface {
47 x := &stream{e: e, q: quantiles, l: list.New()}
48 return &buffer{x, make(Samples, 0, 500)}
49 }
50
51 func (qt *stream) Init() {
52 qt.l.Init()
53 qt.n = 0
54 }
55
56 func (qt *stream) ƒ(r float64) float64 {
57 var m float64 = math.MaxFloat64
58 var f float64
59 for _, q := range qt.q {
60 if q*qt.n <= r {
61 f = (2 * qt.e * r) / q
62 } else {
63 f = (2 * qt.e * (qt.n - r)) / (1 - q)
64 }
65 m = math.Min(m, f)
66 }
67 return m
68 }
69
70 func (qt *stream) Insert(v float64) {
71 fn := qt.mergeFunc()
72 fn(v, 1)
73 }
74
75 func (qt *stream) Merge(samples Samples) {
76 fn := qt.mergeFunc()
77 for _, s := range samples {
78 fn(s.Value, s.Width)
79 }
80 }
81
82 func (qt *stream) mergeFunc() func(v, w float64) {
83 // NOTE: I used a goto over defer because it bought me a few extra
84 // nanoseconds. I know. I know.
85 var r float64
86 e := qt.l.Front()
87 return func(v, w float64) {
88 for ; e != nil; e = e.Next() {
89 c := e.Value.(*Sample)
90 if c.Value > v {
91 s := &Sample{v, w, math.Floor(qt.ƒ(r)) - 1}
92 qt.l.InsertBefore(s, e)
93 goto inserted
94 }
95 r += c.Width
96 }
97 qt.l.PushBack(&Sample{v, w, 0})
98 inserted:
99 qt.n += w
100 }
101 }
102
103 func (qt *stream) Count() int {
104 return int(qt.n)
105 }
106
107 func (qt *stream) Query(q float64) float64 {
108 e := qt.l.Front()
109 t := math.Ceil(q * qt.n)
110 t += math.Ceil(qt.ƒ(t) / 2)
111 p := e.Value.(*Sample)
112 e = e.Next()
113 r := float64(0)
114 for e != nil {
115 c := e.Value.(*Sample)
116 if r+c.Width+c.Delta > t {
117 return p.Value
118 }
119 r += p.Width
120 p = c
121 e = e.Next()
122 }
123 return p.Value
124 }
125
126 func (qt *stream) compress() {
127 if qt.l.Len() < 2 {
128 return
129 }
130 e := qt.l.Back()
131 x := e.Value.(*Sample)
132 r := qt.n - 1 - x.Width
133 e = e.Prev()
134 for e != nil {
135 c := e.Value.(*Sample)
136 if c.Width+x.Width+x.Delta <= qt.ƒ(r) {
137 x.Width += c.Width
138 o := e
139 e = e.Prev()
140 qt.l.Remove(o)
141 } else {
142 x = c
143 e = e.Prev()
144 }
145 r -= c.Width
146 }
147 }
148
149 func (qt *stream) Samples() Samples {
150 samples := make(Samples, 0, qt.l.Len())
151 for e := qt.l.Front(); e != nil; e = e.Next() {
152 samples = append(samples, *e.Value.(*Sample))
153 }
154 return samples
155 }
156
157 // Min returns the mininmul value observed in the stream.
158 func (qt *stream) Min() float64 {
159 if e := qt.l.Front(); e != nil {
160 return e.Value.(*Sample).Value
161 }
162 return math.NaN()
163 }
164
165 // Max returns the maximum value observed in the stream within the error epsilon.
166 func (qt *stream) Max() float64 {
167 if e := qt.l.Back(); e != nil {
168 return e.Value.(*Sample).Value
169 }
170 return math.NaN()
171 }
0 package quantile
1
2 import (
3 "math"
4 "math/rand"
5 "sort"
6 "testing"
7 )
8
9 func TestQuantRandQuery(t *testing.T) {
10 s := New(0.01, 0.5, 0.90, 0.99)
11 a := make([]float64, 0, 1e5)
12 rand.Seed(42)
13 for i := 0; i < cap(a); i++ {
14 v := float64(rand.Int63())
15 s.Insert(v)
16 a = append(a, v)
17 }
18 t.Logf("len: %d", s.Count())
19 sort.Float64s(a)
20 w := getPerc(a, 0.50)
21 if g := s.Query(0.50); math.Abs(w-g)/w > 0.03 {
22 t.Errorf("perc50: want %v, got %v", w, g)
23 t.Logf("e: %f", math.Abs(w-g)/w)
24 }
25 w = getPerc(a, 0.90)
26 if g := s.Query(0.90); math.Abs(w-g)/w > 0.03 {
27 t.Errorf("perc90: want %v, got %v", w, g)
28 t.Logf("e: %f", math.Abs(w-g)/w)
29 }
30 w = getPerc(a, 0.99)
31 if g := s.Query(0.99); math.Abs(w-g)/w > 0.03 {
32 t.Errorf("perc99: want %v, got %v", w, g)
33 t.Logf("e: %f", math.Abs(w-g)/w)
34 }
35 }
36
37 func TestQuantRandMergeQuery(t *testing.T) {
38
39 ch := make(chan float64)
40 done := make(chan Interface)
41 for i := 0; i < 2; i++ {
42 go func() {
43 s := New(0.01, 0.5, 0.90, 0.99)
44 for v := range ch {
45 s.Insert(v)
46 }
47 done <- s
48 }()
49 }
50
51 rand.Seed(42)
52 a := make([]float64, 0, 1e6)
53 for i := 0; i < cap(a); i++ {
54 v := float64(rand.Int63())
55 a = append(a, v)
56 ch <- v
57 }
58 close(ch)
59
60 s := <-done
61 o := <-done
62 s.Merge(o.Samples())
63
64 t.Logf("len: %d", s.Count())
65 sort.Float64s(a)
66 w := getPerc(a, 0.50)
67 if g := s.Query(0.50); math.Abs(w-g)/w > 0.03 {
68 t.Errorf("perc50: want %v, got %v", w, g)
69 t.Logf("e: %f", math.Abs(w-g)/w)
70 }
71 w = getPerc(a, 0.90)
72 if g := s.Query(0.90); math.Abs(w-g)/w > 0.03 {
73 t.Errorf("perc90: want %v, got %v", w, g)
74 t.Logf("e: %f", math.Abs(w-g)/w)
75 }
76 w = getPerc(a, 0.99)
77 if g := s.Query(0.99); math.Abs(w-g)/w > 0.03 {
78 t.Errorf("perc99: want %v, got %v", w, g)
79 t.Logf("e: %f", math.Abs(w-g)/w)
80 }
81 }
82
83 func getPerc(x []float64, p float64) float64 {
84 k := int(float64(len(x)) * p)
85 return x[k]
86 }
0 package quantile
1
2 // Sample holds an observed value and meta information for compression. JSON
3 // tags have been added for convenience.
4 type Sample struct {
5 Value float64 `json:",string"`
6 Width float64 `json:",string"`
7 Delta float64 `json:",string"`
8 }
9
10 type Samples []Sample
11
12 func (a Samples) Len() int {
13 return len(a)
14 }
15
16 func (a Samples) Less(i, j int) bool {
17 return a[i].Value < a[j].Value
18 }
19
20 func (a Samples) Swap(i, j int) {
21 a[i], a[j] = a[j], a[i]
22 }