Codebase list golang-github-vbauerster-mpb / 44b3ab5
Refactoring: Progress.Stop() to Progress.Wait() Vladimir Bauer 8 years ago
25 changed file(s) with 122 addition(s) and 114 deletion(s). Raw diff Collapse all Expand all
6363 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
6464 bar.Increment()
6565 }
66 // Gracefully shutdown mpb's monitor goroutine
67 p.Stop()
66 // Wait for all bars to complete
67 p.Wait()
6868 ```
6969
7070 #### [Rendering multiple bars](examples/simple/main.go)
9898 }
9999 }()
100100 }
101 // Gracefully shutdown mpb's monitor goroutine
102 p.Stop()
101 // Wait for all bars to complete
102 p.Wait()
103103 ```
104104
105105 #### [Dynamic Total](examples/dynTotal/main.go)
44 "fmt"
55 "io"
66 "strings"
7 "sync"
87 "time"
98 "unicode/utf8"
109
3837 done chan struct{}
3938 shutdown chan struct{}
4039
41 // cacheState is used after done is closed
40 // it's guaranted that cacheState isn't nil, after done channel is closed
4241 cacheState *bState
4342 }
4443
7877 }
7978 )
8079
81 func newBar(id int, total int64, wg *sync.WaitGroup, cancel <-chan struct{}, options ...BarOption) *Bar {
80 func newBar(id int, total int64, cancel <-chan struct{}, options ...BarOption) *Bar {
8281 if total <= 0 {
8382 total = time.Now().Unix()
8483 }
104103 shutdown: make(chan struct{}),
105104 }
106105
107 go b.serve(s, wg, cancel)
106 go b.serve(s, cancel)
108107 return b
109108 }
110109
289288 }
290289 }
291290
292 func (b *Bar) serve(s *bState, wg *sync.WaitGroup, cancel <-chan struct{}) {
291 func (b *Bar) serve(s *bState, cancel <-chan struct{}) {
293292 for {
294293 select {
295294 case op := <-b.operateState:
300299 case <-b.shutdown:
301300 b.cacheState = s
302301 close(b.done)
303 wg.Done()
304302 return
305303 }
306304 }
3636 bar.Increment()
3737 }
3838
39 p.Stop()
39 p.Wait()
4040
4141 gotWidth := utf8.RuneCount(buf.Bytes())
4242 if gotWidth != tc.expected {
5959 count++
6060 }
6161
62 p.Stop()
62 p.Wait()
6363 if count != total {
6464 t.Errorf("got count: %d, expected %d\n", count, total)
6565 }
8080 }(bars[i])
8181 }
8282
83 p.Stop()
83 p.Wait()
8484 for wantID, bar := range bars {
8585 gotID := bar.ID()
8686 if gotID != wantID {
111111 time.Sleep(10 * time.Millisecond)
112112 }
113113
114 p.Stop()
114 p.Wait()
115115
116116 wantBar := fmt.Sprintf("[%s%s]",
117117 strings.Repeat(string(refillChar), till-1),
151151 }()
152152 }
153153
154 p.Stop()
154 p.Wait()
155155
156156 wantPanic = fmt.Sprintf("b#%02d panic: %v", 2, wantPanic)
157157
1010 for i := 0; i < total; i++ {
1111 bar.Increment()
1212 }
13 p.Stop()
13 p.Wait()
1414 }
1515
1616 func BenchmarkSingleBar100(b *testing.B) {
3737 for i := 0; i < b.N; i++ {
3838 bar.Increment()
3939 }
40 p.Stop()
40 p.Wait()
4141 }
4242 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
4343 bar.Increment()
4444 }
45
46 p.Stop()
45 // Wait for all bars to complete
46 p.Wait()
4747 }
4848
4949 func ExampleBar_Completed() {
5656 bar.Increment()
5757 }
5858
59 p.Stop()
59 p.Wait()
6060 }
5151 }()
5252 }
5353
54 p.Stop()
55 fmt.Println("stop")
54 p.Wait()
55 fmt.Println("done")
5656 }
4848 }()
4949 }
5050
51 p.Stop()
52 fmt.Println("stop")
51 p.Wait()
52 fmt.Println("done")
5353 }
5151 bar.Increment()
5252 }
5353
54 p.Stop()
54 p.Wait()
5555 }
2727 go download(&wg, p, name, url, i)
2828 }
2929
30 p.Stop()
31 fmt.Println("Finished")
30 p.Wait()
31 fmt.Println("done")
3232 }
3333
3434 func download(wg *sync.WaitGroup, p *mpb.Progress, name, url string, n int) {
4848 // and copy from reader, ignoring errors
4949 io.Copy(dest, reader)
5050
51 p.Stop() // if you omit this line, rendering bars goroutine will quit early
52 fmt.Println("Finished")
51 p.Wait() // if you omit this line, rendering bars goroutine will quit early
52 fmt.Println("done")
5353 }
3838 }()
3939 }
4040
41 p.Stop()
41 p.Wait()
4242 }
4444 }()
4545 }
4646
47 p.Stop()
48 fmt.Println("stop")
47 p.Wait()
48 fmt.Println("done")
4949 }
4545 }()
4646 }
4747
48 p.Stop()
49 fmt.Println("stop")
48 p.Wait()
49 fmt.Println("done")
5050 }
4848 }()
4949 }
5050
51 p.Stop()
52 fmt.Println("stop")
51 p.Wait()
52 fmt.Println("done")
5353 }
4343 }
4444 }()
4545 }
46 // Gracefully shutdown mpb's monitor goroutine
47 p.Stop()
46 // Wait for all bars to complete
47 p.Wait()
4848 }
4242 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
4343 bar.Increment()
4444 }
45 // Gracefully shutdown mpb's monitor goroutine
46 p.Stop()
45 // Wait for all bars to complete
46 p.Wait()
4747 }
4848 }()
4949 }
5050
51 p.Stop()
52 fmt.Println("stop")
51 p.Wait()
52 fmt.Println("done")
5353 }
4646 }()
4747 }
4848
49 p.Stop()
50 fmt.Println("stop")
49 p.Wait()
50 fmt.Println("done")
5151 }
1414 type ProgressOption func(*pState)
1515
1616 // WithWaitGroup provides means to have a single joint point.
17 // If *sync.WaitGroup is provided, you can safely call just p.Stop()
17 // If *sync.WaitGroup is provided, you can safely call just p.Wait()
1818 // without calling Wait() on provided *sync.WaitGroup.
1919 // Makes sense when there are more than one bar to render.
2020 func WithWaitGroup(wg *sync.WaitGroup) ProgressOption {
2121 return func(s *pState) {
22 s.ewg = wg
22 s.uwg = wg
2323 }
2424 }
2525
6161 }
6262 }
6363
64 // WithShutdownNotifier provided chanel will be closed, inside p.Stop() call
64 // WithShutdownNotifier provided chanel will be closed, after all bars have been rendered.
6565 func WithShutdownNotifier(ch chan struct{}) ProgressOption {
6666 return func(s *pState) {
6767 s.shutdownNotifier = ch
2020
2121 // Progress represents the container that renders Progress bars
2222 type Progress struct {
23 // wg for internal rendering sync
24 wg *sync.WaitGroup
25 // External wg
26 ewg *sync.WaitGroup
27
2823 operateState chan func(*pState)
2924 done chan struct{}
30 shutdown chan struct{}
31 once sync.Once
32
33 cacheHeap *priorityQueue
3425 }
3526
3627 type (
3728 // progress state, which may contain several bars
3829 pState struct {
39 bHeap *priorityQueue
40 heapUpdated bool
41 idCounter int
42 width int
43 format string
44 rr time.Duration
45 ewg *sync.WaitGroup
46 cw *cwriter.Writer
47 ticker *time.Ticker
48 interceptors []func(io.Writer)
49
30 bHeap *priorityQueue
31 heapUpdated bool
32 idCounter int
33 width int
34 format string
35 rr time.Duration
36 cw *cwriter.Writer
37 ticker *time.Ticker
38
39 // following are provided by user
40 uwg *sync.WaitGroup
41 cancel <-chan struct{}
5042 shutdownNotifier chan struct{}
51 cancel <-chan struct{}
43 interceptors []func(io.Writer)
5244 }
5345 widthSyncer struct {
5446 // Public for easy testing
8072 }
8173
8274 p := &Progress{
83 ewg: s.ewg,
84 wg: new(sync.WaitGroup),
8575 operateState: make(chan func(*pState)),
8676 done: make(chan struct{}),
87 shutdown: make(chan struct{}),
8877 }
8978 go p.serve(s)
9079 return p
9281
9382 // AddBar creates a new progress bar and adds to the container.
9483 func (p *Progress) AddBar(total int64, options ...BarOption) *Bar {
95 p.wg.Add(1)
9684 result := make(chan *Bar, 1)
9785 select {
9886 case p.operateState <- func(s *pState) {
9987 options = append(options, barWidth(s.width), barFormat(s.format))
100 b := newBar(s.idCounter, total, p.wg, s.cancel, options...)
88 b := newBar(s.idCounter, total, s.cancel, options...)
10189 heap.Push(s.bHeap, b)
10290 s.heapUpdated = true
10391 s.idCounter++
135123 }:
136124 return <-result
137125 case <-p.done:
138 return p.cacheHeap.Len()
139 }
140 }
141
142 // Stop is a way to gracefully shutdown mpb's rendering goroutine.
143 // It is NOT for cancellation (use mpb.WithContext for cancellation purposes).
144 // If *sync.WaitGroup has been provided via mpb.WithWaitGroup(), its Wait()
145 // method will be called first.
126 return 0
127 }
128 }
129
130 // Wait first waits for all bars to complete, then waits for user provided WaitGroup, if any.
131 // It's optional to call, in other words if you don't call Progress.Wait(),
132 // it's not guaranted that all bars will be flushed completely to the underlying io.Writer.
133 func (p *Progress) Wait() {
134 <-p.done
135 }
136
137 // Stop deprecated, use Progress.Wait instead.
146138 func (p *Progress) Stop() {
147 if p.ewg != nil {
148 p.ewg.Wait()
149 }
150 // first wait for all bars to quit
151 p.wg.Wait()
152 p.once.Do(func() {
153 close(p.shutdown)
154 })
155 <-p.done
139 p.Wait()
156140 }
157141
158142 func newWidthSyncer(timeout <-chan struct{}, numBars, numColumn int) *widthSyncer {
204188 r := <-br.ready
205189 _, err = s.cw.ReadFrom(r)
206190 if !br.bar.completed && r.toComplete {
191 close(br.bar.shutdown)
207192 br.bar.completed = true
208 close(br.bar.shutdown)
209193 }
210194 if r.toRemove {
211195 s.heapUpdated = heap.Remove(s.bHeap, br.bar.index) != nil
235219 return slice
236220 }
237221
222 func (s *pState) waitAll() {
223 for s.bHeap.Len() > 0 {
224 b := heap.Pop(s.bHeap).(*Bar)
225 <-b.done
226 }
227 if s.uwg != nil {
228 s.uwg.Wait()
229 }
230 }
231
238232 func calcMax(slice []int) int {
239233 if len(slice) == 0 {
240234 return 0
3434
3535 time.AfterFunc(100*time.Millisecond, cancel)
3636
37 p.Stop()
37 p.Wait()
3838 for _, bar := range bars {
3939 if bar.Current() >= bar.Total() {
4040 t.Errorf("bar %d: total = %d, current = %d\n", bar.ID(), bar.Total(), bar.Current())
4040 if err != nil {
4141 fmt.Fprintln(os.Stderr, err)
4242 }
43 var completed int
44 for i := 0; i < s.bHeap.Len(); i++ {
45 b := (*s.bHeap)[i]
46 if b.completed {
47 completed++
48 }
49 }
50 if completed == s.bHeap.Len() {
51 s.ticker.Stop()
52 signal.Stop(winch)
53 s.waitAll()
54 if s.shutdownNotifier != nil {
55 close(s.shutdownNotifier)
56 }
57 close(p.done)
58 return
59 }
4360 case <-winch:
4461 tw, _, _ := cwriter.TermSize()
4562 err := s.writeAndFlush(tw-tw/8, numP, numA)
5572 case <-resumeTicker:
5673 s.ticker = time.NewTicker(s.rr)
5774 resumeTicker = nil
58 case <-p.shutdown:
59 s.ticker.Stop()
60 signal.Stop(winch)
61 p.cacheHeap = s.bHeap
62 close(p.done)
63 if s.shutdownNotifier != nil {
64 close(s.shutdownNotifier)
65 }
66 return
6775 }
6876 }
6977 }
3939 }
4040
4141 b.Complete()
42 p.Stop()
42 p.Wait()
4343 }
4444
4545 func TestRemoveBars(t *testing.T) {
7171 }
7272 }()
7373 }
74 p.Stop()
74 p.Wait()
7575 }
7676
7777 func TestWithCancel(t *testing.T) {
100100 close(cancel)
101101 })
102102
103 p.Stop()
103 p.Wait()
104104 for _, bar := range bars {
105105 if bar.Current() >= bar.Total() {
106106 t.Errorf("bar %d: total = %d, current = %d\n", bar.ID(), bar.Total(), bar.Current())
138138
139139 wg.Wait()
140140 close(cancel)
141 p.Stop()
141 p.Wait()
142142
143143 for _, r := range customFormat {
144144 if !bytes.ContainsRune(buf.Bytes(), r) {
163163 bar.Increment()
164164 }
165165
166 p.Stop()
166 p.Wait()
167167
168168 got := buf.String()
169169 want := fmt.Sprintf("[%s]", strings.Repeat("=", customWidth-2))
3030 if err != nil {
3131 fmt.Fprintln(os.Stderr, err)
3232 }
33 case <-p.shutdown:
34 s.ticker.Stop()
35 p.cacheHeap = s.bHeap
36 close(p.done)
37 if s.shutdownNotifier != nil {
38 close(s.shutdownNotifier)
33 var completed int
34 for i := 0; i < s.bHeap.Len(); i++ {
35 b := (*s.bHeap)[i]
36 if b.completed {
37 completed++
38 }
3939 }
40 return
40 if completed == s.bHeap.Len() {
41 s.ticker.Stop()
42 s.waitAll()
43 if s.shutdownNotifier != nil {
44 close(s.shutdownNotifier)
45 }
46 close(p.done)
47 return
48 }
4149 }
4250 }
4351 }
3434 t.Errorf("Error copying from reader: %+v\n", err)
3535 }
3636
37 p.Stop()
37 p.Wait()
3838
3939 if written != int64(total) {
4040 t.Errorf("Expected written: %d, got: %d\n", total, written)