Codebase list golang-github-vbauerster-mpb / 4a25556
refactoring: drop sync.Once Vladimir Bauer 3 years ago
1 changed file(s) with 56 addition(s) and 68 deletion(s). Raw diff Collapse all Expand all
2424 type Progress struct {
2525 ctx context.Context
2626 uwg *sync.WaitGroup
27 cwg *sync.WaitGroup
2827 bwg *sync.WaitGroup
2928 operateState chan func(*pState)
3029 interceptIo chan func(io.Writer)
3130 done chan struct{}
32 once sync.Once
31 shutdown chan struct{}
3332 cancel func()
3433 }
3534
7271 // method has been called.
7372 func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
7473 s := &pState{
75 rows: make([]io.Reader, 0, 64),
76 pool: make([]*Bar, 0, 64),
77 refreshRate: defaultRefreshRate,
78 popPriority: math.MinInt32,
79 manualRefresh: make(chan interface{}),
80 shutdownNotifier: make(chan struct{}),
81 queueBars: make(map[*Bar]*Bar),
82 output: os.Stdout,
83 debugOut: io.Discard,
74 rows: make([]io.Reader, 0, 64),
75 pool: make([]*Bar, 0, 64),
76 refreshRate: defaultRefreshRate,
77 popPriority: math.MinInt32,
78 manualRefresh: make(chan interface{}),
79 queueBars: make(map[*Bar]*Bar),
80 output: os.Stdout,
81 debugOut: io.Discard,
8482 }
8583
8684 for _, opt := range options {
9391 p := &Progress{
9492 ctx: ctx,
9593 uwg: s.uwg,
96 cwg: new(sync.WaitGroup),
9794 bwg: new(sync.WaitGroup),
9895 operateState: make(chan func(*pState)),
9996 interceptIo: make(chan func(io.Writer)),
10198 cancel: cancel,
10299 }
103100
104 p.cwg.Add(1)
101 if s.shutdownNotifier != nil {
102 p.shutdown = s.shutdownNotifier
103 s.shutdownNotifier = nil
104 } else {
105 p.shutdown = make(chan struct{})
106 }
107
105108 go p.serve(s, cwriter.New(s.output))
106109 return p
107110 }
224227 p.uwg.Wait()
225228 }
226229
227 // wait for bars to quit, if any
228230 p.bwg.Wait()
229 // shutdown
230 p.once.Do(p.shutdown)
231 // wait for container to quit
232 p.cwg.Wait()
231 p.Shutdown()
233232 }
234233
235234 // Shutdown cancels any running bar immediately and then shutdowns (*Progress)
237236 // are doing. Proper way to shutdown is to call (*Progress).Wait() instead.
238237 func (p *Progress) Shutdown() {
239238 p.cancel()
240 p.bwg.Wait()
241 p.once.Do(p.shutdown)
242 p.cwg.Wait()
243 }
244
245 func (p *Progress) shutdown() {
246 close(p.done)
239 <-p.shutdown
240 }
241
242 func (p *Progress) newTicker(s *pState) chan time.Time {
243 ch := make(chan time.Time)
244 go func() {
245 var autoRefresh <-chan time.Time
246 if !s.disableAutoRefresh && !s.outputDiscarded {
247 if s.renderDelay != nil {
248 <-s.renderDelay
249 }
250 ticker := time.NewTicker(s.refreshRate)
251 defer ticker.Stop()
252 autoRefresh = ticker.C
253 }
254 for {
255 select {
256 case t := <-autoRefresh:
257 ch <- t
258 case x := <-s.manualRefresh:
259 if t, ok := x.(time.Time); ok {
260 ch <- t
261 } else {
262 ch <- time.Now()
263 }
264 case <-p.ctx.Done():
265 close(p.done)
266 return
267 }
268 }
269 }()
270 return ch
247271 }
248272
249273 func (p *Progress) serve(s *pState, cw *cwriter.Writer) {
250 defer p.cwg.Done()
274 defer close(p.shutdown)
251275
252276 render := func() error {
253277 if s.bHeap.Len() == 0 {
256280 return s.render(cw)
257281 }
258282
259 refreshCh := s.newTicker(p.done)
283 refreshCh := p.newTicker(s)
260284
261285 for {
262286 select {
267291 case <-refreshCh:
268292 err := render()
269293 if err != nil {
270 go func() {
271 p.bwg.Wait()
272 p.once.Do(p.shutdown)
273 }()
274 render = func() error {
275 s.heapUpdated = false
276 return nil
277 }
294 s.heapUpdated = false
295 render = func() error { return nil }
278296 _, _ = fmt.Fprintln(s.debugOut, err)
279297 p.cancel() // cancel all bars
280298 }
281 case <-s.shutdownNotifier:
299 case <-p.done:
282300 for s.heapUpdated {
283301 err := render()
284302 if err != nil {
322340 b := heap.Pop(&s.bHeap).(*Bar)
323341 frame := <-b.frameCh
324342 if frame.err != nil {
343 s.rows = s.rows[:0]
325344 return frame.err
326345 }
327346 var usedRows int
399418 return err
400419 }
401420
402 func (s *pState) newTicker(done <-chan struct{}) chan time.Time {
403 ch := make(chan time.Time)
404 go func() {
405 var autoRefresh <-chan time.Time
406 if !s.disableAutoRefresh && !s.outputDiscarded {
407 if s.renderDelay != nil {
408 <-s.renderDelay
409 }
410 ticker := time.NewTicker(s.refreshRate)
411 defer ticker.Stop()
412 autoRefresh = ticker.C
413 }
414 for {
415 select {
416 case t := <-autoRefresh:
417 ch <- t
418 case x := <-s.manualRefresh:
419 if t, ok := x.(time.Time); ok {
420 ch <- t
421 } else {
422 ch <- time.Now()
423 }
424 case <-done:
425 close(s.shutdownNotifier)
426 return
427 }
428 }
429 }()
430 return ch
431 }
432
433421 func (s *pState) updateSyncMatrix() {
434422 s.pMatrix = make(map[int][]chan int)
435423 s.aMatrix = make(map[int][]chan int)
483471 }
484472
485473 func syncWidth(wg *sync.WaitGroup, matrix map[int][]chan int) {
474 wg.Add(len(matrix))
486475 for _, column := range matrix {
487 wg.Add(1)
488476 go maxWidthDistributor(wg, column)
489477 }
490478 }