diff --git a/README.md b/README.md index de9a0fb..d6edbe4 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,6 @@ total := 100 name := "Single Bar:" - sbEta := make(chan time.Time) // adding a single bar bar := p.AddBar(int64(total), mpb.PrependDecorators( @@ -48,7 +47,7 @@ // replace ETA decorator with "done" message, OnComplete event decor.OnComplete( // ETA decorator with ewma age of 60, and width reservation of 4 - decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WC{W: 4}), "done", + decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WC{W: 4}), "done", ), ), mpb.AppendDecorators(decor.Percentage()), @@ -56,11 +55,10 @@ // simulating some work max := 100 * time.Millisecond for i := 0; i < total; i++ { - // update start block time, required for ETA calculation - sbEta <- time.Now() + start := time.Now() time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) - // increment by 1 (there is bar.IncrBy(int) method, if needed) - bar.Increment() + // ewma based decorators require work duration measurement + bar.IncrBy(1, time.Since(start)) } // wait for our bar to complete and flush p.Wait() @@ -75,7 +73,6 @@ for i := 0; i < numBars; i++ { name := fmt.Sprintf("Bar#%d:", i) - sbEta := make(chan time.Time) bar := p.AddBar(int64(total), mpb.PrependDecorators( // simple name decorator @@ -87,7 +84,7 @@ // replace ETA decorator with "done" message, OnComplete event decor.OnComplete( // ETA decorator with ewma age of 60 - decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta), "done", + decor.EwmaETA(decor.ET_STYLE_GO, 60), "done", ), ), ) @@ -96,9 +93,10 @@ defer wg.Done() max := 100 * time.Millisecond for i := 0; i < total; i++ { - sbEta <- time.Now() + start := time.Now() time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) - bar.Increment() + // ewma based decorators require work duration measurement + bar.IncrBy(1, time.Since(start)) } }() } diff --git a/bar.go b/bar.go index 01682bc..fed4482 100644 --- a/bar.go +++ b/bar.go @@ -122,7 +122,7 @@ return b } -// RemoveAllPrependers removes all prepend functions +// RemoveAllPrependers removes all prepend functions. func (b *Bar) RemoveAllPrependers() { select { case b.operateState <- func(s *bState) { s.pDecorators = nil }: @@ -130,7 +130,7 @@ } } -// RemoveAllAppenders removes all append functions +// RemoveAllAppenders removes all append functions. func (b *Bar) RemoveAllAppenders() { select { case b.operateState <- func(s *bState) { s.aDecorators = nil }: @@ -138,21 +138,16 @@ } } -// ProxyReader wrapper for io operations, like io.Copy -// -// `r` io.Reader to be wrapped -// -// `sbChannels` optional start block channels -func (b *Bar) ProxyReader(r io.Reader, sbChannels ...chan<- time.Time) *Reader { +// ProxyReader allows progress tracking against provided io.Reader. +func (b *Bar) ProxyReader(r io.Reader) *Reader { proxyReader := &Reader{ - Reader: r, - bar: b, - sbChannels: sbChannels, + Reader: r, + bar: b, } return proxyReader } -// NumOfAppenders returns current number of append decorators +// NumOfAppenders returns current number of append decorators. func (b *Bar) NumOfAppenders() int { result := make(chan int) select { @@ -163,7 +158,7 @@ } } -// NumOfPrependers returns current number of prepend decorators +// NumOfPrependers returns current number of prepend decorators. func (b *Bar) NumOfPrependers() int { result := make(chan int) select { @@ -174,7 +169,7 @@ } } -// ID returs id of the bar +// ID returs id of the bar. func (b *Bar) ID() int { result := make(chan int) select { @@ -209,6 +204,7 @@ // SetTotal sets total dynamically. The final param indicates the very last set, // in other words you should set it to true when total is determined. +// After final has been set, IncrBy should be called at least once. func (b *Bar) SetTotal(total int64, final bool) { b.operateState <- func(s *bState) { if total != 0 { @@ -232,7 +228,9 @@ } // IncrBy increments progress bar by amount of n. -func (b *Bar) IncrBy(n int) { +// wdd is optional work duration i.e. time.Since(start), +// which expected to be provided, if any ewma based decorator is used. +func (b *Bar) IncrBy(n int, wdd ...time.Duration) { select { case b.operateState <- func(s *bState) { s.current += int64(n) @@ -246,7 +244,7 @@ s.toComplete = true } for _, ar := range s.amountReceivers { - ar.NextAmount(n) + ar.NextAmount(n, wdd...) } }: case <-b.done: diff --git a/decor/counters.go b/decor/counters.go index c5b9146..33dc666 100644 --- a/decor/counters.go +++ b/decor/counters.go @@ -159,7 +159,7 @@ // // `wcc` optional WC config // -// pairFormat example: +// pairFormat example if UnitKB is chosen: // // "%.1f / %.1f" = "1.0MB / 12.0MB" or "% .1f / % .1f" = "1.0 MB / 12.0 MB" func Counters(unit int, pairFormat string, wcc ...WC) Decorator { diff --git a/decor/decorator.go b/decor/decorator.go index 32552c6..b454404 100644 --- a/decor/decorator.go +++ b/decor/decorator.go @@ -2,6 +2,7 @@ import ( "fmt" + "time" "unicode/utf8" ) @@ -64,7 +65,7 @@ } type AmountReceiver interface { - NextAmount(int) + NextAmount(int, ...time.Duration) } type ShutdownListener interface { diff --git a/decor/eta.go b/decor/eta.go index bc6b09a..6a8ba1d 100644 --- a/decor/eta.go +++ b/decor/eta.go @@ -15,14 +15,9 @@ // // `age` is the previous N samples to average over. // -// `sb` is a start block receive channel. It's required by MovingAverage algorithm, -// therefore result of time.Now() must be sent to this channel on each iteration -// of a start block, right before the actual job. There is no need to close the channel, -// as it will be closed automatically on bar completion event. -// // `wcc` optional WC config -func EwmaETA(style int, age float64, sb chan time.Time, wcc ...WC) Decorator { - return MovingAverageETA(style, ewma.NewMovingAverage(age), sb, wcc...) +func EwmaETA(style int, age float64, wcc ...WC) Decorator { + return MovingAverageETA(style, ewma.NewMovingAverage(age), wcc...) } // MovingAverageETA decorator relies on MovingAverage implementation to calculate its average. @@ -31,29 +26,18 @@ // // `average` MovingAverage implementation // -// `sb` is a start block receive channel. It's required by MovingAverage algorithm, -// therefore result of time.Now() must be sent to this channel on each iteration -// of a start block, right before the actual job. There is no need to close the channel, -// as it will be closed automatically on bar completion event. -// // `wcc` optional WC config -func MovingAverageETA(style int, average MovingAverage, sb chan time.Time, wcc ...WC) Decorator { - if sb == nil { - panic("start block channel must not be nil") - } +func MovingAverageETA(style int, average MovingAverage, wcc ...WC) Decorator { var wc WC for _, widthConf := range wcc { wc = widthConf } wc.BuildFormat() d := &movingAverageETA{ - style: style, - wc: wc, - average: average, - sbReceiver: sb, - sbStreamer: make(chan time.Time), + style: style, + wc: wc, + average: average, } - go d.serve() return d } @@ -61,8 +45,6 @@ style int wc WC average ewma.MovingAverage - sbReceiver chan time.Time - sbStreamer chan time.Time onComplete *struct { msg string wc WC @@ -98,10 +80,12 @@ return s.wc.FormatMsg(str, widthAccumulator, widthDistributor) } -func (s *movingAverageETA) NextAmount(n int) { - sb := <-s.sbStreamer - lastBlockTime := time.Since(sb) - lastItemEstimate := float64(lastBlockTime) / float64(n) +func (s *movingAverageETA) NextAmount(n int, wdd ...time.Duration) { + var workDuration time.Duration + for _, wd := range wdd { + workDuration = wd + } + lastItemEstimate := float64(workDuration) / float64(n) s.average.Add(lastItemEstimate) } @@ -115,16 +99,6 @@ msg string wc WC }{msg, wc} -} - -func (s *movingAverageETA) Shutdown() { - close(s.sbReceiver) -} - -func (s *movingAverageETA) serve() { - for now := range s.sbReceiver { - s.sbStreamer <- now - } } // AverageETA decorator. diff --git a/decor/speed.go b/decor/speed.go index 4653d37..69d0802 100644 --- a/decor/speed.go +++ b/decor/speed.go @@ -127,18 +127,13 @@ // // `average` MovingAverage implementation // -// `sb` is a start block receive channel. It's required by MovingAverage algorithm, -// therefore result of time.Now() must be sent to this channel on each iteration -// of a start block, right before the actual job. There is no need to close the channel, -// as it will be closed automatically on bar completion event. -// // `wcc` optional WC config // -// unitFormat example if UnitKiB chosen: +// unitFormat example if UnitKiB is chosen: // // "%.1f" = "1.0MiB/s" or "% .1f" = "1.0 MiB/s" -func EwmaSpeed(unit int, unitFormat string, age float64, sb chan time.Time, wcc ...WC) Decorator { - return MovingAverageSpeed(unit, unitFormat, ewma.NewMovingAverage(age), sb, wcc...) +func EwmaSpeed(unit int, unitFormat string, age float64, wcc ...WC) Decorator { + return MovingAverageSpeed(unit, unitFormat, ewma.NewMovingAverage(age), wcc...) } // MovingAverageSpeed decorator relies on MovingAverage implementation to calculate its average. @@ -149,16 +144,8 @@ // // `average` MovingAverage implementation // -// `sb` is a start block receive channel. It's required by MovingAverage algorithm, -// therefore result of time.Now() must be sent to this channel on each iteration -// of a start block, right before the actual job. There is no need to close the channel, -// as it will be closed automatically on bar completion event. -// // `wcc` optional WC config -func MovingAverageSpeed(unit int, unitFormat string, average MovingAverage, sb chan time.Time, wcc ...WC) Decorator { - if sb == nil { - panic("start block channel must not be nil") - } +func MovingAverageSpeed(unit int, unitFormat string, average MovingAverage, wcc ...WC) Decorator { var wc WC for _, widthConf := range wcc { wc = widthConf @@ -169,10 +156,7 @@ unitFormat: unitFormat, wc: wc, average: average, - sbReceiver: sb, - sbStreamer: make(chan time.Time), - } - go d.serve() + } return d } @@ -181,8 +165,6 @@ unitFormat string wc WC average ewma.MovingAverage - sbReceiver chan time.Time - sbStreamer chan time.Time onComplete *struct { msg string wc WC @@ -206,9 +188,12 @@ return s.wc.FormatMsg(str, widthAccumulator, widthDistributor) } -func (s *movingAverageSpeed) NextAmount(n int) { - sb := <-s.sbStreamer - speed := float64(n) / time.Since(sb).Seconds() +func (s *movingAverageSpeed) NextAmount(n int, wdd ...time.Duration) { + var workDuration time.Duration + for _, wd := range wdd { + workDuration = wd + } + speed := float64(n) / workDuration.Seconds() s.average.Add(speed) } @@ -224,16 +209,6 @@ }{msg, wc} } -func (s *movingAverageSpeed) Shutdown() { - close(s.sbReceiver) -} - -func (s *movingAverageSpeed) serve() { - for now := range s.sbReceiver { - s.sbStreamer <- now - } -} - // AverageSpeed decorator with dynamic unit measure adjustment. // // `unit` one of [0|UnitKiB|UnitKB] zero for no unit @@ -242,7 +217,7 @@ // // `wcc` optional WC config // -// unitFormat example if UnitKiB chosen: +// unitFormat example if UnitKiB is chosen: // // "%.1f" = "1.0MiB/s" or "% .1f" = "1.0 MiB/s" func AverageSpeed(unit int, unitFormat string, wcc ...WC) Decorator { diff --git a/example_test.go b/example_test.go index cdb6540..97f1ded 100644 --- a/example_test.go +++ b/example_test.go @@ -23,7 +23,6 @@ total := 100 name := "Single Bar:" - sbEta := make(chan time.Time) // adding a single bar bar := p.AddBar(int64(total), mpb.PrependDecorators( @@ -32,7 +31,7 @@ // replace ETA decorator with "done" message, OnComplete event decor.OnComplete( // ETA decorator with ewma age of 60, and width reservation of 4 - decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WC{W: 4}), "done", + decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WC{W: 4}), "done", ), ), mpb.AppendDecorators(decor.Percentage()), @@ -40,11 +39,10 @@ // simulating some work max := 100 * time.Millisecond for i := 0; i < total; i++ { - // update start block time, required for ETA calculation - sbEta <- time.Now() + start := time.Now() time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) - // increment by 1 (there is bar.IncrBy(int) method, if needed) - bar.Increment() + // ewma based decorators require work duration measurement + bar.IncrBy(1, time.Since(start)) } // wait for our bar to complete and flush p.Wait() diff --git a/examples/cancel/main.go b/examples/cancel/main.go index f400fbd..001e201 100644 --- a/examples/cancel/main.go +++ b/examples/cancel/main.go @@ -32,11 +32,10 @@ for i := 0; i < numBars; i++ { name := fmt.Sprintf("Bar#%d:", i) - sbEta := make(chan time.Time) bar := p.AddBar(int64(total), mpb.PrependDecorators( decor.Name(name), - decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WCSyncSpace), + decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WCSyncSpace), ), mpb.AppendDecorators( decor.Percentage(decor.WC{W: 5}), @@ -47,9 +46,10 @@ defer wg.Done() max := 100 * time.Millisecond for !bar.Completed() { - sbEta <- time.Now() + start := time.Now() time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) - bar.Increment() + // ewma based decorators require work duration measurement + bar.IncrBy(1, time.Since(start)) } }() } diff --git a/examples/complex/main.go b/examples/complex/main.go index 125071d..a2789d5 100644 --- a/examples/complex/main.go +++ b/examples/complex/main.go @@ -36,7 +36,7 @@ ), mpb.AppendDecorators(decor.Percentage(decor.WC{W: 5})), ) - go newTask(wg, b, i+1, nil) + go newTask(wg, b, i+1) bars = append(bars, b) } @@ -44,7 +44,6 @@ doneWg.Add(1) i := i go func() { - sbEta := make(chan time.Time) task := fmt.Sprintf("Task#%02d:", i) job := "installing" // preparing delayed bars @@ -55,7 +54,7 @@ decor.Name(task, decor.WC{W: len(task) + 1, C: decor.DidentRight}), decor.OnComplete(decor.Name(job, decor.WCSyncSpaceR), "done!", decor.WCSyncSpaceR), decor.OnComplete( - decor.EwmaETA(decor.ET_STYLE_MMSS, 60, sbEta, decor.WCSyncWidth), "", decor.WCSyncSpace, + decor.EwmaETA(decor.ET_STYLE_MMSS, 60, decor.WCSyncWidth), "", decor.WCSyncSpace, ), ), mpb.AppendDecorators( @@ -64,21 +63,20 @@ ) // waiting for download to complete, before starting install job downloadWgg[i].Wait() - go newTask(doneWg, b, numBars-i, sbEta) + go newTask(doneWg, b, numBars-i) }() } p.Wait() } -func newTask(wg *sync.WaitGroup, b *mpb.Bar, incrBy int, sbCh chan<- time.Time) { +func newTask(wg *sync.WaitGroup, b *mpb.Bar, incrBy int) { defer wg.Done() max := 100 * time.Millisecond for !b.Completed() { - if sbCh != nil { - sbCh <- time.Now() - } + start := time.Now() time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) - b.IncrBy(incrBy) + // ewma based decorators require work duration measurement + b.IncrBy(incrBy, time.Since(start)) } } diff --git a/examples/io/multiple/main.go b/examples/io/multiple/main.go index e5d5bed..8a02e16 100644 --- a/examples/io/multiple/main.go +++ b/examples/io/multiple/main.go @@ -8,7 +8,6 @@ "os" "path/filepath" "sync" - "time" "github.com/vbauerster/mpb" "github.com/vbauerster/mpb/decor" @@ -58,7 +57,6 @@ return } - sbEta := make(chan time.Time) // create bar with appropriate decorators bar := p.AddBar(size, mpb.BarPriority(n), mpb.PrependDecorators( @@ -66,13 +64,13 @@ decor.CountersKibiByte("%6.1f / %6.1f", decor.WCSyncWidth), ), mpb.AppendDecorators( - decor.EwmaETA(decor.ET_STYLE_HHMMSS, 1024*4, sbEta, decor.WCSyncWidth), + decor.EwmaETA(decor.ET_STYLE_HHMMSS, 1024*4, decor.WCSyncWidth), decor.AverageSpeed(decor.UnitKiB, "% .2f"), ), ) // create proxy reader - reader := bar.ProxyReader(resp.Body, sbEta) + reader := bar.ProxyReader(resp.Body) // and copy from reader _, err = io.Copy(dest, reader) diff --git a/examples/io/single/main.go b/examples/io/single/main.go index 0e159de..2986d7e 100644 --- a/examples/io/single/main.go +++ b/examples/io/single/main.go @@ -43,20 +43,19 @@ mpb.WithRefreshRate(180*time.Millisecond), ) - sbEta := make(chan time.Time) bar := p.AddBar(size, mpb.PrependDecorators( decor.CountersKibiByte("% 6.1f / % 6.1f"), ), mpb.AppendDecorators( - decor.EwmaETA(decor.ET_STYLE_MMSS, 1024*8, sbEta), + decor.EwmaETA(decor.ET_STYLE_MMSS, 1024*8), decor.Name(" ] "), decor.AverageSpeed(decor.UnitKiB, "% .2f"), ), ) // create proxy reader - reader := bar.ProxyReader(resp.Body, sbEta) + reader := bar.ProxyReader(resp.Body) // and copy from reader, ignoring errors io.Copy(dest, reader) diff --git a/examples/remove/main.go b/examples/remove/main.go index a0f530b..bc9c738 100644 --- a/examples/remove/main.go +++ b/examples/remove/main.go @@ -29,12 +29,11 @@ bOption = mpb.BarRemoveOnComplete() } - sbEta := make(chan time.Time) b := p.AddBar(int64(total), mpb.BarID(i), bOption, mpb.PrependDecorators( decor.Name(name), - decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WCSyncSpace), + decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WCSyncSpace), ), mpb.AppendDecorators(decor.Percentage()), ) @@ -42,13 +41,14 @@ defer wg.Done() max := 100 * time.Millisecond for i := 0; i < total; i++ { - sbEta <- time.Now() + start := time.Now() if b.ID() == 2 && i == 42 { p.Abort(b) return } time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) - b.Increment() + // ewma based decorators require work duration measurement + b.IncrBy(1, time.Since(start)) } }() } diff --git a/examples/simple/main.go b/examples/simple/main.go index ba6ba05..afc2050 100644 --- a/examples/simple/main.go +++ b/examples/simple/main.go @@ -22,7 +22,6 @@ for i := 0; i < numBars; i++ { name := fmt.Sprintf("Bar#%d:", i) - sbEta := make(chan time.Time) bar := p.AddBar(int64(total), mpb.PrependDecorators( // simple name decorator @@ -34,7 +33,7 @@ // replace ETA decorator with "done" message, OnComplete event decor.OnComplete( // ETA decorator with ewma age of 60 - decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta), "done", + decor.EwmaETA(decor.ET_STYLE_GO, 60), "done", ), ), ) @@ -43,9 +42,10 @@ defer wg.Done() max := 100 * time.Millisecond for i := 0; i < total; i++ { - sbEta <- time.Now() + start := time.Now() time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) - bar.Increment() + // ewma based decorators require work duration measurement + bar.IncrBy(1, time.Since(start)) } }() } diff --git a/examples/singleBar/main.go b/examples/singleBar/main.go index 41299a4..7fe249f 100644 --- a/examples/singleBar/main.go +++ b/examples/singleBar/main.go @@ -20,7 +20,6 @@ total := 100 name := "Single Bar:" - sbEta := make(chan time.Time) // adding a single bar bar := p.AddBar(int64(total), mpb.PrependDecorators( @@ -29,7 +28,7 @@ // replace ETA decorator with "done" message, OnComplete event decor.OnComplete( // ETA decorator with ewma age of 60, and width reservation of 4 - decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WC{W: 4}), "done", + decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WC{W: 4}), "done", ), ), mpb.AppendDecorators(decor.Percentage()), @@ -37,11 +36,10 @@ // simulating some work max := 100 * time.Millisecond for i := 0; i < total; i++ { - // update start block time, required for ETA calculation - sbEta <- time.Now() + start := time.Now() time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) - // increment by 1 (there is bar.IncrBy(int) method, if needed) - bar.Increment() + // ewma based decorators require work duration measurement + bar.IncrBy(1, time.Since(start)) } // wait for our bar to complete and flush p.Wait() diff --git a/examples/sort/main.go b/examples/sort/main.go index a9744ed..3dd979c 100644 --- a/examples/sort/main.go +++ b/examples/sort/main.go @@ -26,27 +26,27 @@ if i != 1 { name = fmt.Sprintf("Bar#%d:", i) } - sbEta := make(chan time.Time) b := p.AddBar(int64(total), mpb.PrependDecorators( decor.Name(name, decor.WCSyncWidth), decor.CountersNoUnit("%d / %d", decor.WCSyncSpace), ), mpb.AppendDecorators( - decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WC{W: 3}), + decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WC{W: 3}), ), ) go func() { defer wg.Done() max := 100 * time.Millisecond for i := 0; i < total; i++ { - sbEta <- time.Now() + start := time.Now() time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) if i&1 == 1 { priority := total - int(b.Current()) p.UpdateBarPriority(b, priority) } - b.Increment() + // ewma based decorators require work duration measurement + b.IncrBy(1, time.Since(start)) } }() } diff --git a/examples/stress/main.go b/examples/stress/main.go index 19cb667..b1e99fb 100644 --- a/examples/stress/main.go +++ b/examples/stress/main.go @@ -19,7 +19,6 @@ } func main() { - var wg sync.WaitGroup p := mpb.New(mpb.WithWaitGroup(&wg)) wg.Add(totalBars) @@ -27,11 +26,10 @@ for i := 0; i < totalBars; i++ { name := fmt.Sprintf("Bar#%02d: ", i) total := rand.Intn(320) + 10 - sbEta := make(chan time.Time) bar := p.AddBar(int64(total), mpb.PrependDecorators( decor.Name(name), - decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WCSyncSpace), + decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WCSyncSpace), ), mpb.AppendDecorators( decor.Percentage(decor.WC{W: 5}), @@ -42,9 +40,10 @@ defer wg.Done() max := 100 * time.Millisecond for !bar.Completed() { - sbEta <- time.Now() + start := time.Now() time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) - bar.Increment() + // ewma based decorators require work duration measurement + bar.IncrBy(1, time.Since(start)) } }() } diff --git a/proxyreader.go b/proxyreader.go index 942ccc7..1e5e9b9 100644 --- a/proxyreader.go +++ b/proxyreader.go @@ -8,20 +8,13 @@ // Reader is io.Reader wrapper, for proxy read bytes type Reader struct { io.Reader - bar *Bar - sbChannels []chan<- time.Time + bar *Bar } func (r *Reader) Read(p []byte) (int, error) { - select { - case <-r.bar.done: - default: - for _, ch := range r.sbChannels { - ch <- time.Now() - } - } + start := time.Now() n, err := r.Reader.Read(p) - r.bar.IncrBy(n) + r.bar.IncrBy(n, time.Since(start)) return n, err }