Codebase list golang-github-vbauerster-mpb / ec4a7f0
Single point of bar complete Vladimir Bauer 8 years ago
6 changed file(s) with 135 addition(s) and 147 deletion(s). Raw diff Collapse all Expand all
3131 priority int
3232 index int
3333
34 // the flag is set from Progress monitor goroutine only
35 completed bool
36
3437 operateState chan func(*bState)
3538 done chan struct{}
3639 shutdown chan struct{}
37 once sync.Once
3840
3941 // cacheState is used after done is closed
4042 cacheState *bState
5355 trimLeftSpace bool
5456 trimRightSpace bool
5557 completed bool
56 aborted bool
58 removed bool
5759 dynamic bool
5860 startTime time.Time
5961 timeElapsed time.Duration
6365 prependFuncs []decor.DecoratorFunc
6466 refill *refill
6567 bufP, bufB, bufA *bytes.Buffer
66 panic string
68 panicMsg string
6769 }
6870 refill struct {
6971 char rune
7072 till int64
7173 }
72 bufReader struct {
74 toRenderReader struct {
7375 io.Reader
74 completed bool
76 toComplete bool
77 toRemove bool
7578 }
7679 )
7780
147150 }
148151 select {
149152 case b.operateState <- func(s *bState) {
153 if s.completed {
154 return
155 }
150156 next := time.Now()
151157 if s.current == 0 {
152158 s.startTime = next
252258 }
253259 }
254260
255 // InProgress returns true, while progress is running.
256 // Can be used as condition in for loop
257 func (b *Bar) InProgress() bool {
258 select {
261 // Completed reports whether the bar is in completed state
262 func (b *Bar) Completed() bool {
263 result := make(chan bool, 1)
264 select {
265 case b.operateState <- func(s *bState) { result <- s.completed }:
266 return <-result
267 case <-b.done:
268 return b.cacheState.completed
269 }
270 }
271
272 // Complete stops bar's progress tracking, but doesn't remove the bar from rendering queue.
273 // If you need to remove, invoke Progress.RemoveBar(*Bar) instead.
274 func (b *Bar) Complete() {
275 b.askToComplete(false)
276 }
277
278 func (b *Bar) askToComplete(toRemove bool) bool {
279 result := make(chan bool, 1)
280 select {
281 case b.operateState <- func(s *bState) {
282 s.removed = toRemove
283 s.completed = true
284 result <- true
285 }:
286 return <-result
259287 case <-b.done:
260288 return false
261 default:
262 return true
263 }
264 }
265
266 // Complete stops bar's progress tracking, but doesn't remove the bar.
267 // If you need to remove, call Progress.RemoveBar(*Bar) instead.
268 func (b *Bar) Complete() {
269 b.once.Do(func() {
270 close(b.shutdown)
271 })
289 }
272290 }
273291
274292 func (b *Bar) serve(s *bState, wg *sync.WaitGroup, cancel <-chan struct{}) {
275 defer func() {
276 b.cacheState = s
277 close(b.done)
278 wg.Done()
279 }()
280293 for {
281294 select {
282295 case op := <-b.operateState:
283296 op(s)
284297 case <-cancel:
285 s.aborted = true
298 s.completed = true
299 cancel = nil
300 case <-b.shutdown:
301 b.cacheState = s
302 close(b.done)
303 wg.Done()
286304 return
287 case <-b.shutdown:
288 return
289 }
290 }
291 }
292
293 func (b *Bar) render(tw int, prependWs, appendWs *widthSync) <-chan *bufReader {
294 ch := make(chan *bufReader, 1)
305 }
306 }
307 }
308
309 func (b *Bar) render(tw int, prependWs, appendWs *widthSync) <-chan *toRenderReader {
310 ch := make(chan *toRenderReader, 1)
295311
296312 go func() {
297313 select {
298314 case b.operateState <- func(s *bState) {
315 var r io.Reader
299316 defer func() {
300317 // recovering if external decorators panic
301318 if p := recover(); p != nil {
302 s.panic = fmt.Sprintf("b#%02d panic: %v\n", s.id, p)
319 s.panicMsg = fmt.Sprintf("b#%02d panic: %v\n", s.id, p)
303320 s.prependFuncs = nil
304321 s.appendFuncs = nil
305
306 ch <- &bufReader{strings.NewReader(s.panic), true}
322 s.completed = true
323 r = strings.NewReader(s.panicMsg)
307324 }
308 close(ch)
325 ch <- &toRenderReader{r, s.completed, s.removed}
309326 }()
310327 s.draw(tw, prependWs, appendWs)
311 ch <- &bufReader{io.MultiReader(s.bufP, s.bufB, s.bufA), s.completed}
328 r = io.MultiReader(s.bufP, s.bufB, s.bufA)
312329 }:
313330 case <-b.done:
314331 s := b.cacheState
315332 var r io.Reader
316 if s.panic != "" {
317 r = strings.NewReader(s.panic)
333 if s.panicMsg != "" {
334 r = strings.NewReader(s.panicMsg)
318335 } else {
319336 s.draw(tw, prependWs, appendWs)
320337 r = io.MultiReader(s.bufP, s.bufB, s.bufA)
321338 }
322 ch <- &bufReader{r, false}
323 close(ch)
339 ch <- &toRenderReader{r, s.completed, s.removed}
324340 }
325341 }()
326342
424440 return &decor.Statistics{
425441 ID: s.id,
426442 Completed: s.completed,
427 Aborted: s.aborted,
443 Removed: s.removed,
428444 Total: s.total,
429445 Current: s.current,
430446 StartTime: s.startTime,
3030 type Statistics struct {
3131 ID int
3232 Completed bool
33 Aborted bool
33 Removed bool
3434 Total int64
3535 Current int64
3636 StartTime time.Time
5454 Listen []chan int
5555 Result []chan int
5656 }
57 renderedBar struct {
57 toRenderSnapshot struct {
5858 bar *Bar
59 pipe <-chan *bufReader
59 pipe <-chan *toRenderReader
6060 }
6161 )
6262
7272 cw: cwriter.New(os.Stdout),
7373 rr: prr,
7474 ticker: time.NewTicker(prr),
75 cancel: make(chan struct{}),
7675 }
7776
7877 for _, opt := range options {
110109 }
111110 }
112111
113 // RemoveBar removes bar at any time.
112 // RemoveBar removes the bar at next render cycle
114113 func (p *Progress) RemoveBar(b *Bar) bool {
115 result := make(chan bool, 1)
116 select {
117 case p.operateState <- func(s *pState) {
118 if heap.Remove(s.bHeap, b.index) != nil {
119 s.heapUpdated = true
120 b.Complete()
121 result <- true
122 } else {
123 result <- false
124 }
125 }:
126 return <-result
127 case <-p.done:
128 return false
129 }
114 return b.askToComplete(true)
130115 }
131116
132117 // UpdateBarPriority provides a way to change bar's order position.
215200 prependWs := newWidthSync(wSyncTimeout, s.bHeap.Len(), numP)
216201 appendWs := newWidthSync(wSyncTimeout, s.bHeap.Len(), numA)
217202
218 for _, b := range s.renderByPriority(tw, prependWs, appendWs) {
219 r := <-b.pipe
203 for _, trs := range s.renderByPriority(tw, prependWs, appendWs) {
204 r := <-trs.pipe
220205 _, err = s.cw.ReadFrom(r)
221 if r.completed {
222 b.bar.Complete()
206 if !trs.bar.completed && r.toComplete {
207 trs.bar.completed = true
208 close(trs.bar.shutdown)
209 }
210 if r.toRemove {
211 heap.Remove(s.bHeap, trs.bar.index)
223212 }
224213 }
225214
233222 return
234223 }
235224
236 func (s *pState) renderByPriority(tw int, prependWs, appendWs *widthSync) []*renderedBar {
237 slice := make([]*renderedBar, 0, s.bHeap.Len())
225 func (s *pState) renderByPriority(tw int, prependWs, appendWs *widthSync) []*toRenderSnapshot {
226 slice := make([]*toRenderSnapshot, 0, s.bHeap.Len())
238227 for s.bHeap.Len() > 0 {
239228 b := heap.Pop(s.bHeap).(*Bar)
240229 defer heap.Push(s.bHeap, b)
241 slice = append(slice, &renderedBar{
230 slice = append(slice, &toRenderSnapshot{
242231 bar: b,
243232 pipe: b.render(tw, prependWs, appendWs),
244233 })
1313 )
1414
1515 func (p *Progress) serve(s *pState) {
16 winch := make(chan os.Signal, 1)
16 winch := make(chan os.Signal, 2)
1717 signal.Notify(winch, syscall.SIGWINCH)
18
19 defer func() {
20 s.ticker.Stop()
21 signal.Stop(winch)
22 p.cacheHeap = s.bHeap
23 close(p.done)
24 if s.shutdownNotifier != nil {
25 close(s.shutdownNotifier)
26 }
27 }()
2818
2919 var numP, numA int
3020 var timer *time.Timer
6555 case <-resumeTicker:
6656 s.ticker = time.NewTicker(s.rr)
6757 resumeTicker = nil
68 case <-s.cancel:
69 return
7058 case <-p.shutdown:
59 s.ticker.Stop()
60 signal.Stop(winch)
61 p.cacheHeap = s.bHeap
62 close(p.done)
63 if s.shutdownNotifier != nil {
64 close(s.shutdownNotifier)
65 }
7166 return
7267 }
7368 }
1818 }
1919
2020 func TestAddBar(t *testing.T) {
21 p := mpb.New()
21 p := mpb.New(mpb.Output(ioutil.Discard))
2222
23 var wg sync.WaitGroup
24 wg.Add(1)
25 b := p.AddBar(80)
26 go func() {
27 for i := 0; i < 80; i++ {
28 if i == 33 {
29 wg.Done()
30 }
31 b.Increment()
32 time.Sleep(randomDuration(80 * time.Millisecond))
33 }
34 }()
35
36 wg.Wait()
2337 count := p.BarCount()
24 if count != 0 {
25 t.Errorf("BarCount want: %q, got: %q\n", 0, count)
26 }
27
28 bar := p.AddBar(100)
29
30 count = p.BarCount()
3138 if count != 1 {
3239 t.Errorf("BarCount want: %q, got: %q\n", 1, count)
3340 }
3441
35 bar.Complete()
36 p.Stop()
37 }
38
39 func TestRemoveBar(t *testing.T) {
40 p := mpb.New()
41
42 bar := p.AddBar(10)
43
44 if !p.RemoveBar(bar) {
45 t.Error("RemoveBar failure")
46 }
47
48 count := p.BarCount()
49 if count != 0 {
50 t.Errorf("BarCount want: %q, got: %q\n", 0, count)
51 }
52
53 bar.Complete()
42 b.Complete()
5443 p.Stop()
5544 }
5645
8776 }
8877
8978 func TestWithCancel(t *testing.T) {
90 var wg sync.WaitGroup
9179 cancel := make(chan struct{})
92 shutdown := make(chan struct{})
9380 p := mpb.New(
9481 mpb.Output(ioutil.Discard),
9582 mpb.WithCancel(cancel),
96 mpb.WithShutdownNotifier(shutdown),
97 mpb.WithWaitGroup(&wg),
9883 )
9984
100 total := 100
10185 numBars := 3
102 wg.Add(numBars)
10386
87 type sample struct {
88 id int
89 total int64
90 current int64
91 }
92
93 resStream := make(chan *sample, numBars)
10494 for i := 0; i < numBars; i++ {
105 name := fmt.Sprintf("Bar#%d:", i)
106 bar := p.AddBar(int64(total), mpb.BarID(i),
107 mpb.PrependDecorators(decor.StaticName(name, len(name), 0)))
95 bar := p.AddBar(int64(200),
96 mpb.BarID(i),
97 mpb.PrependDecorators(
98 func(s *decor.Statistics, _ chan<- int, _ <-chan int) string {
99 if s.Completed {
100 resStream <- &sample{
101 id: s.ID,
102 total: s.Total,
103 current: s.Current,
104 }
105 }
106 return ""
107 },
108 ))
108109
109110 go func() {
110 defer wg.Done()
111 for i := 0; i < total; i++ {
112 select {
113 case <-cancel:
114 return
115 default:
116 }
111 for bar.InProgress() {
112 bar.Increment()
117113 time.Sleep(randomDuration(80 * time.Millisecond))
118 bar.Increment()
119114 }
120115 }()
121116 }
122117
123 time.AfterFunc(300*time.Millisecond, func() {
118 time.AfterFunc(100*time.Millisecond, func() {
124119 close(cancel)
125120 })
126121
127122 p.Stop()
128
129 select {
130 case <-shutdown:
131 case <-time.After(300 * time.Millisecond):
132 t.Error("ProgressBar didn't stop")
123 close(resStream)
124 for res := range resStream {
125 if res.current >= res.total {
126 t.Errorf("bar %d: total = %d, current = %d\n", res.id, res.total, res.current)
127 }
133128 }
134129 }
135130
1010 )
1111
1212 func (p *Progress) serve(s *pState) {
13
14 defer func() {
15 s.ticker.Stop()
16 p.cacheHeap = s.bHeap
17 close(p.done)
18 if s.shutdownNotifier != nil {
19 close(s.shutdownNotifier)
20 }
21 }()
22
2313 var numP, numA int
24
2514 for {
2615 select {
2716 case op := <-p.operateState:
4130 if err != nil {
4231 fmt.Fprintln(os.Stderr, err)
4332 }
44 case <-s.cancel:
45 return
4633 case <-p.shutdown:
34 s.ticker.Stop()
35 p.cacheHeap = s.bHeap
36 close(p.done)
37 if s.shutdownNotifier != nil {
38 close(s.shutdownNotifier)
39 }
4740 return
4841 }
4942 }