diff --git a/bar.go b/bar.go index 646cb47..7b75cd2 100644 --- a/bar.go +++ b/bar.go @@ -24,24 +24,18 @@ toDrop bool noPop bool hasEwmaDecorators bool + frameCh chan *frame operateState chan func(*bState) - frameCh chan *frame - - // cancel is called either by user or on complete event - cancel func() - // done is closed after cacheState is assigned - done chan struct{} - // cacheState is populated, right after close(b.done) - cacheState *bState - - container *Progress - recoveredPanic interface{} + done chan struct{} + cacheState *bState + container *Progress + cancel func() + recoveredPanic interface{} } type extenderFunc func(in io.Reader, reqWidth int, st decor.Statistics) (out io.Reader, lines int) -// bState is actual bar state. It gets passed to *Bar.serve(...) monitor -// goroutine. +// bState is actual bar's state. type bState struct { id int priority int @@ -66,11 +60,10 @@ filler BarFiller middleware func(BarFiller) BarFiller extender extenderFunc - - // runningBar is a key for *pState.parkedBars - runningBar *Bar - - debugOut io.Writer + debugOut io.Writer + + // afterBar is a key for *pState.parkedBars + afterBar *Bar } type frame struct { @@ -78,22 +71,38 @@ lines int } -func newBar(container *Progress, bs *bState) *Bar { +func newBar(container *Progress, bs *bState) (*Bar, func()) { ctx, cancel := context.WithCancel(container.ctx) bar := &Bar{ - container: container, priority: bs.priority, toDrop: bs.dropOnComplete, noPop: bs.noPop, + frameCh: make(chan *frame, 1), operateState: make(chan func(*bState)), - frameCh: make(chan *frame, 1), done: make(chan struct{}), + container: container, cancel: cancel, } - go bar.serve(ctx, bs) - return bar + bar.subscribeDecorators(bs) + + serve := func() { + defer container.bwg.Done() + for { + select { + case op := <-bar.operateState: + op(bs) + case <-ctx.Done(): + bs.decoratorShutdownNotify() + bar.cacheState = bs + close(bar.done) + return + } + } + } + + return bar, serve } // ProxyReader wraps r with metrics required for progress tracking. @@ -319,21 +328,6 @@ } } -func (b *Bar) serve(ctx context.Context, s *bState) { - defer b.container.bwg.Done() - for { - select { - case op := <-b.operateState: - op(s) - case <-ctx.Done(): - s.decoratorShutdownNotify() - b.cacheState = s - close(b.done) - return - } - } -} - func (b *Bar) render(tw int) { select { case b.operateState <- func(s *bState) { @@ -371,29 +365,23 @@ } } -func (b *Bar) subscribeDecorators() { - var averageDecorators []decor.AverageDecorator - var ewmaDecorators []decor.EwmaDecorator - var shutdownListeners []decor.ShutdownListener - b.TraverseDecorators(func(d decor.Decorator) { - if d, ok := d.(decor.AverageDecorator); ok { - averageDecorators = append(averageDecorators, d) - } - if d, ok := d.(decor.EwmaDecorator); ok { - ewmaDecorators = append(ewmaDecorators, d) - } - if d, ok := d.(decor.ShutdownListener); ok { - shutdownListeners = append(shutdownListeners, d) - } - }) - b.hasEwmaDecorators = len(ewmaDecorators) != 0 - select { - case b.operateState <- func(s *bState) { - s.averageDecorators = averageDecorators - s.ewmaDecorators = ewmaDecorators - s.shutdownListeners = shutdownListeners - }: - case <-b.done: +func (b *Bar) subscribeDecorators(bs *bState) { + for _, decorators := range [...][]decor.Decorator{ + bs.pDecorators, + bs.aDecorators, + } { + for _, d := range decorators { + d = extractBaseDecorator(d) + if d, ok := d.(decor.AverageDecorator); ok { + bs.averageDecorators = append(bs.averageDecorators, d) + } + if d, ok := d.(decor.EwmaDecorator); ok { + bs.ewmaDecorators = append(bs.ewmaDecorators, d) + } + if d, ok := d.(decor.ShutdownListener); ok { + bs.shutdownListeners = append(bs.shutdownListeners, d) + } + } } } diff --git a/bar_option.go b/bar_option.go index 4ba4905..dc75c49 100644 --- a/bar_option.go +++ b/bar_option.go @@ -66,7 +66,7 @@ return nil } return func(s *bState) { - s.runningBar = runningBar + s.afterBar = bar } } diff --git a/progress.go b/progress.go index 123af17..fdb774e 100644 --- a/progress.go +++ b/progress.go @@ -20,8 +20,7 @@ prr = 150 * time.Millisecond ) -// Progress represents a container that renders one or more progress -// bars. +// Progress represents a container that renders one or more progress bars. type Progress struct { ctx context.Context uwg *sync.WaitGroup @@ -33,8 +32,7 @@ once sync.Once } -// pState holds bars in its priorityQueue. It gets passed to -// *Progress.serve(...) monitor goroutine. +// pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine. type pState struct { bHeap priorityQueue heapUpdated bool @@ -52,9 +50,14 @@ externalRefresh <-chan interface{} renderDelay <-chan struct{} shutdownNotifier chan struct{} - parkedBars map[*Bar]*Bar + queueBars map[*Bar]queueBar output io.Writer debugOut io.Writer +} + +type queueBar struct { + bar *Bar + serve func() } // New creates new Progress container instance. It's not possible to @@ -68,10 +71,10 @@ // method has been called. func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress { s := &pState{ - bHeap: priorityQueue{}, - rr: prr, - parkedBars: make(map[*Bar]*Bar), - output: os.Stdout, + bHeap: priorityQueue{}, + rr: prr, + queueBars: make(map[*Bar]queueBar), + output: os.Stdout, } for _, opt := range options { @@ -121,19 +124,18 @@ select { case p.operateState <- func(ps *pState) { bs := ps.makeBarState(total, filler, options...) - bar := newBar(p, bs) - if bs.runningBar != nil { - bs.runningBar.noPop = true - ps.parkedBars[bs.runningBar] = bar + bar, serve := newBar(p, bs) + if bs.afterBar != nil { + ps.queueBars[bs.afterBar] = queueBar{bar, serve} } else { heap.Push(&ps.bHeap, bar) ps.heapUpdated = true + go serve() } ps.idCount++ result <- bar }: bar := <-result - bar.subscribeDecorators() return bar case <-p.done: p.bwg.Done() @@ -341,13 +343,13 @@ } for _, b := range s.barShutdownQueue { - if parkedBar := s.parkedBars[b]; parkedBar != nil { - parkedBar.priority = b.priority - heap.Push(&s.bHeap, parkedBar) - delete(s.parkedBars, b) + if qb, ok := s.queueBars[b]; ok { + qb.bar.priority = b.priority + heap.Push(&s.bHeap, qb.bar) + delete(s.queueBars, b) b.toDrop = true - } - if s.popCompleted && !b.noPop { + go qb.serve() + } else if s.popCompleted && !b.noPop { totalLines -= bm[b] b.toDrop = true }