diff --git a/bar.go b/bar.go index 8b9e856..a1c3a38 100644 --- a/bar.go +++ b/bar.go @@ -384,7 +384,7 @@ func (b *Bar) refreshNowTillShutdown() { for { select { - case b.container.forceRefresh <- time.Now(): + case b.container.refreshCh <- time.Now(): case <-b.done: return } diff --git a/options.go b/options.go index a0ead16..6b34fb3 100644 --- a/options.go +++ b/options.go @@ -43,7 +43,7 @@ // Refresh will occur upon receive value from provided ch. func WithManualRefresh(ch <-chan time.Time) ContainerOption { return func(s *pState) { - s.manualRefresh = ch + s.refreshSrc = ch } } @@ -71,7 +71,7 @@ func WithOutput(w io.Writer) ContainerOption { return func(s *pState) { if w == nil { - s.manualRefresh = make(chan time.Time) + s.refreshSrc = make(chan time.Time) s.output = ioutil.Discard return } diff --git a/progress.go b/progress.go index 52e27ed..fb3892d 100644 --- a/progress.go +++ b/progress.go @@ -30,7 +30,7 @@ bwg *sync.WaitGroup operateState chan func(*pState) done chan struct{} - forceRefresh chan time.Time + refreshCh chan time.Time once sync.Once dlogger *log.Logger } @@ -49,7 +49,7 @@ popCompleted bool rr time.Duration uwg *sync.WaitGroup - manualRefresh <-chan time.Time + refreshSrc <-chan time.Time renderDelay <-chan struct{} shutdownNotifier chan struct{} parkedBars map[*Bar]*Bar @@ -88,7 +88,6 @@ cwg: new(sync.WaitGroup), bwg: new(sync.WaitGroup), operateState: make(chan func(*pState)), - forceRefresh: make(chan time.Time), done: make(chan struct{}), dlogger: log.New(s.debugOut, "[mpb] ", log.Lshortfile), } @@ -206,25 +205,18 @@ func (p *Progress) serve(s *pState, cw *cwriter.Writer) { defer p.cwg.Done() - manualOrTickCh, cleanUp := s.manualOrTick() - defer cleanUp() - - refreshCh := fanInRefreshSrc(p.done, s.renderDelay, p.forceRefresh, manualOrTickCh) + p.refreshCh = s.newTicker(p.done) for { select { case op := <-p.operateState: op(s) - case _, ok := <-refreshCh: - if !ok { - if s.shutdownNotifier != nil { - close(s.shutdownNotifier) - } - return - } + case <-p.refreshCh: if err := s.render(cw); err != nil { - p.dlogger.Println(err) - } + go p.dlogger.Println(err) + } + case <-s.shutdownNotifier: + return } } } @@ -304,12 +296,31 @@ return cw.Flush(lineCount) } -func (s *pState) manualOrTick() (<-chan time.Time, func()) { - if s.manualRefresh != nil { - return s.manualRefresh, func() {} - } - ticker := time.NewTicker(s.rr) - return ticker.C, ticker.Stop +func (s *pState) newTicker(done <-chan struct{}) chan time.Time { + ch := make(chan time.Time) + if s.shutdownNotifier == nil { + s.shutdownNotifier = make(chan struct{}) + } + go func() { + if s.renderDelay != nil { + <-s.renderDelay + } + if s.refreshSrc == nil { + ticker := time.NewTicker(s.rr) + defer ticker.Stop() + s.refreshSrc = ticker.C + } + for { + select { + case tick := <-s.refreshSrc: + ch <- tick + case <-done: + close(s.shutdownNotifier) + return + } + } + }() + return ch } func (s *pState) updateSyncMatrix() { @@ -374,53 +385,6 @@ } } -func fanInRefreshSrc(done, delay <-chan struct{}, channels ...<-chan time.Time) <-chan time.Time { - var wg sync.WaitGroup - multiplexedStream := make(chan time.Time) - start := make(chan struct{}) - - multiplex := func(c <-chan time.Time) { - defer wg.Done() - <-start - // source channels are never closed (time.Ticker never closes associated - // channel), so we cannot simply range over a c, instead we use select - // inside infinite loop - for { - select { - case v := <-c: - select { - case multiplexedStream <- v: - case <-done: - return - } - case <-done: - return - } - } - } - - if delay != nil { - go func() { - <-delay - close(start) - }() - } else { - close(start) - } - - wg.Add(len(channels)) - for _, c := range channels { - go multiplex(c) - } - - go func() { - wg.Wait() - close(multiplexedStream) - }() - - return multiplexedStream -} - func extractBaseFiller(f Filler) Filler { if f, ok := f.(Wrapper); ok { return extractBaseFiller(f.Base())