diff --git a/README.md b/README.md index 1583413..499fa41 100644 --- a/README.md +++ b/README.md @@ -64,8 +64,8 @@ time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) bar.Increment() } - // Gracefully shutdown mpb's monitor goroutine - p.Stop() + // Wait for all bars to complete + p.Wait() ``` #### [Rendering multiple bars](examples/simple/main.go) @@ -99,8 +99,8 @@ } }() } - // Gracefully shutdown mpb's monitor goroutine - p.Stop() + // Wait for all bars to complete + p.Wait() ``` #### [Dynamic Total](examples/dynTotal/main.go) diff --git a/bar.go b/bar.go index 2089135..80a6f74 100644 --- a/bar.go +++ b/bar.go @@ -5,7 +5,6 @@ "fmt" "io" "strings" - "sync" "time" "unicode/utf8" @@ -39,7 +38,7 @@ done chan struct{} shutdown chan struct{} - // cacheState is used after done is closed + // it's guaranted that cacheState isn't nil, after done channel is closed cacheState *bState } @@ -79,7 +78,7 @@ } ) -func newBar(id int, total int64, wg *sync.WaitGroup, cancel <-chan struct{}, options ...BarOption) *Bar { +func newBar(id int, total int64, cancel <-chan struct{}, options ...BarOption) *Bar { if total <= 0 { total = time.Now().Unix() } @@ -105,7 +104,7 @@ shutdown: make(chan struct{}), } - go b.serve(s, wg, cancel) + go b.serve(s, cancel) return b } @@ -290,7 +289,7 @@ } } -func (b *Bar) serve(s *bState, wg *sync.WaitGroup, cancel <-chan struct{}) { +func (b *Bar) serve(s *bState, cancel <-chan struct{}) { for { select { case op := <-b.operateState: @@ -301,7 +300,6 @@ case <-b.shutdown: b.cacheState = s close(b.done) - wg.Done() return } } diff --git a/bar_test.go b/bar_test.go index 1b8a5d3..9bd3702 100644 --- a/bar_test.go +++ b/bar_test.go @@ -37,7 +37,7 @@ bar.Increment() } - p.Stop() + p.Wait() gotWidth := utf8.RuneCount(buf.Bytes()) if gotWidth != tc.expected { @@ -60,7 +60,7 @@ count++ } - p.Stop() + p.Wait() if count != total { t.Errorf("got count: %d, expected %d\n", count, total) } @@ -81,7 +81,7 @@ }(bars[i]) } - p.Stop() + p.Wait() for wantID, bar := range bars { gotID := bar.ID() if gotID != wantID { @@ -112,7 +112,7 @@ time.Sleep(10 * time.Millisecond) } - p.Stop() + p.Wait() wantBar := fmt.Sprintf("[%s%s]", strings.Repeat(string(refillChar), till-1), @@ -152,7 +152,7 @@ }() } - p.Stop() + p.Wait() wantPanic = fmt.Sprintf("b#%02d panic: %v", 2, wantPanic) diff --git a/barbench_test.go b/barbench_test.go index b93a1c3..c1ea326 100644 --- a/barbench_test.go +++ b/barbench_test.go @@ -11,7 +11,7 @@ for i := 0; i < total; i++ { bar.Increment() } - p.Stop() + p.Wait() } func BenchmarkSingleBar100(b *testing.B) { @@ -38,5 +38,5 @@ for i := 0; i < b.N; i++ { bar.Increment() } - p.Stop() + p.Wait() } diff --git a/example_test.go b/example_test.go index 5887f02..02c8661 100644 --- a/example_test.go +++ b/example_test.go @@ -43,8 +43,8 @@ time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) bar.Increment() } - - p.Stop() + // Wait for all bars to complete + p.Wait() } func ExampleBar_Completed() { @@ -57,5 +57,5 @@ bar.Increment() } - p.Stop() + p.Wait() } diff --git a/examples/cancel/main.go b/examples/cancel/main.go index bdaf555..af07ad1 100644 --- a/examples/cancel/main.go +++ b/examples/cancel/main.go @@ -52,6 +52,6 @@ }() } - p.Stop() - fmt.Println("stop") + p.Wait() + fmt.Println("done") } diff --git a/examples/complete/main.go b/examples/complete/main.go index 4189a25..b400264 100644 --- a/examples/complete/main.go +++ b/examples/complete/main.go @@ -49,6 +49,6 @@ }() } - p.Stop() - fmt.Println("stop") + p.Wait() + fmt.Println("done") } diff --git a/examples/dynTotal/main.go b/examples/dynTotal/main.go index 1df7c4a..fa6754a 100644 --- a/examples/dynTotal/main.go +++ b/examples/dynTotal/main.go @@ -52,5 +52,5 @@ bar.Increment() } - p.Stop() + p.Wait() } diff --git a/examples/io/multiple/main.go b/examples/io/multiple/main.go index d197c09..fb10396 100644 --- a/examples/io/multiple/main.go +++ b/examples/io/multiple/main.go @@ -28,8 +28,8 @@ go download(&wg, p, name, url, i) } - p.Stop() - fmt.Println("Finished") + p.Wait() + fmt.Println("done") } func download(wg *sync.WaitGroup, p *mpb.Progress, name, url string, n int) { diff --git a/examples/io/single/main.go b/examples/io/single/main.go index bf554ef..4984012 100644 --- a/examples/io/single/main.go +++ b/examples/io/single/main.go @@ -49,6 +49,6 @@ // and copy from reader, ignoring errors io.Copy(dest, reader) - p.Stop() // if you omit this line, rendering bars goroutine will quit early - fmt.Println("Finished") + p.Wait() // if you omit this line, rendering bars goroutine will quit early + fmt.Println("done") } diff --git a/examples/panic/main.go b/examples/panic/main.go index f01df89..f4c7b11 100644 --- a/examples/panic/main.go +++ b/examples/panic/main.go @@ -39,5 +39,5 @@ }() } - p.Stop() + p.Wait() } diff --git a/examples/prependETA/main.go b/examples/prependETA/main.go index 974bc6a..a45af32 100644 --- a/examples/prependETA/main.go +++ b/examples/prependETA/main.go @@ -45,6 +45,6 @@ }() } - p.Stop() - fmt.Println("stop") + p.Wait() + fmt.Println("done") } diff --git a/examples/prependElapsed/main.go b/examples/prependElapsed/main.go index 3cc0ae2..8b90279 100644 --- a/examples/prependElapsed/main.go +++ b/examples/prependElapsed/main.go @@ -46,6 +46,6 @@ }() } - p.Stop() - fmt.Println("stop") + p.Wait() + fmt.Println("done") } diff --git a/examples/remove/main.go b/examples/remove/main.go index babf54d..8abfaea 100644 --- a/examples/remove/main.go +++ b/examples/remove/main.go @@ -49,6 +49,6 @@ }() } - p.Stop() - fmt.Println("stop") + p.Wait() + fmt.Println("done") } diff --git a/examples/simple/main.go b/examples/simple/main.go index 67999ac..82a6409 100644 --- a/examples/simple/main.go +++ b/examples/simple/main.go @@ -44,6 +44,6 @@ } }() } - // Gracefully shutdown mpb's monitor goroutine - p.Stop() + // Wait for all bars to complete + p.Wait() } diff --git a/examples/singleBar/main.go b/examples/singleBar/main.go index 52a63cb..dada40b 100644 --- a/examples/singleBar/main.go +++ b/examples/singleBar/main.go @@ -43,6 +43,6 @@ time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) bar.Increment() } - // Gracefully shutdown mpb's monitor goroutine - p.Stop() + // Wait for all bars to complete + p.Wait() } diff --git a/examples/sort/main.go b/examples/sort/main.go index c6bd673..a97f128 100644 --- a/examples/sort/main.go +++ b/examples/sort/main.go @@ -49,6 +49,6 @@ }() } - p.Stop() - fmt.Println("stop") + p.Wait() + fmt.Println("done") } diff --git a/examples/stress/main.go b/examples/stress/main.go index 6a3d72a..db4efba 100644 --- a/examples/stress/main.go +++ b/examples/stress/main.go @@ -47,6 +47,6 @@ }() } - p.Stop() - fmt.Println("stop") + p.Wait() + fmt.Println("done") } diff --git a/options.go b/options.go index 175a87c..2e009c3 100644 --- a/options.go +++ b/options.go @@ -15,12 +15,12 @@ type ProgressOption func(*pState) // WithWaitGroup provides means to have a single joint point. -// If *sync.WaitGroup is provided, you can safely call just p.Stop() +// If *sync.WaitGroup is provided, you can safely call just p.Wait() // without calling Wait() on provided *sync.WaitGroup. // Makes sense when there are more than one bar to render. func WithWaitGroup(wg *sync.WaitGroup) ProgressOption { return func(s *pState) { - s.ewg = wg + s.uwg = wg } } @@ -62,7 +62,7 @@ } } -// WithShutdownNotifier provided chanel will be closed, inside p.Stop() call +// WithShutdownNotifier provided chanel will be closed, after all bars have been rendered. func WithShutdownNotifier(ch chan struct{}) ProgressOption { return func(s *pState) { s.shutdownNotifier = ch diff --git a/progress.go b/progress.go index d607d0b..2157b24 100644 --- a/progress.go +++ b/progress.go @@ -21,35 +21,27 @@ // Progress represents the container that renders Progress bars type Progress struct { - // wg for internal rendering sync - wg *sync.WaitGroup - // External wg - ewg *sync.WaitGroup - operateState chan func(*pState) done chan struct{} - shutdown chan struct{} - once sync.Once - - cacheHeap *priorityQueue } type ( // progress state, which may contain several bars pState struct { - bHeap *priorityQueue - heapUpdated bool - idCounter int - width int - format string - rr time.Duration - ewg *sync.WaitGroup - cw *cwriter.Writer - ticker *time.Ticker - interceptors []func(io.Writer) - + bHeap *priorityQueue + heapUpdated bool + idCounter int + width int + format string + rr time.Duration + cw *cwriter.Writer + ticker *time.Ticker + + // following are provided by user + uwg *sync.WaitGroup + cancel <-chan struct{} shutdownNotifier chan struct{} - cancel <-chan struct{} + interceptors []func(io.Writer) } widthSyncer struct { // Public for easy testing @@ -81,11 +73,8 @@ } p := &Progress{ - ewg: s.ewg, - wg: new(sync.WaitGroup), operateState: make(chan func(*pState)), done: make(chan struct{}), - shutdown: make(chan struct{}), } go p.serve(s) return p @@ -93,12 +82,11 @@ // AddBar creates a new progress bar and adds to the container. func (p *Progress) AddBar(total int64, options ...BarOption) *Bar { - p.wg.Add(1) result := make(chan *Bar, 1) select { case p.operateState <- func(s *pState) { options = append(options, barWidth(s.width), barFormat(s.format)) - b := newBar(s.idCounter, total, p.wg, s.cancel, options...) + b := newBar(s.idCounter, total, s.cancel, options...) heap.Push(s.bHeap, b) s.heapUpdated = true s.idCounter++ @@ -136,24 +124,20 @@ }: return <-result case <-p.done: - return p.cacheHeap.Len() - } -} - -// Stop is a way to gracefully shutdown mpb's rendering goroutine. -// It is NOT for cancellation (use mpb.WithContext for cancellation purposes). -// If *sync.WaitGroup has been provided via mpb.WithWaitGroup(), its Wait() -// method will be called first. + return 0 + } +} + +// Wait first waits for all bars to complete, then waits for user provided WaitGroup, if any. +// It's optional to call, in other words if you don't call Progress.Wait(), +// it's not guaranted that all bars will be flushed completely to the underlying io.Writer. +func (p *Progress) Wait() { + <-p.done +} + +// Stop deprecated, use Progress.Wait instead. func (p *Progress) Stop() { - if p.ewg != nil { - p.ewg.Wait() - } - // first wait for all bars to quit - p.wg.Wait() - p.once.Do(func() { - close(p.shutdown) - }) - <-p.done + p.Wait() } func newWidthSyncer(timeout <-chan struct{}, numBars, numColumn int) *widthSyncer { @@ -205,8 +189,8 @@ r := <-br.ready _, err = s.cw.ReadFrom(r) if !br.bar.completed && r.toComplete { + close(br.bar.shutdown) br.bar.completed = true - close(br.bar.shutdown) } if r.toRemove { s.heapUpdated = heap.Remove(s.bHeap, br.bar.index) != nil @@ -236,6 +220,16 @@ return slice } +func (s *pState) waitAll() { + for s.bHeap.Len() > 0 { + b := heap.Pop(s.bHeap).(*Bar) + <-b.done + } + if s.uwg != nil { + s.uwg.Wait() + } +} + func calcMax(slice []int) int { if len(slice) == 0 { return 0 diff --git a/progress_go1.7_test.go b/progress_go1.7_test.go index 27d6a86..81a00ec 100644 --- a/progress_go1.7_test.go +++ b/progress_go1.7_test.go @@ -35,7 +35,7 @@ time.AfterFunc(100*time.Millisecond, cancel) - p.Stop() + p.Wait() for _, bar := range bars { if bar.Current() >= bar.Total() { t.Errorf("bar %d: total = %d, current = %d\n", bar.ID(), bar.Total(), bar.Current()) diff --git a/progress_posix.go b/progress_posix.go index 6a3fda7..97c829f 100644 --- a/progress_posix.go +++ b/progress_posix.go @@ -41,6 +41,23 @@ if err != nil { fmt.Fprintln(os.Stderr, err) } + var completed int + for i := 0; i < s.bHeap.Len(); i++ { + b := (*s.bHeap)[i] + if b.completed { + completed++ + } + } + if completed == s.bHeap.Len() { + s.ticker.Stop() + signal.Stop(winch) + s.waitAll() + if s.shutdownNotifier != nil { + close(s.shutdownNotifier) + } + close(p.done) + return + } case <-winch: tw, _, _ := cwriter.TermSize() err := s.writeAndFlush(tw-tw/8, numP, numA) @@ -56,15 +73,6 @@ case <-resumeTicker: s.ticker = time.NewTicker(s.rr) resumeTicker = nil - case <-p.shutdown: - s.ticker.Stop() - signal.Stop(winch) - p.cacheHeap = s.bHeap - close(p.done) - if s.shutdownNotifier != nil { - close(s.shutdownNotifier) - } - return } } } diff --git a/progress_test.go b/progress_test.go index 4b28cb4..1a3505a 100644 --- a/progress_test.go +++ b/progress_test.go @@ -40,7 +40,7 @@ } b.Complete() - p.Stop() + p.Wait() } func TestRemoveBars(t *testing.T) { @@ -72,7 +72,7 @@ } }() } - p.Stop() + p.Wait() } func TestWithCancel(t *testing.T) { @@ -101,7 +101,7 @@ close(cancel) }) - p.Stop() + p.Wait() for _, bar := range bars { if bar.Current() >= bar.Total() { t.Errorf("bar %d: total = %d, current = %d\n", bar.ID(), bar.Total(), bar.Current()) @@ -139,7 +139,7 @@ wg.Wait() close(cancel) - p.Stop() + p.Wait() for _, r := range customFormat { if !bytes.ContainsRune(buf.Bytes(), r) { @@ -164,7 +164,7 @@ bar.Increment() } - p.Stop() + p.Wait() got := buf.String() want := fmt.Sprintf("[%s]", strings.Repeat("=", customWidth-2)) diff --git a/progress_windows.go b/progress_windows.go index 0765724..7cfa269 100644 --- a/progress_windows.go +++ b/progress_windows.go @@ -31,14 +31,22 @@ if err != nil { fmt.Fprintln(os.Stderr, err) } - case <-p.shutdown: - s.ticker.Stop() - p.cacheHeap = s.bHeap - close(p.done) - if s.shutdownNotifier != nil { - close(s.shutdownNotifier) + var completed int + for i := 0; i < s.bHeap.Len(); i++ { + b := (*s.bHeap)[i] + if b.completed { + completed++ + } } - return + if completed == s.bHeap.Len() { + s.ticker.Stop() + s.waitAll() + if s.shutdownNotifier != nil { + close(s.shutdownNotifier) + } + close(p.done) + return + } } } } diff --git a/proxyreader_test.go b/proxyreader_test.go index 23b2416..abcfd42 100644 --- a/proxyreader_test.go +++ b/proxyreader_test.go @@ -35,7 +35,7 @@ t.Errorf("Error copying from reader: %+v\n", err) } - p.Stop() + p.Wait() if written != int64(total) { t.Errorf("Expected written: %d, got: %d\n", total, written)