Codebase list golang-github-vbauerster-mpb / 0fcdb29
proxyReader: don't call ewma method if there are no ewma decorators Vladimir Bauer 6 years ago
2 changed file(s) with 80 addition(s) and 48 deletion(s). Raw diff Collapse all Expand all
44 "context"
55 "fmt"
66 "io"
7 "io/ioutil"
87 "log"
98 "strings"
109 "time"
3332 priority int // used by heap
3433 index int // used by heap
3534
36 extendedLines int
37 toShutdown bool
38 toDrop bool
39 noPop bool
40 hasAverageDecorators bool
41 hasEwmaDecorators bool
42 operateState chan func(*bState)
43 frameCh chan io.Reader
44 syncTableCh chan [][]chan int
45 completed chan bool
35 extendedLines int
36 toShutdown bool
37 toDrop bool
38 noPop bool
39 hasEwmaDecorators bool
40 operateState chan func(*bState)
41 frameCh chan io.Reader
42 syncTableCh chan [][]chan int
43 completed chan bool
4644
4745 // cancel is called either by user or on complete event
4846 cancel func()
112110 }
113111
114112 // ProxyReader wraps r with metrics required for progress tracking.
113 // Panics if r is nil.
115114 func (b *Bar) ProxyReader(r io.Reader) io.ReadCloser {
116115 if r == nil {
117 return nil
118 }
119 rc, ok := r.(io.ReadCloser)
120 if !ok {
121 rc = ioutil.NopCloser(r)
122 }
123 prox := &proxyReader{rc, b, time.Now()}
124 if wt, ok := r.(io.WriterTo); ok {
125 return &proxyWriterTo{prox, wt}
126 }
127 return prox
116 panic("expected non nil io.Reader")
117 }
118 return newProxyReader(r, b)
128119 }
129120
130121 // ID returs id of the bar.
247238 // iteration's duration. Panics if called before *Bar.Incr... family
248239 // methods.
249240 func (b *Bar) DecoratorEwmaUpdate(dur time.Duration) {
250 if !b.hasEwmaDecorators {
251 return
252 }
253241 select {
254242 case b.operateState <- func(s *bState) {
255243 ewmaIterationUpdate(false, s, dur)
263251 // if you need to adjust start time of all average based decorators
264252 // or after progress resume.
265253 func (b *Bar) DecoratorAverageAdjust(start time.Time) {
266 if !b.hasAverageDecorators {
267 return
268 }
269254 select {
270255 case b.operateState <- func(s *bState) {
271256 for _, d := range s.averageDecorators {
388373 s.ewmaDecorators = ewmaDecorators
389374 s.shutdownListeners = shutdownListeners
390375 }
391 b.hasAverageDecorators = len(averageDecorators) != 0
392376 b.hasEwmaDecorators = len(ewmaDecorators) != 0
393377 }
394378
11
22 import (
33 "io"
4 "io/ioutil"
45 "time"
56 )
67
78 type proxyReader struct {
89 io.ReadCloser
910 bar *Bar
10 iT time.Time
1111 }
1212
13 func (prox *proxyReader) Read(p []byte) (int, error) {
14 n, err := prox.ReadCloser.Read(p)
15 if n > 0 {
16 prox.bar.IncrBy(n)
17 prox.bar.DecoratorEwmaUpdate(time.Since(prox.iT))
18 prox.iT = time.Now()
19 }
13 func (x *proxyReader) Read(p []byte) (int, error) {
14 n, err := x.ReadCloser.Read(p)
15 x.bar.IncrBy(n)
2016 if err == io.EOF {
21 go prox.bar.SetTotal(0, true)
17 go x.bar.SetTotal(0, true)
2218 }
2319 return n, err
2420 }
2521
2622 type proxyWriterTo struct {
27 *proxyReader
28 wt io.WriterTo
23 io.ReadCloser // *proxyReader
24 wt io.WriterTo
25 bar *Bar
2926 }
3027
31 func (prox *proxyWriterTo) WriteTo(w io.Writer) (int64, error) {
32 n, err := prox.wt.WriteTo(w)
33 if n > 0 {
34 prox.bar.IncrInt64(n)
35 prox.bar.DecoratorEwmaUpdate(time.Since(prox.iT))
36 prox.iT = time.Now()
37 }
28 func (x *proxyWriterTo) WriteTo(w io.Writer) (int64, error) {
29 n, err := x.wt.WriteTo(w)
30 x.bar.IncrInt64(n)
3831 if err == io.EOF {
39 go prox.bar.SetTotal(0, true)
32 go x.bar.SetTotal(0, true)
4033 }
4134 return n, err
4235 }
36
37 type ewmaProxyReader struct {
38 io.ReadCloser // *proxyReader
39 bar *Bar
40 iT time.Time
41 }
42
43 func (x *ewmaProxyReader) Read(p []byte) (int, error) {
44 n, err := x.ReadCloser.Read(p)
45 if n > 0 {
46 x.bar.DecoratorEwmaUpdate(time.Since(x.iT))
47 x.iT = time.Now()
48 }
49 return n, err
50 }
51
52 type ewmaProxyWriterTo struct {
53 io.ReadCloser // *ewmaProxyReader
54 wt io.WriterTo // *proxyWriterTo
55 bar *Bar
56 iT time.Time
57 }
58
59 func (x *ewmaProxyWriterTo) WriteTo(w io.Writer) (int64, error) {
60 n, err := x.wt.WriteTo(w)
61 if n > 0 {
62 x.bar.DecoratorEwmaUpdate(time.Since(x.iT))
63 x.iT = time.Now()
64 }
65 return n, err
66 }
67
68 func newProxyReader(r io.Reader, bar *Bar) io.ReadCloser {
69 wt, isWriterTo := r.(io.WriterTo)
70 rc := toReadCloser(r)
71 rc = &proxyReader{rc, bar}
72
73 if bar.hasEwmaDecorators {
74 now := time.Now()
75 rc = &ewmaProxyReader{rc, bar, now}
76 if isWriterTo {
77 rc = &ewmaProxyWriterTo{rc, wt, bar, now}
78 }
79 } else if isWriterTo {
80 rc = &proxyWriterTo{rc, wt, bar}
81 }
82 return rc
83 }
84
85 func toReadCloser(r io.Reader) io.ReadCloser {
86 if rc, ok := r.(io.ReadCloser); ok {
87 return rc
88 }
89 return ioutil.NopCloser(r)
90 }