diff --git a/bar.go b/bar.go index 5b2efea..c66f995 100644 --- a/bar.go +++ b/bar.go @@ -32,21 +32,15 @@ priority int index int - // pointer to running bar, which this bar should replace - runningBar *Bar - - // completed is set from master Progress goroutine only - completed bool - - removeOnComplete bool - - operateState chan func(*bState) + runningBar *Bar + cacheState *bState + operateState chan func(*bState) + frameReaderCh chan io.Reader + // done is closed by Bar's goroutine, after cacheState is written done chan struct{} // shutdown is closed from master Progress goroutine only shutdown chan struct{} - - cacheState *bState } type ( @@ -63,6 +57,7 @@ trimRightSpace bool toComplete bool dynamic bool + removeOnComplete bool barClearOnComplete bool completeFlushed bool startTime time.Time @@ -77,18 +72,17 @@ panicMsg string // following options are assigned to the *Bar - priority int - removeOnComplete bool - runningBar *Bar + priority int + runningBar *Bar } refill struct { char rune till int64 } - bFrame struct { - bar *Bar - reader io.Reader - toComplete bool + frameReader struct { + io.Reader + toShutdown bool + removeOnComplete bool } ) @@ -117,12 +111,12 @@ s.bufA = bytes.NewBuffer(make([]byte, 0, s.width)) b := &Bar{ - priority: s.priority, - removeOnComplete: s.removeOnComplete, - runningBar: s.runningBar, - operateState: make(chan func(*bState)), - done: make(chan struct{}), - shutdown: make(chan struct{}), + priority: s.priority, + runningBar: s.runningBar, + operateState: make(chan func(*bState)), + frameReaderCh: make(chan io.Reader, 1), + done: make(chan struct{}), + shutdown: make(chan struct{}), } if b.runningBar != nil { @@ -173,7 +167,7 @@ // NumOfAppenders returns current number of append decorators func (b *Bar) NumOfAppenders() int { - result := make(chan int, 1) + result := make(chan int) select { case b.operateState <- func(s *bState) { result <- len(s.aDecorators) }: return <-result @@ -184,7 +178,7 @@ // NumOfPrependers returns current number of prepend decorators func (b *Bar) NumOfPrependers() int { - result := make(chan int, 1) + result := make(chan int) select { case b.operateState <- func(s *bState) { result <- len(s.pDecorators) }: return <-result @@ -195,7 +189,7 @@ // ID returs id of the bar func (b *Bar) ID() int { - result := make(chan int, 1) + result := make(chan int) select { case b.operateState <- func(s *bState) { result <- s.id }: return <-result @@ -206,7 +200,7 @@ // Current returns bar's current number, in other words sum of all increments. func (b *Bar) Current() int64 { - result := make(chan int64, 1) + result := make(chan int64) select { case b.operateState <- func(s *bState) { result <- s.current }: return <-result @@ -217,7 +211,7 @@ // Total returns bar's total number. func (b *Bar) Total() int64 { - result := make(chan int64, 1) + result := make(chan int64) select { case b.operateState <- func(s *bState) { result <- s.total }: return <-result @@ -247,7 +241,12 @@ func (b *Bar) StartBlock() { now := time.Now() select { - case b.operateState <- func(s *bState) { s.blockStartTime = now }: + case b.operateState <- func(s *bState) { + if s.current == 0 { + s.startTime = now + } + s.blockStartTime = now + }: case <-b.done: } } @@ -276,7 +275,7 @@ // Completed reports whether the bar is in completed state func (b *Bar) Completed() bool { - result := make(chan bool, 1) + result := make(chan bool) select { case b.operateState <- func(s *bState) { result <- s.toComplete }: return <-result @@ -304,42 +303,41 @@ } } -func (b *Bar) render(debugOut io.Writer, tw int, pSyncer, aSyncer *widthSyncer) <-chan *bFrame { - ch := make(chan *bFrame, 1) - - go func() { - select { - case b.operateState <- func(s *bState) { - var r io.Reader - defer func() { - // recovering if external decorators panic - if p := recover(); p != nil { - s.panicMsg = fmt.Sprintf("panic: %v", p) - s.pDecorators = nil - s.aDecorators = nil - s.toComplete = true - // truncate panic msg to one tw line, if necessary - r = strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%ds\n", tw), s.panicMsg)) - fmt.Fprintf(debugOut, "%s %s bar id %02d %v\n", "[mpb]", time.Now(), s.id, s.panicMsg) - } - ch <- &bFrame{b, r, s.toComplete && !(s.barClearOnComplete && !s.completeFlushed)} - s.completeFlushed = s.toComplete - }() +func (b *Bar) render(debugOut io.Writer, tw int, pSyncer, aSyncer *widthSyncer) { + var r io.Reader + select { + case b.operateState <- func(s *bState) { + defer func() { + // recovering if external decorators panic + if p := recover(); p != nil { + s.panicMsg = fmt.Sprintf("panic: %v", p) + s.pDecorators = nil + s.aDecorators = nil + s.toComplete = true + // truncate panic msg to one tw line, if necessary + r = strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%ds\n", tw), s.panicMsg)) + fmt.Fprintf(debugOut, "%s %s bar id %02d %v\n", "[mpb]", time.Now(), s.id, s.panicMsg) + } + b.frameReaderCh <- &frameReader{ + Reader: r, + toShutdown: s.toComplete && !s.completeFlushed, + removeOnComplete: s.removeOnComplete, + } + s.completeFlushed = s.toComplete + }() + r = s.draw(tw, pSyncer, aSyncer) + }: + case <-b.done: + s := b.cacheState + if s.panicMsg != "" { + r = strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%ds\n", tw), s.panicMsg)) + } else { r = s.draw(tw, pSyncer, aSyncer) - }: - case <-b.done: - s := b.cacheState - var r io.Reader - if s.panicMsg != "" { - r = strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%ds\n", tw), s.panicMsg)) - } else { - r = s.draw(tw, pSyncer, aSyncer) - } - ch <- &bFrame{b, r, true} - } - }() - - return ch + } + b.frameReaderCh <- &frameReader{ + Reader: r, + } + } } func (s *bState) draw(termWidth int, pSyncer, aSyncer *widthSyncer) io.Reader { diff --git a/progress.go b/progress.go index 7c39249..552572f 100644 --- a/progress.go +++ b/progress.go @@ -2,6 +2,7 @@ import ( "container/heap" + "fmt" "io" "io/ioutil" "os" @@ -204,7 +205,7 @@ return ws } -func (s *pState) writeAndFlush(tw, numP, numA int) (err error) { +func (s *pState) render(tw, numP, numA int) { timeout := make(chan struct{}) pSyncer := newWidthSyncer(timeout, s.bHeap.Len(), numP) aSyncer := newWidthSyncer(timeout, s.bHeap.Len(), numA) @@ -212,26 +213,40 @@ close(timeout) }) - for _, ch := range s.renderByPriority(tw, pSyncer, aSyncer) { - bf := <-ch - _, err = s.cw.ReadFrom(bf.reader) - if !bf.bar.completed && bf.toComplete { - // shutdown at next flush, in other words decrement underlying WaitGroup - // only after the bar with completed state has been flushed. - // this ensures no bar ends up with less than 100% rendered. - defer func() { - s.shutdownPending = append(s.shutdownPending, bf.bar) - }() - if bf.bar.removeOnComplete { - s.heapUpdated = heap.Remove(s.bHeap, bf.bar.index) != nil + for i := 0; i < s.bHeap.Len(); i++ { + bar := (*s.bHeap)[i] + go bar.render(s.debugOut, tw, pSyncer, aSyncer) + } + + if err := s.flush(); err != nil { + fmt.Fprintf(s.debugOut, "%s %s %v\n", "[mpb]", time.Now(), err) + } +} + +func (s *pState) flush() (err error) { + for s.bHeap.Len() > 0 { + bar := heap.Pop(s.bHeap).(*Bar) + reader := <-bar.frameReaderCh + _, err = s.cw.ReadFrom(reader) + frame := reader.(*frameReader) + defer func() { + if frame.toShutdown { + // shutdown at next flush, in other words decrement underlying WaitGroup + // only after the bar with completed state has been flushed. + // this ensures no bar ends up with less than 100% rendered. + s.shutdownPending = append(s.shutdownPending, bar) + if replacementBar, ok := s.waitBars[bar]; ok { + heap.Push(s.bHeap, replacementBar) + s.heapUpdated = true + delete(s.waitBars, bar) + } + if frame.removeOnComplete { + s.heapUpdated = true + return + } } - if replacementBar, ok := s.waitBars[bf.bar]; ok { - heap.Push(s.bHeap, replacementBar) - s.heapUpdated = true - delete(s.waitBars, bf.bar) - } - bf.bar.completed = true - } + heap.Push(s.bHeap, bar) + }() } for _, interceptor := range s.interceptors { @@ -247,16 +262,6 @@ s.shutdownPending = s.shutdownPending[:i] } return -} - -func (s *pState) renderByPriority(tw int, pSyncer, aSyncer *widthSyncer) []<-chan *bFrame { - bff := make([]<-chan *bFrame, 0, s.bHeap.Len()) - for s.bHeap.Len() > 0 { - b := heap.Pop(s.bHeap).(*Bar) - defer heap.Push(s.bHeap, b) - bff = append(bff, b.render(s.debugOut, tw, pSyncer, aSyncer)) - } - return bff } func calcMax(slice []int) int { diff --git a/progress_posix.go b/progress_posix.go index 44aa61a..b40c670 100644 --- a/progress_posix.go +++ b/progress_posix.go @@ -3,7 +3,6 @@ package mpb import ( - "fmt" "os" "os/signal" "syscall" @@ -41,10 +40,7 @@ s.heapUpdated = false } tw, _, _ := cwriter.TermSize() - err := s.writeAndFlush(tw, numP, numA) - if err != nil { - fmt.Fprintf(s.debugOut, "%s %s %v\n", "[mpb]", time.Now(), err) - } + s.render(tw, numP, numA) case <-winch: if s.heapUpdated { numP = s.bHeap.maxNumP() @@ -52,10 +48,7 @@ s.heapUpdated = false } tw, _, _ := cwriter.TermSize() - err := s.writeAndFlush(tw-tw/8, numP, numA) - if err != nil { - fmt.Fprintf(s.debugOut, "%s %s %v\n", "[mpb]", time.Now(), err) - } + s.render(tw-tw/8, numP, numA) if timer != nil && timer.Reset(resumeDelay) { break } diff --git a/progress_windows.go b/progress_windows.go index 9890ee1..65cbfd8 100644 --- a/progress_windows.go +++ b/progress_windows.go @@ -3,9 +3,6 @@ package mpb import ( - "fmt" - "time" - "github.com/vbauerster/mpb/cwriter" ) @@ -30,10 +27,7 @@ s.heapUpdated = false } tw, _, _ := cwriter.TermSize() - err := s.writeAndFlush(tw, numP, numA) - if err != nil { - fmt.Fprintf(s.debugOut, "%s %s %v\n", "[mpb]", time.Now(), err) - } + s.render(tw, numP, numA) } } }