Codebase list golang-github-vbauerster-mpb / cf3dc7a
Improve complete after flush algorithm Vladimir Bauer 8 years ago
3 changed file(s) with 125 addition(s) and 81 deletion(s). Raw diff Collapse all Expand all
3535
3636 // following are used after b.done is receiveable
3737 cacheState state
38
39 once sync.Once
3840 }
3941
4042 type (
6365 prependFuncs []decor.DecoratorFunc
6466 refill *refill
6567 bufP, bufB, bufA *bytes.Buffer
68 panic string
69 }
70 writeBuf struct {
71 buf []byte
72 completeAfterFlush bool
6673 }
6774 )
6875
250257 // of process completion. If you don't call this method, it will be called
251258 // implicitly, upon p.Stop() call.
252259 func (b *Bar) Complete() {
253 select {
254 case <-b.quit:
255 default:
256 close(b.quit)
257 }
258 }
259
260 func (b *Bar) complete() {
261 select {
262 case b.ops <- func(s *state) {
263 if !s.completed {
264 b.Complete()
265 }
266 }:
267 case <-time.After(prr):
268 }
260 b.once.Do(b.shutdown)
261 }
262
263 func (b *Bar) shutdown() {
264 close(b.quit)
269265 }
270266
271267 func (b *Bar) server(s state, wg *sync.WaitGroup, cancel <-chan struct{}) {
284280 cancel = nil
285281 b.Complete()
286282 case <-b.quit:
287 s.completed = true
288283 return
289284 }
290285 }
291286 }
292287
293 func (b *Bar) render(tw int, prependWs, appendWs *widthSync) <-chan []byte {
294 ch := make(chan []byte, 1)
288 func (b *Bar) render(tw int, prependWs, appendWs *widthSync) <-chan *writeBuf {
289 ch := make(chan *writeBuf, 1)
295290
296291 go func() {
297 var st state
298 defer func() {
299 // recovering if external decorators panic
300 if p := recover(); p != nil {
301 ch <- []byte(fmt.Sprintf("bar%02d panic: %q\n", st.id, p))
292 select {
293 case b.ops <- func(s *state) {
294 defer func() {
295 // recovering if external decorators panic
296 if p := recover(); p != nil {
297 s.panic = fmt.Sprintf("b#%02d panic: %v\n", s.id, p)
298 s.prependFuncs = nil
299 s.appendFuncs = nil
300
301 ch <- &writeBuf{[]byte(s.panic), true}
302 }
303 close(ch)
304 }()
305 s.draw(tw, prependWs, appendWs)
306 ch <- &writeBuf{s.toBytes(), s.isFull()}
307 }:
308 case <-b.done:
309 s := b.cacheState
310 var buf []byte
311 if s.panic != "" {
312 buf = []byte(s.panic)
313 } else {
314 s.draw(tw, prependWs, appendWs)
315 buf = s.toBytes()
302316 }
317 ch <- &writeBuf{buf, true}
303318 close(ch)
304 }()
305 result := make(chan state, 1)
306 select {
307 case b.ops <- func(s *state) { result <- *s }:
308 st = <-result
309 if st.completed {
310 b.Complete()
311 }
312 case <-b.done:
313 st = b.cacheState
314 }
315 st.draw(tw, prependWs, appendWs)
316 buf := make([]byte, 0, st.bufP.Len()+st.bufB.Len()+st.bufA.Len())
317 buf = concatenateBlocks(buf, st.bufP.Bytes(), st.bufB.Bytes(), st.bufA.Bytes())
318 buf = append(buf, '\n')
319 ch <- buf
319 }
320320 }()
321321
322322 return ch
323 }
324
325 func (s *state) toBytes() []byte {
326 buf := make([]byte, 0, s.bufP.Len()+s.bufB.Len()+s.bufA.Len())
327 buf = concatenateBlocks(buf, s.bufP.Bytes(), s.bufB.Bytes(), s.bufA.Bytes())
328 return buf
323329 }
324330
325331 func (s *state) updateTimePerItemEstimate(amount int) {
326332 lastBlockTime := time.Since(s.blockStartTime) // shorthand for time.Now().Sub(t)
327333 lastItemEstimate := float64(lastBlockTime) / float64(amount)
328334 s.timePerItem = time.Duration((s.etaAlpha * lastItemEstimate) + (1-s.etaAlpha)*float64(s.timePerItem))
335 }
336
337 func (s *state) isFull() bool {
338 if !s.completed {
339 return false
340 }
341 bar := s.bufB.Bytes()
342 var r rune
343 var n int
344 for i := 0; len(bar) > 0; i++ {
345 r, n = utf8.DecodeLastRune(bar)
346 bar = bar[:len(bar)-n]
347 if i == 1 {
348 break
349 }
350 }
351 return r == s.format[rFill]
329352 }
330353
331354 func (s *state) draw(termWidth int, prependWs, appendWs *widthSync) {
365388 shrinkWidth := termWidth - prependCount - appendCount
366389 s.fillBar(shrinkWidth)
367390 }
391 s.bufA.WriteByte('\n')
368392 }
369393
370394 func (s *state) fillBar(width int) {
241241
242242 out := bytes.Split(removeLastRune(buf.Bytes()), []byte("\n"))
243243 gotPanic := out[len(out)-1]
244 if string(gotPanic) != fmt.Sprintf("bar02 panic: %q", wantPanic) {
244 wantPanic = fmt.Sprintf("b#%02d panic: %v", 2, wantPanic)
245
246 if string(gotPanic) != wantPanic {
245247 t.Errorf("Want: %q, got: %q\n", wantPanic, gotPanic)
246248 }
247249 }
00 package mpb
11
22 import (
3 "fmt"
34 "io"
45 "os"
56 "runtime"
7172 cw: cwriter.New(os.Stdout),
7273 rr: prr,
7374 ticker: time.NewTicker(prr),
75 cancel: make(chan struct{}),
7476 }
7577
7678 for _, opt := range options {
155157 case <-p.quit:
156158 return
157159 default:
158 // complete Total unknown bars
159 p.ops <- func(c *pConf) {
160 for _, b := range c.bars {
161 b.complete()
162 }
163 }
164160 // wait for all bars to quit
165161 p.wg.Wait()
166162 // request p.server to quit
180176
181177 // server monitors underlying channels and renders any progress bars
182178 func (p *Progress) server(conf pConf) {
183
184179 defer func() {
185180 if conf.shutdownNotifier != nil {
186181 close(conf.shutdownNotifier)
187182 }
188183 close(p.done)
189184 }()
185
186 numP, numA := -1, -1
190187
191188 for {
192189 select {
193190 case op := <-p.ops:
194191 op(&conf)
195192 case <-conf.ticker.C:
196 numBars := len(conf.bars)
197 if numBars == 0 {
193 if len(conf.bars) == 0 {
198194 runtime.Gosched()
199195 break
200196 }
201
202 if conf.beforeRender != nil {
203 conf.beforeRender(conf.bars)
204 }
205
206 wSyncTimeout := make(chan struct{})
207 time.AfterFunc(conf.rr, func() {
208 close(wSyncTimeout)
209 })
210
211197 b0 := conf.bars[0]
212 prependWs := newWidthSync(wSyncTimeout, numBars, b0.NumOfPrependers())
213 appendWs := newWidthSync(wSyncTimeout, numBars, b0.NumOfAppenders())
214
215 tw, _, _ := cwriter.GetTermSize()
216
217 sequence := make([]<-chan []byte, numBars)
218 for i, b := range conf.bars {
219 sequence[i] = b.render(tw, prependWs, appendWs)
220 }
221
222 for buf := range fanIn(sequence...) {
223 conf.cw.Write(buf)
224 }
225
226 for _, interceptor := range conf.interceptors {
227 interceptor(conf.cw)
228 }
229
230 conf.cw.Flush()
198 if numP == -1 {
199 numP = b0.NumOfPrependers()
200 }
201 if numA == -1 {
202 numA = b0.NumOfAppenders()
203 }
204 err := conf.writeAndFlush(numP, numA)
205 if err != nil {
206 fmt.Fprintln(os.Stderr, err)
207 }
231208 case <-conf.cancel:
232209 conf.ticker.Stop()
233210 conf.cancel = nil
277254 return ws
278255 }
279256
280 func fanIn(inputs ...<-chan []byte) <-chan []byte {
281 ch := make(chan []byte)
257 func (p *pConf) writeAndFlush(numP, numA int) (err error) {
258 if p.beforeRender != nil {
259 p.beforeRender(p.bars)
260 }
261
262 wSyncTimeout := make(chan struct{})
263 time.AfterFunc(p.rr, func() {
264 close(wSyncTimeout)
265 })
266
267 prependWs := newWidthSync(wSyncTimeout, len(p.bars), numP)
268 appendWs := newWidthSync(wSyncTimeout, len(p.bars), numA)
269
270 tw, _, _ := cwriter.TermSize()
271
272 sequence := make([]<-chan *writeBuf, len(p.bars))
273 for i, b := range p.bars {
274 sequence[i] = b.render(tw, prependWs, appendWs)
275 }
276
277 var i int
278 for b := range fanIn(sequence...) {
279 _, err = p.cw.Write(b.buf)
280 defer func(bar *Bar, complete bool) {
281 if complete {
282 bar.Complete()
283 }
284 }(p.bars[i], b.completeAfterFlush)
285 i++
286 }
287
288 for _, interceptor := range p.interceptors {
289 interceptor(p.cw)
290 }
291
292 if e := p.cw.Flush(); err == nil {
293 err = e
294 }
295 return
296 }
297
298 func fanIn(inputs ...<-chan *writeBuf) <-chan *writeBuf {
299 ch := make(chan *writeBuf)
282300
283301 go func() {
284302 defer close(ch)