package kafka
import (
"sync/atomic"
"time"
)
// SummaryStats is a data structure that carries a summary of observed values.
// The average, minimum, and maximum are reported.
type SummaryStats struct {
Avg int64 `metric:"avg" type:"gauge"`
Min int64 `metric:"min" type:"gauge"`
Max int64 `metric:"max" type:"gauge"`
}
// DurationStats is a data structure that carries a summary of observed duration
// values. The average, minimum, and maximum are reported.
type DurationStats struct {
Avg time.Duration `metric:"avg" type:"gauge"`
Min time.Duration `metric:"min" type:"gauge"`
Max time.Duration `metric:"max" type:"gauge"`
}
// counter is an atomic incrementing counter which gets reset on snapshot.
//
// Since atomic is used to mutate the statistic the value must be 64-bit aligned.
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
type counter int64
func (c *counter) ptr() *int64 {
return (*int64)(c)
}
func (c *counter) observe(v int64) {
atomic.AddInt64(c.ptr(), v)
}
func (c *counter) snapshot() int64 {
p := c.ptr()
v := atomic.LoadInt64(p)
atomic.AddInt64(p, -v)
return v
}
// gauge is an atomic integer that may be set to any arbitrary value, the value
// does not change after a snapshot.
//
// Since atomic is used to mutate the statistic the value must be 64-bit aligned.
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
type gauge int64
func (g *gauge) ptr() *int64 {
return (*int64)(g)
}
func (g *gauge) observe(v int64) {
atomic.StoreInt64(g.ptr(), v)
}
func (g *gauge) snapshot() int64 {
return atomic.LoadInt64(g.ptr())
}
// minimum is an atomic integral type that keeps track of the minimum of all
// values that it observed between snapshots.
//
// Since atomic is used to mutate the statistic the value must be 64-bit aligned.
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
type minimum int64
func (m *minimum) ptr() *int64 {
return (*int64)(m)
}
func (m *minimum) observe(v int64) {
for {
ptr := m.ptr()
min := atomic.LoadInt64(ptr)
if min >= 0 && min <= v {
break
}
if atomic.CompareAndSwapInt64(ptr, min, v) {
break
}
}
}
func (m *minimum) snapshot() int64 {
p := m.ptr()
v := atomic.LoadInt64(p)
atomic.CompareAndSwapInt64(p, v, -1)
if v < 0 {
v = 0
}
return v
}
// maximum is an atomic integral type that keeps track of the maximum of all
// values that it observed between snapshots.
//
// Since atomic is used to mutate the statistic the value must be 64-bit aligned.
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
type maximum int64
func (m *maximum) ptr() *int64 {
return (*int64)(m)
}
func (m *maximum) observe(v int64) {
for {
ptr := m.ptr()
max := atomic.LoadInt64(ptr)
if max >= 0 && max >= v {
break
}
if atomic.CompareAndSwapInt64(ptr, max, v) {
break
}
}
}
func (m *maximum) snapshot() int64 {
p := m.ptr()
v := atomic.LoadInt64(p)
atomic.CompareAndSwapInt64(p, v, -1)
if v < 0 {
v = 0
}
return v
}
type summary struct {
min minimum
max maximum
sum counter
count counter
}
func makeSummary() summary {
return summary{
min: -1,
max: -1,
}
}
func (s *summary) observe(v int64) {
s.min.observe(v)
s.max.observe(v)
s.sum.observe(v)
s.count.observe(1)
}
func (s *summary) observeDuration(v time.Duration) {
s.observe(int64(v))
}
func (s *summary) snapshot() SummaryStats {
avg := int64(0)
min := s.min.snapshot()
max := s.max.snapshot()
sum := s.sum.snapshot()
count := s.count.snapshot()
if count != 0 {
avg = int64(float64(sum) / float64(count))
}
return SummaryStats{
Avg: avg,
Min: min,
Max: max,
}
}
func (s *summary) snapshotDuration() DurationStats {
summary := s.snapshot()
return DurationStats{
Avg: time.Duration(summary.Avg),
Min: time.Duration(summary.Min),
Max: time.Duration(summary.Max),
}
}