Codebase list golang-github-vbauerster-mpb / 8967b98
UpdateBarPriority Vladimir Bauer 8 years ago
7 changed file(s) with 161 addition(s) and 153 deletion(s). Raw diff Collapse all Expand all
2828
2929 // Bar represents a progress Bar
3030 type Bar struct {
31 priority int
32 index int
3133 // quit channel to request b.server to quit
3234 quit chan struct{}
3335 // done channel is receiveable after b.server has been quit
34 done chan struct{}
35 ops chan func(*bState)
36
37 // following are used after b.done is receiveable
36 done chan struct{}
37 operateState chan func(*bState)
38
39 // cacheState is used after b.done is receiveable
3840 cacheState *bState
3941
4042 once sync.Once
7173 }
7274 bufReader struct {
7375 io.Reader
74 complete bool
76 completed bool
7577 }
7678 )
7779
9597 s.bufA = bytes.NewBuffer(make([]byte, 0, s.width/2))
9698
9799 b := &Bar{
98 quit: make(chan struct{}),
99 done: make(chan struct{}),
100 ops: make(chan func(*bState)),
101 }
102
103 go b.server(s, wg, cancel)
100 priority: id,
101 quit: make(chan struct{}),
102 done: make(chan struct{}),
103 operateState: make(chan func(*bState)),
104 }
105
106 go b.serve(s, wg, cancel)
104107 return b
105108 }
106109
107110 // RemoveAllPrependers removes all prepend functions
108111 func (b *Bar) RemoveAllPrependers() {
109112 select {
110 case b.ops <- func(s *bState) {
113 case b.operateState <- func(s *bState) {
111114 s.prependFuncs = nil
112115 }:
113116 case <-b.quit:
117120 // RemoveAllAppenders removes all append functions
118121 func (b *Bar) RemoveAllAppenders() {
119122 select {
120 case b.ops <- func(s *bState) {
123 case b.operateState <- func(s *bState) {
121124 s.appendFuncs = nil
122125 }:
123126 case <-b.quit:
140143 return
141144 }
142145 select {
143 case b.ops <- func(s *bState) {
146 case b.operateState <- func(s *bState) {
144147 next := time.Now()
145148 if s.current == 0 {
146149 s.startTime = next
172175 return
173176 }
174177 select {
175 case b.ops <- func(s *bState) {
178 case b.operateState <- func(s *bState) {
176179 s.refill = &refill{r, till}
177180 }:
178181 case <-b.quit:
183186 func (b *Bar) NumOfAppenders() int {
184187 result := make(chan int, 1)
185188 select {
186 case b.ops <- func(s *bState) { result <- len(s.appendFuncs) }:
189 case b.operateState <- func(s *bState) { result <- len(s.appendFuncs) }:
187190 return <-result
188191 case <-b.done:
189192 return len(b.cacheState.appendFuncs)
194197 func (b *Bar) NumOfPrependers() int {
195198 result := make(chan int, 1)
196199 select {
197 case b.ops <- func(s *bState) { result <- len(s.prependFuncs) }:
200 case b.operateState <- func(s *bState) { result <- len(s.prependFuncs) }:
198201 return <-result
199202 case <-b.done:
200203 return len(b.cacheState.prependFuncs)
205208 func (b *Bar) ID() int {
206209 result := make(chan int, 1)
207210 select {
208 case b.ops <- func(s *bState) { result <- s.id }:
211 case b.operateState <- func(s *bState) { result <- s.id }:
209212 return <-result
210213 case <-b.done:
211214 return b.cacheState.id
216219 func (b *Bar) Current() int64 {
217220 result := make(chan int64, 1)
218221 select {
219 case b.ops <- func(s *bState) { result <- s.current }:
222 case b.operateState <- func(s *bState) { result <- s.current }:
220223 return <-result
221224 case <-b.done:
222225 return b.cacheState.current
227230 func (b *Bar) Total() int64 {
228231 result := make(chan int64, 1)
229232 select {
230 case b.ops <- func(s *bState) { result <- s.total }:
233 case b.operateState <- func(s *bState) { result <- s.total }:
231234 return <-result
232235 case <-b.done:
233236 return b.cacheState.total
238241 // in other words you should set it to true when total is determined.
239242 func (b *Bar) SetTotal(total int64, final bool) {
240243 select {
241 case b.ops <- func(s *bState) {
244 case b.operateState <- func(s *bState) {
242245 s.total = total
243246 s.dynamic = !final
244247 }:
269272 close(b.quit)
270273 }
271274
272 func (b *Bar) server(s *bState, wg *sync.WaitGroup, cancel <-chan struct{}) {
275 func (b *Bar) serve(s *bState, wg *sync.WaitGroup, cancel <-chan struct{}) {
273276 defer func() {
274277 b.cacheState = s
275278 close(b.done)
278281
279282 for {
280283 select {
281 case op := <-b.ops:
284 case op := <-b.operateState:
282285 op(s)
283286 case <-cancel:
284287 s.aborted = true
295298
296299 go func() {
297300 select {
298 case b.ops <- func(s *bState) {
301 case b.operateState <- func(s *bState) {
299302 defer func() {
300303 // recovering if external decorators panic
301304 if p := recover(); p != nil {
22 import (
33 "fmt"
44 "math/rand"
5 "sort"
65 "sync"
76 "time"
87
1413 maxBlockSize = 12
1514 )
1615
17 type barSlice []*mpb.Bar
18
19 func (bs barSlice) Len() int { return len(bs) }
20
21 func (bs barSlice) Less(i, j int) bool {
22 ip := decor.CalcPercentage(bs[i].Total(), bs[i].Current(), 100)
23 jp := decor.CalcPercentage(bs[j].Total(), bs[j].Current(), 100)
24 return ip < jp
25 }
26
27 func (bs barSlice) Swap(i, j int) { bs[i], bs[j] = bs[j], bs[i] }
28
29 func sortByProgressFunc() mpb.BeforeRender {
30 return func(bars []*mpb.Bar) {
31 sort.Sort(sort.Reverse(barSlice(bars)))
32 }
33 }
34
3516 func main() {
36
3717 var wg sync.WaitGroup
38 p := mpb.New(
39 mpb.WithWaitGroup(&wg),
40 mpb.WithBeforeRenderFunc(sortByProgressFunc()),
41 )
18 p := mpb.New(mpb.WithWaitGroup(&wg))
4219 total := 100
4320 numBars := 3
4421 wg.Add(numBars)
5128 b := p.AddBar(int64(total),
5229 mpb.PrependDecorators(
5330 decor.StaticName(name, 0, decor.DwidthSync),
54 decor.Counters("%d / %d", 0, 10, decor.DSyncSpace),
31 decor.CountersNoUnit("%d / %d", 10, decor.DSyncSpace),
5532 ),
5633 mpb.AppendDecorators(
5734 decor.ETA(3, 0),
5936 )
6037 go func() {
6138 defer wg.Done()
62 blockSize := rand.Intn(maxBlockSize) + 1
63 for i := 0; i < total; i++ {
39 for blockSize, i := 0, 0; i < total; i++ {
40 blockSize = rand.Intn(maxBlockSize) + 1
41 if i&1 == 1 {
42 priority := total - int(b.Current())
43 p.UpdateBarPriority(b, priority)
44 }
45 b.Increment()
6446 sleep(blockSize)
65 b.Incr(1)
66 blockSize = rand.Intn(maxBlockSize) + 1
6747 }
6848 }()
6949 }
5050 }
5151 }
5252
53 // WithBeforeRenderFunc provided BeforeRender func,
54 // will be called before each render cycle.
55 func WithBeforeRenderFunc(f BeforeRender) ProgressOption {
56 return func(s *pState) {
57 s.beforeRender = f
58 }
59 }
60
6153 // WithCancel provide your cancel channel,
6254 // which you plan to close at some point.
6355 func WithCancel(ch <-chan struct{}) ProgressOption {
0 package mpb
1
2 import "container/heap"
3
4 // A priorityQueue implements heap.Interface and holds Items.
5 type priorityQueue []*Bar
6
7 func (pq priorityQueue) Len() int { return len(pq) }
8
9 func (pq priorityQueue) Less(i, j int) bool {
10 return pq[i].priority < pq[j].priority
11 }
12
13 func (pq priorityQueue) Swap(i, j int) {
14 pq[i], pq[j] = pq[j], pq[i]
15 pq[i].index = i
16 pq[j].index = j
17 }
18
19 func (pq *priorityQueue) Push(x interface{}) {
20 n := len(*pq)
21 bar := x.(*Bar)
22 bar.index = n
23 *pq = append(*pq, bar)
24 }
25
26 func (pq *priorityQueue) Pop() interface{} {
27 old := *pq
28 n := len(old)
29 bar := old[n-1]
30 bar.index = -1 // for safety
31 *pq = old[0 : n-1]
32 return bar
33 }
34
35 // update modifies the priority of an Bar in the queue.
36 func (pq *priorityQueue) update(bar *Bar, priority int) {
37 bar.priority = priority
38 heap.Fix(pq, bar.index)
39 }
00 package mpb
11
22 import (
3 "container/heap"
34 "io"
45 "os"
56 "sync"
67 "time"
78
89 "github.com/vbauerster/mpb/cwriter"
9 )
10
11 type (
12 // BeforeRender is a func, which gets called before each rendering cycle
13 BeforeRender func([]*Bar)
14
15 widthSync struct {
16 Listen []chan int
17 Result []chan int
18 }
19
20 // progress state, which may contain several bars
21 pState struct {
22 bars []*Bar
23
24 idCounter int
25 width int
26 format string
27 rr time.Duration
28 ewg *sync.WaitGroup
29 cw *cwriter.Writer
30 ticker *time.Ticker
31 beforeRender BeforeRender
32 interceptors []func(io.Writer)
33
34 shutdownNotifier chan struct{}
35 cancel <-chan struct{}
36 }
3710 )
3811
3912 const (
5528 // quit channel to request p.server to quit
5629 quit chan struct{}
5730 // done channel is receiveable after p.server has been quit
58 done chan struct{}
59 ops chan func(*pState)
60 }
31 done chan struct{}
32 operateState chan func(*pState)
33 }
34
35 type (
36 // progress state, which may contain several bars
37 pState struct {
38 bHeap *priorityQueue
39 idCounter int
40 width int
41 format string
42 rr time.Duration
43 ewg *sync.WaitGroup
44 cw *cwriter.Writer
45 ticker *time.Ticker
46 interceptors []func(io.Writer)
47
48 shutdownNotifier chan struct{}
49 cancel <-chan struct{}
50 }
51 widthSync struct {
52 Listen []chan int
53 Result []chan int
54 }
55 renderedBar struct {
56 bar *Bar
57 pipe <-chan *bufReader
58 }
59 )
6160
6261 // New creates new Progress instance, which orchestrates bars rendering process.
6362 // Accepts mpb.ProgressOption funcs for customization.
6463 func New(options ...ProgressOption) *Progress {
64 pq := make(priorityQueue, 0)
65 heap.Init(&pq)
6566 s := &pState{
66 bars: make([]*Bar, 0, 3),
67 bHeap: &pq,
6768 width: pwidth,
6869 format: pformat,
6970 cw: cwriter.New(os.Stdout),
7778 }
7879
7980 p := &Progress{
80 ewg: s.ewg,
81 wg: new(sync.WaitGroup),
82 done: make(chan struct{}),
83 ops: make(chan func(*pState)),
84 quit: make(chan struct{}),
85 }
86 go p.server(s)
81 ewg: s.ewg,
82 wg: new(sync.WaitGroup),
83 done: make(chan struct{}),
84 operateState: make(chan func(*pState)),
85 quit: make(chan struct{}),
86 }
87 go p.serve(s)
8788 return p
8889 }
8990
9293 p.wg.Add(1)
9394 result := make(chan *Bar, 1)
9495 select {
95 case p.ops <- func(s *pState) {
96 case p.operateState <- func(s *pState) {
9697 options = append(options, barWidth(s.width), barFormat(s.format))
9798 b := newBar(s.idCounter, total, p.wg, s.cancel, options...)
98 s.bars = append(s.bars, b)
99 heap.Push(s.bHeap, b)
99100 s.idCounter++
100101 result <- b
101102 }:
109110 func (p *Progress) RemoveBar(b *Bar) bool {
110111 result := make(chan bool, 1)
111112 select {
112 case p.ops <- func(s *pState) {
113 var ok bool
114 for i, bar := range s.bars {
115 if bar == b {
116 bar.Complete()
117 s.bars = append(s.bars[:i], s.bars[i+1:]...)
118 ok = true
119 break
120 }
121 }
122 result <- ok
113 case p.operateState <- func(s *pState) {
114 b.Complete()
115 result <- heap.Remove(s.bHeap, b.index) != nil
123116 }:
124117 return <-result
125118 case <-p.quit:
126119 return false
120 }
121 }
122
123 // UpdateBarPriority provides a way to change bar's order position.
124 // Zero is highest priority, i.e. bar will be on top.
125 func (p *Progress) UpdateBarPriority(b *Bar, priority int) {
126 select {
127 case p.operateState <- func(s *pState) {
128 s.bHeap.update(b, priority)
129 }:
130 case <-p.quit:
127131 }
128132 }
129133
131135 func (p *Progress) BarCount() int {
132136 result := make(chan int, 1)
133137 select {
134 case p.ops <- func(s *pState) {
135 result <- len(s.bars)
138 case p.operateState <- func(s *pState) {
139 result <- s.bHeap.Len()
136140 }:
137141 return <-result
138142 case <-p.quit:
210214 if numP < 0 && numA < 0 {
211215 return
212216 }
213 if s.beforeRender != nil {
214 s.beforeRender(s.bars)
215 }
216217
217218 wSyncTimeout := make(chan struct{})
218219 time.AfterFunc(s.rr, func() {
219220 close(wSyncTimeout)
220221 })
221222
222 prependWs := newWidthSync(wSyncTimeout, len(s.bars), numP)
223 appendWs := newWidthSync(wSyncTimeout, len(s.bars), numA)
224
225 sequence := make([]<-chan *bufReader, len(s.bars))
226 for i, b := range s.bars {
227 sequence[i] = b.render(tw, prependWs, appendWs)
228 }
229
230 var i int
231 for r := range fanIn(sequence...) {
223 prependWs := newWidthSync(wSyncTimeout, s.bHeap.Len(), numP)
224 appendWs := newWidthSync(wSyncTimeout, s.bHeap.Len(), numA)
225
226 for _, b := range s.renderByPriority(tw, prependWs, appendWs) {
227 r := <-b.pipe
232228 _, err = s.cw.ReadFrom(r)
233 defer func(bar *Bar, complete bool) {
234 if complete {
235 bar.Complete()
236 }
237 }(s.bars[i], r.complete)
238 i++
229 if r.completed {
230 b.bar.Complete()
231 }
239232 }
240233
241234 for _, interceptor := range s.interceptors {
248241 return
249242 }
250243
251 func fanIn(inputs ...<-chan *bufReader) <-chan *bufReader {
252 ch := make(chan *bufReader)
253
254 go func() {
255 defer close(ch)
256 for _, input := range inputs {
257 ch <- <-input
258 }
259 }()
260
261 return ch
244 func (s *pState) renderByPriority(tw int, prependWs, appendWs *widthSync) []*renderedBar {
245 slice := make([]*renderedBar, 0, s.bHeap.Len())
246 for s.bHeap.Len() > 0 {
247 b := heap.Pop(s.bHeap).(*Bar)
248 defer heap.Push(s.bHeap, b)
249 slice = append(slice, &renderedBar{
250 bar: b,
251 pipe: b.render(tw, prependWs, appendWs),
252 })
253 }
254 return slice
262255 }
263256
264257 func max(slice []int) int {
1212 "github.com/vbauerster/mpb/cwriter"
1313 )
1414
15 func (p *Progress) server(s *pState) {
15 func (p *Progress) serve(s *pState) {
1616 winch := make(chan os.Signal, 1)
1717 signal.Notify(winch, syscall.SIGWINCH)
1818
3232
3333 for {
3434 select {
35 case op := <-p.ops:
35 case op := <-p.operateState:
3636 op(s)
3737 case <-s.ticker.C:
38 if len(s.bars) == 0 {
38 if s.bHeap.Len() == 0 {
3939 runtime.Gosched()
4040 break
4141 }
42 b0 := s.bars[0]
42 b0 := (*s.bHeap)[0]
4343 if numP == -1 {
4444 numP = b0.NumOfPrependers()
4545 }
99 "github.com/vbauerster/mpb/cwriter"
1010 )
1111
12 func (p *Progress) server(s *pState) {
12 func (p *Progress) serve(s *pState) {
1313 defer func() {
1414 if s.shutdownNotifier != nil {
1515 close(s.shutdownNotifier)
2121
2222 for {
2323 select {
24 case op := <-p.ops:
24 case op := <-p.operateState:
2525 op(s)
2626 case <-s.ticker.C:
27 if len(s.bars) == 0 {
27 if s.bHeap.Len() == 0 {
2828 runtime.Gosched()
2929 break
3030 }
31 b0 := s.bars[0]
31 b0 := (*s.bHeap)[0]
3232 if numP == -1 {
3333 numP = b0.NumOfPrependers()
3434 }