Codebase list golang-github-vbauerster-mpb / cb0c8d9
refactoring refresh src chan Vladimir Bauer 6 years ago
3 changed file(s) with 36 addition(s) and 72 deletion(s). Raw diff Collapse all Expand all
383383 func (b *Bar) refreshNowTillShutdown() {
384384 for {
385385 select {
386 case b.container.forceRefresh <- time.Now():
386 case b.container.refreshCh <- time.Now():
387387 case <-b.done:
388388 return
389389 }
4242 // Refresh will occur upon receive value from provided ch.
4343 func WithManualRefresh(ch <-chan time.Time) ContainerOption {
4444 return func(s *pState) {
45 s.manualRefresh = ch
45 s.refreshSrc = ch
4646 }
4747 }
4848
7070 func WithOutput(w io.Writer) ContainerOption {
7171 return func(s *pState) {
7272 if w == nil {
73 s.manualRefresh = make(chan time.Time)
73 s.refreshSrc = make(chan time.Time)
7474 s.output = ioutil.Discard
7575 return
7676 }
2929 bwg *sync.WaitGroup
3030 operateState chan func(*pState)
3131 done chan struct{}
32 forceRefresh chan time.Time
32 refreshCh chan time.Time
3333 once sync.Once
3434 dlogger *log.Logger
3535 }
4848 popCompleted bool
4949 rr time.Duration
5050 uwg *sync.WaitGroup
51 manualRefresh <-chan time.Time
51 refreshSrc <-chan time.Time
5252 renderDelay <-chan struct{}
5353 shutdownNotifier chan struct{}
5454 parkedBars map[*Bar]*Bar
8787 cwg: new(sync.WaitGroup),
8888 bwg: new(sync.WaitGroup),
8989 operateState: make(chan func(*pState)),
90 forceRefresh: make(chan time.Time),
9190 done: make(chan struct{}),
9291 dlogger: log.New(s.debugOut, "[mpb] ", log.Lshortfile),
9392 }
205204 func (p *Progress) serve(s *pState, cw *cwriter.Writer) {
206205 defer p.cwg.Done()
207206
208 manualOrTickCh, cleanUp := s.manualOrTick()
209 defer cleanUp()
210
211 refreshCh := fanInRefreshSrc(p.done, s.renderDelay, p.forceRefresh, manualOrTickCh)
207 p.refreshCh = s.newTicker(p.done)
212208
213209 for {
214210 select {
215211 case op := <-p.operateState:
216212 op(s)
217 case _, ok := <-refreshCh:
218 if !ok {
219 if s.shutdownNotifier != nil {
220 close(s.shutdownNotifier)
221 }
222 return
223 }
213 case <-p.refreshCh:
224214 if err := s.render(cw); err != nil {
225 p.dlogger.Println(err)
226 }
215 go p.dlogger.Println(err)
216 }
217 case <-s.shutdownNotifier:
218 return
227219 }
228220 }
229221 }
303295 return cw.Flush(lineCount)
304296 }
305297
306 func (s *pState) manualOrTick() (<-chan time.Time, func()) {
307 if s.manualRefresh != nil {
308 return s.manualRefresh, func() {}
309 }
310 ticker := time.NewTicker(s.rr)
311 return ticker.C, ticker.Stop
298 func (s *pState) newTicker(done <-chan struct{}) chan time.Time {
299 ch := make(chan time.Time)
300 if s.shutdownNotifier == nil {
301 s.shutdownNotifier = make(chan struct{})
302 }
303 go func() {
304 if s.renderDelay != nil {
305 <-s.renderDelay
306 }
307 if s.refreshSrc == nil {
308 ticker := time.NewTicker(s.rr)
309 defer ticker.Stop()
310 s.refreshSrc = ticker.C
311 }
312 for {
313 select {
314 case tick := <-s.refreshSrc:
315 ch <- tick
316 case <-done:
317 close(s.shutdownNotifier)
318 return
319 }
320 }
321 }()
322 return ch
312323 }
313324
314325 func (s *pState) updateSyncMatrix() {
373384 }
374385 }
375386
376 func fanInRefreshSrc(done, delay <-chan struct{}, channels ...<-chan time.Time) <-chan time.Time {
377 var wg sync.WaitGroup
378 multiplexedStream := make(chan time.Time)
379 start := make(chan struct{})
380
381 multiplex := func(c <-chan time.Time) {
382 defer wg.Done()
383 <-start
384 // source channels are never closed (time.Ticker never closes associated
385 // channel), so we cannot simply range over a c, instead we use select
386 // inside infinite loop
387 for {
388 select {
389 case v := <-c:
390 select {
391 case multiplexedStream <- v:
392 case <-done:
393 return
394 }
395 case <-done:
396 return
397 }
398 }
399 }
400
401 if delay != nil {
402 go func() {
403 <-delay
404 close(start)
405 }()
406 } else {
407 close(start)
408 }
409
410 wg.Add(len(channels))
411 for _, c := range channels {
412 go multiplex(c)
413 }
414
415 go func() {
416 wg.Wait()
417 close(multiplexedStream)
418 }()
419
420 return multiplexedStream
421 }
422
423387 func extractBaseFiller(f Filler) Filler {
424388 if f, ok := f.(Wrapper); ok {
425389 return extractBaseFiller(f.Base())