diff --git a/bar.go b/bar.go index c3a1b90..e3c128f 100644 --- a/bar.go +++ b/bar.go @@ -9,7 +9,6 @@ "time" "unicode/utf8" - "github.com/VividCortex/ewma" "github.com/vbauerster/mpb/decor" ) @@ -36,7 +35,6 @@ cacheState *bState operateState chan func(*bState) frameReaderCh chan io.Reader - startBlockCh <-chan time.Time // done is closed by Bar's goroutine, after cacheState is written done chan struct{} @@ -65,16 +63,15 @@ timeElapsed time.Duration aDecorators []decor.Decorator pDecorators []decor.Decorator + amountReceivers []decor.AmountReceiver + shutdownListeners []decor.ShutdownListener refill *refill bufP, bufB, bufA *bytes.Buffer panicMsg string - ewmAverage ewma.MovingAverage - // following options are assigned to the *Bar - priority int - runningBar *Bar - startBlockCh chan time.Time + priority int + runningBar *Bar } refill struct { char rune @@ -113,14 +110,11 @@ b := &Bar{ priority: s.priority, runningBar: s.runningBar, - startBlockCh: s.startBlockCh, operateState: make(chan func(*bState)), frameReaderCh: make(chan io.Reader, 1), done: make(chan struct{}), shutdown: make(chan struct{}), } - - s.startBlockCh = nil if b.runningBar != nil { b.priority = b.runningBar.priority @@ -156,11 +150,6 @@ proxyReader.startBlockCh = startBlock[0] } return proxyReader -} - -// Increment is a shorthand for b.IncrBy(1) -func (b *Bar) Increment() { - b.IncrBy(1) } // ResumeFill fills bar with different r rune, @@ -241,18 +230,16 @@ } } +// Increment is a shorthand for b.IncrBy(1) +func (b *Bar) Increment() { + b.IncrBy(1) +} + // IncrBy increments progress bar by amount of n func (b *Bar) IncrBy(n int) { - now := time.Now() select { case b.operateState <- func(s *bState) { s.current += int64(n) - s.timeElapsed = now.Sub(s.startTime) - if s.ewmAverage != nil { - lastBlockTime := now.Sub(s.blockStartTime) - lastItemEstimate := float64(lastBlockTime) / float64(n) - s.ewmAverage.Add(lastItemEstimate) - } if s.dynamic { curp := decor.CalcPercentage(s.total, s.current, 100) if 100-curp <= s.totalAutoIncrTrigger { @@ -262,6 +249,10 @@ s.current = s.total s.toComplete = true } + for _, ar := range s.amountReceivers { + ar.NextAmount(n) + } + s.timeElapsed = time.Since(s.startTime) }: case <-b.done: } @@ -281,14 +272,15 @@ select { case op := <-b.operateState: op(s) - case now := <-b.startBlockCh: - s.blockStartTime = now case <-cancel: s.toComplete = true cancel = nil case <-b.shutdown: b.cacheState = s close(b.done) + for _, sl := range s.shutdownListeners { + sl.Shutdown() + } return } } diff --git a/bar_option.go b/bar_option.go index 13162c6..adec4a9 100644 --- a/bar_option.go +++ b/bar_option.go @@ -12,9 +12,11 @@ func AppendDecorators(appenders ...decor.Decorator) BarOption { return func(s *bState) { for _, decorator := range appenders { - if t, ok := decorator.(*decor.EwmaETA); ok { - s.ewmAverage = t - s.startBlockCh = t.StartBlockCh + if ar, ok := decorator.(decor.AmountReceiver); ok { + s.amountReceivers = append(s.amountReceivers, ar) + } + if sl, ok := decorator.(decor.ShutdownListener); ok { + s.shutdownListeners = append(s.shutdownListeners, sl) } s.aDecorators = append(s.aDecorators, decorator) } @@ -25,9 +27,11 @@ func PrependDecorators(prependers ...decor.Decorator) BarOption { return func(s *bState) { for _, decorator := range prependers { - if t, ok := decorator.(*decor.EwmaETA); ok { - s.ewmAverage = t - s.startBlockCh = t.StartBlockCh + if ar, ok := decorator.(decor.AmountReceiver); ok { + s.amountReceivers = append(s.amountReceivers, ar) + } + if sl, ok := decorator.(decor.ShutdownListener); ok { + s.shutdownListeners = append(s.shutdownListeners, sl) } s.pDecorators = append(s.pDecorators, decorator) } diff --git a/decor/decorators.go b/decor/decorators.go index f5bdaf4..91e99bf 100644 --- a/decor/decorators.go +++ b/decor/decorators.go @@ -5,8 +5,6 @@ "math" "time" "unicode/utf8" - - "github.com/VividCortex/ewma" ) const ( @@ -60,13 +58,21 @@ Decor(*Statistics, chan<- int, <-chan int) string } -// CompleteMessenger is an interface with one method: -// -// OnComplete(message string, wc ...WC) +// OnCompleteMessenger is an interface with one method: +// +// OnCompleteMessage(message string, wc ...WC) // // Decorators implementing this interface suppose to return provided string on complete event. -type CompleteMessenger interface { - OnComplete(string, ...WC) +type OnCompleteMessenger interface { + OnCompleteMessage(string, ...WC) +} + +type AmountReceiver interface { + NextAmount(int) +} + +type ShutdownListener interface { + Shutdown() } // DecoratorFunc is an adapter for Decorator interface @@ -129,8 +135,8 @@ // // `wc` optional WC config func OnComplete(decorator Decorator, message string, wc ...WC) Decorator { - if cm, ok := decorator.(CompleteMessenger); ok { - cm.OnComplete(message, wc...) + if cm, ok := decorator.(OnCompleteMessenger); ok { + cm.OnCompleteMessage(message, wc...) return decorator } msgDecorator := Name(message, wc...) @@ -218,84 +224,6 @@ } return wc0.formatMsg(str, widthAccumulator, widthDistributor) }) -} - -// ETA returns exponential-weighted-moving-average ETA decorator. -// -// `style` one of [ET_STYLE_GO|ET_STYLE_HHMMSS|ET_STYLE_HHMM|ET_STYLE_MMSS] -// -// `age` is related to the decay factor alpha by the formula given for the DECAY constant. -// It signifies the average age of the samples as time goes to infinity. Basically age is -// the previous N samples to average over. If zero value provided, it defaults to 30. -// -// `startBlock` is channel, user suppose to send time.Now() on each iteration of block start. -// -// `wc` optional WC config -func ETA(style int, age float64, startBlock chan time.Time, wc ...WC) Decorator { - var wc0 WC - if len(wc) > 0 { - wc0 = wc[0] - } - if age == .0 { - age = ewma.AVG_METRIC_AGE - } - return &EwmaETA{ - MovingAverage: ewma.NewMovingAverage(age), - StartBlockCh: startBlock, - style: style, - wc: wc0, - } -} - -// EwmaETA is a struct, which implements ewma based ETA decorator. -// Normally should not be used directly, use helper func instead: -// -// decor.ETA(int, float64, chan time.Time, ...decor.WC) -type EwmaETA struct { - ewma.MovingAverage - StartBlockCh chan time.Time - style int - wc WC - onComplete *struct { - msg string - wc WC - } -} - -func (s *EwmaETA) Decor(st *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string { - if st.Completed && s.onComplete != nil { - return s.onComplete.wc.formatMsg(s.onComplete.msg, widthAccumulator, widthDistributor) - } - - var str string - timeRemaining := time.Duration(st.Total-st.Current) * time.Duration(round(s.Value())) - hours := int64((timeRemaining / time.Hour) % 60) - minutes := int64((timeRemaining / time.Minute) % 60) - seconds := int64((timeRemaining / time.Second) % 60) - - switch s.style { - case ET_STYLE_GO: - str = fmt.Sprint(time.Duration(timeRemaining.Seconds()) * time.Second) - case ET_STYLE_HHMMSS: - str = fmt.Sprintf("%02d:%02d:%02d", hours, minutes, seconds) - case ET_STYLE_HHMM: - str = fmt.Sprintf("%02d:%02d", hours, minutes) - case ET_STYLE_MMSS: - str = fmt.Sprintf("%02d:%02d", minutes, seconds) - } - - return s.wc.formatMsg(str, widthAccumulator, widthDistributor) -} - -func (s *EwmaETA) OnComplete(msg string, wc ...WC) { - var wc0 WC - if len(wc) > 0 { - wc0 = wc[0] - } - s.onComplete = &struct { - msg string - wc WC - }{msg, wc0} } // Elapsed returns elapsed time decorator. diff --git a/decor/eta.go b/decor/eta.go new file mode 100644 index 0000000..5d073f5 --- /dev/null +++ b/decor/eta.go @@ -0,0 +1,106 @@ +package decor + +import ( + "fmt" + "time" + + "github.com/VividCortex/ewma" +) + +// ETA returns exponential-weighted-moving-average ETA decorator. +// +// `style` one of [ET_STYLE_GO|ET_STYLE_HHMMSS|ET_STYLE_HHMM|ET_STYLE_MMSS] +// +// `age` is related to the decay factor alpha by the formula given for the DECAY constant. +// It signifies the average age of the samples as time goes to infinity. Basically age is +// the previous N samples to average over. If zero value provided, it defaults to 30. +// +// `startBlock` is a time.Time receive channel. User suppose to send time.Now() +// to this channel on each iteration of block start, right before actual job. +// The channel will be closed automatically on bar shutdown event, so there is +// no need to close from user side. +// +// `wc` optional WC config +func ETA(style int, age float64, startBlock chan time.Time, wc ...WC) Decorator { + var wc0 WC + if len(wc) > 0 { + wc0 = wc[0] + } + if age == .0 { + age = ewma.AVG_METRIC_AGE + } + eta := &ewmaETA{ + mAverage: ewma.NewMovingAverage(age), + startBlockReceiver: startBlock, + startBlockStreamer: make(chan time.Time), + style: style, + wc: wc0, + } + go eta.serve() + return eta +} + +type ewmaETA struct { + mAverage ewma.MovingAverage + startBlockReceiver chan time.Time + startBlockStreamer chan time.Time + style int + wc WC + onComplete *struct { + msg string + wc WC + } +} + +func (s *ewmaETA) Decor(st *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string { + if st.Completed && s.onComplete != nil { + return s.onComplete.wc.formatMsg(s.onComplete.msg, widthAccumulator, widthDistributor) + } + + var str string + timeRemaining := time.Duration(st.Total-st.Current) * time.Duration(round(s.mAverage.Value())) + hours := int64((timeRemaining / time.Hour) % 60) + minutes := int64((timeRemaining / time.Minute) % 60) + seconds := int64((timeRemaining / time.Second) % 60) + + switch s.style { + case ET_STYLE_GO: + str = fmt.Sprint(time.Duration(timeRemaining.Seconds()) * time.Second) + case ET_STYLE_HHMMSS: + str = fmt.Sprintf("%02d:%02d:%02d", hours, minutes, seconds) + case ET_STYLE_HHMM: + str = fmt.Sprintf("%02d:%02d", hours, minutes) + case ET_STYLE_MMSS: + str = fmt.Sprintf("%02d:%02d", minutes, seconds) + } + + return s.wc.formatMsg(str, widthAccumulator, widthDistributor) +} + +func (s *ewmaETA) NextAmount(n int) { + sb := <-s.startBlockStreamer + lastBlockTime := time.Since(sb) + lastItemEstimate := float64(lastBlockTime) / float64(n) + s.mAverage.Add(lastItemEstimate) +} + +func (s *ewmaETA) OnCompleteMessage(msg string, wc ...WC) { + var wc0 WC + if len(wc) > 0 { + wc0 = wc[0] + } + s.onComplete = &struct { + msg string + wc WC + }{msg, wc0} +} + +func (s *ewmaETA) Shutdown() { + close(s.startBlockReceiver) +} + +func (s *ewmaETA) serve() { + for now := range s.startBlockReceiver { + s.startBlockStreamer <- now + } +}