Codebase list golang-github-vbauerster-mpb / 47f184f
SetOrder and Abort on bar side Vladimir Bauer 7 years ago
7 changed file(s) with 87 addition(s) and 67 deletion(s). Raw diff Collapse all Expand all
1919 defer cancel()
2020
2121 var wg sync.WaitGroup
22 p := mpb.New(
23 mpb.WithWaitGroup(&wg),
24 mpb.WithContext(ctx),
25 )
22 p := mpb.NewWithContext(ctx, mpb.WithWaitGroup(&wg))
2623 total := 300
2724 numBars := 3
2825 wg.Add(numBars)
3532 decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WCSyncSpace),
3633 ),
3734 mpb.AppendDecorators(
38 decor.Percentage(decor.WC{W: 5}),
35 // note that OnComplete will not be fired, because of cancel
36 decor.OnComplete(decor.Percentage(decor.WC{W: 5}), "done"),
3937 ),
4038 )
4139
3333 go func() {
3434 defer wg.Done()
3535 max := 100 * time.Millisecond
36 for i := 0; i < total; i++ {
36 for i := 0; !b.Completed(); i++ {
3737 start := time.Now()
38 if b.ID() == 2 && i == 42 {
39 p.Abort(b, true)
40 return
38 if b.ID() == 2 && i >= 42 {
39 // aborting and removing while bar is running
40 b.Abort(true)
4141 }
4242 time.Sleep(time.Duration(rand.Intn(10)+1) * max / 10)
4343 // ewma based decorators require work duration measurement
3838 }
3939 // wait for our bar to complete and flush
4040 p.Wait()
41 p.Wait()
4142 }
4141 frameCh chan io.Reader
4242 syncTableCh chan [][]chan int
4343 completed chan bool
44 forceRefresh chan<- time.Time
45
46 // shutdown is closed when bar completed and flushed
47 shutdown chan struct{}
48 // done is closed after cacheState is written
44
45 // concel is called either by user or on complete event
46 cancel func()
47 // done is closed after cacheState is assigned
4948 done chan struct{}
5049 // cacheState is populated, right after close(shutdown)
5150 cacheState *bState
5554 current int64
5655 }
5756
58 dlogger *log.Logger
59 bpanic interface{}
57 container *Progress
58 dlogger *log.Logger
59 recoveredPanic interface{}
6060 }
6161
6262 type bState struct {
8383 dropOnComplete bool
8484 // runningBar is a key for *pState.parkedBars
8585 runningBar *Bar
86 }
87
88 func newBar(ctx context.Context, wg *sync.WaitGroup, bs *bState) *Bar {
86
87 debugOut io.Writer
88 }
89
90 func newBar(container *Progress, bs *bState) *Bar {
8991
9092 bs.bufP = bytes.NewBuffer(make([]byte, 0, bs.width))
9193 bs.bufB = bytes.NewBuffer(make([]byte, 0, bs.width))
9496 bs.bufE = bytes.NewBuffer(make([]byte, 0, bs.width))
9597 }
9698
99 logPrefix := fmt.Sprintf("%sbar#%02d ", container.dlogger.Prefix(), bs.id)
100 ctx, cancel := context.WithCancel(container.ctx)
97101 bar := &Bar{
102 container: container,
98103 priority: bs.priority,
99104 dropOnComplete: bs.dropOnComplete,
100105 operateState: make(chan func(*bState)),
102107 syncTableCh: make(chan [][]chan int),
103108 completed: make(chan bool),
104109 done: make(chan struct{}),
105 shutdown: make(chan struct{}),
106 }
107
108 go bar.serve(ctx, wg, bs)
110 cancel: cancel,
111 dlogger: log.New(bs.debugOut, logPrefix, log.Lshortfile),
112 }
113
114 go bar.serve(ctx, bs)
109115 return bar
110116 }
111117
221227 }
222228 }
223229
230 // SetOrder changes bar's order among multiple bars. Zero is highest
231 // priority, i.e. bar will be on top. If you don't need to set order
232 // dynamically, better use BarPriority option.
233 func (b *Bar) SetOrder(order int) {
234 select {
235 case <-b.done:
236 default:
237 b.container.setBarOrder(b, order)
238 }
239 }
240
241 // Abort interrupts bar's running goroutine. Call this, if you'd like
242 // to stop/remove bar before completion event. It has no effect after
243 // completion event. If drop is true bar will be removed as well.
244 func (b *Bar) Abort(drop bool) {
245 select {
246 case <-b.done:
247 default:
248 if drop {
249 b.container.dropBar(b)
250 }
251 b.cancel()
252 }
253 }
254
224255 // Completed reports whether the bar is in completed state.
225256 func (b *Bar) Completed() bool {
226 // omit select here, because primary usage of the method is for loop
227 // condition, like for !bar.Completed() {...} so when toComplete=true
228 // it is called once (at which time, the bar is still alive), then
229 // quits the loop and never suppose to be called afterwards.
230 return <-b.completed
257 select {
258 case completed := <-b.completed:
259 return completed
260 case <-b.done:
261 return true
262 }
231263 }
232264
233265 func (b *Bar) wSyncTable() [][]chan int {
239271 }
240272 }
241273
242 func (b *Bar) serve(ctx context.Context, wg *sync.WaitGroup, s *bState) {
243 defer wg.Done()
244 cancel := ctx.Done()
274 func (b *Bar) serve(ctx context.Context, s *bState) {
275 defer b.container.bwg.Done()
245276 for {
246277 select {
247278 case op := <-b.operateState:
248279 op(s)
249280 case b.completed <- s.toComplete:
250 case <-cancel:
251 s.toComplete = true
252 cancel = nil
253 case <-b.shutdown:
281 case <-ctx.Done():
254282 b.cacheState = s
255283 close(b.done)
256284 // Notifying decorators about shutdown event
263291 }
264292
265293 func (b *Bar) render(tw int) {
266 if b.bpanic != nil {
294 if b.recoveredPanic != nil {
267295 b.toShutdown = false
268296 b.frameCh <- b.panicToFrame(tw)
269297 return
274302 // recovering if user defined decorator panics for example
275303 if p := recover(); p != nil {
276304 b.dlogger.Println(p)
277 b.bpanic = p
305 b.recoveredPanic = p
278306 b.toShutdown = !s.completeFlushed
279307 b.frameCh <- b.panicToFrame(tw)
280308 }
306334 }
307335
308336 func (b *Bar) panicToFrame(termWidth int) io.Reader {
309 return strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%dv\n", termWidth), b.bpanic))
337 return strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%dv\n", termWidth), b.recoveredPanic))
310338 }
311339
312340 func (s *bState) draw(termWidth int) io.Reader {
374402 func (b *Bar) refreshNowTillShutdown() {
375403 for {
376404 select {
377 case b.forceRefresh <- time.Now():
378 case <-b.shutdown:
405 case b.container.forceRefresh <- time.Now():
406 case <-b.done:
379407 return
380408 }
381409 }
4949 t.Errorf("Expected bar id: %d, got %d\n", wantID, gotID)
5050 }
5151
52 p.Abort(bar, true)
52 bar.Abort(true)
5353 p.Wait()
5454 }
5555
22 import (
33 "container/heap"
44 "context"
5 "fmt"
65 "io"
76 "io/ioutil"
87 "log"
3837 heapUpdated bool
3938 pMatrix map[int][]chan int
4039 aMatrix map[int][]chan int
41 barShutdownQueue []chan struct{}
40 barShutdownQueue []func()
4241
4342 // following are provided/overrided by user
4443 idCount int
125124 priority: ps.idCount,
126125 id: ps.idCount,
127126 width: ps.width,
127 debugOut: ps.debugOut,
128128 }
129129 for _, opt := range options {
130130 if opt != nil {
131131 opt(bs)
132132 }
133133 }
134 bar := newBar(p.ctx, p.bwg, bs)
135 bar.forceRefresh = p.forceRefresh
136 prefix := fmt.Sprintf("%sbar#%02d ", p.dlogger.Prefix(), bs.id)
137 bar.dlogger = log.New(ps.debugOut, prefix, log.Lshortfile)
134 bar := newBar(p, bs)
138135 if bs.runningBar != nil {
139136 if bar.priority == ps.idCount {
140137 bar.priority = bs.runningBar.priority
154151 }
155152 }
156153
157 // Abort is only effective while bar progress is running, it means
158 // remove bar now without waiting for its completion. If bar is already
159 // completed, there is nothing to abort. If you need to remove bar
160 // after completion, use BarRemoveOnComplete BarOption.
161 func (p *Progress) Abort(b *Bar, remove bool) {
154 func (p *Progress) dropBar(b *Bar) {
162155 select {
163156 case p.operateState <- func(s *pState) {
164157 if b.index < 0 {
165158 return
166159 }
167 if remove {
168 s.heapUpdated = heap.Remove(s.bHeap, b.index) != nil
169 }
170 s.barShutdownQueue = append(s.barShutdownQueue, b.shutdown)
160 s.heapUpdated = heap.Remove(s.bHeap, b.index) != nil
171161 }:
172162 case <-p.done:
173163 }
174164 }
175165
176 // UpdateBarPriority provides a way to change bar's order position.
177 // Zero is highest priority, i.e. bar will be on top.
166 // UpdateBarPriority is deprecated. Please use *Bar.SetOrder.
178167 func (p *Progress) UpdateBarPriority(b *Bar, priority int) {
168 p.setBarOrder(b, priority)
169 }
170
171 func (p *Progress) setBarOrder(b *Bar, order int) {
179172 select {
180 case p.operateState <- func(s *pState) { s.bHeap.update(b, priority) }:
173 case p.operateState <- func(s *pState) { s.bHeap.update(b, order) }:
181174 case <-p.done:
182175 }
183176 }
270263 // shutdown at next flush, in other words decrement underlying WaitGroup
271264 // only after the bar with completed state has been flushed. this
272265 // ensures no bar ends up with less than 100% rendered.
273 s.barShutdownQueue = append(s.barShutdownQueue, bar.shutdown)
266 s.barShutdownQueue = append(s.barShutdownQueue, bar.cancel)
274267 if parkedBar := s.parkedBars[bar]; parkedBar != nil {
275268 heap.Push(s.bHeap, parkedBar)
276269 s.heapUpdated = true
288281 }
289282
290283 for i := len(s.barShutdownQueue) - 1; i >= 0; i-- {
291 close(s.barShutdownQueue[i])
284 s.barShutdownQueue[i]()
292285 s.barShutdownQueue = s.barShutdownQueue[:i]
293286 }
294287
3737 t.Errorf("BarCount want: %q, got: %q\n", 1, count)
3838 }
3939
40 p.Abort(b, true)
40 b.Abort(true)
4141 p.Wait()
4242 }
4343
5151 b := p.AddBar(100)
5252 bars[i] = b
5353 go func(n int) {
54 for i := 0; i < 100; i++ {
55 if n == 0 && i == 33 {
56 p.Abort(b, true)
54 for i := 0; !b.Completed(); i++ {
55 if n == 0 && i >= 33 {
56 b.Abort(true)
5757 wg.Done()
5858 }
5959 b.Increment()
6767 if count != 2 {
6868 t.Errorf("BarCount want: %q, got: %q\n", 2, count)
6969 }
70 p.Abort(bars[1], true)
71 p.Abort(bars[2], true)
70 bars[1].Abort(true)
71 bars[2].Abort(true)
7272 p.Wait()
7373 }
7474