diff --git a/bar.go b/bar.go index ad55067..4d14b47 100644 --- a/bar.go +++ b/bar.go @@ -5,6 +5,7 @@ "fmt" "io" "strings" + "sync" "time" "unicode/utf8" @@ -33,6 +34,8 @@ // completed is set from master Progress goroutine only completed bool + + removeOnComplete bool operateState chan func(*bState) // done is closed by Bar's goroutine, after cacheState is written @@ -55,8 +58,8 @@ totalAutoIncrBy int64 trimLeftSpace bool trimRightSpace bool - completed bool - removed bool + toComplete bool + removeOnComplete bool dynamic bool startTime time.Time timeElapsed time.Duration @@ -72,14 +75,14 @@ char rune till int64 } - renderedReader struct { - io.Reader + renderedState struct { + bar *Bar + reader io.Reader toComplete bool - toRemove bool } ) -func newBar(id int, total int64, cancel <-chan struct{}, options ...BarOption) *Bar { +func newBar(wg *sync.WaitGroup, id int, total int64, cancel <-chan struct{}, options ...BarOption) *Bar { if total <= 0 { total = time.Now().Unix() } @@ -99,13 +102,14 @@ s.bufA = bytes.NewBuffer(make([]byte, 0, s.width/2)) b := &Bar{ - priority: id, - operateState: make(chan func(*bState)), - done: make(chan struct{}), - shutdown: make(chan struct{}), - } - - go b.serve(s, cancel) + priority: id, + removeOnComplete: s.removeOnComplete, + operateState: make(chan func(*bState)), + done: make(chan struct{}), + shutdown: make(chan struct{}), + } + + go b.serve(wg, s, cancel) return b } @@ -133,6 +137,85 @@ // Increment is a shorthand for b.IncrBy(1) func (b *Bar) Increment() { b.IncrBy(1) +} + +// ResumeFill fills bar with different r rune, +// from 0 to till amount of progress. +func (b *Bar) ResumeFill(r rune, till int64) { + if till < 1 { + return + } + select { + case b.operateState <- func(s *bState) { s.refill = &refill{r, till} }: + case <-b.done: + } +} + +// NumOfAppenders returns current number of append decorators +func (b *Bar) NumOfAppenders() int { + result := make(chan int, 1) + select { + case b.operateState <- func(s *bState) { result <- len(s.aDecorators) }: + return <-result + case <-b.done: + return len(b.cacheState.aDecorators) + } +} + +// NumOfPrependers returns current number of prepend decorators +func (b *Bar) NumOfPrependers() int { + result := make(chan int, 1) + select { + case b.operateState <- func(s *bState) { result <- len(s.pDecorators) }: + return <-result + case <-b.done: + return len(b.cacheState.pDecorators) + } +} + +// ID returs id of the bar +func (b *Bar) ID() int { + result := make(chan int, 1) + select { + case b.operateState <- func(s *bState) { result <- s.id }: + return <-result + case <-b.done: + return b.cacheState.id + } +} + +// Current returns bar's current number, in other words sum of all increments. +func (b *Bar) Current() int64 { + result := make(chan int64, 1) + select { + case b.operateState <- func(s *bState) { result <- s.current }: + return <-result + case <-b.done: + return b.cacheState.current + } +} + +// Total returns bar's total number. +func (b *Bar) Total() int64 { + result := make(chan int64, 1) + select { + case b.operateState <- func(s *bState) { result <- s.total }: + return <-result + case <-b.done: + return b.cacheState.total + } +} + +// SetTotal sets total dynamically. The final param indicates the very last set, +// in other words you should set it to true when total is determined. +func (b *Bar) SetTotal(total int64, final bool) { + select { + case b.operateState <- func(s *bState) { + s.total = total + s.dynamic = !final + }: + case <-b.done: + } } // IncrBy increments progress bar by amount of n @@ -142,7 +225,7 @@ } select { case b.operateState <- func(s *bState) { - if s.completed { + if s.toComplete { return } next := time.Now() @@ -162,87 +245,8 @@ } } else if s.current >= s.total { s.current = s.total - s.completed = true - } - }: - case <-b.done: - } -} - -// ResumeFill fills bar with different r rune, -// from 0 to till amount of progress. -func (b *Bar) ResumeFill(r rune, till int64) { - if till < 1 { - return - } - select { - case b.operateState <- func(s *bState) { s.refill = &refill{r, till} }: - case <-b.done: - } -} - -// NumOfAppenders returns current number of append decorators -func (b *Bar) NumOfAppenders() int { - result := make(chan int, 1) - select { - case b.operateState <- func(s *bState) { result <- len(s.aDecorators) }: - return <-result - case <-b.done: - return len(b.cacheState.aDecorators) - } -} - -// NumOfPrependers returns current number of prepend decorators -func (b *Bar) NumOfPrependers() int { - result := make(chan int, 1) - select { - case b.operateState <- func(s *bState) { result <- len(s.pDecorators) }: - return <-result - case <-b.done: - return len(b.cacheState.pDecorators) - } -} - -// ID returs id of the bar -func (b *Bar) ID() int { - result := make(chan int, 1) - select { - case b.operateState <- func(s *bState) { result <- s.id }: - return <-result - case <-b.done: - return b.cacheState.id - } -} - -// Current returns bar's current number, in other words sum of all increments. -func (b *Bar) Current() int64 { - result := make(chan int64, 1) - select { - case b.operateState <- func(s *bState) { result <- s.current }: - return <-result - case <-b.done: - return b.cacheState.current - } -} - -// Total returns bar's total number. -func (b *Bar) Total() int64 { - result := make(chan int64, 1) - select { - case b.operateState <- func(s *bState) { result <- s.total }: - return <-result - case <-b.done: - return b.cacheState.total - } -} - -// SetTotal sets total dynamically. The final param indicates the very last set, -// in other words you should set it to true when total is determined. -func (b *Bar) SetTotal(total int64, final bool) { - select { - case b.operateState <- func(s *bState) { - s.total = total - s.dynamic = !final + s.toComplete = true + } }: case <-b.done: } @@ -252,40 +256,21 @@ func (b *Bar) Completed() bool { result := make(chan bool, 1) select { - case b.operateState <- func(s *bState) { result <- s.completed }: - return <-result - case <-b.done: - return b.cacheState.completed - } -} - -// Complete stops bar's progress tracking, but doesn't remove the bar from rendering queue. -// If you need to remove, invoke Progress.RemoveBar(*Bar) instead. -func (b *Bar) Complete() { - b.askToComplete(false) -} - -func (b *Bar) askToComplete(toRemove bool) bool { - result := make(chan bool, 1) - select { - case b.operateState <- func(s *bState) { - s.removed = toRemove - s.completed = true - result <- true - }: - return <-result - case <-b.done: - return false - } -} - -func (b *Bar) serve(s *bState, cancel <-chan struct{}) { + case b.operateState <- func(s *bState) { result <- s.toComplete }: + return <-result + case <-b.done: + return b.cacheState.toComplete + } +} + +func (b *Bar) serve(wg *sync.WaitGroup, s *bState, cancel <-chan struct{}) { + defer wg.Done() for { select { case op := <-b.operateState: op(s) case <-cancel: - s.completed = true + s.toComplete = true cancel = nil case <-b.shutdown: b.cacheState = s @@ -295,8 +280,8 @@ } } -func (b *Bar) render(tw int, pSyncer, aSyncer *widthSyncer) <-chan *renderedReader { - ch := make(chan *renderedReader, 1) +func (b *Bar) render(tw int, pSyncer, aSyncer *widthSyncer) <-chan *renderedState { + ch := make(chan *renderedState, 1) go func() { select { @@ -308,13 +293,12 @@ s.panicMsg = fmt.Sprintf("b#%02d panic: %v\n", s.id, p) s.pDecorators = nil s.aDecorators = nil - s.completed = true + s.toComplete = true r = strings.NewReader(s.panicMsg) } - ch <- &renderedReader{r, s.completed, s.removed} + ch <- &renderedState{b, r, s.toComplete} }() - s.draw(tw, pSyncer, aSyncer) - r = io.MultiReader(s.bufP, s.bufB, s.bufA) + r = s.draw(tw, pSyncer, aSyncer) }: case <-b.done: s := b.cacheState @@ -322,14 +306,58 @@ if s.panicMsg != "" { r = strings.NewReader(s.panicMsg) } else { - s.draw(tw, pSyncer, aSyncer) - r = io.MultiReader(s.bufP, s.bufB, s.bufA) + r = s.draw(tw, pSyncer, aSyncer) } - ch <- &renderedReader{r, s.completed, s.removed} + ch <- &renderedState{b, r, s.toComplete} } }() return ch +} + +func (s *bState) draw(termWidth int, pSyncer, aSyncer *widthSyncer) io.Reader { + if termWidth <= 0 { + termWidth = s.width + } + + stat := newStatistics(s) + + // render prepend functions to the left of the bar + s.bufP.Reset() + for i, f := range s.pDecorators { + s.bufP.WriteString(f(stat, pSyncer.Accumulator[i], pSyncer.Distributor[i])) + } + + if !s.trimLeftSpace { + s.bufP.WriteByte(' ') + } + + // render append functions to the right of the bar + s.bufA.Reset() + if !s.trimRightSpace { + s.bufA.WriteByte(' ') + } + + for i, f := range s.aDecorators { + s.bufA.WriteString(f(stat, aSyncer.Accumulator[i], aSyncer.Distributor[i])) + } + + prependCount := utf8.RuneCount(s.bufP.Bytes()) + appendCount := utf8.RuneCount(s.bufA.Bytes()) + + if termWidth > s.width { + s.fillBar(s.width) + } else { + s.fillBar(termWidth - prependCount - appendCount) + } + barCount := utf8.RuneCount(s.bufB.Bytes()) + totalCount := prependCount + barCount + appendCount + if totalCount > termWidth { + s.fillBar(termWidth - prependCount - appendCount) + } + s.bufA.WriteByte('\n') + + return io.MultiReader(s.bufP, s.bufB, s.bufA) } func (s *bState) updateTimePerItemEstimate(amount int, now, next time.Time) { @@ -337,49 +365,6 @@ lastItemEstimate := float64(lastBlockTime) / float64(amount) s.timePerItem = time.Duration((s.etaAlpha * lastItemEstimate) + (1-s.etaAlpha)*float64(s.timePerItem)) s.blockStartTime = next -} - -func (s *bState) draw(termWidth int, pSyncer, aSyncer *widthSyncer) { - if termWidth <= 0 { - termWidth = s.width - } - - stat := newStatistics(s) - - // render prepend functions to the left of the bar - s.bufP.Reset() - for i, f := range s.pDecorators { - s.bufP.WriteString(f(stat, pSyncer.Accumulator[i], pSyncer.Distributor[i])) - } - - if !s.trimLeftSpace { - s.bufP.WriteByte(' ') - } - - // render append functions to the right of the bar - s.bufA.Reset() - if !s.trimRightSpace { - s.bufA.WriteByte(' ') - } - - for i, f := range s.aDecorators { - s.bufA.WriteString(f(stat, aSyncer.Accumulator[i], aSyncer.Distributor[i])) - } - - prependCount := utf8.RuneCount(s.bufP.Bytes()) - appendCount := utf8.RuneCount(s.bufA.Bytes()) - - if termWidth > s.width { - s.fillBar(s.width) - } else { - s.fillBar(termWidth - prependCount - appendCount) - } - barCount := utf8.RuneCount(s.bufB.Bytes()) - totalCount := prependCount + barCount + appendCount - if totalCount > termWidth { - s.fillBar(termWidth - prependCount - appendCount) - } - s.bufA.WriteByte('\n') } func (s *bState) fillBar(width int) { @@ -428,8 +413,7 @@ func newStatistics(s *bState) *decor.Statistics { return &decor.Statistics{ ID: s.id, - Completed: s.completed, - Removed: s.removed, + Completed: s.toComplete, Total: s.total, Current: s.current, StartTime: s.startTime, diff --git a/bar_option.go b/bar_option.go index c51aa04..dd47529 100644 --- a/bar_option.go +++ b/bar_option.go @@ -75,6 +75,13 @@ } } +// BarRemoveOnComplete is a flag, which tells whether the intention is to remove the bar after completion. +func BarRemoveOnComplete() BarOption { + return func(s *bState) { + s.removeOnComplete = true + } +} + func barWidth(w int) BarOption { return func(s *bState) { s.width = w diff --git a/decor/decorators.go b/decor/decorators.go index 2d5975f..2f178a8 100644 --- a/decor/decorators.go +++ b/decor/decorators.go @@ -31,7 +31,6 @@ type Statistics struct { ID int Completed bool - Removed bool Total int64 Current int64 StartTime time.Time diff --git a/progress.go b/progress.go index 25d20af..aad0a4b 100644 --- a/progress.go +++ b/progress.go @@ -21,6 +21,8 @@ // Progress represents the container that renders Progress bars type Progress struct { + wg *sync.WaitGroup + uwg *sync.WaitGroup operateState chan func(*pState) done chan struct{} } @@ -28,15 +30,16 @@ type ( // progress state, which may contain several bars pState struct { - bHeap *priorityQueue - heapUpdated bool - zeroWait bool - idCounter int - width int - format string - rr time.Duration - cw *cwriter.Writer - ticker *time.Ticker + bHeap *priorityQueue + shutdownPending []*Bar + heapUpdated bool + zeroWait bool + idCounter int + width int + format string + rr time.Duration + cw *cwriter.Writer + ticker *time.Ticker // following are provided by user uwg *sync.WaitGroup @@ -48,10 +51,6 @@ // Public for easy testing Accumulator []chan int Distributor []chan int - } - barRendering struct { - bar *Bar - ready <-chan *renderedReader } ) @@ -74,6 +73,8 @@ } p := &Progress{ + uwg: s.uwg, + wg: new(sync.WaitGroup), operateState: make(chan func(*pState)), done: make(chan struct{}), } @@ -83,11 +84,12 @@ // AddBar creates a new progress bar and adds to the container. func (p *Progress) AddBar(total int64, options ...BarOption) *Bar { + p.wg.Add(1) result := make(chan *Bar, 1) select { case p.operateState <- func(s *pState) { options = append(options, barWidth(s.width), barFormat(s.format)) - b := newBar(s.idCounter, total, s.cancel, options...) + b := newBar(p.wg, s.idCounter, total, s.cancel, options...) heap.Push(s.bHeap, b) s.heapUpdated = true s.idCounter++ @@ -100,11 +102,18 @@ } } -// RemoveBar removes the bar at next render cycle -func (p *Progress) RemoveBar(b *Bar) bool { - result := b.askToComplete(true) - <-b.done - return result +// Abort is only effective while bar progress is running, +// it means remove bar now without waiting for its completion. +// If bar is already completed, there is nothing to abort. +// If you need to remove bar after completion, use BarRemoveOnComplete BarOption. +func (p *Progress) Abort(b *Bar) { + select { + case p.operateState <- func(s *pState) { + s.heapUpdated = heap.Remove(s.bHeap, b.index) != nil + s.shutdownPending = append(s.shutdownPending, b) + }: + case <-p.done: + } } // UpdateBarPriority provides a way to change bar's order position. @@ -131,14 +140,17 @@ // It's optional to call, in other words if you don't call Progress.Wait(), // it's not guaranteed that all bars will be flushed completely to the underlying io.Writer. func (p *Progress) Wait() { - if p.BarCount() == 0 { - select { - case p.operateState <- func(s *pState) { s.zeroWait = true }: - case <-p.done: - } - return - } - <-p.done + if p.uwg != nil { + p.uwg.Wait() + } + + p.wg.Wait() + + select { + case p.operateState <- func(s *pState) { s.zeroWait = true }: + <-p.done + case <-p.done: + } } func newWidthSyncer(timeout <-chan struct{}, numBars, numColumn int) *widthSyncer { @@ -186,16 +198,17 @@ close(timeout) }) - for _, br := range s.renderByPriority(tw, pSyncer, aSyncer) { - r := <-br.ready - if !r.toRemove { - _, err = s.cw.ReadFrom(r) - } else { - s.heapUpdated = heap.Remove(s.bHeap, br.bar.index) != nil - } - if !br.bar.completed && r.toComplete { - br.bar.completed = true - defer close(br.bar.shutdown) + for _, ch := range s.renderByPriority(tw, pSyncer, aSyncer) { + rs := <-ch + _, err = s.cw.ReadFrom(rs.reader) + if !rs.bar.completed && rs.toComplete { + rs.bar.completed = true + if rs.bar.removeOnComplete { + s.heapUpdated = heap.Remove(s.bHeap, rs.bar.index) != nil + } + defer func() { + s.shutdownPending = append(s.shutdownPending, rs.bar) + }() } } @@ -206,30 +219,22 @@ if e := s.cw.Flush(); err == nil { err = e } + + for i := len(s.shutdownPending) - 1; i >= 0; i-- { + close(s.shutdownPending[i].shutdown) + s.shutdownPending = s.shutdownPending[:i] + } return } -func (s *pState) renderByPriority(tw int, pSyncer, aSyncer *widthSyncer) []*barRendering { - slice := make([]*barRendering, 0, s.bHeap.Len()) +func (s *pState) renderByPriority(tw int, pSyncer, aSyncer *widthSyncer) []<-chan *renderedState { + pp := make([]<-chan *renderedState, 0, s.bHeap.Len()) for s.bHeap.Len() > 0 { b := heap.Pop(s.bHeap).(*Bar) defer heap.Push(s.bHeap, b) - slice = append(slice, &barRendering{ - bar: b, - ready: b.render(tw, pSyncer, aSyncer), - }) - } - return slice -} - -func (s *pState) waitAll() { - for s.bHeap.Len() > 0 { - b := heap.Pop(s.bHeap).(*Bar) - <-b.done - } - if s.uwg != nil { - s.uwg.Wait() - } + pp = append(pp, b.render(tw, pSyncer, aSyncer)) + } + return pp } func calcMax(slice []int) int { diff --git a/progress_posix.go b/progress_posix.go index 7f44b59..56c9cb7 100644 --- a/progress_posix.go +++ b/progress_posix.go @@ -6,7 +6,6 @@ "fmt" "os" "os/signal" - "runtime" "syscall" "time" @@ -19,21 +18,22 @@ var numP, numA int var timer *time.Timer - var resumeTicker <-chan time.Time - resumeDelay := 300 * time.Millisecond + var tickerResumer <-chan time.Time + resumeDelay := s.rr * 2 for { select { case op := <-p.operateState: op(s) case <-s.ticker.C: - if s.bHeap.Len() == 0 { - if s.zeroWait { - close(p.done) - return + if s.zeroWait { + s.ticker.Stop() + signal.Stop(winch) + if s.shutdownNotifier != nil { + close(s.shutdownNotifier) } - runtime.Gosched() - break + close(p.done) + return } if s.heapUpdated { numP = s.bHeap.maxNumP() @@ -45,24 +45,12 @@ if err != nil { fmt.Fprintln(os.Stderr, err) } - var completed int - for i := 0; i < s.bHeap.Len(); i++ { - b := (*s.bHeap)[i] - if b.completed { - completed++ - } + case <-winch: + if s.heapUpdated { + numP = s.bHeap.maxNumP() + numA = s.bHeap.maxNumA() + s.heapUpdated = false } - if completed == s.bHeap.Len() { - s.ticker.Stop() - signal.Stop(winch) - s.waitAll() - if s.shutdownNotifier != nil { - close(s.shutdownNotifier) - } - close(p.done) - return - } - case <-winch: tw, _, _ := cwriter.TermSize() err := s.writeAndFlush(tw-tw/8, numP, numA) if err != nil { @@ -73,10 +61,11 @@ } s.ticker.Stop() timer = time.NewTimer(resumeDelay) - resumeTicker = timer.C - case <-resumeTicker: + tickerResumer = timer.C + case <-tickerResumer: s.ticker = time.NewTicker(s.rr) - resumeTicker = nil + tickerResumer = nil + timer = nil } } } diff --git a/progress_windows.go b/progress_windows.go index 2299208..4c4eb6f 100644 --- a/progress_windows.go +++ b/progress_windows.go @@ -5,7 +5,6 @@ import ( "fmt" "os" - "runtime" "github.com/vbauerster/mpb/cwriter" ) @@ -17,13 +16,13 @@ case op := <-p.operateState: op(s) case <-s.ticker.C: - if s.bHeap.Len() == 0 { - if s.zeroWait { - close(p.done) - return + if s.zeroWait { + s.ticker.Stop() + if s.shutdownNotifier != nil { + close(s.shutdownNotifier) } - runtime.Gosched() - break + close(p.done) + return } if s.heapUpdated { numP = s.bHeap.maxNumP() @@ -35,22 +34,6 @@ if err != nil { fmt.Fprintln(os.Stderr, err) } - var completed int - for i := 0; i < s.bHeap.Len(); i++ { - b := (*s.bHeap)[i] - if b.completed { - completed++ - } - } - if completed == s.bHeap.Len() { - s.ticker.Stop() - s.waitAll() - if s.shutdownNotifier != nil { - close(s.shutdownNotifier) - } - close(p.done) - return - } } } }