Codebase list golang-github-vbauerster-mpb / d4d9499
address issue #117 Vladimir Bauer 3 years ago
3 changed file(s) with 46 addition(s) and 45 deletion(s). Raw diff Collapse all Expand all
3434 total int64
3535 current int64
3636 refill int64
37 lastIncrement int64
3837 trimSpace bool
3938 completed bool
4039 aborted bool
224223 func (b *Bar) SetCurrent(current int64) {
225224 select {
226225 case b.operateState <- func(s *bState) {
227 s.lastIncrement = current - s.current
228226 s.current = current
229227 if s.triggerComplete && s.current >= s.total {
230228 s.current = s.total
253251 }
254252 select {
255253 case b.operateState <- func(s *bState) {
256 s.lastIncrement = n
257254 s.current += n
258255 if s.triggerComplete && s.current >= s.total {
259256 s.current = s.total
265262 }
266263 }
267264
268 // DecoratorEwmaUpdate updates all EWMA based decorators. Should be
269 // called on each iteration, because EWMA's unit of measure is an
270 // iteration's duration. Panics if called before *Bar.Incr... family
271 // methods.
272 func (b *Bar) DecoratorEwmaUpdate(dur time.Duration) {
273 select {
274 case b.operateState <- func(s *bState) {
275 if s.lastIncrement > 0 {
276 s.decoratorEwmaUpdate(dur)
277 s.lastIncrement = 0
278 } else {
279 panic("increment required before ewma iteration update")
280 }
281 }:
282 case <-b.done:
283 if b.bs.lastIncrement > 0 {
284 b.bs.decoratorEwmaUpdate(dur)
285 b.bs.lastIncrement = 0
286 }
265 // EwmaIncrement is a shorthand for b.EwmaIncrInt64(1, dur).
266 func (b *Bar) EwmaIncrement(dur time.Duration) {
267 b.EwmaIncrInt64(1, dur)
268 }
269
270 // EwmaIncrBy is a shorthand for b.EwmaIncrInt64(int64(n), dur).
271 func (b *Bar) EwmaIncrBy(n int, dur time.Duration) {
272 b.EwmaIncrInt64(int64(n), dur)
273 }
274
275 // EwmaIncrInt64 increments progress by amount of n and updates EWMA based
276 // decorators by dur of a single iteration.
277 func (b *Bar) EwmaIncrInt64(n int64, dur time.Duration) {
278 if n <= 0 {
279 return
280 }
281 select {
282 case b.operateState <- func(s *bState) {
283 s.current += n
284 s.ewmaUpdate(n, dur)
285 if s.triggerComplete && s.current >= s.total {
286 s.current = s.total
287 s.completed = true
288 go b.forceRefresh(s.manualRefresh)
289 }
290 }:
291 case <-b.done:
287292 }
288293 }
289294
564569 }
565570 }
566571
567 func (s bState) decoratorEwmaUpdate(dur time.Duration) {
572 func (s bState) ewmaUpdate(n int64, dur time.Duration) {
568573 var wg sync.WaitGroup
569574 for i := 0; i < len(s.ewmaDecorators); i++ {
570575 switch d := s.ewmaDecorators[i]; i {
571576 case len(s.ewmaDecorators) - 1:
572 d.EwmaUpdate(s.lastIncrement, dur)
577 d.EwmaUpdate(n, dur)
573578 default:
574579 wg.Add(1)
575580 go func() {
576 d.EwmaUpdate(s.lastIncrement, dur)
581 d.EwmaUpdate(n, dur)
577582 wg.Done()
578583 }()
579584 }
2626 }
2727
2828 type ewmaProxyReader struct {
29 proxyReader
29 io.ReadCloser
30 bar *Bar
3031 }
3132
3233 func (x ewmaProxyReader) Read(p []byte) (int, error) {
3334 start := time.Now()
34 n, err := x.proxyReader.Read(p)
35 if n > 0 {
36 x.bar.DecoratorEwmaUpdate(time.Since(start))
37 }
35 n, err := x.ReadCloser.Read(p)
36 x.bar.EwmaIncrBy(n, time.Since(start))
3837 return n, err
3938 }
4039
4544 func (x ewmaProxyWriterTo) WriteTo(w io.Writer) (int64, error) {
4645 start := time.Now()
4746 n, err := x.ReadCloser.(io.WriterTo).WriteTo(w)
48 if n > 0 {
49 x.bar.DecoratorEwmaUpdate(time.Since(start))
50 }
47 x.bar.EwmaIncrInt64(n, time.Since(start))
5148 return n, err
5249 }
5350
5451 func newProxyReader(r io.Reader, b *Bar, hasEwma bool) io.ReadCloser {
55 pr := proxyReader{toReadCloser(r), b}
52 rc := toReadCloser(r)
5653 if hasEwma {
57 epr := ewmaProxyReader{pr}
54 epr := ewmaProxyReader{rc, b}
5855 if _, ok := r.(io.WriterTo); ok {
5956 return ewmaProxyWriterTo{epr}
6057 }
6158 return epr
6259 }
60 pr := proxyReader{rc, b}
6361 if _, ok := r.(io.WriterTo); ok {
6462 return proxyWriterTo{pr}
6563 }
2626 }
2727
2828 type ewmaProxyWriter struct {
29 proxyWriter
29 io.WriteCloser
30 bar *Bar
3031 }
3132
3233 func (x ewmaProxyWriter) Write(p []byte) (int, error) {
3334 start := time.Now()
34 n, err := x.proxyWriter.Write(p)
35 if n > 0 {
36 x.bar.DecoratorEwmaUpdate(time.Since(start))
37 }
35 n, err := x.WriteCloser.Write(p)
36 x.bar.EwmaIncrBy(n, time.Since(start))
3837 return n, err
3938 }
4039
4544 func (x ewmaProxyReaderFrom) ReadFrom(r io.Reader) (int64, error) {
4645 start := time.Now()
4746 n, err := x.WriteCloser.(io.ReaderFrom).ReadFrom(r)
48 if n > 0 {
49 x.bar.DecoratorEwmaUpdate(time.Since(start))
50 }
47 x.bar.EwmaIncrInt64(n, time.Since(start))
5148 return n, err
5249 }
5350
5451 func newProxyWriter(w io.Writer, b *Bar, hasEwma bool) io.WriteCloser {
55 pw := proxyWriter{toWriteCloser(w), b}
52 wc := toWriteCloser(w)
5653 if hasEwma {
57 epw := ewmaProxyWriter{pw}
54 epw := ewmaProxyWriter{wc, b}
5855 if _, ok := w.(io.ReaderFrom); ok {
5956 return ewmaProxyReaderFrom{epw}
6057 }
6158 return epw
6259 }
60 pw := proxyWriter{wc, b}
6361 if _, ok := w.(io.ReaderFrom); ok {
6462 return proxyReaderFrom{pw}
6563 }