Codebase list golang-github-vbauerster-mpb / 19f0fe8
Decouple eta from core Vladimir Bauer 8 years ago
4 changed file(s) with 147 addition(s) and 117 deletion(s). Raw diff Collapse all Expand all
88 "time"
99 "unicode/utf8"
1010
11 "github.com/VividCortex/ewma"
1211 "github.com/vbauerster/mpb/decor"
1312 )
1413
3534 cacheState *bState
3635 operateState chan func(*bState)
3736 frameReaderCh chan io.Reader
38 startBlockCh <-chan time.Time
3937
4038 // done is closed by Bar's goroutine, after cacheState is written
4139 done chan struct{}
6462 timeElapsed time.Duration
6563 aDecorators []decor.Decorator
6664 pDecorators []decor.Decorator
65 amountReceivers []decor.AmountReceiver
66 shutdownListeners []decor.ShutdownListener
6767 refill *refill
6868 bufP, bufB, bufA *bytes.Buffer
6969 panicMsg string
7070
71 ewmAverage ewma.MovingAverage
72
7371 // following options are assigned to the *Bar
74 priority int
75 runningBar *Bar
76 startBlockCh chan time.Time
72 priority int
73 runningBar *Bar
7774 }
7875 refill struct {
7976 char rune
112109 b := &Bar{
113110 priority: s.priority,
114111 runningBar: s.runningBar,
115 startBlockCh: s.startBlockCh,
116112 operateState: make(chan func(*bState)),
117113 frameReaderCh: make(chan io.Reader, 1),
118114 done: make(chan struct{}),
119115 shutdown: make(chan struct{}),
120116 }
121
122 s.startBlockCh = nil
123117
124118 if b.runningBar != nil {
125119 b.priority = b.runningBar.priority
155149 proxyReader.startBlockCh = startBlock[0]
156150 }
157151 return proxyReader
158 }
159
160 // Increment is a shorthand for b.IncrBy(1)
161 func (b *Bar) Increment() {
162 b.IncrBy(1)
163152 }
164153
165154 // ResumeFill fills bar with different r rune,
240229 }
241230 }
242231
232 // Increment is a shorthand for b.IncrBy(1)
233 func (b *Bar) Increment() {
234 b.IncrBy(1)
235 }
236
243237 // IncrBy increments progress bar by amount of n
244238 func (b *Bar) IncrBy(n int) {
245 now := time.Now()
246239 select {
247240 case b.operateState <- func(s *bState) {
248241 s.current += int64(n)
249 s.timeElapsed = now.Sub(s.startTime)
250 if s.ewmAverage != nil {
251 lastBlockTime := now.Sub(s.blockStartTime)
252 lastItemEstimate := float64(lastBlockTime) / float64(n)
253 s.ewmAverage.Add(lastItemEstimate)
254 }
255242 if s.dynamic {
256243 curp := decor.CalcPercentage(s.total, s.current, 100)
257244 if 100-curp <= s.totalAutoIncrTrigger {
261248 s.current = s.total
262249 s.toComplete = true
263250 }
251 for _, ar := range s.amountReceivers {
252 ar.NextAmount(n)
253 }
254 s.timeElapsed = time.Since(s.startTime)
264255 }:
265256 case <-b.done:
266257 }
280271 select {
281272 case op := <-b.operateState:
282273 op(s)
283 case now := <-b.startBlockCh:
284 s.blockStartTime = now
285274 case <-cancel:
286275 s.toComplete = true
287276 cancel = nil
288277 case <-b.shutdown:
289278 b.cacheState = s
290279 close(b.done)
280 for _, sl := range s.shutdownListeners {
281 sl.Shutdown()
282 }
291283 return
292284 }
293285 }
1111 func AppendDecorators(appenders ...decor.Decorator) BarOption {
1212 return func(s *bState) {
1313 for _, decorator := range appenders {
14 if t, ok := decorator.(*decor.EwmaETA); ok {
15 s.ewmAverage = t
16 s.startBlockCh = t.StartBlockCh
14 if ar, ok := decorator.(decor.AmountReceiver); ok {
15 s.amountReceivers = append(s.amountReceivers, ar)
16 }
17 if sl, ok := decorator.(decor.ShutdownListener); ok {
18 s.shutdownListeners = append(s.shutdownListeners, sl)
1719 }
1820 s.aDecorators = append(s.aDecorators, decorator)
1921 }
2426 func PrependDecorators(prependers ...decor.Decorator) BarOption {
2527 return func(s *bState) {
2628 for _, decorator := range prependers {
27 if t, ok := decorator.(*decor.EwmaETA); ok {
28 s.ewmAverage = t
29 s.startBlockCh = t.StartBlockCh
29 if ar, ok := decorator.(decor.AmountReceiver); ok {
30 s.amountReceivers = append(s.amountReceivers, ar)
31 }
32 if sl, ok := decorator.(decor.ShutdownListener); ok {
33 s.shutdownListeners = append(s.shutdownListeners, sl)
3034 }
3135 s.pDecorators = append(s.pDecorators, decorator)
3236 }
44 "math"
55 "time"
66 "unicode/utf8"
7
8 "github.com/VividCortex/ewma"
97 )
108
119 const (
5957 Decor(*Statistics, chan<- int, <-chan int) string
6058 }
6159
62 // CompleteMessenger is an interface with one method:
63 //
64 // OnComplete(message string, wc ...WC)
60 // OnCompleteMessenger is an interface with one method:
61 //
62 // OnCompleteMessage(message string, wc ...WC)
6563 //
6664 // Decorators implementing this interface suppose to return provided string on complete event.
67 type CompleteMessenger interface {
68 OnComplete(string, ...WC)
65 type OnCompleteMessenger interface {
66 OnCompleteMessage(string, ...WC)
67 }
68
69 type AmountReceiver interface {
70 NextAmount(int)
71 }
72
73 type ShutdownListener interface {
74 Shutdown()
6975 }
7076
7177 // DecoratorFunc is an adapter for Decorator interface
128134 //
129135 // `wc` optional WC config
130136 func OnComplete(decorator Decorator, message string, wc ...WC) Decorator {
131 if cm, ok := decorator.(CompleteMessenger); ok {
132 cm.OnComplete(message, wc...)
137 if cm, ok := decorator.(OnCompleteMessenger); ok {
138 cm.OnCompleteMessage(message, wc...)
133139 return decorator
134140 }
135141 msgDecorator := Name(message, wc...)
217223 }
218224 return wc0.formatMsg(str, widthAccumulator, widthDistributor)
219225 })
220 }
221
222 // ETA returns exponential-weighted-moving-average ETA decorator.
223 //
224 // `style` one of [ET_STYLE_GO|ET_STYLE_HHMMSS|ET_STYLE_HHMM|ET_STYLE_MMSS]
225 //
226 // `age` is related to the decay factor alpha by the formula given for the DECAY constant.
227 // It signifies the average age of the samples as time goes to infinity. Basically age is
228 // the previous N samples to average over. If zero value provided, it defaults to 30.
229 //
230 // `startBlock` is channel, user suppose to send time.Now() on each iteration of block start.
231 //
232 // `wc` optional WC config
233 func ETA(style int, age float64, startBlock chan time.Time, wc ...WC) Decorator {
234 var wc0 WC
235 if len(wc) > 0 {
236 wc0 = wc[0]
237 }
238 if age == .0 {
239 age = ewma.AVG_METRIC_AGE
240 }
241 return &EwmaETA{
242 MovingAverage: ewma.NewMovingAverage(age),
243 StartBlockCh: startBlock,
244 style: style,
245 wc: wc0,
246 }
247 }
248
249 // EwmaETA is a struct, which implements ewma based ETA decorator.
250 // Normally should not be used directly, use helper func instead:
251 //
252 // decor.ETA(int, float64, chan time.Time, ...decor.WC)
253 type EwmaETA struct {
254 ewma.MovingAverage
255 StartBlockCh chan time.Time
256 style int
257 wc WC
258 onComplete *struct {
259 msg string
260 wc WC
261 }
262 }
263
264 func (s *EwmaETA) Decor(st *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string {
265 if st.Completed && s.onComplete != nil {
266 return s.onComplete.wc.formatMsg(s.onComplete.msg, widthAccumulator, widthDistributor)
267 }
268
269 var str string
270 timeRemaining := time.Duration(st.Total-st.Current) * time.Duration(round(s.Value()))
271 hours := int64((timeRemaining / time.Hour) % 60)
272 minutes := int64((timeRemaining / time.Minute) % 60)
273 seconds := int64((timeRemaining / time.Second) % 60)
274
275 switch s.style {
276 case ET_STYLE_GO:
277 str = fmt.Sprint(time.Duration(timeRemaining.Seconds()) * time.Second)
278 case ET_STYLE_HHMMSS:
279 str = fmt.Sprintf("%02d:%02d:%02d", hours, minutes, seconds)
280 case ET_STYLE_HHMM:
281 str = fmt.Sprintf("%02d:%02d", hours, minutes)
282 case ET_STYLE_MMSS:
283 str = fmt.Sprintf("%02d:%02d", minutes, seconds)
284 }
285
286 return s.wc.formatMsg(str, widthAccumulator, widthDistributor)
287 }
288
289 func (s *EwmaETA) OnComplete(msg string, wc ...WC) {
290 var wc0 WC
291 if len(wc) > 0 {
292 wc0 = wc[0]
293 }
294 s.onComplete = &struct {
295 msg string
296 wc WC
297 }{msg, wc0}
298226 }
299227
300228 // Elapsed returns elapsed time decorator.
0 package decor
1
2 import (
3 "fmt"
4 "time"
5
6 "github.com/VividCortex/ewma"
7 )
8
9 // ETA returns exponential-weighted-moving-average ETA decorator.
10 //
11 // `style` one of [ET_STYLE_GO|ET_STYLE_HHMMSS|ET_STYLE_HHMM|ET_STYLE_MMSS]
12 //
13 // `age` is related to the decay factor alpha by the formula given for the DECAY constant.
14 // It signifies the average age of the samples as time goes to infinity. Basically age is
15 // the previous N samples to average over. If zero value provided, it defaults to 30.
16 //
17 // `startBlock` is a time.Time receive channel. User suppose to send time.Now()
18 // to this channel on each iteration of block start, right before actual job.
19 // The channel will be closed automatically on bar shutdown event, so there is
20 // no need to close from user side.
21 //
22 // `wc` optional WC config
23 func ETA(style int, age float64, startBlock chan time.Time, wc ...WC) Decorator {
24 var wc0 WC
25 if len(wc) > 0 {
26 wc0 = wc[0]
27 }
28 if age == .0 {
29 age = ewma.AVG_METRIC_AGE
30 }
31 eta := &ewmaETA{
32 mAverage: ewma.NewMovingAverage(age),
33 startBlockReceiver: startBlock,
34 startBlockStreamer: make(chan time.Time),
35 style: style,
36 wc: wc0,
37 }
38 go eta.serve()
39 return eta
40 }
41
42 type ewmaETA struct {
43 mAverage ewma.MovingAverage
44 startBlockReceiver chan time.Time
45 startBlockStreamer chan time.Time
46 style int
47 wc WC
48 onComplete *struct {
49 msg string
50 wc WC
51 }
52 }
53
54 func (s *ewmaETA) Decor(st *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string {
55 if st.Completed && s.onComplete != nil {
56 return s.onComplete.wc.formatMsg(s.onComplete.msg, widthAccumulator, widthDistributor)
57 }
58
59 var str string
60 timeRemaining := time.Duration(st.Total-st.Current) * time.Duration(round(s.mAverage.Value()))
61 hours := int64((timeRemaining / time.Hour) % 60)
62 minutes := int64((timeRemaining / time.Minute) % 60)
63 seconds := int64((timeRemaining / time.Second) % 60)
64
65 switch s.style {
66 case ET_STYLE_GO:
67 str = fmt.Sprint(time.Duration(timeRemaining.Seconds()) * time.Second)
68 case ET_STYLE_HHMMSS:
69 str = fmt.Sprintf("%02d:%02d:%02d", hours, minutes, seconds)
70 case ET_STYLE_HHMM:
71 str = fmt.Sprintf("%02d:%02d", hours, minutes)
72 case ET_STYLE_MMSS:
73 str = fmt.Sprintf("%02d:%02d", minutes, seconds)
74 }
75
76 return s.wc.formatMsg(str, widthAccumulator, widthDistributor)
77 }
78
79 func (s *ewmaETA) NextAmount(n int) {
80 sb := <-s.startBlockStreamer
81 lastBlockTime := time.Since(sb)
82 lastItemEstimate := float64(lastBlockTime) / float64(n)
83 s.mAverage.Add(lastItemEstimate)
84 }
85
86 func (s *ewmaETA) OnCompleteMessage(msg string, wc ...WC) {
87 var wc0 WC
88 if len(wc) > 0 {
89 wc0 = wc[0]
90 }
91 s.onComplete = &struct {
92 msg string
93 wc WC
94 }{msg, wc0}
95 }
96
97 func (s *ewmaETA) Shutdown() {
98 close(s.startBlockReceiver)
99 }
100
101 func (s *ewmaETA) serve() {
102 for now := range s.startBlockReceiver {
103 s.startBlockStreamer <- now
104 }
105 }