Codebase list golang-github-vbauerster-mpb / ad7e315
Refactoring: removeOnComplete option Vladimir Bauer 8 years ago
6 changed file(s) with 253 addition(s) and 286 deletion(s). Raw diff Collapse all Expand all
+162
-178
bar.go less more
44 "fmt"
55 "io"
66 "strings"
7 "sync"
78 "time"
89 "unicode/utf8"
910
3233
3334 // completed is set from master Progress goroutine only
3435 completed bool
36
37 removeOnComplete bool
3538
3639 operateState chan func(*bState)
3740 // done is closed by Bar's goroutine, after cacheState is written
5457 totalAutoIncrBy int64
5558 trimLeftSpace bool
5659 trimRightSpace bool
57 completed bool
58 removed bool
60 toComplete bool
61 removeOnComplete bool
5962 dynamic bool
6063 startTime time.Time
6164 timeElapsed time.Duration
7174 char rune
7275 till int64
7376 }
74 renderedReader struct {
75 io.Reader
77 renderedState struct {
78 bar *Bar
79 reader io.Reader
7680 toComplete bool
77 toRemove bool
7881 }
7982 )
8083
81 func newBar(id int, total int64, cancel <-chan struct{}, options ...BarOption) *Bar {
84 func newBar(wg *sync.WaitGroup, id int, total int64, cancel <-chan struct{}, options ...BarOption) *Bar {
8285 if total <= 0 {
8386 total = time.Now().Unix()
8487 }
98101 s.bufA = bytes.NewBuffer(make([]byte, 0, s.width/2))
99102
100103 b := &Bar{
101 priority: id,
102 operateState: make(chan func(*bState)),
103 done: make(chan struct{}),
104 shutdown: make(chan struct{}),
105 }
106
107 go b.serve(s, cancel)
104 priority: id,
105 removeOnComplete: s.removeOnComplete,
106 operateState: make(chan func(*bState)),
107 done: make(chan struct{}),
108 shutdown: make(chan struct{}),
109 }
110
111 go b.serve(wg, s, cancel)
108112 return b
109113 }
110114
132136 // Increment is a shorthand for b.IncrBy(1)
133137 func (b *Bar) Increment() {
134138 b.IncrBy(1)
139 }
140
141 // ResumeFill fills bar with different r rune,
142 // from 0 to till amount of progress.
143 func (b *Bar) ResumeFill(r rune, till int64) {
144 if till < 1 {
145 return
146 }
147 select {
148 case b.operateState <- func(s *bState) { s.refill = &refill{r, till} }:
149 case <-b.done:
150 }
151 }
152
153 // NumOfAppenders returns current number of append decorators
154 func (b *Bar) NumOfAppenders() int {
155 result := make(chan int, 1)
156 select {
157 case b.operateState <- func(s *bState) { result <- len(s.aDecorators) }:
158 return <-result
159 case <-b.done:
160 return len(b.cacheState.aDecorators)
161 }
162 }
163
164 // NumOfPrependers returns current number of prepend decorators
165 func (b *Bar) NumOfPrependers() int {
166 result := make(chan int, 1)
167 select {
168 case b.operateState <- func(s *bState) { result <- len(s.pDecorators) }:
169 return <-result
170 case <-b.done:
171 return len(b.cacheState.pDecorators)
172 }
173 }
174
175 // ID returs id of the bar
176 func (b *Bar) ID() int {
177 result := make(chan int, 1)
178 select {
179 case b.operateState <- func(s *bState) { result <- s.id }:
180 return <-result
181 case <-b.done:
182 return b.cacheState.id
183 }
184 }
185
186 // Current returns bar's current number, in other words sum of all increments.
187 func (b *Bar) Current() int64 {
188 result := make(chan int64, 1)
189 select {
190 case b.operateState <- func(s *bState) { result <- s.current }:
191 return <-result
192 case <-b.done:
193 return b.cacheState.current
194 }
195 }
196
197 // Total returns bar's total number.
198 func (b *Bar) Total() int64 {
199 result := make(chan int64, 1)
200 select {
201 case b.operateState <- func(s *bState) { result <- s.total }:
202 return <-result
203 case <-b.done:
204 return b.cacheState.total
205 }
206 }
207
208 // SetTotal sets total dynamically. The final param indicates the very last set,
209 // in other words you should set it to true when total is determined.
210 func (b *Bar) SetTotal(total int64, final bool) {
211 select {
212 case b.operateState <- func(s *bState) {
213 s.total = total
214 s.dynamic = !final
215 }:
216 case <-b.done:
217 }
135218 }
136219
137220 // IncrBy increments progress bar by amount of n
141224 }
142225 select {
143226 case b.operateState <- func(s *bState) {
144 if s.completed {
227 if s.toComplete {
145228 return
146229 }
147230 next := time.Now()
161244 }
162245 } else if s.current >= s.total {
163246 s.current = s.total
164 s.completed = true
165 }
166 }:
167 case <-b.done:
168 }
169 }
170
171 // ResumeFill fills bar with different r rune,
172 // from 0 to till amount of progress.
173 func (b *Bar) ResumeFill(r rune, till int64) {
174 if till < 1 {
175 return
176 }
177 select {
178 case b.operateState <- func(s *bState) { s.refill = &refill{r, till} }:
179 case <-b.done:
180 }
181 }
182
183 // NumOfAppenders returns current number of append decorators
184 func (b *Bar) NumOfAppenders() int {
185 result := make(chan int, 1)
186 select {
187 case b.operateState <- func(s *bState) { result <- len(s.aDecorators) }:
188 return <-result
189 case <-b.done:
190 return len(b.cacheState.aDecorators)
191 }
192 }
193
194 // NumOfPrependers returns current number of prepend decorators
195 func (b *Bar) NumOfPrependers() int {
196 result := make(chan int, 1)
197 select {
198 case b.operateState <- func(s *bState) { result <- len(s.pDecorators) }:
199 return <-result
200 case <-b.done:
201 return len(b.cacheState.pDecorators)
202 }
203 }
204
205 // ID returs id of the bar
206 func (b *Bar) ID() int {
207 result := make(chan int, 1)
208 select {
209 case b.operateState <- func(s *bState) { result <- s.id }:
210 return <-result
211 case <-b.done:
212 return b.cacheState.id
213 }
214 }
215
216 // Current returns bar's current number, in other words sum of all increments.
217 func (b *Bar) Current() int64 {
218 result := make(chan int64, 1)
219 select {
220 case b.operateState <- func(s *bState) { result <- s.current }:
221 return <-result
222 case <-b.done:
223 return b.cacheState.current
224 }
225 }
226
227 // Total returns bar's total number.
228 func (b *Bar) Total() int64 {
229 result := make(chan int64, 1)
230 select {
231 case b.operateState <- func(s *bState) { result <- s.total }:
232 return <-result
233 case <-b.done:
234 return b.cacheState.total
235 }
236 }
237
238 // SetTotal sets total dynamically. The final param indicates the very last set,
239 // in other words you should set it to true when total is determined.
240 func (b *Bar) SetTotal(total int64, final bool) {
241 select {
242 case b.operateState <- func(s *bState) {
243 s.total = total
244 s.dynamic = !final
247 s.toComplete = true
248 }
245249 }:
246250 case <-b.done:
247251 }
251255 func (b *Bar) Completed() bool {
252256 result := make(chan bool, 1)
253257 select {
254 case b.operateState <- func(s *bState) { result <- s.completed }:
255 return <-result
256 case <-b.done:
257 return b.cacheState.completed
258 }
259 }
260
261 // Complete stops bar's progress tracking, but doesn't remove the bar from rendering queue.
262 // If you need to remove, invoke Progress.RemoveBar(*Bar) instead.
263 func (b *Bar) Complete() {
264 b.askToComplete(false)
265 }
266
267 func (b *Bar) askToComplete(toRemove bool) bool {
268 result := make(chan bool, 1)
269 select {
270 case b.operateState <- func(s *bState) {
271 s.removed = toRemove
272 s.completed = true
273 result <- true
274 }:
275 return <-result
276 case <-b.done:
277 return false
278 }
279 }
280
281 func (b *Bar) serve(s *bState, cancel <-chan struct{}) {
258 case b.operateState <- func(s *bState) { result <- s.toComplete }:
259 return <-result
260 case <-b.done:
261 return b.cacheState.toComplete
262 }
263 }
264
265 func (b *Bar) serve(wg *sync.WaitGroup, s *bState, cancel <-chan struct{}) {
266 defer wg.Done()
282267 for {
283268 select {
284269 case op := <-b.operateState:
285270 op(s)
286271 case <-cancel:
287 s.completed = true
272 s.toComplete = true
288273 cancel = nil
289274 case <-b.shutdown:
290275 b.cacheState = s
294279 }
295280 }
296281
297 func (b *Bar) render(tw int, pSyncer, aSyncer *widthSyncer) <-chan *renderedReader {
298 ch := make(chan *renderedReader, 1)
282 func (b *Bar) render(tw int, pSyncer, aSyncer *widthSyncer) <-chan *renderedState {
283 ch := make(chan *renderedState, 1)
299284
300285 go func() {
301286 select {
307292 s.panicMsg = fmt.Sprintf("b#%02d panic: %v\n", s.id, p)
308293 s.pDecorators = nil
309294 s.aDecorators = nil
310 s.completed = true
295 s.toComplete = true
311296 r = strings.NewReader(s.panicMsg)
312297 }
313 ch <- &renderedReader{r, s.completed, s.removed}
298 ch <- &renderedState{b, r, s.toComplete}
314299 }()
315 s.draw(tw, pSyncer, aSyncer)
316 r = io.MultiReader(s.bufP, s.bufB, s.bufA)
300 r = s.draw(tw, pSyncer, aSyncer)
317301 }:
318302 case <-b.done:
319303 s := b.cacheState
321305 if s.panicMsg != "" {
322306 r = strings.NewReader(s.panicMsg)
323307 } else {
324 s.draw(tw, pSyncer, aSyncer)
325 r = io.MultiReader(s.bufP, s.bufB, s.bufA)
308 r = s.draw(tw, pSyncer, aSyncer)
326309 }
327 ch <- &renderedReader{r, s.completed, s.removed}
310 ch <- &renderedState{b, r, s.toComplete}
328311 }
329312 }()
330313
331314 return ch
315 }
316
317 func (s *bState) draw(termWidth int, pSyncer, aSyncer *widthSyncer) io.Reader {
318 if termWidth <= 0 {
319 termWidth = s.width
320 }
321
322 stat := newStatistics(s)
323
324 // render prepend functions to the left of the bar
325 s.bufP.Reset()
326 for i, f := range s.pDecorators {
327 s.bufP.WriteString(f(stat, pSyncer.Accumulator[i], pSyncer.Distributor[i]))
328 }
329
330 if !s.trimLeftSpace {
331 s.bufP.WriteByte(' ')
332 }
333
334 // render append functions to the right of the bar
335 s.bufA.Reset()
336 if !s.trimRightSpace {
337 s.bufA.WriteByte(' ')
338 }
339
340 for i, f := range s.aDecorators {
341 s.bufA.WriteString(f(stat, aSyncer.Accumulator[i], aSyncer.Distributor[i]))
342 }
343
344 prependCount := utf8.RuneCount(s.bufP.Bytes())
345 appendCount := utf8.RuneCount(s.bufA.Bytes())
346
347 if termWidth > s.width {
348 s.fillBar(s.width)
349 } else {
350 s.fillBar(termWidth - prependCount - appendCount)
351 }
352 barCount := utf8.RuneCount(s.bufB.Bytes())
353 totalCount := prependCount + barCount + appendCount
354 if totalCount > termWidth {
355 s.fillBar(termWidth - prependCount - appendCount)
356 }
357 s.bufA.WriteByte('\n')
358
359 return io.MultiReader(s.bufP, s.bufB, s.bufA)
332360 }
333361
334362 func (s *bState) updateTimePerItemEstimate(amount int, now, next time.Time) {
336364 lastItemEstimate := float64(lastBlockTime) / float64(amount)
337365 s.timePerItem = time.Duration((s.etaAlpha * lastItemEstimate) + (1-s.etaAlpha)*float64(s.timePerItem))
338366 s.blockStartTime = next
339 }
340
341 func (s *bState) draw(termWidth int, pSyncer, aSyncer *widthSyncer) {
342 if termWidth <= 0 {
343 termWidth = s.width
344 }
345
346 stat := newStatistics(s)
347
348 // render prepend functions to the left of the bar
349 s.bufP.Reset()
350 for i, f := range s.pDecorators {
351 s.bufP.WriteString(f(stat, pSyncer.Accumulator[i], pSyncer.Distributor[i]))
352 }
353
354 if !s.trimLeftSpace {
355 s.bufP.WriteByte(' ')
356 }
357
358 // render append functions to the right of the bar
359 s.bufA.Reset()
360 if !s.trimRightSpace {
361 s.bufA.WriteByte(' ')
362 }
363
364 for i, f := range s.aDecorators {
365 s.bufA.WriteString(f(stat, aSyncer.Accumulator[i], aSyncer.Distributor[i]))
366 }
367
368 prependCount := utf8.RuneCount(s.bufP.Bytes())
369 appendCount := utf8.RuneCount(s.bufA.Bytes())
370
371 if termWidth > s.width {
372 s.fillBar(s.width)
373 } else {
374 s.fillBar(termWidth - prependCount - appendCount)
375 }
376 barCount := utf8.RuneCount(s.bufB.Bytes())
377 totalCount := prependCount + barCount + appendCount
378 if totalCount > termWidth {
379 s.fillBar(termWidth - prependCount - appendCount)
380 }
381 s.bufA.WriteByte('\n')
382367 }
383368
384369 func (s *bState) fillBar(width int) {
427412 func newStatistics(s *bState) *decor.Statistics {
428413 return &decor.Statistics{
429414 ID: s.id,
430 Completed: s.completed,
431 Removed: s.removed,
415 Completed: s.toComplete,
432416 Total: s.total,
433417 Current: s.current,
434418 StartTime: s.startTime,
7474 }
7575 }
7676
77 // BarRemoveOnComplete is a flag, which tells whether the intention is to remove the bar after completion.
78 func BarRemoveOnComplete() BarOption {
79 return func(s *bState) {
80 s.removeOnComplete = true
81 }
82 }
83
7784 func barWidth(w int) BarOption {
7885 return func(s *bState) {
7986 s.width = w
3030 type Statistics struct {
3131 ID int
3232 Completed bool
33 Removed bool
3433 Total int64
3534 Current int64
3635 StartTime time.Time
2020
2121 // Progress represents the container that renders Progress bars
2222 type Progress struct {
23 wg *sync.WaitGroup
24 uwg *sync.WaitGroup
2325 operateState chan func(*pState)
2426 done chan struct{}
2527 }
2729 type (
2830 // progress state, which may contain several bars
2931 pState struct {
30 bHeap *priorityQueue
31 heapUpdated bool
32 zeroWait bool
33 idCounter int
34 width int
35 format string
36 rr time.Duration
37 cw *cwriter.Writer
38 ticker *time.Ticker
32 bHeap *priorityQueue
33 shutdownPending []*Bar
34 heapUpdated bool
35 zeroWait bool
36 idCounter int
37 width int
38 format string
39 rr time.Duration
40 cw *cwriter.Writer
41 ticker *time.Ticker
3942
4043 // following are provided by user
4144 uwg *sync.WaitGroup
4750 // Public for easy testing
4851 Accumulator []chan int
4952 Distributor []chan int
50 }
51 barRendering struct {
52 bar *Bar
53 ready <-chan *renderedReader
5453 }
5554 )
5655
7372 }
7473
7574 p := &Progress{
75 uwg: s.uwg,
76 wg: new(sync.WaitGroup),
7677 operateState: make(chan func(*pState)),
7778 done: make(chan struct{}),
7879 }
8283
8384 // AddBar creates a new progress bar and adds to the container.
8485 func (p *Progress) AddBar(total int64, options ...BarOption) *Bar {
86 p.wg.Add(1)
8587 result := make(chan *Bar, 1)
8688 select {
8789 case p.operateState <- func(s *pState) {
8890 options = append(options, barWidth(s.width), barFormat(s.format))
89 b := newBar(s.idCounter, total, s.cancel, options...)
91 b := newBar(p.wg, s.idCounter, total, s.cancel, options...)
9092 heap.Push(s.bHeap, b)
9193 s.heapUpdated = true
9294 s.idCounter++
99101 }
100102 }
101103
102 // RemoveBar removes the bar at next render cycle
103 func (p *Progress) RemoveBar(b *Bar) bool {
104 result := b.askToComplete(true)
105 <-b.done
106 return result
104 // Abort is only effective while bar progress is running,
105 // it means remove bar now without waiting for its completion.
106 // If bar is already completed, there is nothing to abort.
107 // If you need to remove bar after completion, use BarRemoveOnComplete BarOption.
108 func (p *Progress) Abort(b *Bar) {
109 select {
110 case p.operateState <- func(s *pState) {
111 s.heapUpdated = heap.Remove(s.bHeap, b.index) != nil
112 s.shutdownPending = append(s.shutdownPending, b)
113 }:
114 case <-p.done:
115 }
107116 }
108117
109118 // UpdateBarPriority provides a way to change bar's order position.
130139 // It's optional to call, in other words if you don't call Progress.Wait(),
131140 // it's not guaranteed that all bars will be flushed completely to the underlying io.Writer.
132141 func (p *Progress) Wait() {
133 if p.BarCount() == 0 {
134 select {
135 case p.operateState <- func(s *pState) { s.zeroWait = true }:
136 case <-p.done:
137 }
138 return
139 }
140 <-p.done
142 if p.uwg != nil {
143 p.uwg.Wait()
144 }
145
146 p.wg.Wait()
147
148 select {
149 case p.operateState <- func(s *pState) { s.zeroWait = true }:
150 <-p.done
151 case <-p.done:
152 }
141153 }
142154
143155 func newWidthSyncer(timeout <-chan struct{}, numBars, numColumn int) *widthSyncer {
185197 close(timeout)
186198 })
187199
188 for _, br := range s.renderByPriority(tw, pSyncer, aSyncer) {
189 r := <-br.ready
190 if !r.toRemove {
191 _, err = s.cw.ReadFrom(r)
192 } else {
193 s.heapUpdated = heap.Remove(s.bHeap, br.bar.index) != nil
194 }
195 if !br.bar.completed && r.toComplete {
196 br.bar.completed = true
197 defer close(br.bar.shutdown)
200 for _, ch := range s.renderByPriority(tw, pSyncer, aSyncer) {
201 rs := <-ch
202 _, err = s.cw.ReadFrom(rs.reader)
203 if !rs.bar.completed && rs.toComplete {
204 rs.bar.completed = true
205 if rs.bar.removeOnComplete {
206 s.heapUpdated = heap.Remove(s.bHeap, rs.bar.index) != nil
207 }
208 defer func() {
209 s.shutdownPending = append(s.shutdownPending, rs.bar)
210 }()
198211 }
199212 }
200213
205218 if e := s.cw.Flush(); err == nil {
206219 err = e
207220 }
221
222 for i := len(s.shutdownPending) - 1; i >= 0; i-- {
223 close(s.shutdownPending[i].shutdown)
224 s.shutdownPending = s.shutdownPending[:i]
225 }
208226 return
209227 }
210228
211 func (s *pState) renderByPriority(tw int, pSyncer, aSyncer *widthSyncer) []*barRendering {
212 slice := make([]*barRendering, 0, s.bHeap.Len())
229 func (s *pState) renderByPriority(tw int, pSyncer, aSyncer *widthSyncer) []<-chan *renderedState {
230 pp := make([]<-chan *renderedState, 0, s.bHeap.Len())
213231 for s.bHeap.Len() > 0 {
214232 b := heap.Pop(s.bHeap).(*Bar)
215233 defer heap.Push(s.bHeap, b)
216 slice = append(slice, &barRendering{
217 bar: b,
218 ready: b.render(tw, pSyncer, aSyncer),
219 })
220 }
221 return slice
222 }
223
224 func (s *pState) waitAll() {
225 for s.bHeap.Len() > 0 {
226 b := heap.Pop(s.bHeap).(*Bar)
227 <-b.done
228 }
229 if s.uwg != nil {
230 s.uwg.Wait()
231 }
234 pp = append(pp, b.render(tw, pSyncer, aSyncer))
235 }
236 return pp
232237 }
233238
234239 func calcMax(slice []int) int {
55 "fmt"
66 "os"
77 "os/signal"
8 "runtime"
98 "syscall"
109 "time"
1110
1817
1918 var numP, numA int
2019 var timer *time.Timer
21 var resumeTicker <-chan time.Time
22 resumeDelay := 300 * time.Millisecond
20 var tickerResumer <-chan time.Time
21 resumeDelay := s.rr * 2
2322
2423 for {
2524 select {
2625 case op := <-p.operateState:
2726 op(s)
2827 case <-s.ticker.C:
29 if s.bHeap.Len() == 0 {
30 if s.zeroWait {
31 close(p.done)
32 return
28 if s.zeroWait {
29 s.ticker.Stop()
30 signal.Stop(winch)
31 if s.shutdownNotifier != nil {
32 close(s.shutdownNotifier)
3333 }
34 runtime.Gosched()
35 break
34 close(p.done)
35 return
3636 }
3737 if s.heapUpdated {
3838 numP = s.bHeap.maxNumP()
4444 if err != nil {
4545 fmt.Fprintln(os.Stderr, err)
4646 }
47 var completed int
48 for i := 0; i < s.bHeap.Len(); i++ {
49 b := (*s.bHeap)[i]
50 if b.completed {
51 completed++
52 }
47 case <-winch:
48 if s.heapUpdated {
49 numP = s.bHeap.maxNumP()
50 numA = s.bHeap.maxNumA()
51 s.heapUpdated = false
5352 }
54 if completed == s.bHeap.Len() {
55 s.ticker.Stop()
56 signal.Stop(winch)
57 s.waitAll()
58 if s.shutdownNotifier != nil {
59 close(s.shutdownNotifier)
60 }
61 close(p.done)
62 return
63 }
64 case <-winch:
6553 tw, _, _ := cwriter.TermSize()
6654 err := s.writeAndFlush(tw-tw/8, numP, numA)
6755 if err != nil {
7260 }
7361 s.ticker.Stop()
7462 timer = time.NewTimer(resumeDelay)
75 resumeTicker = timer.C
76 case <-resumeTicker:
63 tickerResumer = timer.C
64 case <-tickerResumer:
7765 s.ticker = time.NewTicker(s.rr)
78 resumeTicker = nil
66 tickerResumer = nil
67 timer = nil
7968 }
8069 }
8170 }
44 import (
55 "fmt"
66 "os"
7 "runtime"
87
98 "github.com/vbauerster/mpb/cwriter"
109 )
1615 case op := <-p.operateState:
1716 op(s)
1817 case <-s.ticker.C:
19 if s.bHeap.Len() == 0 {
20 if s.zeroWait {
21 close(p.done)
22 return
18 if s.zeroWait {
19 s.ticker.Stop()
20 if s.shutdownNotifier != nil {
21 close(s.shutdownNotifier)
2322 }
24 runtime.Gosched()
25 break
23 close(p.done)
24 return
2625 }
2726 if s.heapUpdated {
2827 numP = s.bHeap.maxNumP()
3433 if err != nil {
3534 fmt.Fprintln(os.Stderr, err)
3635 }
37 var completed int
38 for i := 0; i < s.bHeap.Len(); i++ {
39 b := (*s.bHeap)[i]
40 if b.completed {
41 completed++
42 }
43 }
44 if completed == s.bHeap.Len() {
45 s.ticker.Stop()
46 s.waitAll()
47 if s.shutdownNotifier != nil {
48 close(s.shutdownNotifier)
49 }
50 close(p.done)
51 return
52 }
5336 }
5437 }
5538 }