Codebase list golang-github-influxdata-tdigest / 8b4ac83
New upstream version 0.0~git20180711.a7d76c6 Alexandre Viau 5 years ago
14 changed file(s) with 1893 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
0 /test/*.dat*
0 Apache License
1 Version 2.0, January 2004
2 http://www.apache.org/licenses/
3
4 TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
5
6 1. Definitions.
7
8 "License" shall mean the terms and conditions for use, reproduction,
9 and distribution as defined by Sections 1 through 9 of this document.
10
11 "Licensor" shall mean the copyright owner or entity authorized by
12 the copyright owner that is granting the License.
13
14 "Legal Entity" shall mean the union of the acting entity and all
15 other entities that control, are controlled by, or are under common
16 control with that entity. For the purposes of this definition,
17 "control" means (i) the power, direct or indirect, to cause the
18 direction or management of such entity, whether by contract or
19 otherwise, or (ii) ownership of fifty percent (50%) or more of the
20 outstanding shares, or (iii) beneficial ownership of such entity.
21
22 "You" (or "Your") shall mean an individual or Legal Entity
23 exercising permissions granted by this License.
24
25 "Source" form shall mean the preferred form for making modifications,
26 including but not limited to software source code, documentation
27 source, and configuration files.
28
29 "Object" form shall mean any form resulting from mechanical
30 transformation or translation of a Source form, including but
31 not limited to compiled object code, generated documentation,
32 and conversions to other media types.
33
34 "Work" shall mean the work of authorship, whether in Source or
35 Object form, made available under the License, as indicated by a
36 copyright notice that is included in or attached to the work
37 (an example is provided in the Appendix below).
38
39 "Derivative Works" shall mean any work, whether in Source or Object
40 form, that is based on (or derived from) the Work and for which the
41 editorial revisions, annotations, elaborations, or other modifications
42 represent, as a whole, an original work of authorship. For the purposes
43 of this License, Derivative Works shall not include works that remain
44 separable from, or merely link (or bind by name) to the interfaces of,
45 the Work and Derivative Works thereof.
46
47 "Contribution" shall mean any work of authorship, including
48 the original version of the Work and any modifications or additions
49 to that Work or Derivative Works thereof, that is intentionally
50 submitted to Licensor for inclusion in the Work by the copyright owner
51 or by an individual or Legal Entity authorized to submit on behalf of
52 the copyright owner. For the purposes of this definition, "submitted"
53 means any form of electronic, verbal, or written communication sent
54 to the Licensor or its representatives, including but not limited to
55 communication on electronic mailing lists, source code control systems,
56 and issue tracking systems that are managed by, or on behalf of, the
57 Licensor for the purpose of discussing and improving the Work, but
58 excluding communication that is conspicuously marked or otherwise
59 designated in writing by the copyright owner as "Not a Contribution."
60
61 "Contributor" shall mean Licensor and any individual or Legal Entity
62 on behalf of whom a Contribution has been received by Licensor and
63 subsequently incorporated within the Work.
64
65 2. Grant of Copyright License. Subject to the terms and conditions of
66 this License, each Contributor hereby grants to You a perpetual,
67 worldwide, non-exclusive, no-charge, royalty-free, irrevocable
68 copyright license to reproduce, prepare Derivative Works of,
69 publicly display, publicly perform, sublicense, and distribute the
70 Work and such Derivative Works in Source or Object form.
71
72 3. Grant of Patent License. Subject to the terms and conditions of
73 this License, each Contributor hereby grants to You a perpetual,
74 worldwide, non-exclusive, no-charge, royalty-free, irrevocable
75 (except as stated in this section) patent license to make, have made,
76 use, offer to sell, sell, import, and otherwise transfer the Work,
77 where such license applies only to those patent claims licensable
78 by such Contributor that are necessarily infringed by their
79 Contribution(s) alone or by combination of their Contribution(s)
80 with the Work to which such Contribution(s) was submitted. If You
81 institute patent litigation against any entity (including a
82 cross-claim or counterclaim in a lawsuit) alleging that the Work
83 or a Contribution incorporated within the Work constitutes direct
84 or contributory patent infringement, then any patent licenses
85 granted to You under this License for that Work shall terminate
86 as of the date such litigation is filed.
87
88 4. Redistribution. You may reproduce and distribute copies of the
89 Work or Derivative Works thereof in any medium, with or without
90 modifications, and in Source or Object form, provided that You
91 meet the following conditions:
92
93 (a) You must give any other recipients of the Work or
94 Derivative Works a copy of this License; and
95
96 (b) You must cause any modified files to carry prominent notices
97 stating that You changed the files; and
98
99 (c) You must retain, in the Source form of any Derivative Works
100 that You distribute, all copyright, patent, trademark, and
101 attribution notices from the Source form of the Work,
102 excluding those notices that do not pertain to any part of
103 the Derivative Works; and
104
105 (d) If the Work includes a "NOTICE" text file as part of its
106 distribution, then any Derivative Works that You distribute must
107 include a readable copy of the attribution notices contained
108 within such NOTICE file, excluding those notices that do not
109 pertain to any part of the Derivative Works, in at least one
110 of the following places: within a NOTICE text file distributed
111 as part of the Derivative Works; within the Source form or
112 documentation, if provided along with the Derivative Works; or,
113 within a display generated by the Derivative Works, if and
114 wherever such third-party notices normally appear. The contents
115 of the NOTICE file are for informational purposes only and
116 do not modify the License. You may add Your own attribution
117 notices within Derivative Works that You distribute, alongside
118 or as an addendum to the NOTICE text from the Work, provided
119 that such additional attribution notices cannot be construed
120 as modifying the License.
121
122 You may add Your own copyright statement to Your modifications and
123 may provide additional or different license terms and conditions
124 for use, reproduction, or distribution of Your modifications, or
125 for any such Derivative Works as a whole, provided Your use,
126 reproduction, and distribution of the Work otherwise complies with
127 the conditions stated in this License.
128
129 5. Submission of Contributions. Unless You explicitly state otherwise,
130 any Contribution intentionally submitted for inclusion in the Work
131 by You to the Licensor shall be under the terms and conditions of
132 this License, without any additional terms or conditions.
133 Notwithstanding the above, nothing herein shall supersede or modify
134 the terms of any separate license agreement you may have executed
135 with Licensor regarding such Contributions.
136
137 6. Trademarks. This License does not grant permission to use the trade
138 names, trademarks, service marks, or product names of the Licensor,
139 except as required for reasonable and customary use in describing the
140 origin of the Work and reproducing the content of the NOTICE file.
141
142 7. Disclaimer of Warranty. Unless required by applicable law or
143 agreed to in writing, Licensor provides the Work (and each
144 Contributor provides its Contributions) on an "AS IS" BASIS,
145 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
146 implied, including, without limitation, any warranties or conditions
147 of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
148 PARTICULAR PURPOSE. You are solely responsible for determining the
149 appropriateness of using or redistributing the Work and assume any
150 risks associated with Your exercise of permissions under this License.
151
152 8. Limitation of Liability. In no event and under no legal theory,
153 whether in tort (including negligence), contract, or otherwise,
154 unless required by applicable law (such as deliberate and grossly
155 negligent acts) or agreed to in writing, shall any Contributor be
156 liable to You for damages, including any direct, indirect, special,
157 incidental, or consequential damages of any character arising as a
158 result of this License or out of the use or inability to use the
159 Work (including but not limited to damages for loss of goodwill,
160 work stoppage, computer failure or malfunction, or any and all
161 other commercial damages or losses), even if such Contributor
162 has been advised of the possibility of such damages.
163
164 9. Accepting Warranty or Additional Liability. While redistributing
165 the Work or Derivative Works thereof, You may choose to offer,
166 and charge a fee for, acceptance of support, warranty, indemnity,
167 or other liability obligations and/or rights consistent with this
168 License. However, in accepting such obligations, You may act only
169 on Your own behalf and on Your sole responsibility, not on behalf
170 of any other Contributor, and only if You agree to indemnify,
171 defend, and hold each Contributor harmless for any liability
172 incurred by, or claims asserted against, such Contributor by reason
173 of your accepting any such warranty or additional liability.
174
175 END OF TERMS AND CONDITIONS
176
177 APPENDIX: How to apply the Apache License to your work.
178
179 To apply the Apache License to your work, attach the following
180 boilerplate notice, with the fields enclosed by brackets "{}"
181 replaced with your own identifying information. (Don't include
182 the brackets!) The text should be enclosed in the appropriate
183 comment syntax for the file format. We also recommend that a
184 file or class name and description of purpose be included on the
185 same "printed page" as the copyright notice for easier
186 identification within third-party archives.
187
188 Copyright 2018 InfluxData Inc.
189
190 Licensed under the Apache License, Version 2.0 (the "License");
191 you may not use this file except in compliance with the License.
192 You may obtain a copy of the License at
193
194 http://www.apache.org/licenses/LICENSE-2.0
195
196 Unless required by applicable law or agreed to in writing, software
197 distributed under the License is distributed on an "AS IS" BASIS,
198 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
199 See the License for the specific language governing permissions and
200 limitations under the License.
201
0 # tdigest
1
2 This is an implementation of Ted Dunning's [t-digest](https://github.com/tdunning/t-digest/) in Go.
3
4 The implementaion is based off [Derrick Burns' C++ implementation](https://github.com/derrickburns/tdigest).
5
6 ## Example
7
8 ```go
9 package main
10
11 import (
12 "log"
13
14 "github.com/influxdata/tdigest"
15 )
16
17 func main() {
18 td := tdigest.NewWithCompression(1000)
19 for _, x := range []float64{1, 2, 3, 4, 5, 5, 4, 3, 2, 1} {
20 td.Add(x, 1)
21 }
22
23 // Compute Quantiles
24 log.Println("50th", td.Quantile(0.5))
25 log.Println("75th", td.Quantile(0.75))
26 log.Println("90th", td.Quantile(0.9))
27 log.Println("99th", td.Quantile(0.99))
28
29 // Compute CDFs
30 log.Println("CDF(1) = ", td.CDF(1))
31 log.Println("CDF(2) = ", td.CDF(2))
32 log.Println("CDF(3) = ", td.CDF(3))
33 log.Println("CDF(4) = ", td.CDF(4))
34 log.Println("CDF(5) = ", td.CDF(5))
35 }
36 ```
37
38 ## TODO
39
40 Only the methods for a single TDigest have been implemented.
41 The methods to merge two or more existing t-digests into a single t-digest have yet to be implemented.
0 package tdigest
1
2 import (
3 "fmt"
4 "sort"
5 )
6
7 // ErrWeightLessThanZero is used when the weight is not able to be processed.
8 const ErrWeightLessThanZero = Error("centroid weight cannot be less than zero")
9
10 // Error is a domain error encountered while processing tdigests
11 type Error string
12
13 func (e Error) Error() string {
14 return string(e)
15 }
16
17 // Centroid average position of all points in a shape
18 type Centroid struct {
19 Mean float64
20 Weight float64
21 }
22
23 func (c *Centroid) String() string {
24 return fmt.Sprintf("{mean: %f weight: %f}", c.Mean, c.Weight)
25 }
26
27 // Add averages the two centroids together and update this centroid
28 func (c *Centroid) Add(r Centroid) error {
29 if r.Weight < 0 {
30 return ErrWeightLessThanZero
31 }
32 if c.Weight != 0 {
33 c.Weight += r.Weight
34 c.Mean += r.Weight * (r.Mean - c.Mean) / c.Weight
35 } else {
36 c.Weight = r.Weight
37 c.Mean = r.Mean
38 }
39 return nil
40 }
41
42 // CentroidList is sorted by the Mean of the centroid, ascending.
43 type CentroidList []Centroid
44
45 func (l *CentroidList) Clear() {
46 *l = (*l)[0:0]
47 }
48
49 func (l CentroidList) Len() int { return len(l) }
50 func (l CentroidList) Less(i, j int) bool { return l[i].Mean < l[j].Mean }
51 func (l CentroidList) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
52
53 // NewCentroidList creates a priority queue for the centroids
54 func NewCentroidList(centroids []Centroid) CentroidList {
55 l := CentroidList(centroids)
56 sort.Sort(l)
57 return l
58 }
0 package tdigest_test
1
2 import (
3 "testing"
4
5 "github.com/google/go-cmp/cmp"
6 "github.com/influxdata/tdigest"
7 )
8
9 func TestCentroid_Add(t *testing.T) {
10 tests := []struct {
11 name string
12 c tdigest.Centroid
13 r tdigest.Centroid
14 want tdigest.Centroid
15 wantErr bool
16 errStr string
17 }{
18 {
19 name: "error when weight is zero",
20 r: tdigest.Centroid{
21 Weight: -1.0,
22 },
23 wantErr: true,
24 errStr: "centroid weight cannot be less than zero",
25 },
26 {
27 name: "zero weight",
28 c: tdigest.Centroid{
29 Weight: 0.0,
30 Mean: 1.0,
31 },
32 r: tdigest.Centroid{
33 Weight: 1.0,
34 Mean: 2.0,
35 },
36 want: tdigest.Centroid{
37 Weight: 1.0,
38 Mean: 2.0,
39 },
40 },
41 {
42 name: "weight order of magnitude",
43 c: tdigest.Centroid{
44 Weight: 1,
45 Mean: 1,
46 },
47 r: tdigest.Centroid{
48 Weight: 10,
49 Mean: 10,
50 },
51 want: tdigest.Centroid{
52 Weight: 11,
53 Mean: 9.181818181818182,
54 },
55 },
56 }
57 for _, tt := range tests {
58 t.Run(tt.name, func(t *testing.T) {
59 c := &tt.c
60 if err := c.Add(tt.r); (err != nil) != tt.wantErr {
61 t.Errorf("Centroid.Add() error = %v, wantErr %v", err, tt.wantErr)
62 } else if tt.wantErr && err.Error() != tt.errStr {
63 t.Errorf("Centroid.Add() error.Error() = %s, errStr %v", err.Error(), tt.errStr)
64 }
65 if !cmp.Equal(tt.c, tt.want) {
66 t.Errorf("unexprected centroid -want/+got\n%s", cmp.Diff(tt.want, tt.c))
67 }
68 })
69 }
70 }
71
72 func TestNewCentroidList(t *testing.T) {
73 tests := []struct {
74 name string
75 centroids []tdigest.Centroid
76 want tdigest.CentroidList
77 }{
78 {
79 name: "empty list",
80 },
81 {
82 name: "priority should be by mean ascending",
83 centroids: []tdigest.Centroid{
84 {
85 Mean: 2.0,
86 },
87 {
88 Mean: 1.0,
89 },
90 },
91 want: tdigest.CentroidList{
92 {
93 Mean: 1.0,
94 },
95 {
96 Mean: 2.0,
97 },
98 },
99 },
100 {
101 name: "single element should be identity",
102 centroids: []tdigest.Centroid{
103 {
104 Mean: 1.0,
105 },
106 },
107 want: tdigest.CentroidList{
108 {
109 Mean: 1.0,
110 },
111 },
112 },
113 }
114 for _, tt := range tests {
115 t.Run(tt.name, func(t *testing.T) {
116 if got := tdigest.NewCentroidList(tt.centroids); !cmp.Equal(tt.want, got) {
117 t.Errorf("NewCentroidList() = -want/+got %s", cmp.Diff(tt.want, got))
118 }
119 })
120 }
121 }
0 package tdigest
1
2 import (
3 "math"
4 "sort"
5 )
6
7 type TDigest struct {
8 Compression float64
9
10 maxProcessed int
11 maxUnprocessed int
12 processed CentroidList
13 unprocessed CentroidList
14 cumulative []float64
15 processedWeight float64
16 unprocessedWeight float64
17 min float64
18 max float64
19 }
20
21 func New() *TDigest {
22 return NewWithCompression(1000)
23 }
24 func NewWithCompression(c float64) *TDigest {
25 t := &TDigest{
26 Compression: c,
27 }
28 t.maxProcessed = processedSize(0, t.Compression)
29 t.maxUnprocessed = unprocessedSize(0, t.Compression)
30 t.processed = make([]Centroid, 0, t.maxProcessed)
31 t.unprocessed = make([]Centroid, 0, t.maxUnprocessed+1)
32 t.min = math.MaxFloat64
33 t.max = -math.MaxFloat64
34 return t
35 }
36
37 func (t *TDigest) Add(x, w float64) {
38 if math.IsNaN(x) {
39 return
40 }
41 t.AddCentroid(Centroid{Mean: x, Weight: w})
42 }
43
44 func (t *TDigest) AddCentroidList(c CentroidList) {
45 l := c.Len()
46 for i := 0; i < l; i++ {
47 diff := l - i
48 room := t.maxUnprocessed - t.unprocessed.Len()
49 mid := i + diff
50 if room < diff {
51 mid = i + room
52 }
53 for i < mid {
54 t.AddCentroid(c[i])
55 i++
56 }
57 }
58 }
59
60 func (t *TDigest) AddCentroid(c Centroid) {
61 t.unprocessed = append(t.unprocessed, c)
62 t.unprocessedWeight += c.Weight
63
64 if t.processed.Len() > t.maxProcessed ||
65 t.unprocessed.Len() > t.maxUnprocessed {
66 t.process()
67 }
68 }
69
70 func (t *TDigest) process() {
71 if t.unprocessed.Len() > 0 ||
72 t.processed.Len() > t.maxProcessed {
73
74 // Append all processed centroids to the unprocessed list and sort
75 t.unprocessed = append(t.unprocessed, t.processed...)
76 sort.Sort(&t.unprocessed)
77
78 // Reset processed list with first centroid
79 t.processed.Clear()
80 t.processed = append(t.processed, t.unprocessed[0])
81
82 t.processedWeight += t.unprocessedWeight
83 t.unprocessedWeight = 0
84 soFar := t.unprocessed[0].Weight
85 limit := t.processedWeight * t.integratedQ(1.0)
86 for _, centroid := range t.unprocessed[1:] {
87 projected := soFar + centroid.Weight
88 if projected <= limit {
89 soFar = projected
90 (&t.processed[t.processed.Len()-1]).Add(centroid)
91 } else {
92 k1 := t.integratedLocation(soFar / t.processedWeight)
93 limit = t.processedWeight * t.integratedQ(k1+1.0)
94 soFar += centroid.Weight
95 t.processed = append(t.processed, centroid)
96 }
97 }
98 t.min = math.Min(t.min, t.processed[0].Mean)
99 t.max = math.Max(t.max, t.processed[t.processed.Len()-1].Mean)
100 t.updateCumulative()
101 t.unprocessed.Clear()
102 }
103 }
104
105 func (t *TDigest) updateCumulative() {
106 t.cumulative = make([]float64, t.processed.Len()+1)
107 prev := 0.0
108 for i, centroid := range t.processed {
109 cur := centroid.Weight
110 t.cumulative[i] = prev + cur/2.0
111 prev = prev + cur
112 }
113 t.cumulative[t.processed.Len()] = prev
114 }
115
116 func (t *TDigest) Quantile(q float64) float64 {
117 t.process()
118 if q < 0 || q > 1 || t.processed.Len() == 0 {
119 return math.NaN()
120 }
121 if t.processed.Len() == 1 {
122 return t.processed[0].Mean
123 }
124 index := q * t.processedWeight
125 if index <= t.processed[0].Weight/2.0 {
126 return t.min + 2.0*index/t.processed[0].Weight*(t.processed[0].Mean-t.min)
127 }
128
129 lower := sort.Search(len(t.cumulative), func(i int) bool {
130 return t.cumulative[i] >= index
131 })
132
133 if lower+1 != len(t.cumulative) {
134 z1 := index - t.cumulative[lower-1]
135 z2 := t.cumulative[lower] - index
136 return weightedAverage(t.processed[lower-1].Mean, z2, t.processed[lower].Mean, z1)
137 }
138
139 z1 := index - t.processedWeight - t.processed[lower-1].Weight/2.0
140 z2 := (t.processed[lower-1].Weight / 2.0) - z1
141 return weightedAverage(t.processed[t.processed.Len()-1].Mean, z1, t.max, z2)
142 }
143
144 func (t *TDigest) CDF(x float64) float64 {
145 t.process()
146 switch t.processed.Len() {
147 case 0:
148 return 0.0
149 case 1:
150 width := t.max - t.min
151 if x <= t.min {
152 return 0.0
153 }
154 if x >= t.max {
155 return 1.0
156 }
157 if (x - t.min) <= width {
158 // min and max are too close together to do any viable interpolation
159 return 0.5
160 }
161 return (x - t.min) / width
162 }
163
164 if x <= t.min {
165 return 0.0
166 }
167 if x >= t.max {
168 return 1.0
169 }
170 m0 := t.processed[0].Mean
171 // Left Tail
172 if x <= m0 {
173 if m0-t.min > 0 {
174 return (x - t.min) / (m0 - t.min) * t.processed[0].Weight / t.processedWeight / 2.0
175 }
176 return 0.0
177 }
178 // Right Tail
179 mn := t.processed[t.processed.Len()-1].Mean
180 if x >= mn {
181 if t.max-mn > 0.0 {
182 return 1.0 - (t.max-x)/(t.max-mn)*t.processed[t.processed.Len()-1].Weight/t.processedWeight/2.0
183 }
184 return 1.0
185 }
186
187 upper := sort.Search(t.processed.Len(), func(i int) bool {
188 return t.processed[i].Mean > x
189 })
190
191 z1 := x - t.processed[upper-1].Mean
192 z2 := t.processed[upper].Mean - x
193 return weightedAverage(t.cumulative[upper-1], z2, t.cumulative[upper], z1) / t.processedWeight
194 }
195
196 func (t *TDigest) integratedQ(k float64) float64 {
197 return (math.Sin(math.Min(k, t.Compression)*math.Pi/t.Compression-math.Pi/2.0) + 1.0) / 2.0
198 }
199
200 func (t *TDigest) integratedLocation(q float64) float64 {
201 return t.Compression * (math.Asin(2.0*q-1.0) + math.Pi/2.0) / math.Pi
202 }
203
204 func weightedAverage(x1, w1, x2, w2 float64) float64 {
205 if x1 <= x2 {
206 return weightedAverageSorted(x1, w1, x2, w2)
207 }
208 return weightedAverageSorted(x2, w2, x1, w1)
209 }
210
211 func weightedAverageSorted(x1, w1, x2, w2 float64) float64 {
212 x := (x1*w1 + x2*w2) / (w1 + w2)
213 return math.Max(x1, math.Min(x, x2))
214 }
215
216 func processedSize(size int, compression float64) int {
217 if size == 0 {
218 return int(2 * math.Ceil(compression))
219 }
220 return size
221 }
222
223 func unprocessedSize(size int, compression float64) int {
224 if size == 0 {
225 return int(8 * math.Ceil(compression))
226 }
227 return size
228 }
0 package tdigest_test
1
2 import (
3 "math/rand"
4 "testing"
5
6 "github.com/gonum/stat/distuv"
7 "github.com/influxdata/tdigest"
8 )
9
10 const (
11 N = 1e6
12 Mu = 10
13 Sigma = 3
14
15 seed = 42
16 )
17
18 // NormalData is a slice of N random values that are normaly distributed with mean Mu and standard deviation Sigma.
19 var NormalData []float64
20 var UniformData []float64
21
22 var NormalDigest *tdigest.TDigest
23 var UniformDigest *tdigest.TDigest
24
25 func init() {
26 dist := distuv.Normal{
27 Mu: Mu,
28 Sigma: Sigma,
29 Source: rand.New(rand.NewSource(seed)),
30 }
31 uniform := rand.New(rand.NewSource(seed))
32
33 UniformData = make([]float64, N)
34 UniformDigest = tdigest.NewWithCompression(1000)
35
36 NormalData = make([]float64, N)
37 NormalDigest = tdigest.NewWithCompression(1000)
38
39 for i := range NormalData {
40 NormalData[i] = dist.Rand()
41 NormalDigest.Add(NormalData[i], 1)
42
43 UniformData[i] = uniform.Float64() * 100
44 UniformDigest.Add(UniformData[i], 1)
45 }
46 }
47
48 func TestTdigest_Quantile(t *testing.T) {
49 tests := []struct {
50 name string
51 data []float64
52 digest *tdigest.TDigest
53 quantile float64
54 want float64
55 }{
56 {
57 name: "increasing",
58 quantile: 0.5,
59 data: []float64{1, 2, 3, 4, 5},
60 want: 3,
61 },
62 {
63 name: "data in decreasing order",
64 quantile: 0.25,
65 data: []float64{555.349107, 432.842597},
66 want: 432.842597,
67 },
68 {
69 name: "small",
70 quantile: 0.5,
71 data: []float64{1, 2, 3, 4, 5, 5, 4, 3, 2, 1},
72 want: 3,
73 },
74 {
75 name: "small 99 (max)",
76 quantile: 0.99,
77 data: []float64{1, 2, 3, 4, 5, 5, 4, 3, 2, 1},
78 want: 5,
79 },
80 {
81 name: "normal 50",
82 quantile: 0.5,
83 digest: NormalDigest,
84 want: 9.997821231634168,
85 },
86 {
87 name: "normal 90",
88 quantile: 0.9,
89 digest: NormalDigest,
90 want: 13.843815760607427,
91 },
92 {
93 name: "uniform 50",
94 quantile: 0.5,
95 digest: UniformDigest,
96 want: 50.02682856274754,
97 },
98 {
99 name: "uniform 90",
100 quantile: 0.9,
101 digest: UniformDigest,
102 want: 90.02117754660424,
103 },
104 {
105 name: "uniform 99",
106 quantile: 0.99,
107 digest: UniformDigest,
108 want: 99.00246731511771,
109 },
110 {
111 name: "uniform 99.9",
112 quantile: 0.999,
113 digest: UniformDigest,
114 want: 99.90178495422307,
115 },
116 }
117 for _, tt := range tests {
118 t.Run(tt.name, func(t *testing.T) {
119 td := tt.digest
120 if td == nil {
121 td = tdigest.NewWithCompression(1000)
122 for _, x := range tt.data {
123 td.Add(x, 1)
124 }
125 }
126 got := td.Quantile(tt.quantile)
127 if got != tt.want {
128 t.Errorf("unexpected quantile %f, got %g want %g", tt.quantile, got, tt.want)
129 }
130 })
131 }
132 }
133
134 func TestTdigest_CDFs(t *testing.T) {
135 tests := []struct {
136 name string
137 data []float64
138 digest *tdigest.TDigest
139 cdf float64
140 want float64
141 }{
142 {
143 name: "increasing",
144 cdf: 3,
145 data: []float64{1, 2, 3, 4, 5},
146 want: 0.5,
147 },
148 {
149 name: "small",
150 cdf: 4,
151 data: []float64{1, 2, 3, 4, 5, 5, 4, 3, 2, 1},
152 want: 0.75,
153 },
154 {
155 name: "small max",
156 cdf: 5,
157 data: []float64{1, 2, 3, 4, 5, 5, 4, 3, 2, 1},
158 want: 1,
159 },
160 {
161 name: "normal mean",
162 cdf: 10,
163 data: NormalData,
164 want: 0.500298235578106,
165 },
166 {
167 name: "normal high",
168 cdf: -100,
169 data: NormalData,
170 want: 0,
171 },
172 {
173 name: "normal low",
174 cdf: 110,
175 data: NormalData,
176 want: 1,
177 },
178 {
179 name: "uniform 50",
180 cdf: 50,
181 data: UniformData,
182 want: 0.49972989818712815,
183 },
184 {
185 name: "uniform min",
186 cdf: 0,
187 data: UniformData,
188 want: 0,
189 },
190 {
191 name: "uniform max",
192 cdf: 100,
193 data: UniformData,
194 want: 1,
195 },
196 {
197 name: "uniform 10",
198 cdf: 10,
199 data: UniformData,
200 want: 0.099715527526992,
201 },
202 {
203 name: "uniform 90",
204 cdf: 90,
205 data: UniformData,
206 want: 0.8997838903965611,
207 },
208 }
209 for _, tt := range tests {
210 t.Run(tt.name, func(t *testing.T) {
211 td := tt.digest
212 if td == nil {
213 td = tdigest.NewWithCompression(1000)
214 for _, x := range tt.data {
215 td.Add(x, 1)
216 }
217 }
218 got := td.CDF(tt.cdf)
219 if got != tt.want {
220 t.Errorf("unexpected CDF %f, got %g want %g", tt.cdf, got, tt.want)
221 }
222 })
223 }
224 }
225
226 var quantiles = []float64{0.1, 0.5, 0.9, 0.99, 0.999}
227
228 func BenchmarkTDigest_Add(b *testing.B) {
229 for n := 0; n < b.N; n++ {
230 td := tdigest.NewWithCompression(1000)
231 for _, x := range NormalData {
232 td.Add(x, 1)
233 }
234 }
235 }
236 func BenchmarkTDigest_Quantile(b *testing.B) {
237 td := tdigest.NewWithCompression(1000)
238 for _, x := range NormalData {
239 td.Add(x, 1)
240 }
241 b.ResetTimer()
242 var x float64
243 for n := 0; n < b.N; n++ {
244 for _, q := range quantiles {
245 x += td.Quantile(q)
246 }
247 }
248 }
0 # Testing
1
2 This directory contains two programs `main.go` and `main.cpp` which both read three input file compute various quantiles and write out their results.
3 The purpose of these programs is to show that the Go implementaion is accurate as compared to the C++ implementaion.
4
5 The tests can be run using `test.sh`.
6
0 package main
1
2 import (
3 "math/rand"
4 "os"
5 "strconv"
6
7 "github.com/gonum/stat/distuv"
8 )
9
10 const (
11 N = 1e6
12 Mu = 10
13 Sigma = 3
14
15 seed = 42
16 )
17
18 func main() {
19 // Generate uniform and normal data
20 uniform := rand.New(rand.NewSource(seed))
21 dist := distuv.Normal{
22 Mu: Mu,
23 Sigma: Sigma,
24 Source: rand.New(rand.NewSource(seed)),
25 }
26
27 uniformData := make([]float64, N)
28 normalData := make([]float64, N)
29 for i := range normalData {
30 normalData[i] = dist.Rand()
31 uniformData[i] = uniform.Float64() * 100
32 }
33
34 smallData := []float64{1, 2, 3, 4, 5, 5, 4, 3, 2, 1}
35
36 writeData("uniform.dat", uniformData)
37 writeData("normal.dat", normalData)
38 writeData("small.dat", smallData)
39 }
40
41 func writeData(name string, data []float64) {
42 f, err := os.Create(name)
43 if err != nil {
44 panic(err)
45 }
46 defer f.Close()
47
48 buf := make([]byte, 0, 64)
49 for _, x := range data {
50 buf = strconv.AppendFloat(buf, x, 'f', -1, 64)
51 _, err := f.Write(buf)
52 if err != nil {
53 panic(err)
54 }
55 _, err = f.Write([]byte{'\n'})
56 if err != nil {
57 panic(err)
58 }
59 buf = buf[0:0]
60 }
61 }
0 // +build ignore
1
2 #include "tdigest.h"
3 #include <iostream>
4 #include <string>
5 #include <sstream>
6 #include <fstream>
7 #include <vector>
8 #include <iomanip>
9
10 using namespace tdigest;
11
12 double quantiles[7] = {
13 0.1,
14 0.2,
15 0.5,
16 0.75,
17 0.9,
18 0.99,
19 0.999,
20 };
21
22
23 std::string dataFiles[3] = {"small.dat", "uniform.dat", "normal.dat"};
24 double cdfs[3][5] = {
25 // small.dat
26 {0, 1, 4, 5, 6},
27 // uniform.dat
28 {-1, 0, 50, 100, 101},
29 // normal.dat
30 {-100, 7, 10, 13, 110},
31 };
32
33
34 std::vector<double> loadData(std::string name) {
35 std::ifstream f (name);
36 std::vector<double> data;
37
38 f >> std::setprecision(std::numeric_limits<long double>::digits10 + 1);
39 double x;
40 while (f >> x) {
41 data.push_back(x);
42 }
43 return data;
44 }
45
46 TDigest* createTDigest(std::vector<double> data){
47 TDigest* td = new TDigest(1000);
48 for (auto x : data) {
49 td->add(x);
50 }
51 return td;
52 }
53
54 std::vector<double> computeQuantiles(TDigest* td){
55 std::vector<double> results;
56 for (int i = 0; i < 7; i++) {
57 double q = td->quantile(quantiles[i]);
58 results.push_back(q);
59 }
60 return results;
61 }
62
63 std::vector<double> computeCDFs(TDigest* td, double cdfs[5]) {
64 std::vector<double> results;
65 for (int i = 0; i < 5; i++) {
66 double p = td->cdf(cdfs[i]);
67 results.push_back(p);
68 }
69
70 return results;
71 }
72
73 void writeResults(std::string name, std::vector<double> results){
74 std::ofstream f (name);
75
76 f << std::setprecision(std::numeric_limits<long double>::digits10 + 1);
77 for (auto x : results) {
78 f << x << std::endl;
79 }
80 }
81
82 int main() {
83 for (int i = 0; i < 3; i++) {
84 std::vector<double> data = loadData(dataFiles[i]);
85 TDigest* td = createTDigest(data);
86 auto results = computeQuantiles(td);
87 writeResults(dataFiles[i] + ".cpp.quantiles", results);
88 results = computeCDFs(td, cdfs[i]);
89 writeResults(dataFiles[i] + ".cpp.cdfs", results);
90 }
91 return 0;
92 }
0 package main
1
2 import (
3 "bufio"
4 "os"
5 "strconv"
6
7 "github.com/influxdata/tdigest"
8 )
9
10 var quantiles = []float64{
11 0.1,
12 0.2,
13 0.5,
14 0.75,
15 0.9,
16 0.99,
17 0.999,
18 }
19
20 var cdfs = map[string][]float64{
21 "small.dat": []float64{0, 1, 4, 5, 6},
22 "uniform.dat": []float64{-1, 0, 50, 100, 101},
23 "normal.dat": []float64{-100, 7, 10, 13, 110},
24 }
25
26 var dataFiles = []string{
27 "small.dat",
28 "uniform.dat",
29 "normal.dat",
30 }
31
32 func main() {
33 for _, f := range dataFiles {
34 data := loadData(f)
35 td := createTdigest(data)
36 results := computeQuantiles(td, quantiles)
37 writeResults(f+".go.quantiles", results)
38 results = computeCDFs(td, cdfs[f])
39 writeResults(f+".go.cdfs", results)
40 }
41 }
42
43 func loadData(name string) []float64 {
44 f, err := os.Open(name)
45 if err != nil {
46 panic(err)
47 }
48 defer f.Close()
49 s := bufio.NewScanner(f)
50 var data []float64
51 for s.Scan() {
52 x, err := strconv.ParseFloat(s.Text(), 64)
53 if err != nil {
54 panic(err)
55 }
56 data = append(data, x)
57 }
58 return data
59 }
60
61 func createTdigest(data []float64) *tdigest.TDigest {
62 td := tdigest.NewWithCompression(1000)
63 for _, x := range data {
64 td.Add(x, 1)
65 }
66 return td
67 }
68
69 func computeQuantiles(td *tdigest.TDigest, quantiles []float64) (r []float64) {
70 for _, q := range quantiles {
71 r = append(r, td.Quantile(q))
72 }
73 return
74 }
75
76 func computeCDFs(td *tdigest.TDigest, cdfs []float64) (r []float64) {
77 for _, x := range cdfs {
78 r = append(r, td.CDF(x))
79 }
80 return
81 }
82
83 func writeResults(name string, results []float64) {
84 f, err := os.Create(name)
85 if err != nil {
86 panic(err)
87 }
88 defer f.Close()
89 buf := make([]byte, 0, 64)
90 for _, x := range results {
91 buf = strconv.AppendFloat(buf, x, 'f', -1, 64)
92 _, err := f.Write(buf)
93 if err != nil {
94 panic(err)
95 }
96 _, err = f.Write([]byte{'\n'})
97 if err != nil {
98 panic(err)
99 }
100 buf = buf[0:0]
101 }
102 }
0 /*
1 * Licensed to Derrick R. Burns under one or more
2 * contributor license agreements. See the NOTICES file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef TDIGEST2_TDIGEST_H_
18 #define TDIGEST2_TDIGEST_H_
19
20 #include <algorithm>
21 #include <cfloat>
22 #include <cmath>
23 #include <queue>
24 #include <utility>
25 #include <vector>
26 #include <iostream>
27
28 // Modifed from original to remove all external depedencies.
29 #define DLOG(l) std::cerr
30 #define LOG(l) std::cerr
31
32 #define CHECK_LE(x1, x2)
33 #define CHECK_GT(x1, x2)
34 #define CHECK_GE(x1, x2)
35
36 namespace tdigest {
37
38 using Value = double;
39 using Weight = double;
40 using Index = size_t;
41
42 const size_t kHighWater = 40000;
43
44 class Centroid {
45 public:
46 Centroid() : Centroid(0.0, 0.0) {}
47
48 Centroid(Value mean, Weight weight) : mean_(mean), weight_(weight) {}
49
50 inline Value mean() const noexcept { return mean_; }
51
52 inline Weight weight() const noexcept { return weight_; }
53
54 inline void add(const Centroid& c) {
55 CHECK_GT(c.weight_, 0);
56 if( weight_ != 0.0 ) {
57 weight_ += c.weight_;
58 mean_ += c.weight_ * (c.mean_ - mean_) / weight_;
59 } else {
60 weight_ = c.weight_;
61 mean_ = c.mean_;
62 }
63 }
64
65 private:
66 Value mean_ = 0;
67 Weight weight_ = 0;
68 };
69
70 struct CentroidList {
71 CentroidList(const std::vector<Centroid>& s) : iter(s.cbegin()), end(s.cend()) {}
72 std::vector<Centroid>::const_iterator iter;
73 std::vector<Centroid>::const_iterator end;
74
75 bool advance() { return ++iter != end; }
76 };
77
78 class CentroidListComparator {
79 public:
80 CentroidListComparator() {}
81
82 bool operator()(const CentroidList& left, const CentroidList& right) const {
83 return left.iter->mean() > right.iter->mean();
84 }
85 };
86
87 using CentroidListQueue = std::priority_queue<CentroidList, std::vector<CentroidList>, CentroidListComparator>;
88
89 struct CentroidComparator {
90 bool operator()(const Centroid& a, const Centroid& b) const { return a.mean() < b.mean(); }
91 };
92
93 class TDigest {
94 class TDigestComparator {
95 public:
96 TDigestComparator() {}
97
98 bool operator()(const TDigest* left, const TDigest* right) const { return left->totalSize() > right->totalSize(); }
99 };
100
101 using TDigestQueue = std::priority_queue<const TDigest*, std::vector<const TDigest*>, TDigestComparator>;
102
103 public:
104 TDigest() : TDigest(1000) {}
105
106 explicit TDigest(Value compression) : TDigest(compression, 0) {}
107
108 TDigest(Value compression, Index bufferSize) : TDigest(compression, bufferSize, 0) {}
109
110 TDigest(Value compression, Index unmergedSize, Index mergedSize)
111 : compression_(compression),
112 maxProcessed_(processedSize(mergedSize, compression)),
113 maxUnprocessed_(unprocessedSize(unmergedSize, compression)) {
114 processed_.reserve(maxProcessed_);
115 unprocessed_.reserve(maxUnprocessed_ + 1);
116 }
117
118 TDigest(std::vector<Centroid>&& processed, std::vector<Centroid>&& unprocessed, Value compression,
119 Index unmergedSize, Index mergedSize)
120 : TDigest(compression, unmergedSize, mergedSize) {
121 processed_ = std::move(processed);
122 unprocessed_ = std::move(unprocessed);
123
124 processedWeight_ = weight(processed_);
125 unprocessedWeight_ = weight(unprocessed_);
126 if( processed_.size() > 0 ) {
127 min_ = std::min(min_, processed_[0].mean());
128 max_ = std::max(max_, (processed_.cend() - 1)->mean());
129 }
130 updateCumulative();
131 }
132
133 static Weight weight(std::vector<Centroid>& centroids) noexcept {
134 Weight w = 0.0;
135 for (auto centroid : centroids) {
136 w += centroid.weight();
137 }
138 return w;
139 }
140
141 TDigest& operator=(TDigest&& o) {
142 compression_ = o.compression_;
143 maxProcessed_ = o.maxProcessed_;
144 maxUnprocessed_ = o.maxUnprocessed_;
145 processedWeight_ = o.processedWeight_;
146 unprocessedWeight_ = o.unprocessedWeight_;
147 processed_ = std::move(o.processed_);
148 unprocessed_ = std::move(o.unprocessed_);
149 cumulative_ = std::move(o.cumulative_);
150 min_ = o.min_;
151 max_ = o.max_;
152 return *this;
153 }
154
155 TDigest(TDigest&& o)
156 : TDigest(std::move(o.processed_), std::move(o.unprocessed_), o.compression_, o.maxUnprocessed_,
157 o.maxProcessed_) {}
158
159 static inline Index processedSize(Index size, Value compression) noexcept {
160 return (size == 0) ? static_cast<Index>(2 * std::ceil(compression)) : size;
161 }
162
163 static inline Index unprocessedSize(Index size, Value compression) noexcept {
164 return (size == 0) ? static_cast<Index>(8 * std::ceil(compression)) : size;
165 }
166
167 // merge in another t-digest
168 inline void merge(const TDigest* other) {
169 std::vector<const TDigest*> others{other};
170 add(others.cbegin(), others.cend());
171 }
172
173 const std::vector<Centroid>& processed() const { return processed_; }
174
175 const std::vector<Centroid>& unprocessed() const { return unprocessed_; }
176
177 Index maxUnprocessed() const { return maxUnprocessed_; }
178
179 Index maxProcessed() const { return maxProcessed_; }
180
181 inline void add(std::vector<const TDigest*> digests) { add(digests.cbegin(), digests.cend()); }
182
183 // merge in a vector of tdigests in the most efficient manner possible
184 // in constant space
185 // works for any value of kHighWater
186 void add(std::vector<const TDigest*>::const_iterator iter, std::vector<const TDigest*>::const_iterator end) {
187 if (iter != end) {
188 auto size = std::distance(iter, end);
189 TDigestQueue pq(TDigestComparator{});
190 for (; iter != end; iter++) {
191 pq.push((*iter));
192 }
193 std::vector<const TDigest*> batch;
194 batch.reserve(size);
195
196 size_t totalSize = 0;
197 while (!pq.empty()) {
198 auto td = pq.top();
199 batch.push_back(td);
200 pq.pop();
201 totalSize += td->totalSize();
202 if (totalSize >= kHighWater || pq.empty()) {
203 mergeProcessed(batch);
204 mergeUnprocessed(batch);
205 processIfNecessary();
206 batch.clear();
207 totalSize = 0;
208 }
209 }
210 updateCumulative();
211 }
212 }
213
214 Weight processedWeight() const { return processedWeight_; }
215
216 Weight unprocessedWeight() const { return unprocessedWeight_; }
217
218 bool haveUnprocessed() const { return unprocessed_.size() > 0; }
219
220 size_t totalSize() const { return processed_.size() + unprocessed_.size(); }
221
222 long totalWeight() const { return static_cast<long>(processedWeight_ + unprocessedWeight_); }
223
224 // return the cdf on the t-digest
225 Value cdf(Value x) {
226 if (haveUnprocessed() || isDirty()) process();
227 return cdfProcessed(x);
228 }
229
230 bool isDirty() { return processed_.size() > maxProcessed_ || unprocessed_.size() > maxUnprocessed_; }
231
232 // return the cdf on the processed values
233 Value cdfProcessed(Value x) const {
234 DLOG(INFO) << "cdf value " << x;
235 DLOG(INFO) << "processed size " << processed_.size();
236 if (processed_.size() == 0) {
237 // no data to examin_e
238 DLOG(INFO) << "no processed values";
239
240 return 0.0;
241 } else if (processed_.size() == 1) {
242 DLOG(INFO) << "one processed value "
243 << " min_ " << min_ << " max_ " << max_;
244 // exactly one centroid, should have max_==min_
245 auto width = max_ - min_;
246 if (x < min_) {
247 return 0.0;
248 } else if (x > max_) {
249 return 1.0;
250 } else if (x - min_ <= width) {
251 // min_ and max_ are too close together to do any viable interpolation
252 return 0.5;
253 } else {
254 // interpolate if somehow we have weight > 0 and max_ != min_
255 return (x - min_) / (max_ - min_);
256 }
257 } else {
258 auto n = processed_.size();
259 if (x <= min_) {
260 DLOG(INFO) << "below min_ "
261 << " min_ " << min_ << " x " << x;
262 return 0;
263 }
264
265 if (x >= max_) {
266 DLOG(INFO) << "above max_ "
267 << " max_ " << max_ << " x " << x;
268 return 1;
269 }
270
271 // check for the left tail
272 if (x <= mean(0)) {
273 DLOG(INFO) << "left tail "
274 << " min_ " << min_ << " mean(0) " << mean(0) << " x " << x;
275
276 // note that this is different than mean(0) > min_ ... this guarantees interpolation works
277 if (mean(0) - min_ > 0) {
278 return (x - min_) / (mean(0) - min_) * weight(0) / processedWeight_ / 2.0;
279 } else {
280 return 0;
281 }
282 }
283
284 // and the right tail
285 if (x >= mean(n - 1)) {
286 DLOG(INFO) << "right tail"
287 << " max_ " << max_ << " mean(n - 1) " << mean(n - 1) << " x " << x;
288
289 if (max_ - mean(n - 1) > 0) {
290 return 1.0 - (max_ - x) / (max_ - mean(n - 1)) * weight(n - 1) / processedWeight_ / 2.0;
291 } else {
292 return 1;
293 }
294 }
295
296 CentroidComparator cc;
297 auto iter = std::upper_bound(processed_.cbegin(), processed_.cend(), Centroid(x, 0), cc);
298
299 auto i = std::distance(processed_.cbegin(), iter);
300 auto z1 = x - (iter - 1)->mean();
301 auto z2 = (iter)->mean() - x;
302 CHECK_LE(0.0, z1);
303 CHECK_LE(0.0, z2);
304 DLOG(INFO) << "middle "
305 << " z1 " << z1 << " z2 " << z2 << " x " << x;
306
307 return weightedAverage(cumulative_[i - 1], z2, cumulative_[i], z1) / processedWeight_;
308 }
309 }
310
311 // this returns a quantile on the t-digest
312 Value quantile(Value q) {
313 if (haveUnprocessed() || isDirty()) process();
314 return quantileProcessed(q);
315 }
316
317 // this returns a quantile on the currently processed values without changing the t-digest
318 // the value will not represent the unprocessed values
319 Value quantileProcessed(Value q) const {
320 if (q < 0 || q > 1) {
321 LOG(ERROR) << "q should be in [0,1], got " << q;
322 return NAN;
323 }
324
325 if (processed_.size() == 0) {
326 // no sorted means no data, no way to get a quantile
327 return NAN;
328 } else if (processed_.size() == 1) {
329 // with one data point, all quantiles lead to Rome
330
331 return mean(0);
332 }
333
334 // we know that there are at least two sorted now
335 auto n = processed_.size();
336
337 // if values were stored in a sorted array, index would be the offset we are Weighterested in
338 const auto index = q * processedWeight_;
339
340 // at the boundaries, we return min_ or max_
341 if (index < weight(0) / 2.0) {
342 CHECK_GT(weight(0), 0);
343 return min_ + 2.0 * index / weight(0) * (mean(0) - min_);
344 }
345
346 auto iter = std::lower_bound(cumulative_.cbegin(), cumulative_.cend(), index);
347
348 if (iter + 1 != cumulative_.cend()) {
349 auto i = std::distance(cumulative_.cbegin(), iter);
350 auto z1 = index - *(iter - 1);
351 auto z2 = *(iter)-index;
352 // LOG(INFO) << "z2 " << z2 << " index " << index << " z1 " << z1;
353 return weightedAverage(mean(i - 1), z2, mean(i), z1);
354 }
355
356 CHECK_LE(index, processedWeight_);
357 CHECK_GE(index, processedWeight_ - weight(n - 1) / 2.0);
358
359 auto z1 = index - processedWeight_ - weight(n - 1) / 2.0;
360 auto z2 = weight(n - 1) / 2 - z1;
361 return weightedAverage(mean(n - 1), z1, max_, z2);
362 }
363
364 Value compression() const { return compression_; }
365
366 void add(Value x) { add(x, 1); }
367
368 inline void compress() { process(); }
369
370 // add a single centroid to the unprocessed vector, processing previously unprocessed sorted if our limit has
371 // been reached.
372 inline bool add(Value x, Weight w) {
373 if (std::isnan(x)) {
374 return false;
375 }
376 unprocessed_.push_back(Centroid(x, w));
377 unprocessedWeight_ += w;
378 processIfNecessary();
379 return true;
380 }
381
382 inline void add(std::vector<Centroid>::const_iterator iter, std::vector<Centroid>::const_iterator end) {
383 while (iter != end) {
384 const size_t diff = std::distance(iter, end);
385 const size_t room = maxUnprocessed_ - unprocessed_.size();
386 auto mid = iter + std::min(diff, room);
387 while (iter != mid) unprocessed_.push_back(*(iter++));
388 if (unprocessed_.size() >= maxUnprocessed_) {
389 process();
390 }
391 }
392 }
393
394 private:
395 Value compression_;
396
397 Value min_ = std::numeric_limits<Value>::max();
398
399 Value max_ = std::numeric_limits<Value>::min();
400
401 Index maxProcessed_;
402
403 Index maxUnprocessed_;
404
405 Value processedWeight_ = 0.0;
406
407 Value unprocessedWeight_ = 0.0;
408
409 std::vector<Centroid> processed_;
410
411 std::vector<Centroid> unprocessed_;
412
413 std::vector<Weight> cumulative_;
414
415 // return mean of i-th centroid
416 inline Value mean(int i) const noexcept { return processed_[i].mean(); }
417
418 // return weight of i-th centroid
419 inline Weight weight(int i) const noexcept { return processed_[i].weight(); }
420
421 // append all unprocessed centroids into current unprocessed vector
422 void mergeUnprocessed(const std::vector<const TDigest*>& tdigests) {
423 if (tdigests.size() == 0) return;
424
425 size_t total = unprocessed_.size();
426 for (auto& td : tdigests) {
427 total += td->unprocessed_.size();
428 }
429
430 unprocessed_.reserve(total);
431 for (auto& td : tdigests) {
432 unprocessed_.insert(unprocessed_.end(), td->unprocessed_.cbegin(), td->unprocessed_.cend());
433 unprocessedWeight_ += td->unprocessedWeight_;
434 }
435 }
436
437 // merge all processed centroids together into a single sorted vector
438 void mergeProcessed(const std::vector<const TDigest*>& tdigests) {
439 if (tdigests.size() == 0) return;
440
441 size_t total = 0;
442 CentroidListQueue pq(CentroidListComparator{});
443 for (auto& td : tdigests) {
444 auto& sorted = td->processed_;
445 auto size = sorted.size();
446 if (size > 0) {
447 pq.push(CentroidList(sorted));
448 total += size;
449 processedWeight_ += td->processedWeight_;
450 }
451 }
452 if (total == 0) return;
453
454 if (processed_.size() > 0) {
455 pq.push(CentroidList(processed_));
456 total += processed_.size();
457 }
458
459 std::vector<Centroid> sorted;
460 LOG(INFO) << "total " << total;
461 sorted.reserve(total);
462
463 while (!pq.empty()) {
464 auto best = pq.top();
465 pq.pop();
466 sorted.push_back(*(best.iter));
467 if (best.advance()) pq.push(best);
468 }
469 processed_ = std::move(sorted);
470 if( processed_.size() > 0 ) {
471 min_ = std::min(min_, processed_[0].mean());
472 max_ = std::max(max_, (processed_.cend() - 1)->mean());
473 }
474 }
475
476 inline void processIfNecessary() {
477 if (isDirty()) {
478 process();
479 }
480 }
481
482 void updateCumulative() {
483 const auto n = processed_.size();
484 cumulative_.clear();
485 cumulative_.reserve(n + 1);
486 auto previous = 0.0;
487 for (Index i = 0; i < n; i++) {
488 auto current = weight(i);
489 auto halfCurrent = current / 2.0;
490 cumulative_.push_back(previous + halfCurrent);
491 previous = previous + current;
492 }
493 cumulative_.push_back(previous);
494 }
495
496 // merges unprocessed_ centroids and processed_ centroids together and processes them
497 // when complete, unprocessed_ will be empty and processed_ will have at most maxProcessed_ centroids
498 inline void process() {
499 CentroidComparator cc;
500 std::sort(unprocessed_.begin(), unprocessed_.end(), cc);
501 auto count = unprocessed_.size();
502 unprocessed_.insert(unprocessed_.end(), processed_.cbegin(), processed_.cend());
503 std::inplace_merge(unprocessed_.begin(), unprocessed_.begin() + count, unprocessed_.end(), cc);
504
505 processedWeight_ += unprocessedWeight_;
506 unprocessedWeight_ = 0;
507 processed_.clear();
508
509 processed_.push_back(unprocessed_[0]);
510 Weight wSoFar = unprocessed_[0].weight();
511 Weight wLimit = processedWeight_ * integratedQ(1.0);
512
513 auto end = unprocessed_.end();
514 for (auto iter = unprocessed_.cbegin() + 1; iter < end; iter++) {
515 auto& centroid = *iter;
516 Weight projectedW = wSoFar + centroid.weight();
517 if (projectedW <= wLimit) {
518 wSoFar = projectedW;
519 (processed_.end() - 1)->add(centroid);
520 } else {
521 auto k1 = integratedLocation(wSoFar / processedWeight_);
522 wLimit = processedWeight_ * integratedQ(k1 + 1.0);
523 wSoFar += centroid.weight();
524 processed_.emplace_back(centroid);
525 }
526 }
527 unprocessed_.clear();
528 min_ = std::min(min_, processed_[0].mean());
529 DLOG(INFO) << "new min_ " << min_;
530 max_ = std::max(max_, (processed_.cend() - 1)->mean());
531 DLOG(INFO) << "new max_ " << max_;
532 updateCumulative();
533 }
534
535 inline int checkWeights() { return checkWeights(processed_, processedWeight_); }
536
537 size_t checkWeights(const std::vector<Centroid>& sorted, Value total) {
538 size_t badWeight = 0;
539 auto k1 = 0.0;
540 auto q = 0.0;
541 for (auto iter = sorted.cbegin(); iter != sorted.cend(); iter++) {
542 auto w = iter->weight();
543 auto dq = w / total;
544 auto k2 = integratedLocation(q + dq);
545 if (k2 - k1 > 1 && w != 1) {
546 LOG(WARNING) << "Oversize centroid at " << std::distance(sorted.cbegin(), iter) << " k1 " << k1 << " k2 " << k2
547 << " dk " << (k2 - k1) << " w " << w << " q " << q;
548 badWeight++;
549 }
550 if (k2 - k1 > 1.5 && w != 1) {
551 LOG(ERROR) << "Egregiously Oversize centroid at " << std::distance(sorted.cbegin(), iter) << " k1 " << k1
552 << " k2 " << k2 << " dk " << (k2 - k1) << " w " << w << " q " << q;
553 badWeight++;
554 }
555 q += dq;
556 k1 = k2;
557 }
558
559 return badWeight;
560 }
561
562 /**
563 * Converts a quantile into a centroid scale value. The centroid scale is nomin_ally
564 * the number k of the centroid that a quantile point q should belong to. Due to
565 * round-offs, however, we can't align things perfectly without splitting points
566 * and sorted. We don't want to do that, so we have to allow for offsets.
567 * In the end, the criterion is that any quantile range that spans a centroid
568 * scale range more than one should be split across more than one centroid if
569 * possible. This won't be possible if the quantile range refers to a single point
570 * or an already existing centroid.
571 * <p/>
572 * This mapping is steep near q=0 or q=1 so each centroid there will correspond to
573 * less q range. Near q=0.5, the mapping is flatter so that sorted there will
574 * represent a larger chunk of quantiles.
575 *
576 * @param q The quantile scale value to be mapped.
577 * @return The centroid scale value corresponding to q.
578 */
579 inline Value integratedLocation(Value q) const {
580 return compression_ * (std::asin(2.0 * q - 1.0) + M_PI / 2) / M_PI;
581 }
582
583 inline Value integratedQ(Value k) const {
584 return (std::sin(std::min(k, compression_) * M_PI / compression_ - M_PI / 2) + 1) / 2;
585 }
586
587 /**
588 * Same as {@link #weightedAverageSorted(Value, Value, Value, Value)} but flips
589 * the order of the variables if <code>x2</code> is greater than
590 * <code>x1</code>.
591 */
592 static Value weightedAverage(Value x1, Value w1, Value x2, Value w2) {
593 return (x1 <= x2) ? weightedAverageSorted(x1, w1, x2, w2) : weightedAverageSorted(x2, w2, x1, w1);
594 }
595
596 /**
597 * Compute the weighted average between <code>x1</code> with a weight of
598 * <code>w1</code> and <code>x2</code> with a weight of <code>w2</code>.
599 * This expects <code>x1</code> to be less than or equal to <code>x2</code>
600 * and is guaranteed to return a number between <code>x1</code> and
601 * <code>x2</code>.
602 */
603 static Value weightedAverageSorted(Value x1, Value w1, Value x2, Value w2) {
604 CHECK_LE(x1, x2);
605 const Value x = (x1 * w1 + x2 * w2) / (w1 + w2);
606 return std::max(x1, std::min(x, x2));
607 }
608
609 static Value interpolate(Value x, Value x0, Value x1) { return (x - x0) / (x1 - x0); }
610
611 /**
612 * Computes an interpolated value of a quantile that is between two sorted.
613 *
614 * Index is the quantile desired multiplied by the total number of samples - 1.
615 *
616 * @param index Denormalized quantile desired
617 * @param previousIndex The denormalized quantile corresponding to the center of the previous centroid.
618 * @param nextIndex The denormalized quantile corresponding to the center of the following centroid.
619 * @param previousMean The mean of the previous centroid.
620 * @param nextMean The mean of the following centroid.
621 * @return The interpolated mean.
622 */
623 static Value quantile(Value index, Value previousIndex, Value nextIndex, Value previousMean, Value nextMean) {
624 const auto delta = nextIndex - previousIndex;
625 const auto previousWeight = (nextIndex - index) / delta;
626 const auto nextWeight = (index - previousIndex) / delta;
627 return previousMean * previousWeight + nextMean * nextWeight;
628 }
629 };
630
631 } // namespace tdigest2
632
633 #endif // TDIGEST2_TDIGEST_H_
634
0 #!/bin/bash
1
2 set -e
3
4 DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
5 cd "$DIR"
6
7 go run gen.go
8 go run main.go
9 g++ -o cpp.test main.cpp
10 ./cpp.test 2>/dev/null
11 rm cpp.test
12
13 go run validate.go
0 package main
1
2 import (
3 "bufio"
4 "log"
5 "math"
6 "os"
7 "strconv"
8 "strings"
9 )
10
11 var dataFiles = []string{
12 "small.dat",
13 "uniform.dat",
14 "normal.dat",
15 }
16
17 const (
18 cppQExt = ".cpp.quantiles"
19 goQExt = ".go.quantiles"
20
21 cppCDFExt = ".cpp.cdfs"
22 goCDFExt = ".go.cdfs"
23
24 epsilon = 1e-6
25 )
26
27 func main() {
28 for _, f := range dataFiles {
29 // Validate Quantiles
30 cppQuantiles := loadResults(f + cppQExt)
31 goQuantiles := loadResults(f + goQExt)
32 if len(cppQuantiles) != len(goQuantiles) {
33 log.Fatal("differing number of quantiles results")
34 }
35
36 for i := range cppQuantiles {
37 if math.Abs(cppQuantiles[i]-goQuantiles[i]) > epsilon {
38 log.Fatalf("differing quantile result go: %f cpp: %f", goQuantiles[i], cppQuantiles[i])
39 }
40 }
41
42 // Validate CDFs
43 cppCDFs := loadResults(f + cppCDFExt)
44 goCDFs := loadResults(f + goCDFExt)
45 if len(cppCDFs) != len(goCDFs) {
46 log.Fatal("differing number of CDFs results")
47 }
48
49 for i := range cppCDFs {
50 if math.Abs(cppCDFs[i]-goCDFs[i]) > epsilon {
51 log.Fatalf("differing CDF result go: %f cpp: %f", goCDFs[i], cppCDFs[i])
52 }
53 }
54 }
55 }
56
57 func loadResults(name string) []float64 {
58 f, err := os.Open(name)
59 if err != nil {
60 panic(err)
61 }
62 defer f.Close()
63 s := bufio.NewScanner(f)
64 var data []float64
65 for s.Scan() {
66 parts := strings.SplitN(s.Text(), " ", 2)
67 x, err := strconv.ParseFloat(parts[0], 64)
68 if err != nil {
69 panic(err)
70 }
71 data = append(data, x)
72 }
73 return data
74 }