Codebase list golang-github-vbauerster-mpb / 05fe2e1
drawer bounded pipeline Vladimir Bauer 9 years ago
2 changed file(s) with 86 addition(s) and 19 deletion(s). Raw diff Collapse all Expand all
185185 return b
186186 }
187187
188 func (b *Bar) Bytes(width int) []byte {
188 func (b *Bar) bytes(width int) []byte {
189189 if width <= 0 {
190190 width = b.width
191191 }
251251 close(b.done)
252252 }
253253
254 func (b *Bar) flushDone() {
255 if !b.isDone() {
256 b.flushedCh <- struct{}{}
257 }
258 }
259
260 func (b *Bar) remove() {
261 if !b.isDone() {
262 b.removeReqCh <- struct{}{}
263 }
264 }
265
254266 func (b *Bar) draw(s state, termWidth int) []byte {
255267 buf := make([]byte, 0, termWidth)
256268
44 "errors"
55 "io"
66 "os"
7 "sort"
87 "sync"
98 "time"
109
175174
176175 // server monitors underlying channels and renders any progress bars
177176 func (p *Progress) server(cw *cwriter.Writer, t *time.Ticker) {
177 const numDigesters = 4
178178 bars := make([]*Bar, 0, 4)
179179 for {
180180 select {
196196 if b == op.bar {
197197 bars = append(bars[:i], bars[i+1:]...)
198198 ok = true
199 b.removeReqCh <- struct{}{}
199 b.remove()
200200 break
201201 }
202202 }
206206 respCh <- len(bars)
207207 case <-t.C:
208208 width, _ := cwriter.TerminalWidth()
209 switch p.sort {
210 case SortTop:
211 sort.Sort(sort.Reverse(SortableBarSlice(bars)))
212 case SortBottom:
213 sort.Sort(SortableBarSlice(bars))
214 }
215 // TODO: pipelines?
216 for _, b := range bars {
217 buf := b.Bytes(width)
218 buf = append(buf, '\n')
219 cw.Write(buf)
209 ibars := iBarsGen(p.ctx.Done(), bars, width)
210 c := make(chan *indexedBarBuffer)
211 var wg sync.WaitGroup
212 wg.Add(numDigesters)
213 for i := 0; i < numDigesters; i++ {
214 go func() {
215 drawer(p.ctx.Done(), ibars, c)
216 wg.Done()
217 }()
218 }
219 go func() {
220 wg.Wait()
221 close(c)
222 }()
223
224 m := make(map[int][]byte, len(bars))
225 for r := range c {
226 m[r.index] = r.buff
227 }
228 for i := 0; i < len(bars); i++ {
229 m[i] = append(m[i], '\n')
230 cw.Write(m[i])
220231 }
221232 cw.Flush()
222 for _, b := range bars {
223 go func(b *Bar) {
224 b.flushedCh <- struct{}{}
225 }(b)
226 }
233 go flushed(p.ctx.Done(), bars)
234
227235 case d := <-p.rrChangeReqCh:
228236 t.Stop()
229237 t = time.NewTicker(d)
235243 }
236244 }
237245
246 type indexedBarBuffer struct {
247 index int
248 buff []byte
249 }
250
251 type indexedBar struct {
252 index int
253 width int
254 bar *Bar
255 }
256
257 func drawer(done <-chan struct{}, ibars <-chan *indexedBar, c chan<- *indexedBarBuffer) {
258 for b := range ibars {
259 select {
260 case c <- &indexedBarBuffer{b.index, b.bar.bytes(b.width)}:
261 case <-done:
262 return
263 }
264 }
265 }
266
267 func iBarsGen(done <-chan struct{}, bars []*Bar, width int) <-chan *indexedBar {
268 ibars := make(chan *indexedBar)
269 go func() {
270 defer close(ibars)
271 for i, b := range bars {
272 select {
273 case ibars <- &indexedBar{i, width, b}:
274 case <-done:
275 return
276 }
277 }
278 }()
279 return ibars
280 }
281
282 func flushed(done <-chan struct{}, bars []*Bar) {
283 for _, b := range bars {
284 select {
285 case <-done:
286 return
287 default:
288 b.flushDone()
289 }
290 }
291 }
292
238293 func (p *Progress) isDone() bool {
239294 select {
240295 case <-p.done: