Codebase list golang-github-vbauerster-mpb / 4be1416
forceRefresh Vladimir Bauer 7 years ago
3 changed file(s) with 122 addition(s) and 158 deletion(s). Raw diff Collapse all Expand all
2121
2222 // Progress represents the container that renders Progress bars
2323 type Progress struct {
24 wg *sync.WaitGroup
2524 uwg *sync.WaitGroup
25 cwg *sync.WaitGroup
26 bwg *sync.WaitGroup
2627 operateState chan func(*pState)
2728 done chan struct{}
2829 }
3132 bHeap *priorityQueue
3233 shutdownPending []*Bar
3334 heapUpdated bool
34 zeroWait bool
3535 idCounter int
3636 width int
3737 format string
3939 cw *cwriter.Writer
4040 pMatrix map[int][]chan int
4141 aMatrix map[int][]chan int
42 forceRefreshCh chan time.Time
4243
4344 // following are provided/overrided by user
4445 ctx context.Context
5556 pq := make(priorityQueue, 0)
5657 heap.Init(&pq)
5758 s := &pState{
58 ctx: context.Background(),
59 bHeap: &pq,
60 width: pwidth,
61 cw: cwriter.New(os.Stdout),
62 rr: prr,
63 waitBars: make(map[*Bar]*Bar),
64 debugOut: ioutil.Discard,
59 ctx: context.Background(),
60 bHeap: &pq,
61 width: pwidth,
62 cw: cwriter.New(os.Stdout),
63 rr: prr,
64 waitBars: make(map[*Bar]*Bar),
65 debugOut: ioutil.Discard,
66 forceRefreshCh: make(chan time.Time),
6567 }
6668
6769 for _, opt := range options {
7274
7375 p := &Progress{
7476 uwg: s.uwg,
75 wg: new(sync.WaitGroup),
77 cwg: new(sync.WaitGroup),
78 bwg: new(sync.WaitGroup),
7679 operateState: make(chan func(*pState)),
7780 done: make(chan struct{}),
7881 }
82 p.cwg.Add(1)
7983 go p.serve(s)
8084 return p
8185 }
96100
97101 // Add creates a bar which renders itself by provided filler.
98102 func (p *Progress) Add(total int64, filler Filler, options ...BarOption) *Bar {
99 p.wg.Add(1)
103 p.bwg.Add(1)
100104 result := make(chan *Bar)
101105 select {
102106 case p.operateState <- func(s *pState) {
103 b := newBar(s.ctx, p.wg, filler, s.idCounter, s.width, total, options...)
107 b := newBar(s.ctx, p.bwg, filler, s.idCounter, s.width, total, options...)
104108 if b.runningBar != nil {
105109 s.waitBars[b.runningBar] = b
106110 } else {
112116 }:
113117 return <-result
114118 case <-p.done:
115 p.wg.Done()
119 p.bwg.Done()
116120 return nil
117121 }
118122 }
156160 }
157161 }
158162
159 // Wait first waits for user provided *sync.WaitGroup, if any, then
160 // waits far all bars to complete and finally shutdowns master goroutine.
163 // Wait waits far all bars to complete and finally shutdowns container.
161164 // After this method has been called, there is no way to reuse *Progress
162165 // instance.
163166 func (p *Progress) Wait() {
164167 if p.uwg != nil {
168 // wait for user wg
165169 p.uwg.Wait()
166170 }
167171
168 p.wg.Wait()
169
170 select {
171 case p.operateState <- func(s *pState) { s.zeroWait = true }:
172 <-p.done
173 case <-p.done:
174 }
175 }
176
177 func (s *pState) updateSyncMatrix() {
178 s.pMatrix = make(map[int][]chan int)
179 s.aMatrix = make(map[int][]chan int)
180 for i := 0; i < s.bHeap.Len(); i++ {
181 bar := (*s.bHeap)[i]
182 table := bar.wSyncTable()
183 pRow, aRow := table[0], table[1]
184
185 for i, ch := range pRow {
186 s.pMatrix[i] = append(s.pMatrix[i], ch)
187 }
188
189 for i, ch := range aRow {
190 s.aMatrix[i] = append(s.aMatrix[i], ch)
191 }
192 }
193 }
194
195 func (s *pState) render(tw int) {
172 // wait for bars to quit, if any
173 p.bwg.Wait()
174
175 close(p.done)
176
177 // wait for container to quit
178 p.cwg.Wait()
179 }
180
181 func (p *Progress) serve(s *pState) {
182 defer p.cwg.Done()
183
184 manualOrTickCh, cleanUp := s.manualOrTick()
185 defer cleanUp()
186
187 refreshCh := fanInRefreshSrc(p.done, s.forceRefreshCh, manualOrTickCh)
188
189 for {
190 select {
191 case op := <-p.operateState:
192 op(s)
193 case _, ok := <-refreshCh:
194 if !ok {
195 if s.shutdownNotifier != nil {
196 close(s.shutdownNotifier)
197 }
198 return
199 }
200 tw, err := s.cw.GetWidth()
201 if err != nil {
202 tw = s.width
203 }
204 s.render(p.done, tw)
205 }
206 }
207 }
208
209 func (s *pState) render(done <-chan struct{}, tw int) {
196210 if s.heapUpdated {
197211 s.updateSyncMatrix()
198212 s.heapUpdated = false
205219 go bar.render(s.debugOut, tw)
206220 }
207221
208 if err := s.flush(s.bHeap.Len()); err != nil {
222 if err := s.flush(done, s.bHeap.Len()); err != nil {
209223 fmt.Fprintf(s.debugOut, "%s %s %v\n", "[mpb]", time.Now(), err)
210224 }
211225 }
212226
213 func (s *pState) flush(lineCount int) error {
227 func (s *pState) flush(done <-chan struct{}, lineCount int) error {
214228 for s.bHeap.Len() > 0 {
215229 bar := heap.Pop(s.bHeap).(*Bar)
216230 frameReader := <-bar.frameReaderCh
217231 defer func() {
218232 if frameReader.toShutdown {
233 // force next refresh asap, without waiting for ticker
234 go func() {
235 select {
236 case s.forceRefreshCh <- time.Now():
237 case <-done:
238 return
239 }
240 }()
219241 // shutdown at next flush, in other words decrement underlying WaitGroup
220242 // only after the bar with completed state has been flushed. this
221243 // ensures no bar ends up with less than 100% rendered.
244266 return s.cw.Flush(lineCount)
245267 }
246268
269 func (s *pState) manualOrTick() (<-chan time.Time, func()) {
270 if s.manualRefreshCh != nil {
271 return s.manualRefreshCh, func() {}
272 }
273 ticker := time.NewTicker(s.rr)
274 return ticker.C, ticker.Stop
275 }
276
277 func (s *pState) updateSyncMatrix() {
278 s.pMatrix = make(map[int][]chan int)
279 s.aMatrix = make(map[int][]chan int)
280 for i := 0; i < s.bHeap.Len(); i++ {
281 bar := (*s.bHeap)[i]
282 table := bar.wSyncTable()
283 pRow, aRow := table[0], table[1]
284
285 for i, ch := range pRow {
286 s.pMatrix[i] = append(s.pMatrix[i], ch)
287 }
288
289 for i, ch := range aRow {
290 s.aMatrix[i] = append(s.aMatrix[i], ch)
291 }
292 }
293 }
294
247295 func syncWidth(matrix map[int][]chan int) {
248296 for _, column := range matrix {
249297 column := column
261309 }()
262310 }
263311 }
312
313 func fanInRefreshSrc(done <-chan struct{}, channels ...<-chan time.Time) <-chan time.Time {
314 var wg sync.WaitGroup
315 multiplexedStream := make(chan time.Time)
316
317 multiplex := func(c <-chan time.Time) {
318 defer wg.Done()
319 for {
320 select {
321 case v := <-c:
322 multiplexedStream <- v
323 case <-done:
324 return
325 }
326 }
327 }
328
329 wg.Add(len(channels))
330 for _, c := range channels {
331 go multiplex(c)
332 }
333
334 go func() {
335 wg.Wait()
336 close(multiplexedStream)
337 }()
338
339 return multiplexedStream
340 }
+0
-70
progress_posix.go less more
0 // +build !windows
1
2 package mpb
3
4 import (
5 "os"
6 "os/signal"
7 "syscall"
8 "time"
9 )
10
11 func (p *Progress) serve(s *pState) {
12
13 var ticker *time.Ticker
14 var refreshCh <-chan time.Time
15 var winch chan os.Signal
16 var resumeTimer *time.Timer
17 var resumeEvent <-chan time.Time
18 winchIdleDur := s.rr * 2
19
20 if s.manualRefreshCh == nil {
21 ticker = time.NewTicker(s.rr)
22 refreshCh = ticker.C
23 winch = make(chan os.Signal, 2)
24 signal.Notify(winch, syscall.SIGWINCH)
25 } else {
26 refreshCh = s.manualRefreshCh
27 }
28
29 for {
30 select {
31 case op := <-p.operateState:
32 op(s)
33 case <-refreshCh:
34 if s.zeroWait {
35 if s.manualRefreshCh == nil {
36 signal.Stop(winch)
37 ticker.Stop()
38 }
39 if s.shutdownNotifier != nil {
40 close(s.shutdownNotifier)
41 }
42 close(p.done)
43 return
44 }
45 tw, err := s.cw.GetWidth()
46 if err != nil {
47 tw = s.width
48 }
49 s.render(tw)
50 case <-winch:
51 tw, err := s.cw.GetWidth()
52 if err != nil {
53 tw = s.width
54 }
55 s.render(tw - tw/8)
56 if resumeTimer != nil && resumeTimer.Reset(winchIdleDur) {
57 break
58 }
59 ticker.Stop()
60 resumeTimer = time.NewTimer(winchIdleDur)
61 resumeEvent = resumeTimer.C
62 case <-resumeEvent:
63 ticker = time.NewTicker(s.rr)
64 refreshCh = ticker.C
65 resumeEvent = nil
66 resumeTimer = nil
67 }
68 }
69 }
+0
-43
progress_windows.go less more
0 // +build windows
1
2 package mpb
3
4 import (
5 "time"
6 )
7
8 func (p *Progress) serve(s *pState) {
9
10 var ticker *time.Ticker
11 var refreshCh <-chan time.Time
12
13 if s.manualRefreshCh == nil {
14 ticker = time.NewTicker(s.rr)
15 refreshCh = ticker.C
16 } else {
17 refreshCh = s.manualRefreshCh
18 }
19
20 for {
21 select {
22 case op := <-p.operateState:
23 op(s)
24 case <-refreshCh:
25 if s.zeroWait {
26 if s.manualRefreshCh == nil {
27 ticker.Stop()
28 }
29 if s.shutdownNotifier != nil {
30 close(s.shutdownNotifier)
31 }
32 close(p.done)
33 return
34 }
35 tw, err := s.cw.GetWidth()
36 if err != nil {
37 tw = s.width
38 }
39 s.render(tw)
40 }
41 }
42 }