Codebase list golang-github-vbauerster-mpb / a27d844
heap manager Vladimir Bauer 3 years ago
3 changed file(s) with 179 addition(s) and 108 deletion(s). Raw diff Collapse all Expand all
457457 var anyOtherRunning bool
458458 b.container.traverseBars(func(bar *Bar) bool {
459459 anyOtherRunning = b != bar && bar.IsRunning()
460 return !anyOtherRunning
460 return anyOtherRunning
461461 })
462462 if !anyOtherRunning {
463463 for {
0 package mpb
1
2 import (
3 "container/heap"
4 )
5
6 type heapManager chan heapRequest
7
8 type heapCmd int
9
10 const (
11 h_sync heapCmd = iota
12 h_push
13 h_iter
14 h_pop_all
15 h_fix
16 )
17
18 type heapRequest struct {
19 cmd heapCmd
20 data interface{}
21 }
22
23 type iterData struct {
24 iter chan *Bar
25 drop chan struct{}
26 }
27
28 type pushData struct {
29 bar *Bar
30 sync bool
31 }
32
33 func (m heapManager) run() {
34 var bHeap priorityQueue
35 var pMatrix map[int][]chan int
36 var aMatrix map[int][]chan int
37
38 var l int
39 var sync bool
40
41 for req := range m {
42 switch req.cmd {
43 case h_push:
44 data := req.data.(*pushData)
45 heap.Push(&bHeap, data.bar)
46 sync = data.sync
47 case h_sync:
48 if sync || l != bHeap.Len() {
49 pMatrix = make(map[int][]chan int)
50 aMatrix = make(map[int][]chan int)
51 for _, b := range bHeap {
52 table := b.wSyncTable()
53 for i, ch := range table[0] {
54 pMatrix[i] = append(pMatrix[i], ch)
55 }
56 for i, ch := range table[1] {
57 aMatrix[i] = append(aMatrix[i], ch)
58 }
59 }
60 }
61 l = bHeap.Len()
62 syncWidth(pMatrix)
63 syncWidth(aMatrix)
64 case h_iter:
65 data := req.data.(*iterData)
66 for _, b := range bHeap {
67 select {
68 case data.iter <- b:
69 case <-data.drop:
70 }
71 }
72 close(data.iter)
73 case h_pop_all:
74 data := req.data.(*iterData)
75 for bHeap.Len() != 0 {
76 select {
77 case data.iter <- heap.Pop(&bHeap).(*Bar):
78 case <-data.drop:
79 }
80 }
81 close(data.iter)
82 case h_fix:
83 heap.Fix(&bHeap, req.data.(int))
84 }
85 }
86 }
87
88 func (m heapManager) sync() {
89 m <- heapRequest{cmd: h_sync}
90 }
91
92 func (m heapManager) push(b *Bar, sync bool) {
93 data := &pushData{b, sync}
94 m <- heapRequest{cmd: h_push, data: data}
95 }
96
97 func (m heapManager) iter(iter chan *Bar, drop chan struct{}) {
98 data := &iterData{iter, drop}
99 m <- heapRequest{cmd: h_iter, data: data}
100 }
101
102 func (m heapManager) popAll(iter chan *Bar, drop chan struct{}) {
103 data := &iterData{iter, drop}
104 m <- heapRequest{cmd: h_pop_all, data: data}
105 }
106
107 func (m heapManager) fix(index int) {
108 m <- heapRequest{cmd: h_push, data: index}
109 }
11
22 import (
33 "bytes"
4 "container/heap"
54 "context"
65 "fmt"
76 "io"
3433
3534 // pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine.
3635 type pState struct {
37 bHeap priorityQueue
38 heapUpdated bool
39 pMatrix map[int][]chan int
40 aMatrix map[int][]chan int
41 rows []io.Reader
36 hm heapManager
37 rows []io.Reader
4238
4339 // following are provided/overrided by user
4440 refreshRate time.Duration
4844 popCompleted bool
4945 outputDiscarded bool
5046 disableAutoRefresh bool
47 ignoreNotTTY bool
5148 manualRefresh chan interface{}
5249 renderDelay <-chan struct{}
5350 shutdownNotifier chan struct{}
6865 // method has been called.
6966 func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
7067 s := &pState{
68 hm: make(heapManager),
7169 rows: make([]io.Reader, 32),
7270 refreshRate: defaultRefreshRate,
7371 popPriority: math.MinInt32,
136134 if bs.wait.bar != nil {
137135 ps.queueBars[bs.wait.bar] = bar
138136 } else {
139 heap.Push(&ps.bHeap, bar)
140 ps.heapUpdated = true
137 ps.hm.push(bar, true)
141138 }
142139 ps.idCount++
143140 result <- bar
151148 }
152149
153150 func (p *Progress) traverseBars(cb func(b *Bar) bool) {
154 sync := make(chan struct{})
151 iter := make(chan *Bar)
152 drop := make(chan struct{})
155153 select {
156154 case p.operateState <- func(s *pState) {
157 defer close(sync)
158 for i := 0; i < s.bHeap.Len(); i++ {
159 bar := s.bHeap[i]
160 if !cb(bar) {
155 s.hm.iter(iter, drop)
156 }:
157 for b := range iter {
158 if cb(b) {
159 close(drop)
161160 break
162161 }
163162 }
164 }:
165 <-sync
166163 case <-p.done:
167164 }
168165 }
175172 return
176173 }
177174 b.priority = priority
178 heap.Fix(&s.bHeap, b.index)
175 s.hm.fix(b.index)
179176 }:
180177 case <-p.done:
181178 }
182179 }
183180
184181 // BarCount returns bars count.
185 func (p *Progress) BarCount() int {
186 result := make(chan int)
187 select {
188 case p.operateState <- func(s *pState) { result <- s.bHeap.Len() }:
189 return <-result
190 case <-p.done:
191 return 0
192 }
193 }
182 // func (p *Progress) BarCount() int {
183 // result := make(chan int)
184 // select {
185 // case p.operateState <- func(s *pState) { result <- s.bHeap.Len() }:
186 // return <-result
187 // case <-p.done:
188 // return 0
189 // }
190 // }
194191
195192 // Write is implementation of io.Writer.
196193 // Writing to `*mpb.Progress` will print lines above a running bar.
267264 }
268265
269266 func (p *Progress) serve(s *pState, cw *cwriter.Writer) {
270 defer close(p.shutdown)
271
272 render := func() error {
273 return s.render(cw)
274 }
267
268 go s.hm.run()
275269
276270 refreshCh := p.newTicker(s)
277271
282276 case fn := <-p.interceptIo:
283277 fn(cw)
284278 case <-refreshCh:
285 err := render()
279 err := s.render(cw)
286280 if err != nil {
287 s.heapUpdated = false
288 render = func() error { return nil }
281 refreshCh = nil
289282 _, _ = fmt.Fprintln(s.debugOut, err.Error())
290283 p.cancel() // cancel all bars
291284 }
292285 case <-p.done:
293 for s.heapUpdated {
294 err := render()
295 if err != nil {
296 _, _ = fmt.Fprintln(s.debugOut, err.Error())
297 return
298 }
299 }
286 close(s.hm)
287 close(p.shutdown)
300288 return
301289 }
302290 }
303291 }
304292
305293 func (s *pState) render(cw *cwriter.Writer) error {
306 var wg sync.WaitGroup
307 if s.heapUpdated {
308 s.updateSyncMatrix()
309 s.heapUpdated = false
310 }
311 syncWidth(&wg, s.pMatrix)
312 syncWidth(&wg, s.aMatrix)
313
314294 width, height, err := cw.GetTermSize()
315295 if err != nil {
296 if !s.ignoreNotTTY {
297 return err
298 }
316299 width = s.reqWidth
317 height = s.bHeap.Len()
318 }
319 for i := 0; i < s.bHeap.Len(); i++ {
320 bar := s.bHeap[i]
321 go bar.render(width)
322 }
323
324 err = s.flush(&wg, cw, height)
300 height = 100
301 }
302
303 s.hm.sync()
304
305 iter := make(chan *Bar)
306 s.hm.iter(iter, nil)
307 for b := range iter {
308 go b.render(width)
309 }
310
311 wg := new(sync.WaitGroup)
312 err = s.flush(wg, cw, height)
325313 wg.Wait()
326314 return err
327315 }
328316
329317 func (s *pState) flush(wg *sync.WaitGroup, cw *cwriter.Writer, height int) error {
330318 var popCount int
331 pool := make([]*Bar, 0, s.bHeap.Len())
332319 s.rows = s.rows[:0]
333320
334 for s.bHeap.Len() > 0 {
335 b := heap.Pop(&s.bHeap).(*Bar)
321 iter := make(chan *Bar)
322 drop := make(chan struct{})
323 s.hm.popAll(iter, drop)
324
325 for b := range iter {
336326 frame := <-b.frameCh
337327 if frame.err != nil {
338 // b.frameCh is buffered it's ok to return here
339 return frame.err
328 close(drop)
329 return frame.err // b.frameCh is buffered it's ok to return here
340330 }
341331 var usedRows int
342332 for i := len(frame.rows) - 1; i >= 0; i-- {
344334 s.rows = append(s.rows, row)
345335 usedRows++
346336 } else {
347 wg.Add(1)
348 go func() {
349 _, _ = io.Copy(io.Discard, row)
350 wg.Done()
351 }()
337 _, _ = io.Copy(io.Discard, row)
352338 }
353339 }
354340 if frame.shutdown {
356342 if qb, ok := s.queueBars[b]; ok {
357343 delete(s.queueBars, b)
358344 qb.priority = b.priority
359 pool = append(pool, qb)
360 s.heapUpdated = true
345 wg.Add(1)
346 go func(b *Bar) {
347 s.hm.push(b, true)
348 wg.Done()
349 }(qb)
361350 continue
362351 }
363352 if s.popCompleted && !b.bs.noPop {
368357 default:
369358 if b.bs.dropOnComplete {
370359 popCount += usedRows
371 s.heapUpdated = true
372360 continue
373361 }
374362 }
375363 } else if b.bs.dropOnComplete {
376 s.heapUpdated = true
377364 continue
378365 }
379366 }
380 pool = append(pool, b)
381 }
382
383 if len(pool) != 0 {
384367 wg.Add(1)
385 go func() {
386 for _, b := range pool {
387 heap.Push(&s.bHeap, b)
388 }
368 go func(b *Bar) {
369 s.hm.push(b, false)
389370 wg.Done()
390 }()
371 }(b)
391372 }
392373
393374 for i := len(s.rows) - 1; i >= 0; i-- {
397378 }
398379 }
399380
400 err := cw.Flush(len(s.rows) - popCount)
401 return err
402 }
403
404 func (s *pState) updateSyncMatrix() {
405 s.pMatrix = make(map[int][]chan int)
406 s.aMatrix = make(map[int][]chan int)
407 for i := 0; i < s.bHeap.Len(); i++ {
408 bar := s.bHeap[i]
409 table := bar.wSyncTable()
410
411 for i, ch := range table[0] {
412 s.pMatrix[i] = append(s.pMatrix[i], ch)
413 }
414
415 for i, ch := range table[1] {
416 s.aMatrix[i] = append(s.aMatrix[i], ch)
417 }
418 }
381 return cw.Flush(len(s.rows) - popCount)
419382 }
420383
421384 func (s *pState) makeBarState(total int64, filler BarFiller, options ...BarOption) *bState {
452415 return bs
453416 }
454417
455 func syncWidth(wg *sync.WaitGroup, matrix map[int][]chan int) {
418 func syncWidth(matrix map[int][]chan int) {
456419 for _, column := range matrix {
457 wg.Add(1)
458 go maxWidthDistributor(wg, column)
459 }
460 }
461
462 func maxWidthDistributor(wg *sync.WaitGroup, column []chan int) {
420 go maxWidthDistributor(column)
421 }
422 }
423
424 func maxWidthDistributor(column []chan int) {
463425 var maxWidth int
464426 for _, ch := range column {
465427 if w := <-ch; w > maxWidth {
469431 for _, ch := range column {
470432 ch <- maxWidth
471433 }
472 wg.Done()
473 }
434 }