diff --git a/decor/decorator.go b/decor/decorator.go index daa86fc..5bca63d 100644 --- a/decor/decorator.go +++ b/decor/decorator.go @@ -85,16 +85,16 @@ } // EwmaDecorator interface. -// EWMA based decorators must to implement this one. +// EWMA based decorators should implement this one. type EwmaDecorator interface { - IterationUpdate(int64, time.Duration) + EwmaUpdate(int64, time.Duration) } // AverageDecorator interface. // Average decorators should implement this interface to provide start // time adjustment facility, for resume-able tasks. type AverageDecorator interface { - Adjust(time.Time) + AverageAdjust(time.Time) } // ShutdownListener interface. diff --git a/decor/eta.go b/decor/eta.go index 5955f1e..f0c97dd 100644 --- a/decor/eta.go +++ b/decor/eta.go @@ -27,12 +27,13 @@ // work duration as second argument, in order for this decorator to // work correctly. This decorator is a wrapper of MovingAverageETA. func EwmaETA(style TimeStyle, age float64, wcc ...WC) Decorator { - var average MovingAverage + var average ewma.MovingAverage if age == 0 { average = ewma.NewMovingAverage() } else { average = ewma.NewMovingAverage(age) } + average = &ThreadSafeMovingAverage{MovingAverage: average} return MovingAverageETA(style, average, nil, wcc...) } @@ -46,7 +47,7 @@ // // `wcc` optional WC config // -func MovingAverageETA(style TimeStyle, average MovingAverage, normalizer TimeNormalizer, wcc ...WC) Decorator { +func MovingAverageETA(style TimeStyle, average ewma.MovingAverage, normalizer TimeNormalizer, wcc ...WC) Decorator { d := &movingAverageETA{ WC: initWC(wcc...), average: average, @@ -72,12 +73,8 @@ return d.FormatMsg(d.producer(remaining)) } -func (d *movingAverageETA) NextAmount(n int64, wdd ...time.Duration) { - var workDuration time.Duration - for _, wd := range wdd { - workDuration = wd - } - durPerItem := float64(workDuration) / float64(n) +func (d *movingAverageETA) EwmaUpdate(n int64, dur time.Duration) { + durPerItem := float64(dur) / float64(n) if math.IsInf(durPerItem, 0) || math.IsNaN(durPerItem) { return } diff --git a/decor/moving_average.go b/decor/moving_average.go index 933b1f2..5d2d3a0 100644 --- a/decor/moving_average.go +++ b/decor/moving_average.go @@ -2,14 +2,34 @@ import ( "sort" + "sync" "github.com/VividCortex/ewma" ) -// MovingAverage is the interface that computes a moving average over -// a time-series stream of numbers. The average may be over a window -// or exponentially decaying. -type MovingAverage = ewma.MovingAverage +// ThreadSafeMovingAverage is a thread safe wrapper for ewma.MovingAverage. +type ThreadSafeMovingAverage struct { + ewma.MovingAverage + mu sync.Mutex +} + +func (s *ThreadSafeMovingAverage) Add(value float64) { + s.mu.Lock() + s.MovingAverage.Add(value) + s.mu.Unlock() +} + +func (s *ThreadSafeMovingAverage) Value() float64 { + s.mu.Lock() + defer s.mu.Unlock() + return s.MovingAverage.Value() +} + +func (s *ThreadSafeMovingAverage) Set(value float64) { + s.mu.Lock() + s.MovingAverage.Set(value) + s.mu.Unlock() +} type medianWindow [3]float64 @@ -35,6 +55,6 @@ } // NewMedian is fixed last 3 samples median MovingAverage. -func NewMedian() MovingAverage { - return new(medianWindow) +func NewMedian() ewma.MovingAverage { + return &ThreadSafeMovingAverage{MovingAverage: new(medianWindow)} } diff --git a/decor/speed.go b/decor/speed.go index 92d13e6..142e5be 100644 --- a/decor/speed.go +++ b/decor/speed.go @@ -32,12 +32,13 @@ // work duration as second argument, in order for this decorator to // work correctly. This decorator is a wrapper of MovingAverageSpeed. func EwmaSpeed(unit int, format string, age float64, wcc ...WC) Decorator { - var average MovingAverage + var average ewma.MovingAverage if age == 0 { average = ewma.NewMovingAverage() } else { average = ewma.NewMovingAverage(age) } + average = &ThreadSafeMovingAverage{MovingAverage: average} return MovingAverageSpeed(unit, format, average, wcc...) } @@ -59,7 +60,7 @@ // unit=UnitKB, format="%.1f" output: "1.0MB/s" // unit=UnitKB, format="% .1f" output: "1.0 MB/s" // -func MovingAverageSpeed(unit int, format string, average MovingAverage, wcc ...WC) Decorator { +func MovingAverageSpeed(unit int, format string, average ewma.MovingAverage, wcc ...WC) Decorator { if format == "" { format = "%.0f" } @@ -89,12 +90,8 @@ return d.FormatMsg(d.msg) } -func (d *movingAverageSpeed) NextAmount(n int64, wdd ...time.Duration) { - var workDuration time.Duration - for _, wd := range wdd { - workDuration = wd - } - durPerByte := float64(workDuration) / float64(n) +func (d *movingAverageSpeed) EwmaUpdate(n int64, dur time.Duration) { + durPerByte := float64(dur) / float64(n) if math.IsInf(durPerByte, 0) || math.IsNaN(durPerByte) { return }