diff --git a/bar.go b/bar.go index ebbb16c..e1e10bc 100644 --- a/bar.go +++ b/bar.go @@ -29,13 +29,15 @@ // Bar represents a progress Bar type Bar struct { + priority int + index int // quit channel to request b.server to quit quit chan struct{} // done channel is receiveable after b.server has been quit - done chan struct{} - ops chan func(*bState) - - // following are used after b.done is receiveable + done chan struct{} + operateState chan func(*bState) + + // cacheState is used after b.done is receiveable cacheState *bState once sync.Once @@ -72,7 +74,7 @@ } bufReader struct { io.Reader - complete bool + completed bool } ) @@ -96,19 +98,20 @@ s.bufA = bytes.NewBuffer(make([]byte, 0, s.width/2)) b := &Bar{ - quit: make(chan struct{}), - done: make(chan struct{}), - ops: make(chan func(*bState)), - } - - go b.server(s, wg, cancel) + priority: id, + quit: make(chan struct{}), + done: make(chan struct{}), + operateState: make(chan func(*bState)), + } + + go b.serve(s, wg, cancel) return b } // RemoveAllPrependers removes all prepend functions func (b *Bar) RemoveAllPrependers() { select { - case b.ops <- func(s *bState) { + case b.operateState <- func(s *bState) { s.prependFuncs = nil }: case <-b.quit: @@ -118,7 +121,7 @@ // RemoveAllAppenders removes all append functions func (b *Bar) RemoveAllAppenders() { select { - case b.ops <- func(s *bState) { + case b.operateState <- func(s *bState) { s.appendFuncs = nil }: case <-b.quit: @@ -141,7 +144,7 @@ return } select { - case b.ops <- func(s *bState) { + case b.operateState <- func(s *bState) { next := time.Now() if s.current == 0 { s.startTime = next @@ -173,7 +176,7 @@ return } select { - case b.ops <- func(s *bState) { + case b.operateState <- func(s *bState) { s.refill = &refill{r, till} }: case <-b.quit: @@ -184,7 +187,7 @@ func (b *Bar) NumOfAppenders() int { result := make(chan int, 1) select { - case b.ops <- func(s *bState) { result <- len(s.appendFuncs) }: + case b.operateState <- func(s *bState) { result <- len(s.appendFuncs) }: return <-result case <-b.done: return len(b.cacheState.appendFuncs) @@ -195,7 +198,7 @@ func (b *Bar) NumOfPrependers() int { result := make(chan int, 1) select { - case b.ops <- func(s *bState) { result <- len(s.prependFuncs) }: + case b.operateState <- func(s *bState) { result <- len(s.prependFuncs) }: return <-result case <-b.done: return len(b.cacheState.prependFuncs) @@ -206,7 +209,7 @@ func (b *Bar) ID() int { result := make(chan int, 1) select { - case b.ops <- func(s *bState) { result <- s.id }: + case b.operateState <- func(s *bState) { result <- s.id }: return <-result case <-b.done: return b.cacheState.id @@ -217,7 +220,7 @@ func (b *Bar) Current() int64 { result := make(chan int64, 1) select { - case b.ops <- func(s *bState) { result <- s.current }: + case b.operateState <- func(s *bState) { result <- s.current }: return <-result case <-b.done: return b.cacheState.current @@ -228,7 +231,7 @@ func (b *Bar) Total() int64 { result := make(chan int64, 1) select { - case b.ops <- func(s *bState) { result <- s.total }: + case b.operateState <- func(s *bState) { result <- s.total }: return <-result case <-b.done: return b.cacheState.total @@ -239,7 +242,7 @@ // in other words you should set it to true when total is determined. func (b *Bar) SetTotal(total int64, final bool) { select { - case b.ops <- func(s *bState) { + case b.operateState <- func(s *bState) { s.total = total s.dynamic = !final }: @@ -270,7 +273,7 @@ close(b.quit) } -func (b *Bar) server(s *bState, wg *sync.WaitGroup, cancel <-chan struct{}) { +func (b *Bar) serve(s *bState, wg *sync.WaitGroup, cancel <-chan struct{}) { defer func() { b.cacheState = s close(b.done) @@ -279,7 +282,7 @@ for { select { - case op := <-b.ops: + case op := <-b.operateState: op(s) case <-cancel: s.aborted = true @@ -296,7 +299,7 @@ go func() { select { - case b.ops <- func(s *bState) { + case b.operateState <- func(s *bState) { defer func() { // recovering if external decorators panic if p := recover(); p != nil { diff --git a/examples/sort/main.go b/examples/sort/main.go index 7e89642..3bdda2e 100644 --- a/examples/sort/main.go +++ b/examples/sort/main.go @@ -3,7 +3,6 @@ import ( "fmt" "math/rand" - "sort" "sync" "time" @@ -15,31 +14,9 @@ maxBlockSize = 12 ) -type barSlice []*mpb.Bar - -func (bs barSlice) Len() int { return len(bs) } - -func (bs barSlice) Less(i, j int) bool { - ip := decor.CalcPercentage(bs[i].Total(), bs[i].Current(), 100) - jp := decor.CalcPercentage(bs[j].Total(), bs[j].Current(), 100) - return ip < jp -} - -func (bs barSlice) Swap(i, j int) { bs[i], bs[j] = bs[j], bs[i] } - -func sortByProgressFunc() mpb.BeforeRender { - return func(bars []*mpb.Bar) { - sort.Sort(sort.Reverse(barSlice(bars))) - } -} - func main() { - var wg sync.WaitGroup - p := mpb.New( - mpb.WithWaitGroup(&wg), - mpb.WithBeforeRenderFunc(sortByProgressFunc()), - ) + p := mpb.New(mpb.WithWaitGroup(&wg)) total := 100 numBars := 3 wg.Add(numBars) @@ -52,7 +29,7 @@ b := p.AddBar(int64(total), mpb.PrependDecorators( decor.StaticName(name, 0, decor.DwidthSync), - decor.Counters("%d / %d", 0, 10, decor.DSyncSpace), + decor.CountersNoUnit("%d / %d", 10, decor.DSyncSpace), ), mpb.AppendDecorators( decor.ETA(3, 0), @@ -60,11 +37,14 @@ ) go func() { defer wg.Done() - blockSize := rand.Intn(maxBlockSize) + 1 - for i := 0; i < total; i++ { + for blockSize, i := 0, 0; i < total; i++ { + blockSize = rand.Intn(maxBlockSize) + 1 + if i&1 == 1 { + priority := total - int(b.Current()) + p.UpdateBarPriority(b, priority) + } + b.Increment() sleep(blockSize) - b.Incr(1) - blockSize = rand.Intn(maxBlockSize) + 1 } }() } diff --git a/options.go b/options.go index 0800657..60a5b45 100644 --- a/options.go +++ b/options.go @@ -51,14 +51,6 @@ } } -// WithBeforeRenderFunc provided BeforeRender func, -// will be called before each render cycle. -func WithBeforeRenderFunc(f BeforeRender) ProgressOption { - return func(s *pState) { - s.beforeRender = f - } -} - // WithCancel provide your cancel channel, // which you plan to close at some point. func WithCancel(ch <-chan struct{}) ProgressOption { diff --git a/priority-queue.go b/priority-queue.go new file mode 100644 index 0000000..99a9234 --- /dev/null +++ b/priority-queue.go @@ -0,0 +1,40 @@ +package mpb + +import "container/heap" + +// A priorityQueue implements heap.Interface and holds Items. +type priorityQueue []*Bar + +func (pq priorityQueue) Len() int { return len(pq) } + +func (pq priorityQueue) Less(i, j int) bool { + return pq[i].priority < pq[j].priority +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *priorityQueue) Push(x interface{}) { + n := len(*pq) + bar := x.(*Bar) + bar.index = n + *pq = append(*pq, bar) +} + +func (pq *priorityQueue) Pop() interface{} { + old := *pq + n := len(old) + bar := old[n-1] + bar.index = -1 // for safety + *pq = old[0 : n-1] + return bar +} + +// update modifies the priority of an Bar in the queue. +func (pq *priorityQueue) update(bar *Bar, priority int) { + bar.priority = priority + heap.Fix(pq, bar.index) +} diff --git a/progress.go b/progress.go index 1d3d6ba..150d274 100644 --- a/progress.go +++ b/progress.go @@ -1,40 +1,13 @@ package mpb import ( + "container/heap" "io" "os" "sync" "time" "github.com/vbauerster/mpb/cwriter" -) - -type ( - // BeforeRender is a func, which gets called before each rendering cycle - BeforeRender func([]*Bar) - - widthSync struct { - Listen []chan int - Result []chan int - } - - // progress state, which may contain several bars - pState struct { - bars []*Bar - - idCounter int - width int - format string - rr time.Duration - ewg *sync.WaitGroup - cw *cwriter.Writer - ticker *time.Ticker - beforeRender BeforeRender - interceptors []func(io.Writer) - - shutdownNotifier chan struct{} - cancel <-chan struct{} - } ) const ( @@ -56,15 +29,43 @@ // quit channel to request p.server to quit quit chan struct{} // done channel is receiveable after p.server has been quit - done chan struct{} - ops chan func(*pState) -} + done chan struct{} + operateState chan func(*pState) +} + +type ( + // progress state, which may contain several bars + pState struct { + bHeap *priorityQueue + idCounter int + width int + format string + rr time.Duration + ewg *sync.WaitGroup + cw *cwriter.Writer + ticker *time.Ticker + interceptors []func(io.Writer) + + shutdownNotifier chan struct{} + cancel <-chan struct{} + } + widthSync struct { + Listen []chan int + Result []chan int + } + renderedBar struct { + bar *Bar + pipe <-chan *bufReader + } +) // New creates new Progress instance, which orchestrates bars rendering process. // Accepts mpb.ProgressOption funcs for customization. func New(options ...ProgressOption) *Progress { + pq := make(priorityQueue, 0) + heap.Init(&pq) s := &pState{ - bars: make([]*Bar, 0, 3), + bHeap: &pq, width: pwidth, format: pformat, cw: cwriter.New(os.Stdout), @@ -78,13 +79,13 @@ } p := &Progress{ - ewg: s.ewg, - wg: new(sync.WaitGroup), - done: make(chan struct{}), - ops: make(chan func(*pState)), - quit: make(chan struct{}), - } - go p.server(s) + ewg: s.ewg, + wg: new(sync.WaitGroup), + done: make(chan struct{}), + operateState: make(chan func(*pState)), + quit: make(chan struct{}), + } + go p.serve(s) return p } @@ -93,10 +94,10 @@ p.wg.Add(1) result := make(chan *Bar, 1) select { - case p.ops <- func(s *pState) { + case p.operateState <- func(s *pState) { options = append(options, barWidth(s.width), barFormat(s.format)) b := newBar(s.idCounter, total, p.wg, s.cancel, options...) - s.bars = append(s.bars, b) + heap.Push(s.bHeap, b) s.idCounter++ result <- b }: @@ -110,21 +111,24 @@ func (p *Progress) RemoveBar(b *Bar) bool { result := make(chan bool, 1) select { - case p.ops <- func(s *pState) { - var ok bool - for i, bar := range s.bars { - if bar == b { - bar.Complete() - s.bars = append(s.bars[:i], s.bars[i+1:]...) - ok = true - break - } - } - result <- ok + case p.operateState <- func(s *pState) { + b.Complete() + result <- heap.Remove(s.bHeap, b.index) != nil }: return <-result case <-p.quit: return false + } +} + +// UpdateBarPriority provides a way to change bar's order position. +// Zero is highest priority, i.e. bar will be on top. +func (p *Progress) UpdateBarPriority(b *Bar, priority int) { + select { + case p.operateState <- func(s *pState) { + s.bHeap.update(b, priority) + }: + case <-p.quit: } } @@ -132,8 +136,8 @@ func (p *Progress) BarCount() int { result := make(chan int, 1) select { - case p.ops <- func(s *pState) { - result <- len(s.bars) + case p.operateState <- func(s *pState) { + result <- s.bHeap.Len() }: return <-result case <-p.quit: @@ -211,32 +215,21 @@ if numP < 0 && numA < 0 { return } - if s.beforeRender != nil { - s.beforeRender(s.bars) - } wSyncTimeout := make(chan struct{}) time.AfterFunc(s.rr, func() { close(wSyncTimeout) }) - prependWs := newWidthSync(wSyncTimeout, len(s.bars), numP) - appendWs := newWidthSync(wSyncTimeout, len(s.bars), numA) - - sequence := make([]<-chan *bufReader, len(s.bars)) - for i, b := range s.bars { - sequence[i] = b.render(tw, prependWs, appendWs) - } - - var i int - for r := range fanIn(sequence...) { + prependWs := newWidthSync(wSyncTimeout, s.bHeap.Len(), numP) + appendWs := newWidthSync(wSyncTimeout, s.bHeap.Len(), numA) + + for _, b := range s.renderByPriority(tw, prependWs, appendWs) { + r := <-b.pipe _, err = s.cw.ReadFrom(r) - defer func(bar *Bar, complete bool) { - if complete { - bar.Complete() - } - }(s.bars[i], r.complete) - i++ + if r.completed { + b.bar.Complete() + } } for _, interceptor := range s.interceptors { @@ -249,17 +242,17 @@ return } -func fanIn(inputs ...<-chan *bufReader) <-chan *bufReader { - ch := make(chan *bufReader) - - go func() { - defer close(ch) - for _, input := range inputs { - ch <- <-input - } - }() - - return ch +func (s *pState) renderByPriority(tw int, prependWs, appendWs *widthSync) []*renderedBar { + slice := make([]*renderedBar, 0, s.bHeap.Len()) + for s.bHeap.Len() > 0 { + b := heap.Pop(s.bHeap).(*Bar) + defer heap.Push(s.bHeap, b) + slice = append(slice, &renderedBar{ + bar: b, + pipe: b.render(tw, prependWs, appendWs), + }) + } + return slice } func max(slice []int) int { diff --git a/progress_posix.go b/progress_posix.go index 39cb68f..1a3db11 100644 --- a/progress_posix.go +++ b/progress_posix.go @@ -13,7 +13,7 @@ "github.com/vbauerster/mpb/cwriter" ) -func (p *Progress) server(s *pState) { +func (p *Progress) serve(s *pState) { winch := make(chan os.Signal, 1) signal.Notify(winch, syscall.SIGWINCH) @@ -33,14 +33,14 @@ for { select { - case op := <-p.ops: + case op := <-p.operateState: op(s) case <-s.ticker.C: - if len(s.bars) == 0 { + if s.bHeap.Len() == 0 { runtime.Gosched() break } - b0 := s.bars[0] + b0 := (*s.bHeap)[0] if numP == -1 { numP = b0.NumOfPrependers() } diff --git a/progress_windows.go b/progress_windows.go index 931ae1c..9b16ff6 100644 --- a/progress_windows.go +++ b/progress_windows.go @@ -10,7 +10,7 @@ "github.com/vbauerster/mpb/cwriter" ) -func (p *Progress) server(s *pState) { +func (p *Progress) serve(s *pState) { defer func() { if s.shutdownNotifier != nil { close(s.shutdownNotifier) @@ -22,14 +22,14 @@ for { select { - case op := <-p.ops: + case op := <-p.operateState: op(s) case <-s.ticker.C: - if len(s.bars) == 0 { + if s.bHeap.Len() == 0 { runtime.Gosched() break } - b0 := s.bars[0] + b0 := (*s.bHeap)[0] if numP == -1 { numP = b0.NumOfPrependers() }