diff --git a/bar.go b/bar.go index b71e6b4..22a22ee 100644 --- a/bar.go +++ b/bar.go @@ -458,7 +458,7 @@ var anyOtherRunning bool b.container.traverseBars(func(bar *Bar) bool { anyOtherRunning = b != bar && bar.IsRunning() - return !anyOtherRunning + return anyOtherRunning }) if !anyOtherRunning { for { diff --git a/heap_manager.go b/heap_manager.go new file mode 100644 index 0000000..fe2152d --- /dev/null +++ b/heap_manager.go @@ -0,0 +1,110 @@ +package mpb + +import ( + "container/heap" +) + +type heapManager chan heapRequest + +type heapCmd int + +const ( + h_sync heapCmd = iota + h_push + h_iter + h_pop_all + h_fix +) + +type heapRequest struct { + cmd heapCmd + data interface{} +} + +type iterData struct { + iter chan *Bar + drop chan struct{} +} + +type pushData struct { + bar *Bar + sync bool +} + +func (m heapManager) run() { + var bHeap priorityQueue + var pMatrix map[int][]chan int + var aMatrix map[int][]chan int + + var l int + var sync bool + + for req := range m { + switch req.cmd { + case h_push: + data := req.data.(*pushData) + heap.Push(&bHeap, data.bar) + sync = data.sync + case h_sync: + if sync || l != bHeap.Len() { + pMatrix = make(map[int][]chan int) + aMatrix = make(map[int][]chan int) + for _, b := range bHeap { + table := b.wSyncTable() + for i, ch := range table[0] { + pMatrix[i] = append(pMatrix[i], ch) + } + for i, ch := range table[1] { + aMatrix[i] = append(aMatrix[i], ch) + } + } + } + l = bHeap.Len() + syncWidth(pMatrix) + syncWidth(aMatrix) + case h_iter: + data := req.data.(*iterData) + for _, b := range bHeap { + select { + case data.iter <- b: + case <-data.drop: + } + } + close(data.iter) + case h_pop_all: + data := req.data.(*iterData) + for bHeap.Len() != 0 { + select { + case data.iter <- heap.Pop(&bHeap).(*Bar): + case <-data.drop: + } + } + close(data.iter) + case h_fix: + heap.Fix(&bHeap, req.data.(int)) + } + } +} + +func (m heapManager) sync() { + m <- heapRequest{cmd: h_sync} +} + +func (m heapManager) push(b *Bar, sync bool) { + data := &pushData{b, sync} + m <- heapRequest{cmd: h_push, data: data} +} + +func (m heapManager) iter(iter chan *Bar, drop chan struct{}) { + data := &iterData{iter, drop} + m <- heapRequest{cmd: h_iter, data: data} +} + +func (m heapManager) popAll(iter chan *Bar, drop chan struct{}) { + data := &iterData{iter, drop} + m <- heapRequest{cmd: h_pop_all, data: data} +} + +func (m heapManager) fix(index int) { + m <- heapRequest{cmd: h_push, data: index} +} diff --git a/progress.go b/progress.go index 7f47e0d..0782f41 100644 --- a/progress.go +++ b/progress.go @@ -2,7 +2,6 @@ import ( "bytes" - "container/heap" "context" "fmt" "io" @@ -35,11 +34,8 @@ // pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine. type pState struct { - bHeap priorityQueue - heapUpdated bool - pMatrix map[int][]chan int - aMatrix map[int][]chan int - rows []io.Reader + hm heapManager + rows []io.Reader // following are provided/overrided by user refreshRate time.Duration @@ -49,6 +45,7 @@ popCompleted bool outputDiscarded bool disableAutoRefresh bool + ignoreNotTTY bool manualRefresh chan interface{} renderDelay <-chan struct{} shutdownNotifier chan struct{} @@ -69,6 +66,7 @@ // method has been called. func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress { s := &pState{ + hm: make(heapManager), rows: make([]io.Reader, 32), refreshRate: defaultRefreshRate, popPriority: math.MinInt32, @@ -137,8 +135,7 @@ if bs.wait.bar != nil { ps.queueBars[bs.wait.bar] = bar } else { - heap.Push(&ps.bHeap, bar) - ps.heapUpdated = true + ps.hm.push(bar, true) } ps.idCount++ result <- bar @@ -152,18 +149,18 @@ } func (p *Progress) traverseBars(cb func(b *Bar) bool) { - sync := make(chan struct{}) + iter := make(chan *Bar) + drop := make(chan struct{}) select { case p.operateState <- func(s *pState) { - defer close(sync) - for i := 0; i < s.bHeap.Len(); i++ { - bar := s.bHeap[i] - if !cb(bar) { + s.hm.iter(iter, drop) + }: + for b := range iter { + if cb(b) { + close(drop) break } } - }: - <-sync case <-p.done: } } @@ -176,22 +173,22 @@ return } b.priority = priority - heap.Fix(&s.bHeap, b.index) + s.hm.fix(b.index) }: case <-p.done: } } // BarCount returns bars count. -func (p *Progress) BarCount() int { - result := make(chan int) - select { - case p.operateState <- func(s *pState) { result <- s.bHeap.Len() }: - return <-result - case <-p.done: - return 0 - } -} +// func (p *Progress) BarCount() int { +// result := make(chan int) +// select { +// case p.operateState <- func(s *pState) { result <- s.bHeap.Len() }: +// return <-result +// case <-p.done: +// return 0 +// } +// } // Write is implementation of io.Writer. // Writing to `*mpb.Progress` will print lines above a running bar. @@ -268,11 +265,8 @@ } func (p *Progress) serve(s *pState, cw *cwriter.Writer) { - defer close(p.shutdown) - - render := func() error { - return s.render(cw) - } + + go s.hm.run() refreshCh := p.newTicker(s) @@ -283,61 +277,57 @@ case fn := <-p.interceptIo: fn(cw) case <-refreshCh: - err := render() + err := s.render(cw) if err != nil { - s.heapUpdated = false - render = func() error { return nil } + refreshCh = nil _, _ = fmt.Fprintln(s.debugOut, err.Error()) p.cancel() // cancel all bars } case <-p.done: - for s.heapUpdated { - err := render() - if err != nil { - _, _ = fmt.Fprintln(s.debugOut, err.Error()) - return - } - } + close(s.hm) + close(p.shutdown) return } } } func (s *pState) render(cw *cwriter.Writer) error { - var wg sync.WaitGroup - if s.heapUpdated { - s.updateSyncMatrix() - s.heapUpdated = false - } - syncWidth(&wg, s.pMatrix) - syncWidth(&wg, s.aMatrix) - width, height, err := cw.GetTermSize() if err != nil { + if !s.ignoreNotTTY { + return err + } width = s.reqWidth - height = s.bHeap.Len() - } - for i := 0; i < s.bHeap.Len(); i++ { - bar := s.bHeap[i] - go bar.render(width) - } - - err = s.flush(&wg, cw, height) + height = 100 + } + + s.hm.sync() + + iter := make(chan *Bar) + s.hm.iter(iter, nil) + for b := range iter { + go b.render(width) + } + + wg := new(sync.WaitGroup) + err = s.flush(wg, cw, height) wg.Wait() return err } func (s *pState) flush(wg *sync.WaitGroup, cw *cwriter.Writer, height int) error { var popCount int - pool := make([]*Bar, 0, s.bHeap.Len()) s.rows = s.rows[:0] - for s.bHeap.Len() > 0 { - b := heap.Pop(&s.bHeap).(*Bar) + iter := make(chan *Bar) + drop := make(chan struct{}) + s.hm.popAll(iter, drop) + + for b := range iter { frame := <-b.frameCh if frame.err != nil { - // b.frameCh is buffered it's ok to return here - return frame.err + close(drop) + return frame.err // b.frameCh is buffered it's ok to return here } var usedRows int for i := len(frame.rows) - 1; i >= 0; i-- { @@ -345,11 +335,7 @@ s.rows = append(s.rows, row) usedRows++ } else { - wg.Add(1) - go func() { - _, _ = io.Copy(io.Discard, row) - wg.Done() - }() + _, _ = io.Copy(io.Discard, row) } } if frame.shutdown { @@ -357,8 +343,11 @@ if qb, ok := s.queueBars[b]; ok { delete(s.queueBars, b) qb.priority = b.priority - pool = append(pool, qb) - s.heapUpdated = true + wg.Add(1) + go func(b *Bar) { + s.hm.push(b, true) + wg.Done() + }(qb) continue } if s.popCompleted && !b.bs.noPop { @@ -369,26 +358,18 @@ default: if b.bs.dropOnComplete { popCount += usedRows - s.heapUpdated = true continue } } } else if b.bs.dropOnComplete { - s.heapUpdated = true continue } } - pool = append(pool, b) - } - - if len(pool) != 0 { wg.Add(1) - go func() { - for _, b := range pool { - heap.Push(&s.bHeap, b) - } + go func(b *Bar) { + s.hm.push(b, false) wg.Done() - }() + }(b) } for i := len(s.rows) - 1; i >= 0; i-- { @@ -398,25 +379,7 @@ } } - err := cw.Flush(len(s.rows) - popCount) - return err -} - -func (s *pState) updateSyncMatrix() { - s.pMatrix = make(map[int][]chan int) - s.aMatrix = make(map[int][]chan int) - for i := 0; i < s.bHeap.Len(); i++ { - bar := s.bHeap[i] - table := bar.wSyncTable() - - for i, ch := range table[0] { - s.pMatrix[i] = append(s.pMatrix[i], ch) - } - - for i, ch := range table[1] { - s.aMatrix[i] = append(s.aMatrix[i], ch) - } - } + return cw.Flush(len(s.rows) - popCount) } func (s *pState) makeBarState(total int64, filler BarFiller, options ...BarOption) *bState { @@ -453,14 +416,13 @@ return bs } -func syncWidth(wg *sync.WaitGroup, matrix map[int][]chan int) { +func syncWidth(matrix map[int][]chan int) { for _, column := range matrix { - wg.Add(1) - go maxWidthDistributor(wg, column) - } -} - -func maxWidthDistributor(wg *sync.WaitGroup, column []chan int) { + go maxWidthDistributor(column) + } +} + +func maxWidthDistributor(column []chan int) { var maxWidth int for _, ch := range column { if w := <-ch; w > maxWidth { @@ -470,5 +432,4 @@ for _, ch := range column { ch <- maxWidth } - wg.Done() -} +}