Codebase list golang-github-vbauerster-mpb / 75e13fc
queued bar is blocking Vladimir Bauer 4 years ago
3 changed file(s) with 72 addition(s) and 82 deletion(s). Raw diff Collapse all Expand all
2323 toDrop bool
2424 noPop bool
2525 hasEwmaDecorators bool
26 frameCh chan *frame
2627 operateState chan func(*bState)
27 frameCh chan *frame
28
29 // cancel is called either by user or on complete event
30 cancel func()
31 // done is closed after cacheState is assigned
32 done chan struct{}
33 // cacheState is populated, right after close(b.done)
34 cacheState *bState
35
36 container *Progress
37 recoveredPanic interface{}
28 done chan struct{}
29 cacheState *bState
30 container *Progress
31 cancel func()
32 recoveredPanic interface{}
3833 }
3934
4035 type extenderFunc func(in io.Reader, reqWidth int, st decor.Statistics) (out io.Reader, lines int)
4136
42 // bState is actual bar state. It gets passed to *Bar.serve(...) monitor
43 // goroutine.
37 // bState is actual bar's state.
4438 type bState struct {
4539 id int
4640 priority int
6559 filler BarFiller
6660 middleware func(BarFiller) BarFiller
6761 extender extenderFunc
68
69 // runningBar is a key for *pState.parkedBars
70 runningBar *Bar
71
72 debugOut io.Writer
62 debugOut io.Writer
63
64 // afterBar is a key for *pState.parkedBars
65 afterBar *Bar
7366 }
7467
7568 type frame struct {
7770 lines int
7871 }
7972
80 func newBar(container *Progress, bs *bState) *Bar {
73 func newBar(container *Progress, bs *bState) (*Bar, func()) {
8174 ctx, cancel := context.WithCancel(container.ctx)
8275
8376 bar := &Bar{
84 container: container,
8577 priority: bs.priority,
8678 toDrop: bs.dropOnComplete,
8779 noPop: bs.noPop,
80 frameCh: make(chan *frame, 1),
8881 operateState: make(chan func(*bState)),
89 frameCh: make(chan *frame, 1),
9082 done: make(chan struct{}),
83 container: container,
9184 cancel: cancel,
9285 }
9386
94 go bar.serve(ctx, bs)
95 return bar
87 bar.subscribeDecorators(bs)
88
89 serve := func() {
90 defer container.bwg.Done()
91 for {
92 select {
93 case op := <-bar.operateState:
94 op(bs)
95 case <-ctx.Done():
96 bs.decoratorShutdownNotify()
97 bar.cacheState = bs
98 close(bar.done)
99 return
100 }
101 }
102 }
103
104 return bar, serve
96105 }
97106
98107 // ProxyReader wraps r with metrics required for progress tracking.
318327 }
319328 }
320329
321 func (b *Bar) serve(ctx context.Context, s *bState) {
322 defer b.container.bwg.Done()
323 for {
324 select {
325 case op := <-b.operateState:
326 op(s)
327 case <-ctx.Done():
328 s.decoratorShutdownNotify()
329 b.cacheState = s
330 close(b.done)
331 return
332 }
333 }
334 }
335
336330 func (b *Bar) render(tw int) {
337331 select {
338332 case b.operateState <- func(s *bState) {
370364 }
371365 }
372366
373 func (b *Bar) subscribeDecorators() {
374 var averageDecorators []decor.AverageDecorator
375 var ewmaDecorators []decor.EwmaDecorator
376 var shutdownListeners []decor.ShutdownListener
377 b.TraverseDecorators(func(d decor.Decorator) {
378 if d, ok := d.(decor.AverageDecorator); ok {
379 averageDecorators = append(averageDecorators, d)
380 }
381 if d, ok := d.(decor.EwmaDecorator); ok {
382 ewmaDecorators = append(ewmaDecorators, d)
383 }
384 if d, ok := d.(decor.ShutdownListener); ok {
385 shutdownListeners = append(shutdownListeners, d)
386 }
387 })
388 b.hasEwmaDecorators = len(ewmaDecorators) != 0
389 select {
390 case b.operateState <- func(s *bState) {
391 s.averageDecorators = averageDecorators
392 s.ewmaDecorators = ewmaDecorators
393 s.shutdownListeners = shutdownListeners
394 }:
395 case <-b.done:
367 func (b *Bar) subscribeDecorators(bs *bState) {
368 for _, decorators := range [...][]decor.Decorator{
369 bs.pDecorators,
370 bs.aDecorators,
371 } {
372 for _, d := range decorators {
373 d = extractBaseDecorator(d)
374 if d, ok := d.(decor.AverageDecorator); ok {
375 bs.averageDecorators = append(bs.averageDecorators, d)
376 }
377 if d, ok := d.(decor.EwmaDecorator); ok {
378 bs.ewmaDecorators = append(bs.ewmaDecorators, d)
379 }
380 if d, ok := d.(decor.ShutdownListener); ok {
381 bs.shutdownListeners = append(bs.shutdownListeners, d)
382 }
383 }
396384 }
397385 }
398386
6565 return nil
6666 }
6767 return func(s *bState) {
68 s.runningBar = runningBar
68 s.afterBar = bar
6969 }
7070 }
7171
1919 prr = 150 * time.Millisecond
2020 )
2121
22 // Progress represents a container that renders one or more progress
23 // bars.
22 // Progress represents a container that renders one or more progress bars.
2423 type Progress struct {
2524 ctx context.Context
2625 uwg *sync.WaitGroup
3231 once sync.Once
3332 }
3433
35 // pState holds bars in its priorityQueue. It gets passed to
36 // *Progress.serve(...) monitor goroutine.
34 // pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine.
3735 type pState struct {
3836 bHeap priorityQueue
3937 heapUpdated bool
5149 externalRefresh <-chan interface{}
5250 renderDelay <-chan struct{}
5351 shutdownNotifier chan struct{}
54 parkedBars map[*Bar]*Bar
52 queueBars map[*Bar]queueBar
5553 output io.Writer
5654 debugOut io.Writer
55 }
56
57 type queueBar struct {
58 bar *Bar
59 serve func()
5760 }
5861
5962 // New creates new Progress container instance. It's not possible to
6770 // method has been called.
6871 func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
6972 s := &pState{
70 bHeap: priorityQueue{},
71 rr: prr,
72 parkedBars: make(map[*Bar]*Bar),
73 output: os.Stdout,
73 bHeap: priorityQueue{},
74 rr: prr,
75 queueBars: make(map[*Bar]queueBar),
76 output: os.Stdout,
7477 }
7578
7679 for _, opt := range options {
120123 select {
121124 case p.operateState <- func(ps *pState) {
122125 bs := ps.makeBarState(total, filler, options...)
123 bar := newBar(p, bs)
124 if bs.runningBar != nil {
125 bs.runningBar.noPop = true
126 ps.parkedBars[bs.runningBar] = bar
126 bar, serve := newBar(p, bs)
127 if bs.afterBar != nil {
128 ps.queueBars[bs.afterBar] = queueBar{bar, serve}
127129 } else {
128130 heap.Push(&ps.bHeap, bar)
129131 ps.heapUpdated = true
132 go serve()
130133 }
131134 ps.idCount++
132135 result <- bar
133136 }:
134137 bar := <-result
135 bar.subscribeDecorators()
136138 return bar
137139 case <-p.done:
138140 p.bwg.Done()
340342 }
341343
342344 for _, b := range s.barShutdownQueue {
343 if parkedBar := s.parkedBars[b]; parkedBar != nil {
344 parkedBar.priority = b.priority
345 heap.Push(&s.bHeap, parkedBar)
346 delete(s.parkedBars, b)
345 if qb, ok := s.queueBars[b]; ok {
346 qb.bar.priority = b.priority
347 heap.Push(&s.bHeap, qb.bar)
348 delete(s.queueBars, b)
347349 b.toDrop = true
348 }
349 if s.popCompleted && !b.noPop {
350 go qb.serve()
351 } else if s.popCompleted && !b.noPop {
350352 totalLines -= bm[b]
351353 b.toDrop = true
352354 }