diff --git a/decor/counters.go b/decor/counters.go index 66fd04a..db4bf53 100644 --- a/decor/counters.go +++ b/decor/counters.go @@ -24,8 +24,8 @@ const ( _ = iota - unitKiB - unitKB + UnitKiB + UnitKB ) type CounterKiB int64 @@ -155,7 +155,7 @@ // // "%.1f / %.1f" = "1.0MiB / 12.0MiB" or "% .1f / % .1f" = "1.0 MiB / 12.0 MiB" func CountersKibiByte(pairFormat string, wcc ...WC) Decorator { - return counters(unitKiB, pairFormat, wcc...) + return counters(UnitKiB, pairFormat, wcc...) } // CountersKiloByte returns human friendly byte counters decorator, where counters unit is multiple by 1000. @@ -168,7 +168,7 @@ // // "%.1f / %.1f" = "1.0MB / 12.0MB" or "% .1f / % .1f" = "1.0 MB / 12.0 MB" func CountersKiloByte(pairFormat string, wcc ...WC) Decorator { - return counters(unitKB, pairFormat, wcc...) + return counters(UnitKB, pairFormat, wcc...) } func counters(unit int, pairFormat string, wcc ...WC) Decorator { @@ -180,9 +180,9 @@ return DecoratorFunc(func(s *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string { var str string switch unit { - case unitKiB: + case UnitKiB: str = fmt.Sprintf(pairFormat, CounterKiB(s.Current), CounterKiB(s.Total)) - case unitKB: + case UnitKB: str = fmt.Sprintf(pairFormat, CounterKB(s.Current), CounterKB(s.Total)) default: str = fmt.Sprintf(pairFormat, s.Current, s.Total) diff --git a/decor/eta.go b/decor/eta.go index 1f9e5db..d669cbf 100644 --- a/decor/eta.go +++ b/decor/eta.go @@ -16,14 +16,23 @@ // `age` is the previous N samples to average over. // If zero value provided, it defaults to 30. // -// `sbCh` is a start block receive channel. User suppose to send time.Now() +// `sb` is a start block receive channel. User suppose to send time.Now() // to this channel on each iteration of a start block, right before actual job. // The channel will be auto closed on bar shutdown event, so there is no need // to close from user side. // // `wcc` optional WC config -func ETA(style int, age float64, sbCh chan time.Time, wcc ...WC) Decorator { - if sbCh == nil { +func ETA(style int, age float64, sb chan time.Time, wcc ...WC) Decorator { + return MovingAverageETA(style, ewma.NewMovingAverage(age), sb, wcc...) +} + +// MovingAverageETA returns ETA decorator, which relies on MovingAverage implementation to calculate average. +// Default ETA decorator relies on ewma implementation. However you're free to provide your own implementation +// or use alternative one, which is provided by decor package: +// +// decor.MovingAverageETA(decor.ET_STYLE_GO, decor.NewMedianMovingAverage(), sb) +func MovingAverageETA(style int, average MovingAverage, sb chan time.Time, wcc ...WC) Decorator { + if sb == nil { panic("start block channel must not be nil") } var wc WC @@ -31,24 +40,21 @@ wc = widthConf } wc.BuildFormat() - if age == .0 { - age = ewma.AVG_METRIC_AGE - } - d := &ewmaETA{ + d := &movingAverageETA{ style: style, wc: wc, - mAverage: ewma.NewMovingAverage(age), - sbReceiver: sbCh, + average: average, + sbReceiver: sb, sbStreamer: make(chan time.Time), } go d.serve() return d } -type ewmaETA struct { +type movingAverageETA struct { style int wc WC - mAverage ewma.MovingAverage + average ewma.MovingAverage sbReceiver chan time.Time sbStreamer chan time.Time onComplete *struct { @@ -57,12 +63,12 @@ } } -func (s *ewmaETA) Decor(st *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string { +func (s *movingAverageETA) Decor(st *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string { if st.Completed && s.onComplete != nil { return s.onComplete.wc.FormatMsg(s.onComplete.msg, widthAccumulator, widthDistributor) } - v := internal.Round(s.mAverage.Value()) + v := internal.Round(s.average.Value()) if math.IsInf(v, 0) || math.IsNaN(v) { v = .0 } @@ -86,14 +92,14 @@ return s.wc.FormatMsg(str, widthAccumulator, widthDistributor) } -func (s *ewmaETA) NextAmount(n int) { +func (s *movingAverageETA) NextAmount(n int) { sb := <-s.sbStreamer lastBlockTime := time.Since(sb) lastItemEstimate := float64(lastBlockTime) / float64(n) - s.mAverage.Add(lastItemEstimate) + s.average.Add(lastItemEstimate) } -func (s *ewmaETA) OnCompleteMessage(msg string, wcc ...WC) { +func (s *movingAverageETA) OnCompleteMessage(msg string, wcc ...WC) { var wc WC for _, widthConf := range wcc { wc = widthConf @@ -105,11 +111,11 @@ }{msg, wc} } -func (s *ewmaETA) Shutdown() { +func (s *movingAverageETA) Shutdown() { close(s.sbReceiver) } -func (s *ewmaETA) serve() { +func (s *movingAverageETA) serve() { for now := range s.sbReceiver { s.sbStreamer <- now } diff --git a/decor/moving-average.go b/decor/moving-average.go new file mode 100644 index 0000000..5bca2fc --- /dev/null +++ b/decor/moving-average.go @@ -0,0 +1,69 @@ +package decor + +import ( + "sort" + + "github.com/VividCortex/ewma" +) + +// MovingAverage is the interface that computes a moving average over a time- +// series stream of numbers. The average may be over a window or exponentially +// decaying. +type MovingAverage interface { + Add(float64) + Value() float64 + Set(float64) +} + +type median struct { + window [3]float64 + dst []float64 +} + +type sortable []float64 + +func (s sortable) Len() int { return len(s) } +func (s sortable) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s sortable) Less(i, j int) bool { return s[i] < s[j] } + +func (s *median) Add(v float64) { + s.window[0], s.window[1] = s.window[1], s.window[2] + s.window[2] = v +} + +func (s *median) Value() float64 { + copy(s.dst, s.window[:]) + sort.Sort(sortable(s.dst)) + return s.dst[1] +} + +func (s *median) Set(value float64) { + for i, _ := range s.window { + s.window[i] = value + } +} + +// NewMedian is fixed last 3 samples median MovingAverage. +func NewMedian() MovingAverage { + return &median{ + dst: make([]float64, 3), + } +} + +type medianEwma struct { + MovingAverage + median MovingAverage +} + +func (s medianEwma) Add(v float64) { + s.median.Add(v) + s.MovingAverage.Add(s.median.Value()) +} + +// NewMedianEwma is ewma based MovingAverage, which gets its values from median MovingAverage. +func NewMedianEwma(age ...float64) MovingAverage { + return medianEwma{ + MovingAverage: ewma.NewMovingAverage(age...), + median: NewMedian(), + } +} diff --git a/decor/speed.go b/decor/speed.go index 5a00a2a..d8c20b3 100644 --- a/decor/speed.go +++ b/decor/speed.go @@ -125,7 +125,7 @@ // `age` is the previous N samples to average over. // If zero value provided, it defaults to 30. // -// `sbCh` is a start block receive channel. User suppose to send time.Now() +// `sb` is a start block receive channel. User suppose to send time.Now() // to this channel on each iteration of a start block, right before actual job. // The channel will be auto closed on bar shutdown event, so there is no need // to close from user side. @@ -135,8 +135,8 @@ // unitFormat example: // // "%.1f" = "1.0" or "% .1f" = "1.0" -func SpeedNoUnit(unitFormat string, age float64, sbCh chan time.Time, wcc ...WC) Decorator { - return speed(0, unitFormat, age, sbCh, wcc...) +func SpeedNoUnit(unitFormat string, age float64, sb chan time.Time, wcc ...WC) Decorator { + return MovingAverageSpeed(0, unitFormat, ewma.NewMovingAverage(age), sb, wcc...) } // SpeedKibiByte returns human friendly I/O operation speed decorator, @@ -146,7 +146,7 @@ // `age` is the previous N samples to average over. // If zero value provided, it defaults to 30. // -// `sbCh` is a start block receive channel. User suppose to send time.Now() +// `sb` is a start block receive channel. User suppose to send time.Now() // to this channel on each iteration of a start block, right before actual job. // The channel will be auto closed on bar shutdown event, so there is no need // to close from user side. @@ -156,8 +156,8 @@ // unitFormat example: // // "%.1f" = "1.0MiB/s" or "% .1f" = "1.0 MiB/s" -func SpeedKibiByte(unitFormat string, age float64, sbCh chan time.Time, wcc ...WC) Decorator { - return speed(unitKiB, unitFormat, age, sbCh, wcc...) +func SpeedKibiByte(unitFormat string, age float64, sb chan time.Time, wcc ...WC) Decorator { + return MovingAverageSpeed(UnitKiB, unitFormat, ewma.NewMovingAverage(age), sb, wcc...) } // SpeedKiloByte returns human friendly I/O operation speed decorator, @@ -167,7 +167,7 @@ // `age` is the previous N samples to average over. // If zero value provided, it defaults to 30. // -// `sbCh` is a start block receive channel. User suppose to send time.Now() +// `sb` is a start block receive channel. User suppose to send time.Now() // to this channel on each iteration of a start block, right before actual job. // The channel will be auto closed on bar shutdown event, so there is no need // to close from user side. @@ -177,12 +177,17 @@ // unitFormat example: // // "%.1f" = "1.0MB/s" or "% .1f" = "1.0 MB/s" -func SpeedKiloByte(unitFormat string, age float64, sbCh chan time.Time, wcc ...WC) Decorator { - return speed(unitKB, unitFormat, age, sbCh, wcc...) -} - -func speed(unit int, unitFormat string, age float64, sbCh chan time.Time, wcc ...WC) Decorator { - if sbCh == nil { +func SpeedKiloByte(unitFormat string, age float64, sb chan time.Time, wcc ...WC) Decorator { + return MovingAverageSpeed(UnitKB, unitFormat, ewma.NewMovingAverage(age), sb, wcc...) +} + +// MovingAverageSpeed returns Speed decorator, which relies on MovingAverage implementation to calculate average. +// Default Speed decorator relies on ewma implementation. However you're free to provide your own implementation +// or use alternative one, which is provided by decor package: +// +// decor.MovingAverageSpeed(decor.UnitKiB, "% .2f", decor.NewMedianMovingAverage(), sb) +func MovingAverageSpeed(unit int, unitFormat string, average MovingAverage, sb chan time.Time, wcc ...WC) Decorator { + if sb == nil { panic("start block channel must not be nil") } var wc WC @@ -190,26 +195,23 @@ wc = widthConf } wc.BuildFormat() - if age == .0 { - age = ewma.AVG_METRIC_AGE - } - d := &ewmaSpeed{ + d := &movingAverageSpeed{ unit: unit, unitFormat: unitFormat, wc: wc, - mAverage: ewma.NewMovingAverage(age), - sbReceiver: sbCh, + average: average, + sbReceiver: sb, sbStreamer: make(chan time.Time), } go d.serve() return d } -type ewmaSpeed struct { +type movingAverageSpeed struct { unit int unitFormat string wc WC - mAverage ewma.MovingAverage + average ewma.MovingAverage sbReceiver chan time.Time sbStreamer chan time.Time onComplete *struct { @@ -218,16 +220,16 @@ } } -func (s *ewmaSpeed) Decor(st *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string { +func (s *movingAverageSpeed) Decor(st *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string { if st.Completed && s.onComplete != nil { return s.onComplete.wc.FormatMsg(s.onComplete.msg, widthAccumulator, widthDistributor) } var str string - speed := s.mAverage.Value() + speed := s.average.Value() switch s.unit { - case unitKiB: + case UnitKiB: str = fmt.Sprintf(s.unitFormat, SpeedKiB(speed)) - case unitKB: + case UnitKB: str = fmt.Sprintf(s.unitFormat, SpeedKB(speed)) default: str = fmt.Sprintf(s.unitFormat, speed) @@ -235,13 +237,13 @@ return s.wc.FormatMsg(str, widthAccumulator, widthDistributor) } -func (s *ewmaSpeed) NextAmount(n int) { +func (s *movingAverageSpeed) NextAmount(n int) { sb := <-s.sbStreamer speed := float64(n) / time.Since(sb).Seconds() - s.mAverage.Add(speed) -} - -func (s *ewmaSpeed) OnCompleteMessage(msg string, wcc ...WC) { + s.average.Add(speed) +} + +func (s *movingAverageSpeed) OnCompleteMessage(msg string, wcc ...WC) { var wc WC for _, widthConf := range wcc { wc = widthConf @@ -253,11 +255,11 @@ }{msg, wc} } -func (s *ewmaSpeed) Shutdown() { +func (s *movingAverageSpeed) Shutdown() { close(s.sbReceiver) } -func (s *ewmaSpeed) serve() { +func (s *movingAverageSpeed) serve() { for now := range s.sbReceiver { s.sbStreamer <- now } diff --git a/examples/io/single/main.go b/examples/io/single/main.go index aad7746..34e2dcc 100644 --- a/examples/io/single/main.go +++ b/examples/io/single/main.go @@ -50,9 +50,9 @@ decor.CountersKibiByte("% 6.1f / % 6.1f"), ), mpb.AppendDecorators( - decor.ETA(decor.ET_STYLE_MMSS, 600, sbEta), + decor.MovingAverageETA(decor.ET_STYLE_MMSS, decor.NewMedianEwma(300), sbEta), decor.Name(" ] "), - decor.SpeedKibiByte("% .2f", 600, sbSpeed), + decor.MovingAverageSpeed(decor.UnitKiB, "% .2f", decor.NewMedianEwma(300), sbSpeed), ), )