Codebase list golang-github-vbauerster-mpb / e36c394
iterate subscribed decorators concurrently Any decorator implemening either of these interfaces, is considered to be subscribed: - EwmaDecorator - AverageDecorator - ShutdownListener Vladimir Bauer 4 years ago
1 changed file(s) with 62 addition(s) and 30 deletion(s). Raw diff Collapse all Expand all
77 "log"
88 "runtime/debug"
99 "strings"
10 "sync"
1011 "time"
1112
1213 "github.com/acarl005/stripansi"
4950 total int64
5051 current int64
5152 refill int64
52 lastN int64
53 iterated bool
53 lastIncrement int64
5454 trimSpace bool
5555 completed bool
5656 completeFlushed bool
188188 func (b *Bar) SetCurrent(current int64) {
189189 select {
190190 case b.operateState <- func(s *bState) {
191 s.iterated = true
192 s.lastN = current - s.current
191 s.lastIncrement = current - s.current
193192 s.current = current
194193 if s.triggerComplete && s.current >= s.total {
195194 s.current = s.total
213212
214213 // IncrInt64 increments progress by amount of n.
215214 func (b *Bar) IncrInt64(n int64) {
216 select {
217 case b.operateState <- func(s *bState) {
218 s.iterated = true
219 s.lastN = n
215 if n <= 0 {
216 return
217 }
218 select {
219 case b.operateState <- func(s *bState) {
220 s.lastIncrement = n
220221 s.current += n
221222 if s.triggerComplete && s.current >= s.total {
222223 s.current = s.total
235236 func (b *Bar) DecoratorEwmaUpdate(dur time.Duration) {
236237 select {
237238 case b.operateState <- func(s *bState) {
238 ewmaIterationUpdate(false, s, dur)
239 }:
240 case <-b.done:
241 ewmaIterationUpdate(true, b.cacheState, dur)
239 if s.lastIncrement > 0 {
240 s.decoratorEwmaUpdate(dur)
241 s.lastIncrement = 0
242 } else {
243 panic("increment required before ewma iteration update")
244 }
245 }:
246 case <-b.done:
247 if b.cacheState.lastIncrement > 0 {
248 b.cacheState.decoratorEwmaUpdate(dur)
249 b.cacheState.lastIncrement = 0
250 }
242251 }
243252 }
244253
248257 func (b *Bar) DecoratorAverageAdjust(start time.Time) {
249258 select {
250259 case b.operateState <- func(s *bState) {
251 for _, d := range s.averageDecorators {
252 d.AverageAdjust(start)
253 }
260 s.decoratorAverageAdjust(start)
254261 }:
255262 case <-b.done:
256263 }
320327 case op := <-b.operateState:
321328 op(s)
322329 case <-ctx.Done():
323 // Notifying decorators about shutdown event
324 for _, sl := range s.shutdownListeners {
325 sl.Shutdown()
326 }
330 s.decoratorShutdownNotify()
327331 b.cacheState = s
328332 close(b.done)
329333 return
480484 return table
481485 }
482486
487 func (s bState) decoratorEwmaUpdate(dur time.Duration) {
488 wg := new(sync.WaitGroup)
489 wg.Add(len(s.ewmaDecorators))
490 for _, d := range s.ewmaDecorators {
491 d := d
492 go func() {
493 d.EwmaUpdate(s.lastIncrement, dur)
494 wg.Done()
495 }()
496 }
497 wg.Wait()
498 }
499
500 func (s bState) decoratorAverageAdjust(start time.Time) {
501 wg := new(sync.WaitGroup)
502 wg.Add(len(s.averageDecorators))
503 for _, d := range s.averageDecorators {
504 d := d
505 go func() {
506 d.AverageAdjust(start)
507 wg.Done()
508 }()
509 }
510 wg.Wait()
511 }
512
513 func (s bState) decoratorShutdownNotify() {
514 wg := new(sync.WaitGroup)
515 wg.Add(len(s.shutdownListeners))
516 for _, d := range s.shutdownListeners {
517 d := d
518 go func() {
519 d.Shutdown()
520 wg.Done()
521 }()
522 }
523 wg.Wait()
524 }
525
483526 func newStatistics(tw int, s *bState) decor.Statistics {
484527 return decor.Statistics{
485528 ID: s.id,
496539 return extractBaseDecorator(d.Base())
497540 }
498541 return d
499 }
500
501 func ewmaIterationUpdate(done bool, s *bState, dur time.Duration) {
502 if !done && !s.iterated {
503 panic("increment required before ewma iteration update")
504 } else {
505 s.iterated = false
506 }
507 for _, d := range s.ewmaDecorators {
508 d.EwmaUpdate(s.lastN, dur)
509 }
510542 }
511543
512544 func makePanicExtender(p interface{}) extenderFunc {