diff --git a/bar.go b/bar.go index 70f8715..9ae55f5 100644 --- a/bar.go +++ b/bar.go @@ -32,10 +32,12 @@ priority int index int + // the flag is set from Progress monitor goroutine only + completed bool + operateState chan func(*bState) done chan struct{} shutdown chan struct{} - once sync.Once // cacheState is used after done is closed cacheState *bState @@ -54,7 +56,7 @@ trimLeftSpace bool trimRightSpace bool completed bool - aborted bool + removed bool dynamic bool startTime time.Time timeElapsed time.Duration @@ -64,15 +66,16 @@ prependFuncs []decor.DecoratorFunc refill *refill bufP, bufB, bufA *bytes.Buffer - panic string + panicMsg string } refill struct { char rune till int64 } - bufReader struct { + toRenderReader struct { io.Reader - completed bool + toComplete bool + toRemove bool } ) @@ -148,6 +151,9 @@ } select { case b.operateState <- func(s *bState) { + if s.completed { + return + } next := time.Now() if s.current == 0 { s.startTime = next @@ -253,75 +259,85 @@ } } -// InProgress returns true, while progress is running. -// Can be used as condition in for loop -func (b *Bar) InProgress() bool { - select { +// Completed reports whether the bar is in completed state +func (b *Bar) Completed() bool { + result := make(chan bool, 1) + select { + case b.operateState <- func(s *bState) { result <- s.completed }: + return <-result + case <-b.done: + return b.cacheState.completed + } +} + +// Complete stops bar's progress tracking, but doesn't remove the bar from rendering queue. +// If you need to remove, invoke Progress.RemoveBar(*Bar) instead. +func (b *Bar) Complete() { + b.askToComplete(false) +} + +func (b *Bar) askToComplete(toRemove bool) bool { + result := make(chan bool, 1) + select { + case b.operateState <- func(s *bState) { + s.removed = toRemove + s.completed = true + result <- true + }: + return <-result case <-b.done: return false - default: - return true - } -} - -// Complete stops bar's progress tracking, but doesn't remove the bar. -// If you need to remove, call Progress.RemoveBar(*Bar) instead. -func (b *Bar) Complete() { - b.once.Do(func() { - close(b.shutdown) - }) + } } func (b *Bar) serve(s *bState, wg *sync.WaitGroup, cancel <-chan struct{}) { - defer func() { - b.cacheState = s - close(b.done) - wg.Done() - }() for { select { case op := <-b.operateState: op(s) case <-cancel: - s.aborted = true + s.completed = true + cancel = nil + case <-b.shutdown: + b.cacheState = s + close(b.done) + wg.Done() return - case <-b.shutdown: - return - } - } -} - -func (b *Bar) render(tw int, prependWs, appendWs *widthSync) <-chan *bufReader { - ch := make(chan *bufReader, 1) + } + } +} + +func (b *Bar) render(tw int, prependWs, appendWs *widthSync) <-chan *toRenderReader { + ch := make(chan *toRenderReader, 1) go func() { select { case b.operateState <- func(s *bState) { + var r io.Reader defer func() { // recovering if external decorators panic if p := recover(); p != nil { - s.panic = fmt.Sprintf("b#%02d panic: %v\n", s.id, p) + s.panicMsg = fmt.Sprintf("b#%02d panic: %v\n", s.id, p) s.prependFuncs = nil s.appendFuncs = nil - - ch <- &bufReader{strings.NewReader(s.panic), true} + s.completed = true + r = strings.NewReader(s.panicMsg) } - close(ch) + ch <- &toRenderReader{r, s.completed, s.removed} }() s.draw(tw, prependWs, appendWs) - ch <- &bufReader{io.MultiReader(s.bufP, s.bufB, s.bufA), s.completed} + r = io.MultiReader(s.bufP, s.bufB, s.bufA) }: case <-b.done: s := b.cacheState var r io.Reader - if s.panic != "" { - r = strings.NewReader(s.panic) + if s.panicMsg != "" { + r = strings.NewReader(s.panicMsg) } else { s.draw(tw, prependWs, appendWs) r = io.MultiReader(s.bufP, s.bufB, s.bufA) } - ch <- &bufReader{r, false} - close(ch) + ch <- &toRenderReader{r, s.completed, s.removed} } }() @@ -425,7 +441,7 @@ return &decor.Statistics{ ID: s.id, Completed: s.completed, - Aborted: s.aborted, + Removed: s.removed, Total: s.total, Current: s.current, StartTime: s.startTime, diff --git a/decor/decorators.go b/decor/decorators.go index e66108b..b843411 100644 --- a/decor/decorators.go +++ b/decor/decorators.go @@ -31,7 +31,7 @@ type Statistics struct { ID int Completed bool - Aborted bool + Removed bool Total int64 Current int64 StartTime time.Time diff --git a/progress.go b/progress.go index bf9d931..c178348 100644 --- a/progress.go +++ b/progress.go @@ -55,9 +55,9 @@ Listen []chan int Result []chan int } - renderedBar struct { + toRenderSnapshot struct { bar *Bar - pipe <-chan *bufReader + pipe <-chan *toRenderReader } ) @@ -73,7 +73,6 @@ cw: cwriter.New(os.Stdout), rr: prr, ticker: time.NewTicker(prr), - cancel: make(chan struct{}), } for _, opt := range options { @@ -111,23 +110,9 @@ } } -// RemoveBar removes bar at any time. +// RemoveBar removes the bar at next render cycle func (p *Progress) RemoveBar(b *Bar) bool { - result := make(chan bool, 1) - select { - case p.operateState <- func(s *pState) { - if heap.Remove(s.bHeap, b.index) != nil { - s.heapUpdated = true - b.Complete() - result <- true - } else { - result <- false - } - }: - return <-result - case <-p.done: - return false - } + return b.askToComplete(true) } // UpdateBarPriority provides a way to change bar's order position. @@ -216,11 +201,15 @@ prependWs := newWidthSync(wSyncTimeout, s.bHeap.Len(), numP) appendWs := newWidthSync(wSyncTimeout, s.bHeap.Len(), numA) - for _, b := range s.renderByPriority(tw, prependWs, appendWs) { - r := <-b.pipe + for _, trs := range s.renderByPriority(tw, prependWs, appendWs) { + r := <-trs.pipe _, err = s.cw.ReadFrom(r) - if r.completed { - b.bar.Complete() + if !trs.bar.completed && r.toComplete { + trs.bar.completed = true + close(trs.bar.shutdown) + } + if r.toRemove { + heap.Remove(s.bHeap, trs.bar.index) } } @@ -234,12 +223,12 @@ return } -func (s *pState) renderByPriority(tw int, prependWs, appendWs *widthSync) []*renderedBar { - slice := make([]*renderedBar, 0, s.bHeap.Len()) +func (s *pState) renderByPriority(tw int, prependWs, appendWs *widthSync) []*toRenderSnapshot { + slice := make([]*toRenderSnapshot, 0, s.bHeap.Len()) for s.bHeap.Len() > 0 { b := heap.Pop(s.bHeap).(*Bar) defer heap.Push(s.bHeap, b) - slice = append(slice, &renderedBar{ + slice = append(slice, &toRenderSnapshot{ bar: b, pipe: b.render(tw, prependWs, appendWs), }) diff --git a/progress_posix.go b/progress_posix.go index be5bafd..6a3fda7 100644 --- a/progress_posix.go +++ b/progress_posix.go @@ -14,18 +14,8 @@ ) func (p *Progress) serve(s *pState) { - winch := make(chan os.Signal, 1) + winch := make(chan os.Signal, 2) signal.Notify(winch, syscall.SIGWINCH) - - defer func() { - s.ticker.Stop() - signal.Stop(winch) - p.cacheHeap = s.bHeap - close(p.done) - if s.shutdownNotifier != nil { - close(s.shutdownNotifier) - } - }() var numP, numA int var timer *time.Timer @@ -66,9 +56,14 @@ case <-resumeTicker: s.ticker = time.NewTicker(s.rr) resumeTicker = nil - case <-s.cancel: - return case <-p.shutdown: + s.ticker.Stop() + signal.Stop(winch) + p.cacheHeap = s.bHeap + close(p.done) + if s.shutdownNotifier != nil { + close(s.shutdownNotifier) + } return } } diff --git a/progress_test.go b/progress_test.go index a078879..d1b23ef 100644 --- a/progress_test.go +++ b/progress_test.go @@ -19,39 +19,28 @@ } func TestAddBar(t *testing.T) { - p := mpb.New() + p := mpb.New(mpb.Output(ioutil.Discard)) + var wg sync.WaitGroup + wg.Add(1) + b := p.AddBar(80) + go func() { + for i := 0; i < 80; i++ { + if i == 33 { + wg.Done() + } + b.Increment() + time.Sleep(randomDuration(80 * time.Millisecond)) + } + }() + + wg.Wait() count := p.BarCount() - if count != 0 { - t.Errorf("BarCount want: %q, got: %q\n", 0, count) - } - - bar := p.AddBar(100) - - count = p.BarCount() if count != 1 { t.Errorf("BarCount want: %q, got: %q\n", 1, count) } - bar.Complete() - p.Stop() -} - -func TestRemoveBar(t *testing.T) { - p := mpb.New() - - bar := p.AddBar(10) - - if !p.RemoveBar(bar) { - t.Error("RemoveBar failure") - } - - count := p.BarCount() - if count != 0 { - t.Errorf("BarCount want: %q, got: %q\n", 0, count) - } - - bar.Complete() + b.Complete() p.Stop() } @@ -88,49 +77,55 @@ } func TestWithCancel(t *testing.T) { - var wg sync.WaitGroup cancel := make(chan struct{}) - shutdown := make(chan struct{}) p := mpb.New( mpb.Output(ioutil.Discard), mpb.WithCancel(cancel), - mpb.WithShutdownNotifier(shutdown), - mpb.WithWaitGroup(&wg), ) - total := 100 numBars := 3 - wg.Add(numBars) + type sample struct { + id int + total int64 + current int64 + } + + resStream := make(chan *sample, numBars) for i := 0; i < numBars; i++ { - name := fmt.Sprintf("Bar#%d:", i) - bar := p.AddBar(int64(total), mpb.BarID(i), - mpb.PrependDecorators(decor.StaticName(name, len(name), 0))) + bar := p.AddBar(int64(200), + mpb.BarID(i), + mpb.PrependDecorators( + func(s *decor.Statistics, _ chan<- int, _ <-chan int) string { + if s.Completed { + resStream <- &sample{ + id: s.ID, + total: s.Total, + current: s.Current, + } + } + return "" + }, + )) go func() { - defer wg.Done() - for i := 0; i < total; i++ { - select { - case <-cancel: - return - default: - } + for bar.InProgress() { + bar.Increment() time.Sleep(randomDuration(80 * time.Millisecond)) - bar.Increment() } }() } - time.AfterFunc(300*time.Millisecond, func() { + time.AfterFunc(100*time.Millisecond, func() { close(cancel) }) p.Stop() - - select { - case <-shutdown: - case <-time.After(300 * time.Millisecond): - t.Error("ProgressBar didn't stop") + close(resStream) + for res := range resStream { + if res.current >= res.total { + t.Errorf("bar %d: total = %d, current = %d\n", res.id, res.total, res.current) + } } } diff --git a/progress_windows.go b/progress_windows.go index dd45a81..0765724 100644 --- a/progress_windows.go +++ b/progress_windows.go @@ -11,18 +11,7 @@ ) func (p *Progress) serve(s *pState) { - - defer func() { - s.ticker.Stop() - p.cacheHeap = s.bHeap - close(p.done) - if s.shutdownNotifier != nil { - close(s.shutdownNotifier) - } - }() - var numP, numA int - for { select { case op := <-p.operateState: @@ -42,9 +31,13 @@ if err != nil { fmt.Fprintln(os.Stderr, err) } - case <-s.cancel: - return case <-p.shutdown: + s.ticker.Stop() + p.cacheHeap = s.bHeap + close(p.done) + if s.shutdownNotifier != nil { + close(s.shutdownNotifier) + } return } }