Codebase list golang-github-vbauerster-mpb / e4a3ff2
pwroxywriter address issue #116 Vladimir Bauer 3 years ago
3 changed file(s) with 234 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
9696 select {
9797 case b.operateState <- func(s *bState) { result <- len(s.ewmaDecorators) != 0 }:
9898 return newProxyReader(r, b, <-result)
99 case <-b.done:
100 return nil
101 }
102 }
103
104 // ProxyWriter wraps io.Writer with metrics required for progress tracking.
105 // If bar is already completed or aborted, returns nil.
106 // Panics if `w` is nil.
107 func (b *Bar) ProxyWriter(w io.Writer) io.WriteCloser {
108 if w == nil {
109 panic("expected non nil io.Writer")
110 }
111 result := make(chan bool)
112 select {
113 case b.operateState <- func(s *bState) { result <- len(s.ewmaDecorators) != 0 }:
114 return newProxyWriter(w, b, <-result)
99115 case <-b.done:
100116 return nil
101117 }
0 package mpb
1
2 import (
3 "io"
4 "time"
5 )
6
7 type proxyWriter struct {
8 io.WriteCloser
9 bar *Bar
10 }
11
12 func (x proxyWriter) Write(p []byte) (int, error) {
13 n, err := x.WriteCloser.Write(p)
14 x.bar.IncrBy(n)
15 return n, err
16 }
17
18 type proxyReaderFrom struct {
19 proxyWriter
20 }
21
22 func (x proxyReaderFrom) ReadFrom(r io.Reader) (int64, error) {
23 n, err := x.WriteCloser.(io.ReaderFrom).ReadFrom(r)
24 x.bar.IncrInt64(n)
25 return n, err
26 }
27
28 type ewmaProxyWriter struct {
29 proxyWriter
30 }
31
32 func (x ewmaProxyWriter) Write(p []byte) (int, error) {
33 start := time.Now()
34 n, err := x.proxyWriter.Write(p)
35 if n > 0 {
36 x.bar.DecoratorEwmaUpdate(time.Since(start))
37 }
38 return n, err
39 }
40
41 type ewmaProxyReaderFrom struct {
42 ewmaProxyWriter
43 }
44
45 func (x ewmaProxyReaderFrom) ReadFrom(r io.Reader) (int64, error) {
46 start := time.Now()
47 n, err := x.WriteCloser.(io.ReaderFrom).ReadFrom(r)
48 if n > 0 {
49 x.bar.DecoratorEwmaUpdate(time.Since(start))
50 }
51 return n, err
52 }
53
54 func newProxyWriter(w io.Writer, b *Bar, hasEwma bool) io.WriteCloser {
55 pw := proxyWriter{toWriteCloser(w), b}
56 if hasEwma {
57 epw := ewmaProxyWriter{pw}
58 if _, ok := w.(io.ReaderFrom); ok {
59 return ewmaProxyReaderFrom{epw}
60 }
61 return epw
62 }
63 if _, ok := w.(io.ReaderFrom); ok {
64 return proxyReaderFrom{pw}
65 }
66 return pw
67 }
68
69 func toWriteCloser(w io.Writer) io.WriteCloser {
70 if wc, ok := w.(io.WriteCloser); ok {
71 return wc
72 }
73 return toNopWriteCloser(w)
74 }
75
76 func toNopWriteCloser(w io.Writer) io.WriteCloser {
77 if _, ok := w.(io.ReaderFrom); ok {
78 return nopWriteCloserReaderFrom{w}
79 }
80 return nopWriteCloser{w}
81 }
82
83 type nopWriteCloser struct {
84 io.Writer
85 }
86
87 func (nopWriteCloser) Close() error { return nil }
88
89 type nopWriteCloserReaderFrom struct {
90 io.Writer
91 }
92
93 func (nopWriteCloserReaderFrom) Close() error { return nil }
94
95 func (c nopWriteCloserReaderFrom) ReadFrom(r io.Reader) (int64, error) {
96 return c.Writer.(io.ReaderFrom).ReadFrom(r)
97 }
0 package mpb_test
1
2 import (
3 "bytes"
4 "io"
5 "strings"
6 "testing"
7
8 "github.com/vbauerster/mpb/v8"
9 )
10
11 type testWriter struct {
12 io.Writer
13 called bool
14 }
15
16 func (w *testWriter) Write(p []byte) (n int, err error) {
17 w.called = true
18 return w.Writer.Write(p)
19 }
20
21 func TestProxyWriter(t *testing.T) {
22 p := mpb.New(mpb.WithOutput(io.Discard))
23
24 var buf bytes.Buffer
25 tw := &testWriter{&buf, false}
26
27 bar := p.AddBar(int64(len(content)))
28
29 _, err := io.Copy(bar.ProxyWriter(tw), strings.NewReader(content))
30 if err != nil {
31 t.Errorf("io.Copy: %s\n", err.Error())
32 }
33
34 p.Wait()
35
36 if !tw.called {
37 t.Error("Read not called")
38 }
39
40 if got := buf.String(); got != content {
41 t.Errorf("Expected content: %s, got: %s\n", content, got)
42 }
43 }
44
45 type testWriteCloser struct {
46 io.Writer
47 called bool
48 }
49
50 func (w *testWriteCloser) Close() error {
51 w.called = true
52 return nil
53 }
54
55 func TestProxyWriteCloser(t *testing.T) {
56 p := mpb.New(mpb.WithOutput(io.Discard))
57
58 var buf bytes.Buffer
59 tw := &testWriteCloser{&buf, false}
60
61 bar := p.AddBar(int64(len(content)))
62
63 wc := bar.ProxyWriter(tw)
64 _, err := io.Copy(wc, strings.NewReader(content))
65 if err != nil {
66 t.Errorf("io.Copy: %s\n", err.Error())
67 }
68 _ = wc.Close()
69
70 p.Wait()
71
72 if !tw.called {
73 t.Error("Close not called")
74 }
75 }
76
77 type testWriterReadFrom struct {
78 io.Writer
79 called bool
80 }
81
82 func (w *testWriterReadFrom) ReadFrom(r io.Reader) (n int64, err error) {
83 w.called = true
84 return w.Writer.(io.ReaderFrom).ReadFrom(r)
85 }
86
87 type dumbReader struct {
88 r *strings.Reader
89 }
90
91 func (r dumbReader) Read(p []byte) (int, error) {
92 return r.r.Read(p)
93 }
94
95 func TestProxyWriterReadFrom(t *testing.T) {
96 p := mpb.New(mpb.WithOutput(io.Discard))
97
98 var buf bytes.Buffer
99 tw := &testWriterReadFrom{&buf, false}
100
101 bar := p.New(int64(len(content)), mpb.NopStyle())
102
103 // To trigger ReadFrom, WriteTo needs to be hidden, hence a dumb wrapper
104 dr := dumbReader{strings.NewReader(content)}
105 _, err := io.Copy(bar.ProxyWriter(tw), dr)
106 if err != nil {
107 t.Errorf("io.Copy: %s\n", err.Error())
108 }
109
110 p.Wait()
111
112 if !tw.called {
113 t.Error("ReadFrom not called")
114 }
115
116 if got := buf.String(); got != content {
117 t.Errorf("Expected content: %s, got: %s\n", content, got)
118 }
119 }