Codebase list golang-github-vbauerster-mpb / 42963cd
get rid of start block channel Vladimir Bauer 7 years ago
17 changed file(s) with 91 addition(s) and 162 deletion(s). Raw diff Collapse all Expand all
3838
3939 total := 100
4040 name := "Single Bar:"
41 sbEta := make(chan time.Time)
4241 // adding a single bar
4342 bar := p.AddBar(int64(total),
4443 mpb.PrependDecorators(
4746 // replace ETA decorator with "done" message, OnComplete event
4847 decor.OnComplete(
4948 // ETA decorator with ewma age of 60, and width reservation of 4
50 decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WC{W: 4}), "done",
49 decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WC{W: 4}), "done",
5150 ),
5251 ),
5352 mpb.AppendDecorators(decor.Percentage()),
5554 // simulating some work
5655 max := 100 * time.Millisecond
5756 for i := 0; i < total; i++ {
58 // update start block time, required for ETA calculation
59 sbEta <- time.Now()
57 start := time.Now()
6058 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
61 // increment by 1 (there is bar.IncrBy(int) method, if needed)
62 bar.Increment()
59 // ewma based decorators require work duration measurement
60 bar.IncrBy(1, time.Since(start))
6361 }
6462 // wait for our bar to complete and flush
6563 p.Wait()
7472
7573 for i := 0; i < numBars; i++ {
7674 name := fmt.Sprintf("Bar#%d:", i)
77 sbEta := make(chan time.Time)
7875 bar := p.AddBar(int64(total),
7976 mpb.PrependDecorators(
8077 // simple name decorator
8683 // replace ETA decorator with "done" message, OnComplete event
8784 decor.OnComplete(
8885 // ETA decorator with ewma age of 60
89 decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta), "done",
86 decor.EwmaETA(decor.ET_STYLE_GO, 60), "done",
9087 ),
9188 ),
9289 )
9592 defer wg.Done()
9693 max := 100 * time.Millisecond
9794 for i := 0; i < total; i++ {
98 sbEta <- time.Now()
95 start := time.Now()
9996 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
100 bar.Increment()
97 // ewma based decorators require work duration measurement
98 bar.IncrBy(1, time.Since(start))
10199 }
102100 }()
103101 }
121121 return b
122122 }
123123
124 // RemoveAllPrependers removes all prepend functions
124 // RemoveAllPrependers removes all prepend functions.
125125 func (b *Bar) RemoveAllPrependers() {
126126 select {
127127 case b.operateState <- func(s *bState) { s.pDecorators = nil }:
129129 }
130130 }
131131
132 // RemoveAllAppenders removes all append functions
132 // RemoveAllAppenders removes all append functions.
133133 func (b *Bar) RemoveAllAppenders() {
134134 select {
135135 case b.operateState <- func(s *bState) { s.aDecorators = nil }:
137137 }
138138 }
139139
140 // ProxyReader wrapper for io operations, like io.Copy
141 //
142 // `r` io.Reader to be wrapped
143 //
144 // `sbChannels` optional start block channels
145 func (b *Bar) ProxyReader(r io.Reader, sbChannels ...chan<- time.Time) *Reader {
140 // ProxyReader allows progress tracking against provided io.Reader.
141 func (b *Bar) ProxyReader(r io.Reader) *Reader {
146142 proxyReader := &Reader{
147 Reader: r,
148 bar: b,
149 sbChannels: sbChannels,
143 Reader: r,
144 bar: b,
150145 }
151146 return proxyReader
152147 }
153148
154 // NumOfAppenders returns current number of append decorators
149 // NumOfAppenders returns current number of append decorators.
155150 func (b *Bar) NumOfAppenders() int {
156151 result := make(chan int)
157152 select {
162157 }
163158 }
164159
165 // NumOfPrependers returns current number of prepend decorators
160 // NumOfPrependers returns current number of prepend decorators.
166161 func (b *Bar) NumOfPrependers() int {
167162 result := make(chan int)
168163 select {
173168 }
174169 }
175170
176 // ID returs id of the bar
171 // ID returs id of the bar.
177172 func (b *Bar) ID() int {
178173 result := make(chan int)
179174 select {
208203
209204 // SetTotal sets total dynamically. The final param indicates the very last set,
210205 // in other words you should set it to true when total is determined.
206 // After final has been set, IncrBy should be called at least once.
211207 func (b *Bar) SetTotal(total int64, final bool) {
212208 b.operateState <- func(s *bState) {
213209 if total != 0 {
231227 }
232228
233229 // IncrBy increments progress bar by amount of n.
234 func (b *Bar) IncrBy(n int) {
230 // wdd is optional work duration i.e. time.Since(start),
231 // which expected to be provided, if any ewma based decorator is used.
232 func (b *Bar) IncrBy(n int, wdd ...time.Duration) {
235233 select {
236234 case b.operateState <- func(s *bState) {
237235 s.current += int64(n)
245243 s.toComplete = true
246244 }
247245 for _, ar := range s.amountReceivers {
248 ar.NextAmount(n)
246 ar.NextAmount(n, wdd...)
249247 }
250248 }:
251249 case <-b.done:
158158 //
159159 // `wcc` optional WC config
160160 //
161 // pairFormat example:
161 // pairFormat example if UnitKB is chosen:
162162 //
163163 // "%.1f / %.1f" = "1.0MB / 12.0MB" or "% .1f / % .1f" = "1.0 MB / 12.0 MB"
164164 func Counters(unit int, pairFormat string, wcc ...WC) Decorator {
11
22 import (
33 "fmt"
4 "time"
45 "unicode/utf8"
56 )
67
6364 }
6465
6566 type AmountReceiver interface {
66 NextAmount(int)
67 NextAmount(int, ...time.Duration)
6768 }
6869
6970 type ShutdownListener interface {
1414 //
1515 // `age` is the previous N samples to average over.
1616 //
17 // `sb` is a start block receive channel. It's required by MovingAverage algorithm,
18 // therefore result of time.Now() must be sent to this channel on each iteration
19 // of a start block, right before the actual job. There is no need to close the channel,
20 // as it will be closed automatically on bar completion event.
21 //
2217 // `wcc` optional WC config
23 func EwmaETA(style int, age float64, sb chan time.Time, wcc ...WC) Decorator {
24 return MovingAverageETA(style, ewma.NewMovingAverage(age), sb, wcc...)
18 func EwmaETA(style int, age float64, wcc ...WC) Decorator {
19 return MovingAverageETA(style, ewma.NewMovingAverage(age), wcc...)
2520 }
2621
2722 // MovingAverageETA decorator relies on MovingAverage implementation to calculate its average.
3025 //
3126 // `average` MovingAverage implementation
3227 //
33 // `sb` is a start block receive channel. It's required by MovingAverage algorithm,
34 // therefore result of time.Now() must be sent to this channel on each iteration
35 // of a start block, right before the actual job. There is no need to close the channel,
36 // as it will be closed automatically on bar completion event.
37 //
3828 // `wcc` optional WC config
39 func MovingAverageETA(style int, average MovingAverage, sb chan time.Time, wcc ...WC) Decorator {
40 if sb == nil {
41 panic("start block channel must not be nil")
42 }
29 func MovingAverageETA(style int, average MovingAverage, wcc ...WC) Decorator {
4330 var wc WC
4431 for _, widthConf := range wcc {
4532 wc = widthConf
4633 }
4734 wc.BuildFormat()
4835 d := &movingAverageETA{
49 style: style,
50 wc: wc,
51 average: average,
52 sbReceiver: sb,
53 sbStreamer: make(chan time.Time),
36 style: style,
37 wc: wc,
38 average: average,
5439 }
55 go d.serve()
5640 return d
5741 }
5842
6044 style int
6145 wc WC
6246 average ewma.MovingAverage
63 sbReceiver chan time.Time
64 sbStreamer chan time.Time
6547 onComplete *struct {
6648 msg string
6749 wc WC
9779 return s.wc.FormatMsg(str, widthAccumulator, widthDistributor)
9880 }
9981
100 func (s *movingAverageETA) NextAmount(n int) {
101 sb := <-s.sbStreamer
102 lastBlockTime := time.Since(sb)
103 lastItemEstimate := float64(lastBlockTime) / float64(n)
82 func (s *movingAverageETA) NextAmount(n int, wdd ...time.Duration) {
83 var workDuration time.Duration
84 for _, wd := range wdd {
85 workDuration = wd
86 }
87 lastItemEstimate := float64(workDuration) / float64(n)
10488 s.average.Add(lastItemEstimate)
10589 }
10690
11498 msg string
11599 wc WC
116100 }{msg, wc}
117 }
118
119 func (s *movingAverageETA) Shutdown() {
120 close(s.sbReceiver)
121 }
122
123 func (s *movingAverageETA) serve() {
124 for now := range s.sbReceiver {
125 s.sbStreamer <- now
126 }
127101 }
128102
129103 // AverageETA decorator.
126126 //
127127 // `average` MovingAverage implementation
128128 //
129 // `sb` is a start block receive channel. It's required by MovingAverage algorithm,
130 // therefore result of time.Now() must be sent to this channel on each iteration
131 // of a start block, right before the actual job. There is no need to close the channel,
132 // as it will be closed automatically on bar completion event.
133 //
134129 // `wcc` optional WC config
135130 //
136 // unitFormat example if UnitKiB chosen:
131 // unitFormat example if UnitKiB is chosen:
137132 //
138133 // "%.1f" = "1.0MiB/s" or "% .1f" = "1.0 MiB/s"
139 func EwmaSpeed(unit int, unitFormat string, age float64, sb chan time.Time, wcc ...WC) Decorator {
140 return MovingAverageSpeed(unit, unitFormat, ewma.NewMovingAverage(age), sb, wcc...)
134 func EwmaSpeed(unit int, unitFormat string, age float64, wcc ...WC) Decorator {
135 return MovingAverageSpeed(unit, unitFormat, ewma.NewMovingAverage(age), wcc...)
141136 }
142137
143138 // MovingAverageSpeed decorator relies on MovingAverage implementation to calculate its average.
148143 //
149144 // `average` MovingAverage implementation
150145 //
151 // `sb` is a start block receive channel. It's required by MovingAverage algorithm,
152 // therefore result of time.Now() must be sent to this channel on each iteration
153 // of a start block, right before the actual job. There is no need to close the channel,
154 // as it will be closed automatically on bar completion event.
155 //
156146 // `wcc` optional WC config
157 func MovingAverageSpeed(unit int, unitFormat string, average MovingAverage, sb chan time.Time, wcc ...WC) Decorator {
158 if sb == nil {
159 panic("start block channel must not be nil")
160 }
147 func MovingAverageSpeed(unit int, unitFormat string, average MovingAverage, wcc ...WC) Decorator {
161148 var wc WC
162149 for _, widthConf := range wcc {
163150 wc = widthConf
168155 unitFormat: unitFormat,
169156 wc: wc,
170157 average: average,
171 sbReceiver: sb,
172 sbStreamer: make(chan time.Time),
173 }
174 go d.serve()
158 }
175159 return d
176160 }
177161
180164 unitFormat string
181165 wc WC
182166 average ewma.MovingAverage
183 sbReceiver chan time.Time
184 sbStreamer chan time.Time
185167 onComplete *struct {
186168 msg string
187169 wc WC
205187 return s.wc.FormatMsg(str, widthAccumulator, widthDistributor)
206188 }
207189
208 func (s *movingAverageSpeed) NextAmount(n int) {
209 sb := <-s.sbStreamer
210 speed := float64(n) / time.Since(sb).Seconds()
190 func (s *movingAverageSpeed) NextAmount(n int, wdd ...time.Duration) {
191 var workDuration time.Duration
192 for _, wd := range wdd {
193 workDuration = wd
194 }
195 speed := float64(n) / workDuration.Seconds()
211196 s.average.Add(speed)
212197 }
213198
223208 }{msg, wc}
224209 }
225210
226 func (s *movingAverageSpeed) Shutdown() {
227 close(s.sbReceiver)
228 }
229
230 func (s *movingAverageSpeed) serve() {
231 for now := range s.sbReceiver {
232 s.sbStreamer <- now
233 }
234 }
235
236211 // AverageSpeed decorator with dynamic unit measure adjustment.
237212 //
238213 // `unit` one of [0|UnitKiB|UnitKB] zero for no unit
241216 //
242217 // `wcc` optional WC config
243218 //
244 // unitFormat example if UnitKiB chosen:
219 // unitFormat example if UnitKiB is chosen:
245220 //
246221 // "%.1f" = "1.0MiB/s" or "% .1f" = "1.0 MiB/s"
247222 func AverageSpeed(unit int, unitFormat string, wcc ...WC) Decorator {
2222
2323 total := 100
2424 name := "Single Bar:"
25 sbEta := make(chan time.Time)
2625 // adding a single bar
2726 bar := p.AddBar(int64(total),
2827 mpb.PrependDecorators(
3130 // replace ETA decorator with "done" message, OnComplete event
3231 decor.OnComplete(
3332 // ETA decorator with ewma age of 60, and width reservation of 4
34 decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WC{W: 4}), "done",
33 decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WC{W: 4}), "done",
3534 ),
3635 ),
3736 mpb.AppendDecorators(decor.Percentage()),
3938 // simulating some work
4039 max := 100 * time.Millisecond
4140 for i := 0; i < total; i++ {
42 // update start block time, required for ETA calculation
43 sbEta <- time.Now()
41 start := time.Now()
4442 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
45 // increment by 1 (there is bar.IncrBy(int) method, if needed)
46 bar.Increment()
43 // ewma based decorators require work duration measurement
44 bar.IncrBy(1, time.Since(start))
4745 }
4846 // wait for our bar to complete and flush
4947 p.Wait()
3131
3232 for i := 0; i < numBars; i++ {
3333 name := fmt.Sprintf("Bar#%d:", i)
34 sbEta := make(chan time.Time)
3534 bar := p.AddBar(int64(total),
3635 mpb.PrependDecorators(
3736 decor.Name(name),
38 decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WCSyncSpace),
37 decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WCSyncSpace),
3938 ),
4039 mpb.AppendDecorators(
4140 decor.Percentage(decor.WC{W: 5}),
4645 defer wg.Done()
4746 max := 100 * time.Millisecond
4847 for !bar.Completed() {
49 sbEta <- time.Now()
48 start := time.Now()
5049 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
51 bar.Increment()
50 // ewma based decorators require work duration measurement
51 bar.IncrBy(1, time.Since(start))
5252 }
5353 }()
5454 }
3535 ),
3636 mpb.AppendDecorators(decor.Percentage(decor.WC{W: 5})),
3737 )
38 go newTask(wg, b, i+1, nil)
38 go newTask(wg, b, i+1)
3939 bars = append(bars, b)
4040 }
4141
4343 doneWg.Add(1)
4444 i := i
4545 go func() {
46 sbEta := make(chan time.Time)
4746 task := fmt.Sprintf("Task#%02d:", i)
4847 job := "installing"
4948 // preparing delayed bars
5453 decor.Name(task, decor.WC{W: len(task) + 1, C: decor.DidentRight}),
5554 decor.OnComplete(decor.Name(job, decor.WCSyncSpaceR), "done!", decor.WCSyncSpaceR),
5655 decor.OnComplete(
57 decor.EwmaETA(decor.ET_STYLE_MMSS, 60, sbEta, decor.WCSyncWidth), "", decor.WCSyncSpace,
56 decor.EwmaETA(decor.ET_STYLE_MMSS, 60, decor.WCSyncWidth), "", decor.WCSyncSpace,
5857 ),
5958 ),
6059 mpb.AppendDecorators(
6362 )
6463 // waiting for download to complete, before starting install job
6564 downloadWgg[i].Wait()
66 go newTask(doneWg, b, numBars-i, sbEta)
65 go newTask(doneWg, b, numBars-i)
6766 }()
6867 }
6968
7069 p.Wait()
7170 }
7271
73 func newTask(wg *sync.WaitGroup, b *mpb.Bar, incrBy int, sbCh chan<- time.Time) {
72 func newTask(wg *sync.WaitGroup, b *mpb.Bar, incrBy int) {
7473 defer wg.Done()
7574 max := 100 * time.Millisecond
7675 for !b.Completed() {
77 if sbCh != nil {
78 sbCh <- time.Now()
79 }
76 start := time.Now()
8077 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
81 b.IncrBy(incrBy)
78 // ewma based decorators require work duration measurement
79 b.IncrBy(incrBy, time.Since(start))
8280 }
8381 }
77 "os"
88 "path/filepath"
99 "sync"
10 "time"
1110
1211 "github.com/vbauerster/mpb"
1312 "github.com/vbauerster/mpb/decor"
5756 return
5857 }
5958
60 sbEta := make(chan time.Time)
6159 // create bar with appropriate decorators
6260 bar := p.AddBar(size, mpb.BarPriority(n),
6361 mpb.PrependDecorators(
6563 decor.CountersKibiByte("%6.1f / %6.1f", decor.WCSyncWidth),
6664 ),
6765 mpb.AppendDecorators(
68 decor.EwmaETA(decor.ET_STYLE_HHMMSS, 1024*4, sbEta, decor.WCSyncWidth),
66 decor.EwmaETA(decor.ET_STYLE_HHMMSS, 1024*4, decor.WCSyncWidth),
6967 decor.AverageSpeed(decor.UnitKiB, "% .2f"),
7068 ),
7169 )
7270
7371 // create proxy reader
74 reader := bar.ProxyReader(resp.Body, sbEta)
72 reader := bar.ProxyReader(resp.Body)
7573 // and copy from reader
7674 _, err = io.Copy(dest, reader)
7775
4242 mpb.WithRefreshRate(180*time.Millisecond),
4343 )
4444
45 sbEta := make(chan time.Time)
4645 bar := p.AddBar(size,
4746 mpb.PrependDecorators(
4847 decor.CountersKibiByte("% 6.1f / % 6.1f"),
4948 ),
5049 mpb.AppendDecorators(
51 decor.EwmaETA(decor.ET_STYLE_MMSS, 1024*8, sbEta),
50 decor.EwmaETA(decor.ET_STYLE_MMSS, 1024*8),
5251 decor.Name(" ] "),
5352 decor.AverageSpeed(decor.UnitKiB, "% .2f"),
5453 ),
5554 )
5655
5756 // create proxy reader
58 reader := bar.ProxyReader(resp.Body, sbEta)
57 reader := bar.ProxyReader(resp.Body)
5958
6059 // and copy from reader, ignoring errors
6160 io.Copy(dest, reader)
2828 bOption = mpb.BarRemoveOnComplete()
2929 }
3030
31 sbEta := make(chan time.Time)
3231 b := p.AddBar(int64(total), mpb.BarID(i),
3332 bOption,
3433 mpb.PrependDecorators(
3534 decor.Name(name),
36 decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WCSyncSpace),
35 decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WCSyncSpace),
3736 ),
3837 mpb.AppendDecorators(decor.Percentage()),
3938 )
4140 defer wg.Done()
4241 max := 100 * time.Millisecond
4342 for i := 0; i < total; i++ {
44 sbEta <- time.Now()
43 start := time.Now()
4544 if b.ID() == 2 && i == 42 {
4645 p.Abort(b)
4746 return
4847 }
4948 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
50 b.Increment()
49 // ewma based decorators require work duration measurement
50 b.IncrBy(1, time.Since(start))
5151 }
5252 }()
5353 }
2121
2222 for i := 0; i < numBars; i++ {
2323 name := fmt.Sprintf("Bar#%d:", i)
24 sbEta := make(chan time.Time)
2524 bar := p.AddBar(int64(total),
2625 mpb.PrependDecorators(
2726 // simple name decorator
3332 // replace ETA decorator with "done" message, OnComplete event
3433 decor.OnComplete(
3534 // ETA decorator with ewma age of 60
36 decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta), "done",
35 decor.EwmaETA(decor.ET_STYLE_GO, 60), "done",
3736 ),
3837 ),
3938 )
4241 defer wg.Done()
4342 max := 100 * time.Millisecond
4443 for i := 0; i < total; i++ {
45 sbEta <- time.Now()
44 start := time.Now()
4645 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
47 bar.Increment()
46 // ewma based decorators require work duration measurement
47 bar.IncrBy(1, time.Since(start))
4848 }
4949 }()
5050 }
1919
2020 total := 100
2121 name := "Single Bar:"
22 sbEta := make(chan time.Time)
2322 // adding a single bar
2423 bar := p.AddBar(int64(total),
2524 mpb.PrependDecorators(
2827 // replace ETA decorator with "done" message, OnComplete event
2928 decor.OnComplete(
3029 // ETA decorator with ewma age of 60, and width reservation of 4
31 decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WC{W: 4}), "done",
30 decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WC{W: 4}), "done",
3231 ),
3332 ),
3433 mpb.AppendDecorators(decor.Percentage()),
3635 // simulating some work
3736 max := 100 * time.Millisecond
3837 for i := 0; i < total; i++ {
39 // update start block time, required for ETA calculation
40 sbEta <- time.Now()
38 start := time.Now()
4139 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
42 // increment by 1 (there is bar.IncrBy(int) method, if needed)
43 bar.Increment()
40 // ewma based decorators require work duration measurement
41 bar.IncrBy(1, time.Since(start))
4442 }
4543 // wait for our bar to complete and flush
4644 p.Wait()
2525 if i != 1 {
2626 name = fmt.Sprintf("Bar#%d:", i)
2727 }
28 sbEta := make(chan time.Time)
2928 b := p.AddBar(int64(total),
3029 mpb.PrependDecorators(
3130 decor.Name(name, decor.WCSyncWidth),
3231 decor.CountersNoUnit("%d / %d", decor.WCSyncSpace),
3332 ),
3433 mpb.AppendDecorators(
35 decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WC{W: 3}),
34 decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WC{W: 3}),
3635 ),
3736 )
3837 go func() {
3938 defer wg.Done()
4039 max := 100 * time.Millisecond
4140 for i := 0; i < total; i++ {
42 sbEta <- time.Now()
41 start := time.Now()
4342 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
4443 if i&1 == 1 {
4544 priority := total - int(b.Current())
4645 p.UpdateBarPriority(b, priority)
4746 }
48 b.Increment()
47 // ewma based decorators require work duration measurement
48 b.IncrBy(1, time.Since(start))
4949 }
5050 }()
5151 }
1818 }
1919
2020 func main() {
21
2221 var wg sync.WaitGroup
2322 p := mpb.New(mpb.WithWaitGroup(&wg))
2423 wg.Add(totalBars)
2625 for i := 0; i < totalBars; i++ {
2726 name := fmt.Sprintf("Bar#%02d: ", i)
2827 total := rand.Intn(320) + 10
29 sbEta := make(chan time.Time)
3028 bar := p.AddBar(int64(total),
3129 mpb.PrependDecorators(
3230 decor.Name(name),
33 decor.EwmaETA(decor.ET_STYLE_GO, 60, sbEta, decor.WCSyncSpace),
31 decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WCSyncSpace),
3432 ),
3533 mpb.AppendDecorators(
3634 decor.Percentage(decor.WC{W: 5}),
4139 defer wg.Done()
4240 max := 100 * time.Millisecond
4341 for !bar.Completed() {
44 sbEta <- time.Now()
42 start := time.Now()
4543 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
46 bar.Increment()
44 // ewma based decorators require work duration measurement
45 bar.IncrBy(1, time.Since(start))
4746 }
4847 }()
4948 }
77 // Reader is io.Reader wrapper, for proxy read bytes
88 type Reader struct {
99 io.Reader
10 bar *Bar
11 sbChannels []chan<- time.Time
10 bar *Bar
1211 }
1312
1413 func (r *Reader) Read(p []byte) (int, error) {
15 select {
16 case <-r.bar.done:
17 default:
18 for _, ch := range r.sbChannels {
19 ch <- time.Now()
20 }
21 }
14 start := time.Now()
2215 n, err := r.Reader.Read(p)
23 r.bar.IncrBy(n)
16 r.bar.IncrBy(n, time.Since(start))
2417 return n, err
2518 }
2619