diff --git a/progress.go b/progress.go index a5b0d83..ca938ad 100644 --- a/progress.go +++ b/progress.go @@ -22,8 +22,9 @@ // Progress represents the container that renders Progress bars type Progress struct { - wg *sync.WaitGroup uwg *sync.WaitGroup + cwg *sync.WaitGroup + bwg *sync.WaitGroup operateState chan func(*pState) done chan struct{} } @@ -32,7 +33,6 @@ bHeap *priorityQueue shutdownPending []*Bar heapUpdated bool - zeroWait bool idCounter int width int format string @@ -40,6 +40,7 @@ cw *cwriter.Writer pMatrix map[int][]chan int aMatrix map[int][]chan int + forceRefreshCh chan time.Time // following are provided/overrided by user ctx context.Context @@ -56,13 +57,14 @@ pq := make(priorityQueue, 0) heap.Init(&pq) s := &pState{ - ctx: context.Background(), - bHeap: &pq, - width: pwidth, - cw: cwriter.New(os.Stdout), - rr: prr, - waitBars: make(map[*Bar]*Bar), - debugOut: ioutil.Discard, + ctx: context.Background(), + bHeap: &pq, + width: pwidth, + cw: cwriter.New(os.Stdout), + rr: prr, + waitBars: make(map[*Bar]*Bar), + debugOut: ioutil.Discard, + forceRefreshCh: make(chan time.Time), } for _, opt := range options { @@ -73,10 +75,12 @@ p := &Progress{ uwg: s.uwg, - wg: new(sync.WaitGroup), + cwg: new(sync.WaitGroup), + bwg: new(sync.WaitGroup), operateState: make(chan func(*pState)), done: make(chan struct{}), } + p.cwg.Add(1) go p.serve(s) return p } @@ -97,11 +101,11 @@ // Add creates a bar which renders itself by provided filler. func (p *Progress) Add(total int64, filler Filler, options ...BarOption) *Bar { - p.wg.Add(1) + p.bwg.Add(1) result := make(chan *Bar) select { case p.operateState <- func(s *pState) { - b := newBar(s.ctx, p.wg, filler, s.idCounter, s.width, total, options...) + b := newBar(s.ctx, p.bwg, filler, s.idCounter, s.width, total, options...) if b.runningBar != nil { s.waitBars[b.runningBar] = b } else { @@ -113,7 +117,7 @@ }: return <-result case <-p.done: - p.wg.Done() + p.bwg.Done() return nil } } @@ -157,43 +161,53 @@ } } -// Wait first waits for user provided *sync.WaitGroup, if any, then -// waits far all bars to complete and finally shutdowns master goroutine. +// Wait waits far all bars to complete and finally shutdowns container. // After this method has been called, there is no way to reuse *Progress // instance. func (p *Progress) Wait() { if p.uwg != nil { + // wait for user wg p.uwg.Wait() } - p.wg.Wait() - - select { - case p.operateState <- func(s *pState) { s.zeroWait = true }: - <-p.done - case <-p.done: - } -} - -func (s *pState) updateSyncMatrix() { - s.pMatrix = make(map[int][]chan int) - s.aMatrix = make(map[int][]chan int) - for i := 0; i < s.bHeap.Len(); i++ { - bar := (*s.bHeap)[i] - table := bar.wSyncTable() - pRow, aRow := table[0], table[1] - - for i, ch := range pRow { - s.pMatrix[i] = append(s.pMatrix[i], ch) - } - - for i, ch := range aRow { - s.aMatrix[i] = append(s.aMatrix[i], ch) - } - } -} - -func (s *pState) render(tw int) { + // wait for bars to quit, if any + p.bwg.Wait() + + close(p.done) + + // wait for container to quit + p.cwg.Wait() +} + +func (p *Progress) serve(s *pState) { + defer p.cwg.Done() + + manualOrTickCh, cleanUp := s.manualOrTick() + defer cleanUp() + + refreshCh := fanInRefreshSrc(p.done, s.forceRefreshCh, manualOrTickCh) + + for { + select { + case op := <-p.operateState: + op(s) + case _, ok := <-refreshCh: + if !ok { + if s.shutdownNotifier != nil { + close(s.shutdownNotifier) + } + return + } + tw, err := s.cw.GetWidth() + if err != nil { + tw = s.width + } + s.render(p.done, tw) + } + } +} + +func (s *pState) render(done <-chan struct{}, tw int) { if s.heapUpdated { s.updateSyncMatrix() s.heapUpdated = false @@ -206,17 +220,25 @@ go bar.render(s.debugOut, tw) } - if err := s.flush(s.bHeap.Len()); err != nil { + if err := s.flush(done, s.bHeap.Len()); err != nil { fmt.Fprintf(s.debugOut, "%s %s %v\n", "[mpb]", time.Now(), err) } } -func (s *pState) flush(lineCount int) error { +func (s *pState) flush(done <-chan struct{}, lineCount int) error { for s.bHeap.Len() > 0 { bar := heap.Pop(s.bHeap).(*Bar) frameReader := <-bar.frameReaderCh defer func() { if frameReader.toShutdown { + // force next refresh asap, without waiting for ticker + go func() { + select { + case s.forceRefreshCh <- time.Now(): + case <-done: + return + } + }() // shutdown at next flush, in other words decrement underlying WaitGroup // only after the bar with completed state has been flushed. this // ensures no bar ends up with less than 100% rendered. @@ -245,6 +267,32 @@ return s.cw.Flush(lineCount) } +func (s *pState) manualOrTick() (<-chan time.Time, func()) { + if s.manualRefreshCh != nil { + return s.manualRefreshCh, func() {} + } + ticker := time.NewTicker(s.rr) + return ticker.C, ticker.Stop +} + +func (s *pState) updateSyncMatrix() { + s.pMatrix = make(map[int][]chan int) + s.aMatrix = make(map[int][]chan int) + for i := 0; i < s.bHeap.Len(); i++ { + bar := (*s.bHeap)[i] + table := bar.wSyncTable() + pRow, aRow := table[0], table[1] + + for i, ch := range pRow { + s.pMatrix[i] = append(s.pMatrix[i], ch) + } + + for i, ch := range aRow { + s.aMatrix[i] = append(s.aMatrix[i], ch) + } + } +} + func syncWidth(matrix map[int][]chan int) { for _, column := range matrix { column := column @@ -262,3 +310,32 @@ }() } } + +func fanInRefreshSrc(done <-chan struct{}, channels ...<-chan time.Time) <-chan time.Time { + var wg sync.WaitGroup + multiplexedStream := make(chan time.Time) + + multiplex := func(c <-chan time.Time) { + defer wg.Done() + for { + select { + case v := <-c: + multiplexedStream <- v + case <-done: + return + } + } + } + + wg.Add(len(channels)) + for _, c := range channels { + go multiplex(c) + } + + go func() { + wg.Wait() + close(multiplexedStream) + }() + + return multiplexedStream +} diff --git a/progress_posix.go b/progress_posix.go deleted file mode 100644 index 545245a..0000000 --- a/progress_posix.go +++ /dev/null @@ -1,70 +0,0 @@ -// +build !windows - -package mpb - -import ( - "os" - "os/signal" - "syscall" - "time" -) - -func (p *Progress) serve(s *pState) { - - var ticker *time.Ticker - var refreshCh <-chan time.Time - var winch chan os.Signal - var resumeTimer *time.Timer - var resumeEvent <-chan time.Time - winchIdleDur := s.rr * 2 - - if s.manualRefreshCh == nil { - ticker = time.NewTicker(s.rr) - refreshCh = ticker.C - winch = make(chan os.Signal, 2) - signal.Notify(winch, syscall.SIGWINCH) - } else { - refreshCh = s.manualRefreshCh - } - - for { - select { - case op := <-p.operateState: - op(s) - case <-refreshCh: - if s.zeroWait { - if s.manualRefreshCh == nil { - signal.Stop(winch) - ticker.Stop() - } - if s.shutdownNotifier != nil { - close(s.shutdownNotifier) - } - close(p.done) - return - } - tw, err := s.cw.GetWidth() - if err != nil { - tw = s.width - } - s.render(tw) - case <-winch: - tw, err := s.cw.GetWidth() - if err != nil { - tw = s.width - } - s.render(tw - tw/8) - if resumeTimer != nil && resumeTimer.Reset(winchIdleDur) { - break - } - ticker.Stop() - resumeTimer = time.NewTimer(winchIdleDur) - resumeEvent = resumeTimer.C - case <-resumeEvent: - ticker = time.NewTicker(s.rr) - refreshCh = ticker.C - resumeEvent = nil - resumeTimer = nil - } - } -} diff --git a/progress_windows.go b/progress_windows.go deleted file mode 100644 index cab03d3..0000000 --- a/progress_windows.go +++ /dev/null @@ -1,43 +0,0 @@ -// +build windows - -package mpb - -import ( - "time" -) - -func (p *Progress) serve(s *pState) { - - var ticker *time.Ticker - var refreshCh <-chan time.Time - - if s.manualRefreshCh == nil { - ticker = time.NewTicker(s.rr) - refreshCh = ticker.C - } else { - refreshCh = s.manualRefreshCh - } - - for { - select { - case op := <-p.operateState: - op(s) - case <-refreshCh: - if s.zeroWait { - if s.manualRefreshCh == nil { - ticker.Stop() - } - if s.shutdownNotifier != nil { - close(s.shutdownNotifier) - } - close(p.done) - return - } - tw, err := s.cw.GetWidth() - if err != nil { - tw = s.width - } - s.render(tw) - } - } -}