Codebase list golang-github-vbauerster-mpb / ee30446
MovingAverage interface Vladimir Bauer 8 years ago
5 changed file(s) with 135 addition(s) and 58 deletion(s). Raw diff Collapse all Expand all
2323
2424 const (
2525 _ = iota
26 unitKiB
27 unitKB
26 UnitKiB
27 UnitKB
2828 )
2929
3030 type CounterKiB int64
154154 //
155155 // "%.1f / %.1f" = "1.0MiB / 12.0MiB" or "% .1f / % .1f" = "1.0 MiB / 12.0 MiB"
156156 func CountersKibiByte(pairFormat string, wcc ...WC) Decorator {
157 return counters(unitKiB, pairFormat, wcc...)
157 return counters(UnitKiB, pairFormat, wcc...)
158158 }
159159
160160 // CountersKiloByte returns human friendly byte counters decorator, where counters unit is multiple by 1000.
167167 //
168168 // "%.1f / %.1f" = "1.0MB / 12.0MB" or "% .1f / % .1f" = "1.0 MB / 12.0 MB"
169169 func CountersKiloByte(pairFormat string, wcc ...WC) Decorator {
170 return counters(unitKB, pairFormat, wcc...)
170 return counters(UnitKB, pairFormat, wcc...)
171171 }
172172
173173 func counters(unit int, pairFormat string, wcc ...WC) Decorator {
179179 return DecoratorFunc(func(s *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string {
180180 var str string
181181 switch unit {
182 case unitKiB:
182 case UnitKiB:
183183 str = fmt.Sprintf(pairFormat, CounterKiB(s.Current), CounterKiB(s.Total))
184 case unitKB:
184 case UnitKB:
185185 str = fmt.Sprintf(pairFormat, CounterKB(s.Current), CounterKB(s.Total))
186186 default:
187187 str = fmt.Sprintf(pairFormat, s.Current, s.Total)
1515 // `age` is the previous N samples to average over.
1616 // If zero value provided, it defaults to 30.
1717 //
18 // `sbCh` is a start block receive channel. User suppose to send time.Now()
18 // `sb` is a start block receive channel. User suppose to send time.Now()
1919 // to this channel on each iteration of a start block, right before actual job.
2020 // The channel will be auto closed on bar shutdown event, so there is no need
2121 // to close from user side.
2222 //
2323 // `wcc` optional WC config
24 func ETA(style int, age float64, sbCh chan time.Time, wcc ...WC) Decorator {
25 if sbCh == nil {
24 func ETA(style int, age float64, sb chan time.Time, wcc ...WC) Decorator {
25 return MovingAverageETA(style, ewma.NewMovingAverage(age), sb, wcc...)
26 }
27
28 // MovingAverageETA returns ETA decorator, which relies on MovingAverage implementation to calculate average.
29 // Default ETA decorator relies on ewma implementation. However you're free to provide your own implementation
30 // or use alternative one, which is provided by decor package:
31 //
32 // decor.MovingAverageETA(decor.ET_STYLE_GO, decor.NewMedianMovingAverage(), sb)
33 func MovingAverageETA(style int, average MovingAverage, sb chan time.Time, wcc ...WC) Decorator {
34 if sb == nil {
2635 panic("start block channel must not be nil")
2736 }
2837 var wc WC
3039 wc = widthConf
3140 }
3241 wc.BuildFormat()
33 if age == .0 {
34 age = ewma.AVG_METRIC_AGE
35 }
36 d := &ewmaETA{
42 d := &movingAverageETA{
3743 style: style,
3844 wc: wc,
39 mAverage: ewma.NewMovingAverage(age),
40 sbReceiver: sbCh,
45 average: average,
46 sbReceiver: sb,
4147 sbStreamer: make(chan time.Time),
4248 }
4349 go d.serve()
4450 return d
4551 }
4652
47 type ewmaETA struct {
53 type movingAverageETA struct {
4854 style int
4955 wc WC
50 mAverage ewma.MovingAverage
56 average ewma.MovingAverage
5157 sbReceiver chan time.Time
5258 sbStreamer chan time.Time
5359 onComplete *struct {
5662 }
5763 }
5864
59 func (s *ewmaETA) Decor(st *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string {
65 func (s *movingAverageETA) Decor(st *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string {
6066 if st.Completed && s.onComplete != nil {
6167 return s.onComplete.wc.FormatMsg(s.onComplete.msg, widthAccumulator, widthDistributor)
6268 }
6369
64 v := internal.Round(s.mAverage.Value())
70 v := internal.Round(s.average.Value())
6571 if math.IsInf(v, 0) || math.IsNaN(v) {
6672 v = .0
6773 }
8591 return s.wc.FormatMsg(str, widthAccumulator, widthDistributor)
8692 }
8793
88 func (s *ewmaETA) NextAmount(n int) {
94 func (s *movingAverageETA) NextAmount(n int) {
8995 sb := <-s.sbStreamer
9096 lastBlockTime := time.Since(sb)
9197 lastItemEstimate := float64(lastBlockTime) / float64(n)
92 s.mAverage.Add(lastItemEstimate)
98 s.average.Add(lastItemEstimate)
9399 }
94100
95 func (s *ewmaETA) OnCompleteMessage(msg string, wcc ...WC) {
101 func (s *movingAverageETA) OnCompleteMessage(msg string, wcc ...WC) {
96102 var wc WC
97103 for _, widthConf := range wcc {
98104 wc = widthConf
104110 }{msg, wc}
105111 }
106112
107 func (s *ewmaETA) Shutdown() {
113 func (s *movingAverageETA) Shutdown() {
108114 close(s.sbReceiver)
109115 }
110116
111 func (s *ewmaETA) serve() {
117 func (s *movingAverageETA) serve() {
112118 for now := range s.sbReceiver {
113119 s.sbStreamer <- now
114120 }
0 package decor
1
2 import (
3 "sort"
4
5 "github.com/VividCortex/ewma"
6 )
7
8 // MovingAverage is the interface that computes a moving average over a time-
9 // series stream of numbers. The average may be over a window or exponentially
10 // decaying.
11 type MovingAverage interface {
12 Add(float64)
13 Value() float64
14 Set(float64)
15 }
16
17 type median struct {
18 window [3]float64
19 dst []float64
20 }
21
22 type sortable []float64
23
24 func (s sortable) Len() int { return len(s) }
25 func (s sortable) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
26 func (s sortable) Less(i, j int) bool { return s[i] < s[j] }
27
28 func (s *median) Add(v float64) {
29 s.window[0], s.window[1] = s.window[1], s.window[2]
30 s.window[2] = v
31 }
32
33 func (s *median) Value() float64 {
34 copy(s.dst, s.window[:])
35 sort.Sort(sortable(s.dst))
36 return s.dst[1]
37 }
38
39 func (s *median) Set(value float64) {
40 for i, _ := range s.window {
41 s.window[i] = value
42 }
43 }
44
45 // NewMedian is fixed last 3 samples median MovingAverage.
46 func NewMedian() MovingAverage {
47 return &median{
48 dst: make([]float64, 3),
49 }
50 }
51
52 type medianEwma struct {
53 MovingAverage
54 median MovingAverage
55 }
56
57 func (s medianEwma) Add(v float64) {
58 s.median.Add(v)
59 s.MovingAverage.Add(s.median.Value())
60 }
61
62 // NewMedianEwma is ewma based MovingAverage, which gets its values from median MovingAverage.
63 func NewMedianEwma(age ...float64) MovingAverage {
64 return medianEwma{
65 MovingAverage: ewma.NewMovingAverage(age...),
66 median: NewMedian(),
67 }
68 }
124124 // `age` is the previous N samples to average over.
125125 // If zero value provided, it defaults to 30.
126126 //
127 // `sbCh` is a start block receive channel. User suppose to send time.Now()
127 // `sb` is a start block receive channel. User suppose to send time.Now()
128128 // to this channel on each iteration of a start block, right before actual job.
129129 // The channel will be auto closed on bar shutdown event, so there is no need
130130 // to close from user side.
134134 // unitFormat example:
135135 //
136136 // "%.1f" = "1.0" or "% .1f" = "1.0"
137 func SpeedNoUnit(unitFormat string, age float64, sbCh chan time.Time, wcc ...WC) Decorator {
138 return speed(0, unitFormat, age, sbCh, wcc...)
137 func SpeedNoUnit(unitFormat string, age float64, sb chan time.Time, wcc ...WC) Decorator {
138 return MovingAverageSpeed(0, unitFormat, ewma.NewMovingAverage(age), sb, wcc...)
139139 }
140140
141141 // SpeedKibiByte returns human friendly I/O operation speed decorator,
145145 // `age` is the previous N samples to average over.
146146 // If zero value provided, it defaults to 30.
147147 //
148 // `sbCh` is a start block receive channel. User suppose to send time.Now()
148 // `sb` is a start block receive channel. User suppose to send time.Now()
149149 // to this channel on each iteration of a start block, right before actual job.
150150 // The channel will be auto closed on bar shutdown event, so there is no need
151151 // to close from user side.
155155 // unitFormat example:
156156 //
157157 // "%.1f" = "1.0MiB/s" or "% .1f" = "1.0 MiB/s"
158 func SpeedKibiByte(unitFormat string, age float64, sbCh chan time.Time, wcc ...WC) Decorator {
159 return speed(unitKiB, unitFormat, age, sbCh, wcc...)
158 func SpeedKibiByte(unitFormat string, age float64, sb chan time.Time, wcc ...WC) Decorator {
159 return MovingAverageSpeed(UnitKiB, unitFormat, ewma.NewMovingAverage(age), sb, wcc...)
160160 }
161161
162162 // SpeedKiloByte returns human friendly I/O operation speed decorator,
166166 // `age` is the previous N samples to average over.
167167 // If zero value provided, it defaults to 30.
168168 //
169 // `sbCh` is a start block receive channel. User suppose to send time.Now()
169 // `sb` is a start block receive channel. User suppose to send time.Now()
170170 // to this channel on each iteration of a start block, right before actual job.
171171 // The channel will be auto closed on bar shutdown event, so there is no need
172172 // to close from user side.
176176 // unitFormat example:
177177 //
178178 // "%.1f" = "1.0MB/s" or "% .1f" = "1.0 MB/s"
179 func SpeedKiloByte(unitFormat string, age float64, sbCh chan time.Time, wcc ...WC) Decorator {
180 return speed(unitKB, unitFormat, age, sbCh, wcc...)
181 }
182
183 func speed(unit int, unitFormat string, age float64, sbCh chan time.Time, wcc ...WC) Decorator {
184 if sbCh == nil {
179 func SpeedKiloByte(unitFormat string, age float64, sb chan time.Time, wcc ...WC) Decorator {
180 return MovingAverageSpeed(UnitKB, unitFormat, ewma.NewMovingAverage(age), sb, wcc...)
181 }
182
183 // MovingAverageSpeed returns Speed decorator, which relies on MovingAverage implementation to calculate average.
184 // Default Speed decorator relies on ewma implementation. However you're free to provide your own implementation
185 // or use alternative one, which is provided by decor package:
186 //
187 // decor.MovingAverageSpeed(decor.UnitKiB, "% .2f", decor.NewMedianMovingAverage(), sb)
188 func MovingAverageSpeed(unit int, unitFormat string, average MovingAverage, sb chan time.Time, wcc ...WC) Decorator {
189 if sb == nil {
185190 panic("start block channel must not be nil")
186191 }
187192 var wc WC
189194 wc = widthConf
190195 }
191196 wc.BuildFormat()
192 if age == .0 {
193 age = ewma.AVG_METRIC_AGE
194 }
195 d := &ewmaSpeed{
197 d := &movingAverageSpeed{
196198 unit: unit,
197199 unitFormat: unitFormat,
198200 wc: wc,
199 mAverage: ewma.NewMovingAverage(age),
200 sbReceiver: sbCh,
201 average: average,
202 sbReceiver: sb,
201203 sbStreamer: make(chan time.Time),
202204 }
203205 go d.serve()
204206 return d
205207 }
206208
207 type ewmaSpeed struct {
209 type movingAverageSpeed struct {
208210 unit int
209211 unitFormat string
210212 wc WC
211 mAverage ewma.MovingAverage
213 average ewma.MovingAverage
212214 sbReceiver chan time.Time
213215 sbStreamer chan time.Time
214216 onComplete *struct {
217219 }
218220 }
219221
220 func (s *ewmaSpeed) Decor(st *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string {
222 func (s *movingAverageSpeed) Decor(st *Statistics, widthAccumulator chan<- int, widthDistributor <-chan int) string {
221223 if st.Completed && s.onComplete != nil {
222224 return s.onComplete.wc.FormatMsg(s.onComplete.msg, widthAccumulator, widthDistributor)
223225 }
224226 var str string
225 speed := s.mAverage.Value()
227 speed := s.average.Value()
226228 switch s.unit {
227 case unitKiB:
229 case UnitKiB:
228230 str = fmt.Sprintf(s.unitFormat, SpeedKiB(speed))
229 case unitKB:
231 case UnitKB:
230232 str = fmt.Sprintf(s.unitFormat, SpeedKB(speed))
231233 default:
232234 str = fmt.Sprintf(s.unitFormat, speed)
234236 return s.wc.FormatMsg(str, widthAccumulator, widthDistributor)
235237 }
236238
237 func (s *ewmaSpeed) NextAmount(n int) {
239 func (s *movingAverageSpeed) NextAmount(n int) {
238240 sb := <-s.sbStreamer
239241 speed := float64(n) / time.Since(sb).Seconds()
240 s.mAverage.Add(speed)
241 }
242
243 func (s *ewmaSpeed) OnCompleteMessage(msg string, wcc ...WC) {
242 s.average.Add(speed)
243 }
244
245 func (s *movingAverageSpeed) OnCompleteMessage(msg string, wcc ...WC) {
244246 var wc WC
245247 for _, widthConf := range wcc {
246248 wc = widthConf
252254 }{msg, wc}
253255 }
254256
255 func (s *ewmaSpeed) Shutdown() {
257 func (s *movingAverageSpeed) Shutdown() {
256258 close(s.sbReceiver)
257259 }
258260
259 func (s *ewmaSpeed) serve() {
261 func (s *movingAverageSpeed) serve() {
260262 for now := range s.sbReceiver {
261263 s.sbStreamer <- now
262264 }
4949 decor.CountersKibiByte("% 6.1f / % 6.1f"),
5050 ),
5151 mpb.AppendDecorators(
52 decor.ETA(decor.ET_STYLE_MMSS, 600, sbEta),
52 decor.MovingAverageETA(decor.ET_STYLE_MMSS, decor.NewMedianEwma(300), sbEta),
5353 decor.Name(" ] "),
54 decor.SpeedKibiByte("% .2f", 600, sbSpeed),
54 decor.MovingAverageSpeed(decor.UnitKiB, "% .2f", decor.NewMedianEwma(300), sbSpeed),
5555 ),
5656 )
5757