diff --git a/_examples/cancel/main.go b/_examples/cancel/main.go index 23d91ed..a911195 100644 --- a/_examples/cancel/main.go +++ b/_examples/cancel/main.go @@ -20,10 +20,7 @@ defer cancel() var wg sync.WaitGroup - p := mpb.New( - mpb.WithWaitGroup(&wg), - mpb.WithContext(ctx), - ) + p := mpb.NewWithContext(ctx, mpb.WithWaitGroup(&wg)) total := 300 numBars := 3 wg.Add(numBars) @@ -36,7 +33,8 @@ decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WCSyncSpace), ), mpb.AppendDecorators( - decor.Percentage(decor.WC{W: 5}), + // note that OnComplete will not be fired, because of cancel + decor.OnComplete(decor.Percentage(decor.WC{W: 5}), "done"), ), ) diff --git a/_examples/remove/main.go b/_examples/remove/main.go index 438c1ed..0f1ce4c 100644 --- a/_examples/remove/main.go +++ b/_examples/remove/main.go @@ -34,11 +34,11 @@ go func() { defer wg.Done() max := 100 * time.Millisecond - for i := 0; i < total; i++ { + for i := 0; !b.Completed(); i++ { start := time.Now() - if b.ID() == 2 && i == 42 { - p.Abort(b, true) - return + if b.ID() == 2 && i >= 42 { + // aborting and removing while bar is running + b.Abort(true) } time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10) // ewma based decorators require work duration measurement diff --git a/_examples/singleBar/main.go b/_examples/singleBar/main.go index a7a6a13..fc76943 100644 --- a/_examples/singleBar/main.go +++ b/_examples/singleBar/main.go @@ -39,4 +39,5 @@ } // wait for our bar to complete and flush p.Wait() + p.Wait() } diff --git a/bar.go b/bar.go index 1b617a4..d702bd8 100644 --- a/bar.go +++ b/bar.go @@ -42,11 +42,10 @@ frameCh chan io.Reader syncTableCh chan [][]chan int completed chan bool - forceRefresh chan<- time.Time - - // shutdown is closed when bar completed and flushed - shutdown chan struct{} - // done is closed after cacheState is written + + // concel is called either by user or on complete event + cancel func() + // done is closed after cacheState is assigned done chan struct{} // cacheState is populated, right after close(shutdown) cacheState *bState @@ -56,8 +55,9 @@ current int64 } - dlogger *log.Logger - bpanic interface{} + container *Progress + dlogger *log.Logger + recoveredPanic interface{} } type bState struct { @@ -84,9 +84,11 @@ dropOnComplete bool // runningBar is a key for *pState.parkedBars runningBar *Bar -} - -func newBar(ctx context.Context, wg *sync.WaitGroup, bs *bState) *Bar { + + debugOut io.Writer +} + +func newBar(container *Progress, bs *bState) *Bar { bs.bufP = bytes.NewBuffer(make([]byte, 0, bs.width)) bs.bufB = bytes.NewBuffer(make([]byte, 0, bs.width)) @@ -95,7 +97,10 @@ bs.bufE = bytes.NewBuffer(make([]byte, 0, bs.width)) } + logPrefix := fmt.Sprintf("%sbar#%02d ", container.dlogger.Prefix(), bs.id) + ctx, cancel := context.WithCancel(container.ctx) bar := &Bar{ + container: container, priority: bs.priority, dropOnComplete: bs.dropOnComplete, operateState: make(chan func(*bState)), @@ -103,10 +108,11 @@ syncTableCh: make(chan [][]chan int), completed: make(chan bool), done: make(chan struct{}), - shutdown: make(chan struct{}), - } - - go bar.serve(ctx, wg, bs) + cancel: cancel, + dlogger: log.New(bs.debugOut, logPrefix, log.Lshortfile), + } + + go bar.serve(ctx, bs) return bar } @@ -222,13 +228,39 @@ } } +// SetOrder changes bar's order among multiple bars. Zero is highest +// priority, i.e. bar will be on top. If you don't need to set order +// dynamically, better use BarPriority option. +func (b *Bar) SetOrder(order int) { + select { + case <-b.done: + default: + b.container.setBarOrder(b, order) + } +} + +// Abort interrupts bar's running goroutine. Call this, if you'd like +// to stop/remove bar before completion event. It has no effect after +// completion event. If drop is true bar will be removed as well. +func (b *Bar) Abort(drop bool) { + select { + case <-b.done: + default: + if drop { + b.container.dropBar(b) + } + b.cancel() + } +} + // Completed reports whether the bar is in completed state. func (b *Bar) Completed() bool { - // omit select here, because primary usage of the method is for loop - // condition, like for !bar.Completed() {...} so when toComplete=true - // it is called once (at which time, the bar is still alive), then - // quits the loop and never suppose to be called afterwards. - return <-b.completed + select { + case completed := <-b.completed: + return completed + case <-b.done: + return true + } } func (b *Bar) wSyncTable() [][]chan int { @@ -240,18 +272,14 @@ } } -func (b *Bar) serve(ctx context.Context, wg *sync.WaitGroup, s *bState) { - defer wg.Done() - cancel := ctx.Done() +func (b *Bar) serve(ctx context.Context, s *bState) { + defer b.container.bwg.Done() for { select { case op := <-b.operateState: op(s) case b.completed <- s.toComplete: - case <-cancel: - s.toComplete = true - cancel = nil - case <-b.shutdown: + case <-ctx.Done(): b.cacheState = s close(b.done) // Notifying decorators about shutdown event @@ -264,7 +292,7 @@ } func (b *Bar) render(tw int) { - if b.bpanic != nil { + if b.recoveredPanic != nil { b.toShutdown = false b.frameCh <- b.panicToFrame(tw) return @@ -275,7 +303,7 @@ // recovering if user defined decorator panics for example if p := recover(); p != nil { b.dlogger.Println(p) - b.bpanic = p + b.recoveredPanic = p b.toShutdown = !s.completeFlushed b.frameCh <- b.panicToFrame(tw) } @@ -307,7 +335,7 @@ } func (b *Bar) panicToFrame(termWidth int) io.Reader { - return strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%dv\n", termWidth), b.bpanic)) + return strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%dv\n", termWidth), b.recoveredPanic)) } func (s *bState) draw(termWidth int) io.Reader { @@ -375,8 +403,8 @@ func (b *Bar) refreshNowTillShutdown() { for { select { - case b.forceRefresh <- time.Now(): - case <-b.shutdown: + case b.container.forceRefresh <- time.Now(): + case <-b.done: return } } diff --git a/bar_test.go b/bar_test.go index acf924f..33f7e95 100644 --- a/bar_test.go +++ b/bar_test.go @@ -50,7 +50,7 @@ t.Errorf("Expected bar id: %d, got %d\n", wantID, gotID) } - p.Abort(bar, true) + bar.Abort(true) p.Wait() } diff --git a/progress.go b/progress.go index f8d5415..19869df 100644 --- a/progress.go +++ b/progress.go @@ -3,7 +3,6 @@ import ( "container/heap" "context" - "fmt" "io" "io/ioutil" "log" @@ -39,7 +38,7 @@ heapUpdated bool pMatrix map[int][]chan int aMatrix map[int][]chan int - barShutdownQueue []chan struct{} + barShutdownQueue []func() // following are provided/overrided by user idCount int @@ -126,16 +125,14 @@ priority: ps.idCount, id: ps.idCount, width: ps.width, + debugOut: ps.debugOut, } for _, opt := range options { if opt != nil { opt(bs) } } - bar := newBar(p.ctx, p.bwg, bs) - bar.forceRefresh = p.forceRefresh - prefix := fmt.Sprintf("%sbar#%02d ", p.dlogger.Prefix(), bs.id) - bar.dlogger = log.New(ps.debugOut, prefix, log.Lshortfile) + bar := newBar(p, bs) if bs.runningBar != nil { if bar.priority == ps.idCount { bar.priority = bs.runningBar.priority @@ -155,30 +152,26 @@ } } -// Abort is only effective while bar progress is running, it means -// remove bar now without waiting for its completion. If bar is already -// completed, there is nothing to abort. If you need to remove bar -// after completion, use BarRemoveOnComplete BarOption. -func (p *Progress) Abort(b *Bar, remove bool) { +func (p *Progress) dropBar(b *Bar) { select { case p.operateState <- func(s *pState) { if b.index < 0 { return } - if remove { - s.heapUpdated = heap.Remove(s.bHeap, b.index) != nil - } - s.barShutdownQueue = append(s.barShutdownQueue, b.shutdown) + s.heapUpdated = heap.Remove(s.bHeap, b.index) != nil }: case <-p.done: } } -// UpdateBarPriority provides a way to change bar's order position. -// Zero is highest priority, i.e. bar will be on top. +// UpdateBarPriority is deprecated. Please use *Bar.SetOrder. func (p *Progress) UpdateBarPriority(b *Bar, priority int) { + p.setBarOrder(b, priority) +} + +func (p *Progress) setBarOrder(b *Bar, order int) { select { - case p.operateState <- func(s *pState) { s.bHeap.update(b, priority) }: + case p.operateState <- func(s *pState) { s.bHeap.update(b, order) }: case <-p.done: } } @@ -271,7 +264,7 @@ // shutdown at next flush, in other words decrement underlying WaitGroup // only after the bar with completed state has been flushed. this // ensures no bar ends up with less than 100% rendered. - s.barShutdownQueue = append(s.barShutdownQueue, bar.shutdown) + s.barShutdownQueue = append(s.barShutdownQueue, bar.cancel) if parkedBar := s.parkedBars[bar]; parkedBar != nil { heap.Push(s.bHeap, parkedBar) s.heapUpdated = true @@ -289,7 +282,7 @@ } for i := len(s.barShutdownQueue) - 1; i >= 0; i-- { - close(s.barShutdownQueue[i]) + s.barShutdownQueue[i]() s.barShutdownQueue = s.barShutdownQueue[:i] } diff --git a/progress_test.go b/progress_test.go index 7fe0f92..141be24 100644 --- a/progress_test.go +++ b/progress_test.go @@ -38,7 +38,7 @@ t.Errorf("BarCount want: %q, got: %q\n", 1, count) } - p.Abort(b, true) + b.Abort(true) p.Wait() } @@ -52,9 +52,9 @@ b := p.AddBar(100) bars[i] = b go func(n int) { - for i := 0; i < 100; i++ { - if n == 0 && i == 33 { - p.Abort(b, true) + for i := 0; !b.Completed(); i++ { + if n == 0 && i >= 33 { + b.Abort(true) wg.Done() } b.Increment() @@ -68,8 +68,8 @@ if count != 2 { t.Errorf("BarCount want: %q, got: %q\n", 2, count) } - p.Abort(bars[1], true) - p.Abort(bars[2], true) + bars[1].Abort(true) + bars[2].Abort(true) p.Wait() }