Codebase list golang-github-segmentio-kafka-go / multiarch-fixes/main stats.go
multiarch-fixes/main

Tree @multiarch-fixes/main (Download .tar.gz)

stats.go @multiarch-fixes/mainraw · history · blame

package kafka

import (
	"sync/atomic"
	"time"
)

// SummaryStats is a data structure that carries a summary of observed values.
// The average, minimum, and maximum are reported.
type SummaryStats struct {
	Avg int64 `metric:"avg" type:"gauge"`
	Min int64 `metric:"min" type:"gauge"`
	Max int64 `metric:"max" type:"gauge"`
}

// DurationStats is a data structure that carries a summary of observed duration
// values. The average, minimum, and maximum are reported.
type DurationStats struct {
	Avg time.Duration `metric:"avg" type:"gauge"`
	Min time.Duration `metric:"min" type:"gauge"`
	Max time.Duration `metric:"max" type:"gauge"`
}

// counter is an atomic incrementing counter which gets reset on snapshot.
//
// Since atomic is used to mutate the statistic the value must be 64-bit aligned.
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
type counter int64

func (c *counter) ptr() *int64 {
	return (*int64)(c)
}

func (c *counter) observe(v int64) {
	atomic.AddInt64(c.ptr(), v)
}

func (c *counter) snapshot() int64 {
	p := c.ptr()
	v := atomic.LoadInt64(p)
	atomic.AddInt64(p, -v)
	return v
}

// gauge is an atomic integer that may be set to any arbitrary value, the value
// does not change after a snapshot.
//
// Since atomic is used to mutate the statistic the value must be 64-bit aligned.
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
type gauge int64

func (g *gauge) ptr() *int64 {
	return (*int64)(g)
}

func (g *gauge) observe(v int64) {
	atomic.StoreInt64(g.ptr(), v)
}

func (g *gauge) snapshot() int64 {
	return atomic.LoadInt64(g.ptr())
}

// minimum is an atomic integral type that keeps track of the minimum of all
// values that it observed between snapshots.
//
// Since atomic is used to mutate the statistic the value must be 64-bit aligned.
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
type minimum int64

func (m *minimum) ptr() *int64 {
	return (*int64)(m)
}

func (m *minimum) observe(v int64) {
	for {
		ptr := m.ptr()
		min := atomic.LoadInt64(ptr)

		if min >= 0 && min <= v {
			break
		}

		if atomic.CompareAndSwapInt64(ptr, min, v) {
			break
		}
	}
}

func (m *minimum) snapshot() int64 {
	p := m.ptr()
	v := atomic.LoadInt64(p)
	atomic.CompareAndSwapInt64(p, v, -1)
	if v < 0 {
		v = 0
	}
	return v
}

// maximum is an atomic integral type that keeps track of the maximum of all
// values that it observed between snapshots.
//
// Since atomic is used to mutate the statistic the value must be 64-bit aligned.
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
type maximum int64

func (m *maximum) ptr() *int64 {
	return (*int64)(m)
}

func (m *maximum) observe(v int64) {
	for {
		ptr := m.ptr()
		max := atomic.LoadInt64(ptr)

		if max >= 0 && max >= v {
			break
		}

		if atomic.CompareAndSwapInt64(ptr, max, v) {
			break
		}
	}
}

func (m *maximum) snapshot() int64 {
	p := m.ptr()
	v := atomic.LoadInt64(p)
	atomic.CompareAndSwapInt64(p, v, -1)
	if v < 0 {
		v = 0
	}
	return v
}

type summary struct {
	min   minimum
	max   maximum
	sum   counter
	count counter
}

func makeSummary() summary {
	return summary{
		min: -1,
		max: -1,
	}
}

func (s *summary) observe(v int64) {
	s.min.observe(v)
	s.max.observe(v)
	s.sum.observe(v)
	s.count.observe(1)
}

func (s *summary) observeDuration(v time.Duration) {
	s.observe(int64(v))
}

func (s *summary) snapshot() SummaryStats {
	avg := int64(0)
	min := s.min.snapshot()
	max := s.max.snapshot()
	sum := s.sum.snapshot()
	count := s.count.snapshot()

	if count != 0 {
		avg = int64(float64(sum) / float64(count))
	}

	return SummaryStats{
		Avg: avg,
		Min: min,
		Max: max,
	}
}

func (s *summary) snapshotDuration() DurationStats {
	summary := s.snapshot()
	return DurationStats{
		Avg: time.Duration(summary.Avg),
		Min: time.Duration(summary.Min),
		Max: time.Duration(summary.Max),
	}
}