Codebase list golang-github-vbauerster-mpb / 07763ad
close flushed Vladimir Bauer 9 years ago
2 changed file(s) with 187 addition(s) and 277 deletion(s). Raw diff Collapse all Expand all
+171
-214
bar.go less more
00 package mpb
11
22 import (
3 "fmt"
34 "io"
45 "math"
56 "sync"
1516 rRight
1617 )
1718
18 type barFmtRunes [numFmtRunes]rune
19 type barFmtBytes [numFmtRunes][]byte
19 const (
20 formatLen = 5
21 etaAlpha = 0.25
22 )
23
24 type barFmtRunes [formatLen]rune
25 type barFmtBytes [formatLen][]byte
2026
2127 // Bar represents a progress Bar
2228 type Bar struct {
23 stateCh chan state
2429 incrCh chan incrReq
25 flushedCh chan struct{}
2630 completeReqCh chan struct{}
27 removeReqCh chan struct{}
2831 done chan struct{}
2932 inProgress chan struct{}
30 cancel <-chan struct{}
33 ops chan func(*state)
3134
3235 // following are used after (*Bar.done) is closed
3336 width int
4043 ID int
4144 Completed bool
4245 Aborted bool
43 Total int64
44 Current int64
46 Total int
47 Current int
4548 StartTime time.Time
4649 TimeElapsed time.Duration
4750 TimePerItemEstimate time.Duration
4851 }
4952
5053 // Refil is a struct for b.IncrWithReFill
51 type Refill struct {
52 Char rune
53 till int64
54 type refill struct {
55 char rune
56 till int
5457 }
5558
5659 // Eta returns exponential-weighted-moving-average ETA estimator
6164 type (
6265 incrReq struct {
6366 amount int64
64 refill *Refill
67 refill *refill
6568 }
6669 state struct {
6770 id int
6871 width int
6972 format barFmtRunes
7073 etaAlpha float64
71 total int64
72 current int64
74 total int
75 current int
7376 trimLeftSpace bool
7477 trimRightSpace bool
7578 completed bool
7679 aborted bool
7780 startTime time.Time
7881 timeElapsed time.Duration
82 blockStartTime time.Time
7983 timePerItem time.Duration
8084 appendFuncs []DecoratorFunc
8185 prependFuncs []DecoratorFunc
8286 simpleSpinner func() byte
83 refill *Refill
87 refill *refill
88 flushed chan struct{}
8489 }
8590 )
8691
87 func newBar(total int64, wg *sync.WaitGroup, cancel <-chan struct{}, options ...BarOption) *Bar {
92 func newBar(total int, wg *sync.WaitGroup, cancel <-chan struct{}, options ...BarOption) *Bar {
93 s := state{
94 total: total,
95 etaAlpha: etaAlpha,
96 }
97
98 if total <= 0 {
99 s.simpleSpinner = getSpinner()
100 }
101
102 for _, opt := range options {
103 opt(&s)
104 }
105
88106 b := &Bar{
89 // width: width,
90 stateCh: make(chan state),
91107 incrCh: make(chan incrReq),
92 flushedCh: make(chan struct{}),
93 removeReqCh: make(chan struct{}),
94108 completeReqCh: make(chan struct{}),
95109 done: make(chan struct{}),
96110 inProgress: make(chan struct{}),
97 }
98
99 s := state{
100 total: total,
101 etaAlpha: 0.25,
102 }
103
104 if total <= 0 {
105 s.simpleSpinner = getSpinner()
106 }
107
108 for _, opt := range options {
109 opt(&s)
110 }
111
111 ops: make(chan func(*state)),
112 }
112113 b.width = s.width
113114
114115 go b.server(s, wg, cancel)
115116 return b
116117 }
117118
118 // SetWidth overrides width of individual bar
119 func (b *Bar) SetWidth(n int) *Bar {
120 if n < 2 {
121 return b
122 }
123 b.updateState(func(s *state) {
124 s.width = n
125 })
126 return b
127 }
128
129 // TrimLeftSpace removes space befor LeftEnd charater
130 func (b *Bar) TrimLeftSpace() *Bar {
131 b.updateState(func(s *state) {
132 s.trimLeftSpace = true
133 })
134 return b
135 }
136
137 // TrimRightSpace removes space after RightEnd charater
138 func (b *Bar) TrimRightSpace() *Bar {
139 b.updateState(func(s *state) {
140 s.trimRightSpace = true
141 })
142 return b
143 }
144
145 // Format overrides format of individual bar
146 func (b *Bar) Format(format string) *Bar {
147 if utf8.RuneCountInString(format) != numFmtRunes {
148 return b
149 }
150 b.updateState(func(s *state) {
151 s.updateFormat(format)
152 })
153 return b
154 }
155
156 // SetEtaAlpha sets alfa for exponential-weighted-moving-average ETA estimator
157 // Defaults to 0.25
158 // Normally you shouldn't touch this
159 func (b *Bar) SetEtaAlpha(a float64) *Bar {
160 b.updateState(func(s *state) {
161 s.etaAlpha = a
162 })
163 return b
164 }
165
166 // PrependFunc prepends DecoratorFunc
167 func (b *Bar) PrependFunc(f DecoratorFunc) *Bar {
168 b.updateState(func(s *state) {
169 s.prependFuncs = append(s.prependFuncs, f)
170 })
171 return b
172 }
173
174 // AppendFunc appends DecoratorFunc
175 func (b *Bar) AppendFunc(f DecoratorFunc) *Bar {
176 b.updateState(func(s *state) {
177 s.appendFuncs = append(s.appendFuncs, f)
178 })
179 return b
180 }
181
182119 // RemoveAllPrependers removes all prepend functions
183120 func (b *Bar) RemoveAllPrependers() {
184 b.updateState(func(s *state) {
121 select {
122 case b.ops <- func(s *state) {
185123 s.prependFuncs = nil
186 })
124 }:
125 case <-b.done:
126 return
127 }
187128 }
188129
189130 // RemoveAllAppenders removes all append functions
190131 func (b *Bar) RemoveAllAppenders() {
191 b.updateState(func(s *state) {
132 select {
133 case b.ops <- func(s *state) {
192134 s.appendFuncs = nil
193 })
135 }:
136 case <-b.done:
137 return
138 }
194139 }
195140
196141 // ProxyReader wrapper for io operations, like io.Copy
200145
201146 // Incr increments progress bar
202147 func (b *Bar) Incr(n int) {
203 b.IncrWithReFill(n, nil)
204 }
205
206 // IncrWithReFill increments pb with different fill character
207 func (b *Bar) IncrWithReFill(n int, refill *Refill) {
208148 if n < 1 {
209149 return
210150 }
211151 select {
212 case b.incrCh <- incrReq{int64(n), refill}:
152 case b.ops <- func(s *state) {
153 defer func() {
154 if s.completed {
155 b.Complete()
156 }
157 s.blockStartTime = time.Now()
158 }()
159 if s.current == 0 {
160 s.startTime = time.Now()
161 s.blockStartTime = s.startTime
162 }
163 sum := s.current + n
164 s.timeElapsed = time.Since(s.startTime)
165 s.updateTimePerItemEstimate(n)
166 if s.total > 0 && sum >= s.total {
167 s.current = s.total
168 s.completed = true
169 return
170 }
171 s.current = sum
172 }:
173 case <-b.done:
174 return
175 }
176 }
177
178 // ResumeFill fills bar with different r rune,
179 // from 0 to till amount of progress.
180 func (b *Bar) ResumeFill(r rune, till int) {
181 if till < 1 {
182 return
183 }
184 select {
185 case b.ops <- func(s *state) {
186 s.refill = &refill{r, till}
187 }:
213188 case <-b.done:
214189 return
215190 }
216191 }
217192
218193 func (b *Bar) NumOfAppenders() int {
219 return len(b.getState().appendFuncs)
194 result := make(chan int, 1)
195 select {
196 case b.ops <- func(s *state) { result <- len(s.appendFuncs) }:
197 return <-result
198 case <-b.done:
199 return len(b.state.appendFuncs)
200 }
220201 }
221202
222203 func (b *Bar) NumOfPrependers() int {
223 return len(b.getState().prependFuncs)
224 }
225
226 // GetStatistics returs *Statistics, which contains information like
204 result := make(chan int, 1)
205 select {
206 case b.ops <- func(s *state) { result <- len(s.prependFuncs) }:
207 return <-result
208 case <-b.done:
209 return len(b.state.prependFuncs)
210 }
211 }
212
213 // Statistics returs *Statistics, which contains information like
227214 // Tottal, Current, TimeElapsed and TimePerItemEstimate
228 func (b *Bar) GetStatistics() *Statistics {
229 s := b.getState()
230 return newStatistics(&s)
215 func (b *Bar) Statistics() *Statistics {
216 result := make(chan *Statistics, 1)
217 select {
218 case b.ops <- func(s *state) { result <- newStatistics(s) }:
219 return <-result
220 case <-b.done:
221 return newStatistics(&b.state)
222 }
231223 }
232224
233225 // GetID returs id of the bar
234226 func (b *Bar) GetID() int {
235 return b.getState().id
227 result := make(chan int, 1)
228 select {
229 case b.ops <- func(s *state) { result <- s.id }:
230 return <-result
231 case <-b.done:
232 return b.state.id
233 }
236234 }
237235
238236 // InProgress returns true, while progress is running.
239237 // Can be used as condition in for loop
240238 func (b *Bar) InProgress() bool {
241239 select {
242 case <-b.inProgress:
240 case <-b.completeReqCh:
243241 return false
244242 default:
245243 return true
252250 // implicitly, upon p.Stop() call.
253251 func (b *Bar) Complete() {
254252 select {
255 case b.completeReqCh <- struct{}{}:
256 case <-b.done:
257 return
258 }
259 }
260
261 // Completed: deprecated! Use b.Complete()
262 func (b *Bar) Completed() {
263 b.Complete()
264 }
265
266 func (b *Bar) flushed() {
267 select {
268 case b.flushedCh <- struct{}{}:
269 case <-b.done:
270 return
271 }
272 }
273
274 func (b *Bar) remove() {
275 select {
276 case b.removeReqCh <- struct{}{}:
277 case <-b.done:
278 return
279 }
280 }
281
282 func (b *Bar) getState() state {
283 select {
284 case s := <-b.stateCh:
285 return s
286 case <-b.done:
287 return b.state
288 }
289 }
290
291 func (b *Bar) updateState(cb func(*state)) {
292 s := b.getState()
293 cb(&s)
294 select {
295 case b.stateCh <- s:
296 case <-b.done:
297 return
298 }
299 }
253 case <-b.completeReqCh:
254 return
255 default:
256 close(b.completeReqCh)
257 }
258 }
259
260 // func (b *Bar) getState() state {
261 // result := make(chan state, 1)
262 // select {
263 // case b.ops <- func(s *state) { result <- *s }:
264 // return <-result
265 // case <-b.done:
266 // return b.state
267 // }
268 // }
300269
301270 func (b *Bar) server(s state, wg *sync.WaitGroup, cancel <-chan struct{}) {
302 var incrStartTime time.Time
303271
304272 defer func() {
305273 b.state = s
274 close(b.done)
275 <-s.flushed
276 // fmt.Fprintf(os.Stderr, "Bar:%d flushed\n", s.id)
306277 wg.Done()
307 close(b.done)
308278 }()
309279
310280 for {
311281 select {
312 case b.stateCh <- s:
313 case s = <-b.stateCh:
314 case r := <-b.incrCh:
315 if s.current == 0 {
316 incrStartTime = time.Now()
317 s.startTime = incrStartTime
318 }
319 n := s.current + r.amount
320 if s.total > 0 && n > s.total {
321 s.current = s.total
322 s.completed = true
323 break // break out of select
324 }
325 s.timeElapsed = time.Since(s.startTime)
326 s.updateTimePerItemEstimate(incrStartTime, r.amount)
327 if n == s.total {
328 s.completed = true
329 close(b.inProgress)
330 }
331 s.current = n
332 if r.refill != nil {
333 r.refill.till = n
334 s.refill = r.refill
335 }
336 incrStartTime = time.Now()
337 case <-b.flushedCh:
338 if s.completed {
339 return
340 }
282 case op := <-b.ops:
283 op(&s)
341284 case <-b.completeReqCh:
342285 s.completed = true
343286 return
344 case <-b.removeReqCh:
345 return
346 case <-b.cancel:
287 case <-cancel:
347288 s.aborted = true
348 close(b.inProgress)
349 return
350 }
351 }
352 }
353
354 func (b *Bar) render(rFn func(chan []byte), termWidth int, prependWs, appendWs *widthSync) <-chan []byte {
289 cancel = nil
290 b.Complete()
291 }
292 }
293 }
294
295 func (b *Bar) render(tw int, flushed chan struct{}, prependWs, appendWs *widthSync) <-chan []byte {
355296 ch := make(chan []byte)
356297
357298 go func() {
358 defer rFn(ch)
359 s := b.getState()
360 buf := draw(&s, termWidth, prependWs, appendWs)
299 defer func() {
300 // recovering if external decorators panic
301 if p := recover(); p != nil {
302 ch <- []byte(fmt.Sprintln(p))
303 }
304 close(ch)
305 }()
306 var st state
307 result := make(chan state, 1)
308 select {
309 case b.ops <- func(s *state) {
310 s.flushed = flushed
311 result <- *s
312 }:
313 st = <-result
314 case <-b.done:
315 st = b.state
316 }
317 buf := draw(&st, tw, prependWs, appendWs)
361318 buf = append(buf, '\n')
362319 ch <- buf
363320 }()
372329 }
373330 }
374331
375 func (s *state) updateTimePerItemEstimate(incrStartTime time.Time, amount int64) {
376 lastBlockTime := time.Since(incrStartTime) // shorthand for time.Now().Sub(t)
332 func (s *state) updateTimePerItemEstimate(amount int) {
333 lastBlockTime := time.Since(s.blockStartTime) // shorthand for time.Now().Sub(t)
377334 lastItemEstimate := float64(lastBlockTime) / float64(amount)
378335 s.timePerItem = time.Duration((s.etaAlpha * lastItemEstimate) + (1-s.etaAlpha)*float64(s.timePerItem))
379336 }
446403 return buf
447404 }
448405
449 func fillBar(total, current int64, width int, fmtBytes barFmtBytes, rf *Refill) []byte {
406 func fillBar(total, current, width int, fmtBytes barFmtBytes, rf *refill) []byte {
450407 if width < 2 || total <= 0 {
451408 return []byte{}
452409 }
461418
462419 if rf != nil {
463420 till := percentage(total, rf.till, barWidth)
464 rbytes := make([]byte, utf8.RuneLen(rf.Char))
465 utf8.EncodeRune(rbytes, rf.Char)
421 rbytes := make([]byte, utf8.RuneLen(rf.char))
422 utf8.EncodeRune(rbytes, rf.char)
466423 // append refill rune
467424 for i := 0; i < till; i++ {
468425 buf = append(buf, rbytes...)
514471 return fmtBytes
515472 }
516473
517 func percentage(total, current int64, ratio int) int {
474 func percentage(total, current, ratio int) int {
518475 if total == 0 || current > total {
519476 return 0
520477 }
00 package mpb
11
22 import (
3 "fmt"
4 "io"
53 "os"
64 "sync"
75 "time"
8 "unicode/utf8"
96
107 "github.com/vbauerster/mpb/cwriter"
118 )
4239 pwidth = 80
4340 // default format
4441 pformat = "[=>-]"
45 // number of format runes for bar
46 numFmtRunes = 5
4742 )
4843
4944 // Progress represents the container that renders Progress bars
8681 return p
8782 }
8883
89 // WithCancel Deprecated, use mpb.WithCancel
90 func (p *Progress) WithCancel(ch <-chan struct{}) *Progress {
91 if ch == nil {
92 panic("nil cancel channel")
93 }
94 return updateConf(p, func(c *pConf) {
95 c.cancel = ch
96 })
97 }
98
99 // SetWidth Deprecated, use mpb.WithWidth
100 func (p *Progress) SetWidth(width int) *Progress {
101 if width < 2 {
102 return p
103 }
104 return updateConf(p, func(c *pConf) {
105 c.width = width
106 })
107 }
108
109 // SetOut Deprecated, use mpb.Output
110 func (p *Progress) SetOut(w io.Writer) *Progress {
111 if w == nil {
112 return p
113 }
114 return updateConf(p, func(c *pConf) {
115 c.cw.Flush()
116 c.cw = cwriter.New(w)
117 })
118 }
119
12084 // AddBar creates a new progress bar and adds to the container.
121 func (p *Progress) AddBar(total int64, options ...BarOption) *Bar {
85 func (p *Progress) AddBar(total int, options ...BarOption) *Bar {
12286 result := make(chan *Bar, 1)
12387 op := func(c *pConf) {
12488 options = append(options, barWidth(c.width), barFormat(c.format))
142106 var ok bool
143107 for i, bar := range c.bars {
144108 if bar == b {
109 // bar.remove()
110 bar.Complete()
145111 c.bars = append(c.bars[:i], c.bars[i+1:]...)
146 bar.remove()
147112 ok = true
148113 break
149114 }
172137 }
173138 }
174139
175 // Format Deprecated, use mpb.WithFormat
176 func (p *Progress) Format(format string) *Progress {
177 if utf8.RuneCountInString(format) != numFmtRunes {
178 return p
179 }
180 return updateConf(p, func(c *pConf) {
181 c.format = format
182 })
183 }
184
185140 // Stop shutdowns Progress' goroutine.
186141 // Should be called only after each bar's work done, i.e. bar has reached its
187142 // 100 %. It is NOT for cancelation. Use WithContext or WithCancel for
194149 // complete Total unknown bars
195150 p.ops <- func(c *pConf) {
196151 for _, b := range c.bars {
197 s := b.getState()
198 if !s.completed && !s.aborted {
152 s := b.Statistics()
153 if !s.Completed && !s.Aborted {
199154 b.Complete()
200155 }
201156 }
220175 close(p.done)
221176 }()
222177
223 recoverFn := func(ch chan []byte) {
224 if p := recover(); p != nil {
225 ch <- []byte(fmt.Sprintln(p))
226 }
227 close(ch)
228 }
178 // recoverFn := func(ch chan []byte) {
179 // if p := recover(); p != nil {
180 // ch <- []byte(fmt.Sprintln(p))
181 // }
182 // close(ch)
183 // }
229184
230185 for {
231186 select {
250205 prependWs := newWidthSync(quitWidthSyncCh, numBars, b0.NumOfPrependers())
251206 appendWs := newWidthSync(quitWidthSyncCh, numBars, b0.NumOfAppenders())
252207
253 width, _, _ := cwriter.GetTermSize()
254
208 tw, _, _ := cwriter.GetTermSize()
209
210 flushed := make(chan struct{})
255211 sequence := make([]<-chan []byte, numBars)
256212 for i, b := range conf.bars {
257 sequence[i] = b.render(recoverFn, width, prependWs, appendWs)
213 sequence[i] = b.render(tw, flushed, prependWs, appendWs)
258214 }
259215
260216 ch := fanIn(sequence...)
264220 }
265221
266222 conf.cw.Flush()
267
268 for _, b := range conf.bars {
269 b.flushed()
270 }
223 close(flushed)
271224 case <-conf.cancel:
272225 conf.ticker.Stop()
273226 conf.cancel = nil