diff --git a/bar.go b/bar.go index c1bffa6..c9f2227 100644 --- a/bar.go +++ b/bar.go @@ -36,6 +36,8 @@ // following are used after b.done is receiveable cacheState state + + once sync.Once } type ( @@ -64,6 +66,11 @@ prependFuncs []decor.DecoratorFunc refill *refill bufP, bufB, bufA *bytes.Buffer + panic string + } + writeBuf struct { + buf []byte + completeAfterFlush bool } ) @@ -251,22 +258,11 @@ // of process completion. If you don't call this method, it will be called // implicitly, upon p.Stop() call. func (b *Bar) Complete() { - select { - case <-b.quit: - default: - close(b.quit) - } -} - -func (b *Bar) complete() { - select { - case b.ops <- func(s *state) { - if !s.completed { - b.Complete() - } - }: - case <-time.After(prr): - } + b.once.Do(b.shutdown) +} + +func (b *Bar) shutdown() { + close(b.quit) } func (b *Bar) server(s state, wg *sync.WaitGroup, cancel <-chan struct{}) { @@ -285,48 +281,75 @@ cancel = nil b.Complete() case <-b.quit: - s.completed = true return } } } -func (b *Bar) render(tw int, prependWs, appendWs *widthSync) <-chan []byte { - ch := make(chan []byte, 1) +func (b *Bar) render(tw int, prependWs, appendWs *widthSync) <-chan *writeBuf { + ch := make(chan *writeBuf, 1) go func() { - var st state - defer func() { - // recovering if external decorators panic - if p := recover(); p != nil { - ch <- []byte(fmt.Sprintf("bar%02d panic: %q\n", st.id, p)) + select { + case b.ops <- func(s *state) { + defer func() { + // recovering if external decorators panic + if p := recover(); p != nil { + s.panic = fmt.Sprintf("b#%02d panic: %v\n", s.id, p) + s.prependFuncs = nil + s.appendFuncs = nil + + ch <- &writeBuf{[]byte(s.panic), true} + } + close(ch) + }() + s.draw(tw, prependWs, appendWs) + ch <- &writeBuf{s.toBytes(), s.isFull()} + }: + case <-b.done: + s := b.cacheState + var buf []byte + if s.panic != "" { + buf = []byte(s.panic) + } else { + s.draw(tw, prependWs, appendWs) + buf = s.toBytes() } + ch <- &writeBuf{buf, true} close(ch) - }() - result := make(chan state, 1) - select { - case b.ops <- func(s *state) { result <- *s }: - st = <-result - if st.completed { - b.Complete() - } - case <-b.done: - st = b.cacheState - } - st.draw(tw, prependWs, appendWs) - buf := make([]byte, 0, st.bufP.Len()+st.bufB.Len()+st.bufA.Len()) - buf = concatenateBlocks(buf, st.bufP.Bytes(), st.bufB.Bytes(), st.bufA.Bytes()) - buf = append(buf, '\n') - ch <- buf + } }() return ch +} + +func (s *state) toBytes() []byte { + buf := make([]byte, 0, s.bufP.Len()+s.bufB.Len()+s.bufA.Len()) + buf = concatenateBlocks(buf, s.bufP.Bytes(), s.bufB.Bytes(), s.bufA.Bytes()) + return buf } func (s *state) updateTimePerItemEstimate(amount int) { lastBlockTime := time.Since(s.blockStartTime) // shorthand for time.Now().Sub(t) lastItemEstimate := float64(lastBlockTime) / float64(amount) s.timePerItem = time.Duration((s.etaAlpha * lastItemEstimate) + (1-s.etaAlpha)*float64(s.timePerItem)) +} + +func (s *state) isFull() bool { + if !s.completed { + return false + } + bar := s.bufB.Bytes() + var r rune + var n int + for i := 0; len(bar) > 0; i++ { + r, n = utf8.DecodeLastRune(bar) + bar = bar[:len(bar)-n] + if i == 1 { + break + } + } + return r == s.format[rFill] } func (s *state) draw(termWidth int, prependWs, appendWs *widthSync) { @@ -366,6 +389,7 @@ shrinkWidth := termWidth - prependCount - appendCount s.fillBar(shrinkWidth) } + s.bufA.WriteByte('\n') } func (s *state) fillBar(width int) { diff --git a/bar_test.go b/bar_test.go index 48bc62f..f1eab47 100644 --- a/bar_test.go +++ b/bar_test.go @@ -242,7 +242,9 @@ out := bytes.Split(removeLastRune(buf.Bytes()), []byte("\n")) gotPanic := out[len(out)-1] - if string(gotPanic) != fmt.Sprintf("bar02 panic: %q", wantPanic) { + wantPanic = fmt.Sprintf("b#%02d panic: %v", 2, wantPanic) + + if string(gotPanic) != wantPanic { t.Errorf("Want: %q, got: %q\n", wantPanic, gotPanic) } } diff --git a/progress.go b/progress.go index 5d5a5e1..291305e 100644 --- a/progress.go +++ b/progress.go @@ -1,6 +1,7 @@ package mpb import ( + "fmt" "io" "os" "runtime" @@ -72,6 +73,7 @@ cw: cwriter.New(os.Stdout), rr: prr, ticker: time.NewTicker(prr), + cancel: make(chan struct{}), } for _, opt := range options { @@ -156,12 +158,6 @@ case <-p.quit: return default: - // complete Total unknown bars - p.ops <- func(c *pConf) { - for _, b := range c.bars { - b.complete() - } - } // wait for all bars to quit p.wg.Wait() // request p.server to quit @@ -181,54 +177,35 @@ // server monitors underlying channels and renders any progress bars func (p *Progress) server(conf pConf) { - defer func() { if conf.shutdownNotifier != nil { close(conf.shutdownNotifier) } close(p.done) }() + + numP, numA := -1, -1 for { select { case op := <-p.ops: op(&conf) case <-conf.ticker.C: - numBars := len(conf.bars) - if numBars == 0 { + if len(conf.bars) == 0 { runtime.Gosched() break } - - if conf.beforeRender != nil { - conf.beforeRender(conf.bars) - } - - wSyncTimeout := make(chan struct{}) - time.AfterFunc(conf.rr, func() { - close(wSyncTimeout) - }) - b0 := conf.bars[0] - prependWs := newWidthSync(wSyncTimeout, numBars, b0.NumOfPrependers()) - appendWs := newWidthSync(wSyncTimeout, numBars, b0.NumOfAppenders()) - - tw, _, _ := cwriter.GetTermSize() - - sequence := make([]<-chan []byte, numBars) - for i, b := range conf.bars { - sequence[i] = b.render(tw, prependWs, appendWs) - } - - for buf := range fanIn(sequence...) { - conf.cw.Write(buf) - } - - for _, interceptor := range conf.interceptors { - interceptor(conf.cw) - } - - conf.cw.Flush() + if numP == -1 { + numP = b0.NumOfPrependers() + } + if numA == -1 { + numA = b0.NumOfAppenders() + } + err := conf.writeAndFlush(numP, numA) + if err != nil { + fmt.Fprintln(os.Stderr, err) + } case <-conf.cancel: conf.ticker.Stop() conf.cancel = nil @@ -278,8 +255,49 @@ return ws } -func fanIn(inputs ...<-chan []byte) <-chan []byte { - ch := make(chan []byte) +func (p *pConf) writeAndFlush(numP, numA int) (err error) { + if p.beforeRender != nil { + p.beforeRender(p.bars) + } + + wSyncTimeout := make(chan struct{}) + time.AfterFunc(p.rr, func() { + close(wSyncTimeout) + }) + + prependWs := newWidthSync(wSyncTimeout, len(p.bars), numP) + appendWs := newWidthSync(wSyncTimeout, len(p.bars), numA) + + tw, _, _ := cwriter.TermSize() + + sequence := make([]<-chan *writeBuf, len(p.bars)) + for i, b := range p.bars { + sequence[i] = b.render(tw, prependWs, appendWs) + } + + var i int + for b := range fanIn(sequence...) { + _, err = p.cw.Write(b.buf) + defer func(bar *Bar, complete bool) { + if complete { + bar.Complete() + } + }(p.bars[i], b.completeAfterFlush) + i++ + } + + for _, interceptor := range p.interceptors { + interceptor(p.cw) + } + + if e := p.cw.Flush(); err == nil { + err = e + } + return +} + +func fanIn(inputs ...<-chan *writeBuf) <-chan *writeBuf { + ch := make(chan *writeBuf) go func() { defer close(ch)