Codebase list golang-github-influxdata-tdigest / 881c7289-cc37-407b-b4f2-5766753c6b15/upstream tdigest.go
881c7289-cc37-407b-b4f2-5766753c6b15/upstream

Tree @881c7289-cc37-407b-b4f2-5766753c6b15/upstream (Download .tar.gz)

tdigest.go @881c7289-cc37-407b-b4f2-5766753c6b15/upstreamraw · history · blame

package tdigest

import (
	"math"
	"sort"
)

// TDigest is a data structure for accurate on-line accumulation of
// rank-based statistics such as quantiles and trimmed means.
type TDigest struct {
	Compression float64

	maxProcessed      int
	maxUnprocessed    int
	processed         CentroidList
	unprocessed       CentroidList
	cumulative        []float64
	processedWeight   float64
	unprocessedWeight float64
	min               float64
	max               float64
}

// New initializes a new distribution with a default compression.
func New() *TDigest {
	return NewWithCompression(1000)
}

// NewWithCompression initializes a new distribution with custom compression.
func NewWithCompression(c float64) *TDigest {
	t := &TDigest{
		Compression: c,
	}
	t.maxProcessed = processedSize(0, t.Compression)
	t.maxUnprocessed = unprocessedSize(0, t.Compression)
	t.processed = make(CentroidList, 0, t.maxProcessed)
	t.unprocessed = make(CentroidList, 0, t.maxUnprocessed+1)
	t.Reset()
	return t
}

// Calculate number of bytes needed for a tdigest of size c,
// where c is the compression value
func ByteSizeForCompression(comp float64) int {
	c := int(comp)
	// // A centroid is 2 float64s, so we need 16 bytes for each centroid
	// float_size := 8
	// centroid_size := 2 * float_size

	// // Unprocessed and processed can grow up to length c
	// unprocessed_size := centroid_size * c
	// processed_size := unprocessed_size

	// // the cumulative field can also be of length c, but each item is a single float64
	// cumulative_size := float_size * c // <- this could also be unprocessed_size / 2

	// return unprocessed_size + processed_size + cumulative_size

	// // or, more succinctly:
	// return float_size * c * 5

	// or even more succinctly
	return c * 40
}

// Reset resets the distribution to its initial state.
func (t *TDigest) Reset() {
	t.processed = t.processed[:0]
	t.unprocessed = t.unprocessed[:0]
	t.cumulative = t.cumulative[:0]
	t.processedWeight = 0
	t.unprocessedWeight = 0
	t.min = math.MaxFloat64
	t.max = -math.MaxFloat64
}

// Add adds a value x with a weight w to the distribution.
func (t *TDigest) Add(x, w float64) {
	t.AddCentroid(Centroid{Mean: x, Weight: w})
}

// AddCentroidList can quickly add multiple centroids.
func (t *TDigest) AddCentroidList(c CentroidList) {
	// It's possible to optimize this by bulk-copying the slice, but this
	// yields just a 1-2% speedup (most time is in process()), so not worth
	// the complexity.
	for i := range c {
		t.AddCentroid(c[i])
	}
}

// AddCentroid adds a single centroid.
// Weights which are not a number or are <= 0 are ignored, as are NaN means.
func (t *TDigest) AddCentroid(c Centroid) {
	if math.IsNaN(c.Mean) || c.Weight <= 0 || math.IsNaN(c.Weight) || math.IsInf(c.Weight, 1) {
		return
	}

	t.unprocessed = append(t.unprocessed, c)
	t.unprocessedWeight += c.Weight

	if t.processed.Len() > t.maxProcessed ||
		t.unprocessed.Len() > t.maxUnprocessed {
		t.process()
	}
}

// Merges the supplied digest into this digest. Functionally equivalent to
// calling t.AddCentroidList(t2.Centroids(nil)), but avoids making an extra
// copy of the CentroidList.
func (t *TDigest) Merge(t2 *TDigest) {
	t2.process()
	t.AddCentroidList(t2.processed)
}

func (t *TDigest) process() {
	if t.unprocessed.Len() > 0 ||
		t.processed.Len() > t.maxProcessed {

		// Append all processed centroids to the unprocessed list and sort
		t.unprocessed = append(t.unprocessed, t.processed...)
		sort.Sort(&t.unprocessed)

		// Reset processed list with first centroid
		t.processed.Clear()
		t.processed = append(t.processed, t.unprocessed[0])

		t.processedWeight += t.unprocessedWeight
		t.unprocessedWeight = 0
		soFar := t.unprocessed[0].Weight
		limit := t.processedWeight * t.integratedQ(1.0)
		for _, centroid := range t.unprocessed[1:] {
			projected := soFar + centroid.Weight
			if projected <= limit {
				soFar = projected
				(&t.processed[t.processed.Len()-1]).Add(centroid)
			} else {
				k1 := t.integratedLocation(soFar / t.processedWeight)
				limit = t.processedWeight * t.integratedQ(k1+1.0)
				soFar += centroid.Weight
				t.processed = append(t.processed, centroid)
			}
		}
		t.min = math.Min(t.min, t.processed[0].Mean)
		t.max = math.Max(t.max, t.processed[t.processed.Len()-1].Mean)
		t.unprocessed.Clear()
	}
}

// Centroids returns a copy of processed centroids.
// Useful when aggregating multiple t-digests.
//
// Centroids are appended to the passed CentroidList; if you're re-using a
// buffer, be sure to pass cl[:0].
func (t *TDigest) Centroids(cl CentroidList) CentroidList {
	t.process()
	return append(cl, t.processed...)
}

func (t *TDigest) Count() float64 {
	t.process()

	// t.process always updates t.processedWeight to the total count of all
	// centroids, so we don't need to re-count here.
	return t.processedWeight
}

func (t *TDigest) updateCumulative() {
	// Weight can only increase, so the final cumulative value will always be
	// either equal to, or less than, the total weight. If they are the same,
	// then nothing has changed since the last update.
	if len(t.cumulative) > 0 && t.cumulative[len(t.cumulative)-1] == t.processedWeight {
		return
	}

	if n := t.processed.Len() + 1; n <= cap(t.cumulative) {
		t.cumulative = t.cumulative[:n]
	} else {
		t.cumulative = make([]float64, n)
	}

	prev := 0.0
	for i, centroid := range t.processed {
		cur := centroid.Weight
		t.cumulative[i] = prev + cur/2.0
		prev = prev + cur
	}
	t.cumulative[t.processed.Len()] = prev
}

// Quantile returns the (approximate) quantile of
// the distribution. Accepted values for q are between 0.0 and 1.0.
// Returns NaN if Count is zero or bad inputs.
func (t *TDigest) Quantile(q float64) float64 {
	t.process()
	t.updateCumulative()
	if q < 0 || q > 1 || t.processed.Len() == 0 {
		return math.NaN()
	}
	if t.processed.Len() == 1 {
		return t.processed[0].Mean
	}
	index := q * t.processedWeight
	if index <= t.processed[0].Weight/2.0 {
		return t.min + 2.0*index/t.processed[0].Weight*(t.processed[0].Mean-t.min)
	}

	lower := sort.Search(len(t.cumulative), func(i int) bool {
		return t.cumulative[i] >= index
	})

	if lower+1 != len(t.cumulative) {
		z1 := index - t.cumulative[lower-1]
		z2 := t.cumulative[lower] - index
		return weightedAverage(t.processed[lower-1].Mean, z2, t.processed[lower].Mean, z1)
	}

	z1 := index - t.processedWeight - t.processed[lower-1].Weight/2.0
	z2 := (t.processed[lower-1].Weight / 2.0) - z1
	return weightedAverage(t.processed[t.processed.Len()-1].Mean, z1, t.max, z2)
}

// CDF returns the cumulative distribution function for a given value x.
func (t *TDigest) CDF(x float64) float64 {
	t.process()
	t.updateCumulative()
	switch t.processed.Len() {
	case 0:
		return 0.0
	case 1:
		width := t.max - t.min
		if x <= t.min {
			return 0.0
		}
		if x >= t.max {
			return 1.0
		}
		if (x - t.min) <= width {
			// min and max are too close together to do any viable interpolation
			return 0.5
		}
		return (x - t.min) / width
	}

	if x <= t.min {
		return 0.0
	}
	if x >= t.max {
		return 1.0
	}
	m0 := t.processed[0].Mean
	// Left Tail
	if x <= m0 {
		if m0-t.min > 0 {
			return (x - t.min) / (m0 - t.min) * t.processed[0].Weight / t.processedWeight / 2.0
		}
		return 0.0
	}
	// Right Tail
	mn := t.processed[t.processed.Len()-1].Mean
	if x >= mn {
		if t.max-mn > 0.0 {
			return 1.0 - (t.max-x)/(t.max-mn)*t.processed[t.processed.Len()-1].Weight/t.processedWeight/2.0
		}
		return 1.0
	}

	upper := sort.Search(t.processed.Len(), func(i int) bool {
		return t.processed[i].Mean > x
	})

	z1 := x - t.processed[upper-1].Mean
	z2 := t.processed[upper].Mean - x
	return weightedAverage(t.cumulative[upper-1], z2, t.cumulative[upper], z1) / t.processedWeight
}

func (t *TDigest) integratedQ(k float64) float64 {
	return (math.Sin(math.Min(k, t.Compression)*math.Pi/t.Compression-math.Pi/2.0) + 1.0) / 2.0
}

func (t *TDigest) integratedLocation(q float64) float64 {
	return t.Compression * (math.Asin(2.0*q-1.0) + math.Pi/2.0) / math.Pi
}

func weightedAverage(x1, w1, x2, w2 float64) float64 {
	if x1 <= x2 {
		return weightedAverageSorted(x1, w1, x2, w2)
	}
	return weightedAverageSorted(x2, w2, x1, w1)
}

func weightedAverageSorted(x1, w1, x2, w2 float64) float64 {
	x := (x1*w1 + x2*w2) / (w1 + w2)
	return math.Max(x1, math.Min(x, x2))
}

func processedSize(size int, compression float64) int {
	if size == 0 {
		return int(2 * math.Ceil(compression))
	}
	return size
}

func unprocessedSize(size int, compression float64) int {
	if size == 0 {
		return int(8 * math.Ceil(compression))
	}
	return size
}