Codebase list golang-github-vbauerster-mpb / 4417ccd
Refactoring: render and flush Vladimir Bauer 8 years ago
4 changed file(s) with 102 addition(s) and 112 deletion(s). Raw diff Collapse all Expand all
3131 priority int
3232 index int
3333
34 // pointer to running bar, which this bar should replace
35 runningBar *Bar
36
37 // completed is set from master Progress goroutine only
38 completed bool
39
40 removeOnComplete bool
41
42 operateState chan func(*bState)
34 runningBar *Bar
35 cacheState *bState
36 operateState chan func(*bState)
37 frameReaderCh chan io.Reader
38
4339 // done is closed by Bar's goroutine, after cacheState is written
4440 done chan struct{}
4541 // shutdown is closed from master Progress goroutine only
4642 shutdown chan struct{}
47
48 cacheState *bState
4943 }
5044
5145 type (
6256 trimRightSpace bool
6357 toComplete bool
6458 dynamic bool
59 removeOnComplete bool
6560 barClearOnComplete bool
6661 completeFlushed bool
6762 startTime time.Time
7671 panicMsg string
7772
7873 // following options are assigned to the *Bar
79 priority int
80 removeOnComplete bool
81 runningBar *Bar
74 priority int
75 runningBar *Bar
8276 }
8377 refill struct {
8478 char rune
8579 till int64
8680 }
87 bFrame struct {
88 bar *Bar
89 reader io.Reader
90 toComplete bool
81 frameReader struct {
82 io.Reader
83 toShutdown bool
84 removeOnComplete bool
9185 }
9286 )
9387
116110 s.bufA = bytes.NewBuffer(make([]byte, 0, s.width))
117111
118112 b := &Bar{
119 priority: s.priority,
120 removeOnComplete: s.removeOnComplete,
121 runningBar: s.runningBar,
122 operateState: make(chan func(*bState)),
123 done: make(chan struct{}),
124 shutdown: make(chan struct{}),
113 priority: s.priority,
114 runningBar: s.runningBar,
115 operateState: make(chan func(*bState)),
116 frameReaderCh: make(chan io.Reader, 1),
117 done: make(chan struct{}),
118 shutdown: make(chan struct{}),
125119 }
126120
127121 if b.runningBar != nil {
172166
173167 // NumOfAppenders returns current number of append decorators
174168 func (b *Bar) NumOfAppenders() int {
175 result := make(chan int, 1)
169 result := make(chan int)
176170 select {
177171 case b.operateState <- func(s *bState) { result <- len(s.aDecorators) }:
178172 return <-result
183177
184178 // NumOfPrependers returns current number of prepend decorators
185179 func (b *Bar) NumOfPrependers() int {
186 result := make(chan int, 1)
180 result := make(chan int)
187181 select {
188182 case b.operateState <- func(s *bState) { result <- len(s.pDecorators) }:
189183 return <-result
194188
195189 // ID returs id of the bar
196190 func (b *Bar) ID() int {
197 result := make(chan int, 1)
191 result := make(chan int)
198192 select {
199193 case b.operateState <- func(s *bState) { result <- s.id }:
200194 return <-result
205199
206200 // Current returns bar's current number, in other words sum of all increments.
207201 func (b *Bar) Current() int64 {
208 result := make(chan int64, 1)
202 result := make(chan int64)
209203 select {
210204 case b.operateState <- func(s *bState) { result <- s.current }:
211205 return <-result
216210
217211 // Total returns bar's total number.
218212 func (b *Bar) Total() int64 {
219 result := make(chan int64, 1)
213 result := make(chan int64)
220214 select {
221215 case b.operateState <- func(s *bState) { result <- s.total }:
222216 return <-result
246240 func (b *Bar) StartBlock() {
247241 now := time.Now()
248242 select {
249 case b.operateState <- func(s *bState) { s.blockStartTime = now }:
243 case b.operateState <- func(s *bState) {
244 if s.current == 0 {
245 s.startTime = now
246 }
247 s.blockStartTime = now
248 }:
250249 case <-b.done:
251250 }
252251 }
275274
276275 // Completed reports whether the bar is in completed state
277276 func (b *Bar) Completed() bool {
278 result := make(chan bool, 1)
277 result := make(chan bool)
279278 select {
280279 case b.operateState <- func(s *bState) { result <- s.toComplete }:
281280 return <-result
303302 }
304303 }
305304
306 func (b *Bar) render(debugOut io.Writer, tw int, pSyncer, aSyncer *widthSyncer) <-chan *bFrame {
307 ch := make(chan *bFrame, 1)
308
309 go func() {
310 select {
311 case b.operateState <- func(s *bState) {
312 var r io.Reader
313 defer func() {
314 // recovering if external decorators panic
315 if p := recover(); p != nil {
316 s.panicMsg = fmt.Sprintf("panic: %v", p)
317 s.pDecorators = nil
318 s.aDecorators = nil
319 s.toComplete = true
320 // truncate panic msg to one tw line, if necessary
321 r = strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%ds\n", tw), s.panicMsg))
322 fmt.Fprintf(debugOut, "%s %s bar id %02d %v\n", "[mpb]", time.Now(), s.id, s.panicMsg)
323 }
324 ch <- &bFrame{b, r, s.toComplete && !(s.barClearOnComplete && !s.completeFlushed)}
325 s.completeFlushed = s.toComplete
326 }()
305 func (b *Bar) render(debugOut io.Writer, tw int, pSyncer, aSyncer *widthSyncer) {
306 var r io.Reader
307 select {
308 case b.operateState <- func(s *bState) {
309 defer func() {
310 // recovering if external decorators panic
311 if p := recover(); p != nil {
312 s.panicMsg = fmt.Sprintf("panic: %v", p)
313 s.pDecorators = nil
314 s.aDecorators = nil
315 s.toComplete = true
316 // truncate panic msg to one tw line, if necessary
317 r = strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%ds\n", tw), s.panicMsg))
318 fmt.Fprintf(debugOut, "%s %s bar id %02d %v\n", "[mpb]", time.Now(), s.id, s.panicMsg)
319 }
320 b.frameReaderCh <- &frameReader{
321 Reader: r,
322 toShutdown: s.toComplete && !s.completeFlushed,
323 removeOnComplete: s.removeOnComplete,
324 }
325 s.completeFlushed = s.toComplete
326 }()
327 r = s.draw(tw, pSyncer, aSyncer)
328 }:
329 case <-b.done:
330 s := b.cacheState
331 if s.panicMsg != "" {
332 r = strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%ds\n", tw), s.panicMsg))
333 } else {
327334 r = s.draw(tw, pSyncer, aSyncer)
328 }:
329 case <-b.done:
330 s := b.cacheState
331 var r io.Reader
332 if s.panicMsg != "" {
333 r = strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%ds\n", tw), s.panicMsg))
334 } else {
335 r = s.draw(tw, pSyncer, aSyncer)
336 }
337 ch <- &bFrame{b, r, true}
338 }
339 }()
340
341 return ch
335 }
336 b.frameReaderCh <- &frameReader{
337 Reader: r,
338 }
339 }
342340 }
343341
344342 func (s *bState) draw(termWidth int, pSyncer, aSyncer *widthSyncer) io.Reader {
11
22 import (
33 "container/heap"
4 "fmt"
45 "io"
56 "io/ioutil"
67 "os"
203204 return ws
204205 }
205206
206 func (s *pState) writeAndFlush(tw, numP, numA int) (err error) {
207 func (s *pState) render(tw, numP, numA int) {
207208 timeout := make(chan struct{})
208209 pSyncer := newWidthSyncer(timeout, s.bHeap.Len(), numP)
209210 aSyncer := newWidthSyncer(timeout, s.bHeap.Len(), numA)
211212 close(timeout)
212213 })
213214
214 for _, ch := range s.renderByPriority(tw, pSyncer, aSyncer) {
215 bf := <-ch
216 _, err = s.cw.ReadFrom(bf.reader)
217 if !bf.bar.completed && bf.toComplete {
218 // shutdown at next flush, in other words decrement underlying WaitGroup
219 // only after the bar with completed state has been flushed.
220 // this ensures no bar ends up with less than 100% rendered.
221 defer func() {
222 s.shutdownPending = append(s.shutdownPending, bf.bar)
223 }()
224 if bf.bar.removeOnComplete {
225 s.heapUpdated = heap.Remove(s.bHeap, bf.bar.index) != nil
215 for i := 0; i < s.bHeap.Len(); i++ {
216 bar := (*s.bHeap)[i]
217 go bar.render(s.debugOut, tw, pSyncer, aSyncer)
218 }
219
220 if err := s.flush(); err != nil {
221 fmt.Fprintf(s.debugOut, "%s %s %v\n", "[mpb]", time.Now(), err)
222 }
223 }
224
225 func (s *pState) flush() (err error) {
226 for s.bHeap.Len() > 0 {
227 bar := heap.Pop(s.bHeap).(*Bar)
228 reader := <-bar.frameReaderCh
229 _, err = s.cw.ReadFrom(reader)
230 frame := reader.(*frameReader)
231 defer func() {
232 if frame.toShutdown {
233 // shutdown at next flush, in other words decrement underlying WaitGroup
234 // only after the bar with completed state has been flushed.
235 // this ensures no bar ends up with less than 100% rendered.
236 s.shutdownPending = append(s.shutdownPending, bar)
237 if replacementBar, ok := s.waitBars[bar]; ok {
238 heap.Push(s.bHeap, replacementBar)
239 s.heapUpdated = true
240 delete(s.waitBars, bar)
241 }
242 if frame.removeOnComplete {
243 s.heapUpdated = true
244 return
245 }
226246 }
227 if replacementBar, ok := s.waitBars[bf.bar]; ok {
228 heap.Push(s.bHeap, replacementBar)
229 s.heapUpdated = true
230 delete(s.waitBars, bf.bar)
231 }
232 bf.bar.completed = true
233 }
247 heap.Push(s.bHeap, bar)
248 }()
234249 }
235250
236251 for _, interceptor := range s.interceptors {
246261 s.shutdownPending = s.shutdownPending[:i]
247262 }
248263 return
249 }
250
251 func (s *pState) renderByPriority(tw int, pSyncer, aSyncer *widthSyncer) []<-chan *bFrame {
252 bff := make([]<-chan *bFrame, 0, s.bHeap.Len())
253 for s.bHeap.Len() > 0 {
254 b := heap.Pop(s.bHeap).(*Bar)
255 defer heap.Push(s.bHeap, b)
256 bff = append(bff, b.render(s.debugOut, tw, pSyncer, aSyncer))
257 }
258 return bff
259264 }
260265
261266 func calcMax(slice []int) int {
22 package mpb
33
44 import (
5 "fmt"
65 "os"
76 "os/signal"
87 "syscall"
4039 s.heapUpdated = false
4140 }
4241 tw, _, _ := cwriter.TermSize()
43 err := s.writeAndFlush(tw, numP, numA)
44 if err != nil {
45 fmt.Fprintf(s.debugOut, "%s %s %v\n", "[mpb]", time.Now(), err)
46 }
42 s.render(tw, numP, numA)
4743 case <-winch:
4844 if s.heapUpdated {
4945 numP = s.bHeap.maxNumP()
5147 s.heapUpdated = false
5248 }
5349 tw, _, _ := cwriter.TermSize()
54 err := s.writeAndFlush(tw-tw/8, numP, numA)
55 if err != nil {
56 fmt.Fprintf(s.debugOut, "%s %s %v\n", "[mpb]", time.Now(), err)
57 }
50 s.render(tw-tw/8, numP, numA)
5851 if timer != nil && timer.Reset(resumeDelay) {
5952 break
6053 }
22 package mpb
33
44 import (
5 "fmt"
6 "time"
7
85 "github.com/vbauerster/mpb/cwriter"
96 )
107
2926 s.heapUpdated = false
3027 }
3128 tw, _, _ := cwriter.TermSize()
32 err := s.writeAndFlush(tw, numP, numA)
33 if err != nil {
34 fmt.Fprintf(s.debugOut, "%s %s %v\n", "[mpb]", time.Now(), err)
35 }
29 s.render(tw, numP, numA)
3630 }
3731 }
3832 }