diff --git a/bar.go b/bar.go index de6c89c..567a748 100644 --- a/bar.go +++ b/bar.go @@ -61,9 +61,10 @@ recoveredPanic interface{} } +type extFunc func(in io.Reader, tw int, st *decor.Statistics) (out io.Reader, lines int) + type bState struct { filler Filler - extender Filler id int width int total int64 @@ -71,7 +72,6 @@ trimSpace bool toComplete bool completeFlushed bool - noBufBOnComplete bool noPop bool aDecorators []decor.Decorator pDecorators []decor.Decorator @@ -80,7 +80,7 @@ shutdownListeners []decor.ShutdownListener averageAdjusters []decor.AverageAdjuster bufP, bufB, bufA *bytes.Buffer - bufE *bytes.Buffer + extender extFunc // priority overrides *Bar's priority, if set priority int @@ -93,16 +93,9 @@ } func newBar(container *Progress, bs *bState) *Bar { - - bs.bufP = bytes.NewBuffer(make([]byte, 0, bs.width)) - bs.bufB = bytes.NewBuffer(make([]byte, 0, bs.width)) - bs.bufA = bytes.NewBuffer(make([]byte, 0, bs.width)) - if bs.extender != nil { - bs.bufE = bytes.NewBuffer(make([]byte, 0, bs.width)) - } - logPrefix := fmt.Sprintf("%sbar#%02d ", container.dlogger.Prefix(), bs.id) ctx, cancel := context.WithCancel(container.ctx) + bar := &Bar{ container: container, priority: bs.priority, @@ -299,15 +292,6 @@ return <-b.completed case <-b.done: return true - } -} - -func (b *Bar) wSyncTable() [][]chan int { - select { - case b.operateState <- func(s *bState) { b.syncTableCh <- s.wSyncTable() }: - return <-b.syncTableCh - case <-b.done: - return b.cacheState.wSyncTable() } } @@ -347,27 +331,19 @@ } }() - frame := s.draw(tw) - - if s.extender != nil { - s.extender.Fill(s.bufE, tw, newStatistics(s)) - b.extendedLines = countLines(s.bufE.Bytes()) - frame = io.MultiReader(frame, s.bufE) - } + st := newStatistics(s) + frame := s.draw(tw, st) + frame, b.extendedLines = s.extender(frame, tw, st) b.toShutdown = s.toComplete && !s.completeFlushed s.completeFlushed = s.toComplete - b.frameCh <- frame }: case <-b.done: s := b.cacheState - frame := s.draw(tw) - if s.extender != nil { - s.extender.Fill(s.bufE, tw, newStatistics(s)) - b.extendedLines = countLines(s.bufE.Bytes()) - frame = io.MultiReader(frame, s.bufE) - } + st := newStatistics(s) + frame := s.draw(tw, st) + frame, b.extendedLines = s.extender(frame, tw, st) b.frameCh <- frame } } @@ -376,10 +352,48 @@ return strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%dv\n", termWidth), b.recoveredPanic)) } -func (s *bState) draw(termWidth int) io.Reader { - - stat := newStatistics(s) - +func (b *Bar) subscribeDecorators() { + var amountReceivers []decor.AmountReceiver + var shutdownListeners []decor.ShutdownListener + var averageAdjusters []decor.AverageAdjuster + b.TraverseDecorators(func(d decor.Decorator) { + if d, ok := d.(decor.AmountReceiver); ok { + amountReceivers = append(amountReceivers, d) + } + if d, ok := d.(decor.ShutdownListener); ok { + shutdownListeners = append(shutdownListeners, d) + } + if d, ok := d.(decor.AverageAdjuster); ok { + averageAdjusters = append(averageAdjusters, d) + } + }) + b.operateState <- func(s *bState) { + s.amountReceivers = amountReceivers + s.shutdownListeners = shutdownListeners + s.averageAdjusters = averageAdjusters + } +} + +func (b *Bar) refreshNowTillShutdown() { + for { + select { + case b.container.forceRefresh <- time.Now(): + case <-b.done: + return + } + } +} + +func (b *Bar) wSyncTable() [][]chan int { + select { + case b.operateState <- func(s *bState) { b.syncTableCh <- s.wSyncTable() }: + return <-b.syncTableCh + case <-b.done: + return b.cacheState.wSyncTable() + } +} + +func (s *bState) draw(termWidth int, stat *decor.Statistics) io.Reader { for _, d := range s.pDecorators { s.bufP.WriteString(d.Decor(stat)) } @@ -389,9 +403,6 @@ } s.bufA.WriteByte('\n') - if s.noBufBOnComplete && s.completeFlushed { - return io.MultiReader(s.bufP, s.bufA) - } prependCount := utf8.RuneCount(s.bufP.Bytes()) appendCount := utf8.RuneCount(s.bufA.Bytes()) - 1 @@ -434,16 +445,6 @@ return table } -func (b *Bar) refreshNowTillShutdown() { - for { - select { - case b.container.forceRefresh <- time.Now(): - case <-b.done: - return - } - } -} - func newStatistics(s *bState) *decor.Statistics { return &decor.Statistics{ ID: s.id, @@ -452,7 +453,3 @@ Current: s.current, } } - -func countLines(b []byte) int { - return bytes.Count(b, []byte("\n")) -} diff --git a/bar_option.go b/bar_option.go index 3887648..4374cb5 100644 --- a/bar_option.go +++ b/bar_option.go @@ -1,6 +1,9 @@ package mpb import ( + "bytes" + "io" + "github.com/vbauerster/mpb/v4/decor" ) @@ -78,7 +81,17 @@ // BarClearOnComplete clears bar part of bar line on complete event. func BarClearOnComplete() BarOption { return func(s *bState) { - s.noBufBOnComplete = true + s.filler = makeClearOnCompleteFiller(s.filler) + } +} + +func makeClearOnCompleteFiller(filler Filler) FillerFunc { + return func(w io.Writer, width int, st *decor.Statistics) { + if st.Completed { + w.Write([]byte{}) + } else { + filler.Fill(w, width, st) + } } } @@ -94,8 +107,20 @@ // BarExtender is an option to extend bar to the next new line, with // arbitrary output. func BarExtender(extender Filler) BarOption { - return func(s *bState) { - s.extender = extender + if extender == nil { + return nil + } + return func(s *bState) { + s.extender = makeExtFunc(extender) + } +} + +func makeExtFunc(extender Filler) extFunc { + buf := new(bytes.Buffer) + nl := []byte("\n") + return func(r io.Reader, tw int, st *decor.Statistics) (io.Reader, int) { + extender.Fill(buf, tw, st) + return io.MultiReader(r, buf), bytes.Count(buf.Bytes(), nl) } } diff --git a/draw_test.go b/draw_test.go index f87753a..9a3dff4 100644 --- a/draw_test.go +++ b/draw_test.go @@ -395,7 +395,7 @@ } } tmpBuf.Reset() - tmpBuf.ReadFrom(s.draw(termWidth)) + tmpBuf.ReadFrom(s.draw(termWidth, newStatistics(s))) by := tmpBuf.Bytes() by = by[:len(by)-1] @@ -413,7 +413,7 @@ func newTestState() *bState { s := &bState{ - filler: newDefaultBarFiller(), + filler: NewBarFiller(), bufP: new(bytes.Buffer), bufB: new(bytes.Buffer), bufA: new(bytes.Buffer), diff --git a/progress.go b/progress.go index d2dc553..3f7fab3 100644 --- a/progress.go +++ b/progress.go @@ -1,6 +1,7 @@ package mpb import ( + "bytes" "container/heap" "context" "io" @@ -65,7 +66,6 @@ // context. It's not possible to reuse instance after *Progress.Wait() // method has been called. func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress { - s := &pState{ bHeap: priorityQueue{}, width: pwidth, @@ -91,6 +91,7 @@ done: make(chan struct{}), dlogger: log.New(s.debugOut, "[mpb] ", log.Lshortfile), } + p.cwg.Add(1) go p.serve(s, cwriter.New(s.output)) return p @@ -116,19 +117,7 @@ result := make(chan *Bar) select { case p.operateState <- func(ps *pState) { - bs := &bState{ - total: total, - filler: filler, - priority: ps.idCount, - id: ps.idCount, - width: ps.width, - debugOut: ps.debugOut, - } - for _, opt := range options { - if opt != nil { - opt(bs) - } - } + bs := ps.makeBarState(total, filler, options...) bar := newBar(p, bs) if bs.runningBar != nil { bs.runningBar.noPop = true @@ -140,26 +129,8 @@ ps.idCount++ result <- bar }: - var amountReceivers []decor.AmountReceiver - var shutdownListeners []decor.ShutdownListener - var averageAdjusters []decor.AverageAdjuster bar := <-result - bar.TraverseDecorators(func(d decor.Decorator) { - if d, ok := d.(decor.AmountReceiver); ok { - amountReceivers = append(amountReceivers, d) - } - if d, ok := d.(decor.ShutdownListener); ok { - shutdownListeners = append(shutdownListeners, d) - } - if d, ok := d.(decor.AverageAdjuster); ok { - averageAdjusters = append(averageAdjusters, d) - } - }) - bar.operateState <- func(s *bState) { - s.amountReceivers = amountReceivers - s.shutdownListeners = shutdownListeners - s.averageAdjusters = averageAdjusters - } + bar.subscribeDecorators() return bar case <-p.done: p.bwg.Done() @@ -358,6 +329,32 @@ } } +func (s *pState) makeBarState(total int64, filler Filler, options ...BarOption) *bState { + bs := &bState{ + total: total, + filler: filler, + priority: s.idCount, + id: s.idCount, + width: s.width, + debugOut: s.debugOut, + extender: func(r io.Reader, tw int, st *decor.Statistics) (io.Reader, int) { + return r, 0 + }, + } + + for _, opt := range options { + if opt != nil { + opt(bs) + } + } + + bs.bufP = bytes.NewBuffer(make([]byte, 0, bs.width)) + bs.bufB = bytes.NewBuffer(make([]byte, 0, bs.width)) + bs.bufA = bytes.NewBuffer(make([]byte, 0, bs.width)) + + return bs +} + func syncWidth(matrix map[int][]chan int) { for _, column := range matrix { column := column