Codebase list golang-github-vbauerster-mpb / de760b5
refactoring: parkedBars, NewWithContext Vladimir Bauer 7 years ago
4 changed file(s) with 105 addition(s) and 100 deletion(s). Raw diff Collapse all Expand all
55 "fmt"
66 "io"
77 "io/ioutil"
8 "runtime"
8 "log"
99 "strings"
1010 "sync"
11 "sync/atomic"
1211 "time"
1312 "unicode/utf8"
1413
3534 priority int
3635 index int
3736
38 runningBar *Bar
39 cacheState *bState
40 operateState chan func(*bState)
41 bFrameCh chan *bFrame
42 syncTableCh chan [][]chan int
43 completed chan bool
44 forceRefreshCh chan time.Time
37 runningBar *Bar
38 cacheState *bState
39 operateState chan func(*bState)
40 bFrameCh chan *bFrame
41 syncTableCh chan [][]chan int
42 completed chan bool
43 forceRefresh chan<- time.Time
4544
4645 // done is closed by Bar's goroutine, after cacheState is written
4746 done chan struct{}
4948 shutdown chan struct{}
5049
5150 arbitraryCurrent struct {
52 lock uint32
51 sync.Mutex
5352 current int64
5453 }
54
55 dlogger *log.Logger
5556 }
5657
5758 type (
9091 func newBar(
9192 ctx context.Context,
9293 wg *sync.WaitGroup,
93 forceRefreshCh chan time.Time,
94 filler Filler,
95 id, width int,
96 total int64,
97 options ...BarOption,
94 forceRefresh chan<- time.Time,
95 bs *bState,
96 dlogger *log.Logger,
9897 ) *Bar {
99 if total <= 0 {
100 total = time.Now().Unix()
101 }
102
103 s := &bState{
104 filler: filler,
105 id: id,
106 priority: id,
107 width: width,
108 total: total,
109 }
110
111 for _, opt := range options {
112 if opt != nil {
113 opt(s)
114 }
115 }
116
117 s.bufP = bytes.NewBuffer(make([]byte, 0, width))
118 s.bufB = bytes.NewBuffer(make([]byte, 0, width))
119 s.bufA = bytes.NewBuffer(make([]byte, 0, width))
120 if s.extender != nil {
121 s.bufE = bytes.NewBuffer(make([]byte, 0, width))
122 }
123
124 b := &Bar{
125 priority: s.priority,
126 runningBar: s.runningBar,
127 operateState: make(chan func(*bState)),
128 bFrameCh: make(chan *bFrame, 1),
129 syncTableCh: make(chan [][]chan int),
130 completed: make(chan bool),
131 done: make(chan struct{}),
132 shutdown: make(chan struct{}),
133 forceRefreshCh: forceRefreshCh,
134 }
135
136 if b.runningBar != nil {
137 b.priority = b.runningBar.priority
138 }
139
140 go b.serve(ctx, wg, s)
141 return b
98
99 bs.bufP = bytes.NewBuffer(make([]byte, 0, bs.width))
100 bs.bufB = bytes.NewBuffer(make([]byte, 0, bs.width))
101 bs.bufA = bytes.NewBuffer(make([]byte, 0, bs.width))
102 if bs.extender != nil {
103 bs.bufE = bytes.NewBuffer(make([]byte, 0, bs.width))
104 }
105
106 bar := &Bar{
107 priority: bs.priority,
108 runningBar: bs.runningBar,
109 operateState: make(chan func(*bState)),
110 bFrameCh: make(chan *bFrame, 1),
111 syncTableCh: make(chan [][]chan int),
112 completed: make(chan bool),
113 done: make(chan struct{}),
114 shutdown: make(chan struct{}),
115 forceRefresh: forceRefresh,
116 dlogger: dlogger,
117 }
118
119 go bar.serve(ctx, wg, bs)
120 return bar
142121 }
143122
144123 // RemoveAllPrependers removes all prepend functions.
211190 if final && !s.toComplete {
212191 s.current = s.total
213192 s.toComplete = true
214 go b.forceRefresh()
193 go b.refreshNowTillShutdown()
215194 }
216195 }:
217196 return true
225204 if current <= 0 {
226205 return
227206 }
228 for !atomic.CompareAndSwapUint32(&b.arbitraryCurrent.lock, 0, 1) {
229 runtime.Gosched()
230 }
207 b.arbitraryCurrent.Lock()
231208 last := b.arbitraryCurrent.current
232209 b.IncrBy(int(current-last), wdd...)
233210 b.arbitraryCurrent.current = current
234 atomic.StoreUint32(&b.arbitraryCurrent.lock, 0)
211 b.arbitraryCurrent.Unlock()
235212 }
236213
237214 // Increment is a shorthand for b.IncrBy(1).
249226 if s.current >= s.total && !s.toComplete {
250227 s.current = s.total
251228 s.toComplete = true
252 go b.forceRefresh()
229 go b.refreshNowTillShutdown()
253230 }
254231 for _, ar := range s.amountReceivers {
255232 ar.NextAmount(n, wdd...)
307284 // recovering if user defined decorator panics for example
308285 if p := recover(); p != nil {
309286 s.panicMsg = fmt.Sprintf("panic: %v", p)
310 fmt.Fprintf(debugOut, "%s %s bar id %02d %v\n", "[mpb]", time.Now(), s.id, s.panicMsg)
287 b.dlogger.Println(s.panicMsg)
311288 b.bFrameCh <- &bFrame{
312289 rd: strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%ds\n", tw), s.panicMsg)),
313290 toShutdown: true,
410387 return table
411388 }
412389
413 func (b *Bar) forceRefresh() {
390 func (b *Bar) refreshNowTillShutdown() {
414391 for {
415392 select {
416 case b.forceRefreshCh <- time.Now():
393 case b.forceRefresh <- time.Now():
417394 case <-b.shutdown:
418395 return
419396 }
4747 }
4848 }
4949
50 // WithContext provided context will be used for cancellation purposes.
50 // WithContext deprecated and has no effect, please use NewWithContext instead.
5151 func WithContext(ctx context.Context) ContainerOption {
5252 return func(s *pState) {
53 if ctx == nil {
54 return
55 }
56 s.ctx = ctx
5753 }
5854 }
5955
55 "fmt"
66 "io"
77 "io/ioutil"
8 "log"
89 "os"
910 "sync"
1011 "time"
2122
2223 // Progress represents the container that renders Progress bars
2324 type Progress struct {
25 ctx context.Context
2426 uwg *sync.WaitGroup
2527 cwg *sync.WaitGroup
2628 bwg *sync.WaitGroup
2729 operateState chan func(*pState)
2830 done chan struct{}
31 forceRefresh chan time.Time
32 dlogger *log.Logger
2933 }
3034
3135 type pState struct {
3741 rr time.Duration
3842 pMatrix map[int][]chan int
3943 aMatrix map[int][]chan int
40 forceRefreshCh chan time.Time
4144 output io.Writer
4245
4346 // following are provided/overrided by user
44 ctx context.Context
4547 uwg *sync.WaitGroup
4648 manualRefreshCh <-chan time.Time
4749 shutdownNotifier chan struct{}
48 waitBars map[*Bar]*Bar
50 parkedBars map[*Bar]*Bar
4951 debugOut io.Writer
5052 }
5153
52 // New creates new Progress instance, which orchestrates bars rendering
53 // process. Accepts mpb.ContainerOption funcs for customization.
54 // New creates new Progress container instance. It's not possible to
55 // reuse instance after *Progress.Wait() method has been called.
5456 func New(options ...ContainerOption) *Progress {
57 return NewWithContext(context.Background(), options...)
58 }
59
60 // NewWithContext creates new Progress container instance with provided
61 // context. It's not possible to reuse instance after *Progress.Wait()
62 // method has been called.
63 func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
5564 pq := make(priorityQueue, 0)
5665 heap.Init(&pq)
5766
5867 s := &pState{
59 ctx: context.Background(),
60 bHeap: &pq,
61 width: pwidth,
62 rr: prr,
63 waitBars: make(map[*Bar]*Bar),
64 forceRefreshCh: make(chan time.Time),
65 debugOut: ioutil.Discard,
66 output: os.Stdout,
68 bHeap: &pq,
69 width: pwidth,
70 rr: prr,
71 parkedBars: make(map[*Bar]*Bar),
72 output: os.Stdout,
73 debugOut: ioutil.Discard,
6774 }
6875
6976 for _, opt := range options {
7380 }
7481
7582 p := &Progress{
83 ctx: ctx,
7684 uwg: s.uwg,
7785 cwg: new(sync.WaitGroup),
7886 bwg: new(sync.WaitGroup),
7987 operateState: make(chan func(*pState)),
88 forceRefresh: make(chan time.Time),
8089 done: make(chan struct{}),
90 dlogger: log.New(s.debugOut, "[mpb] ", log.Lshortfile),
8191 }
8292 p.cwg.Add(1)
8393 go p.serve(s, cwriter.New(s.output))
100110
101111 // Add creates a bar which renders itself by provided filler.
102112 func (p *Progress) Add(total int64, filler Filler, options ...BarOption) *Bar {
113 if total <= 0 {
114 total = time.Now().Unix()
115 }
116 if filler == nil {
117 filler = newDefaultBarFiller()
118 }
103119 p.bwg.Add(1)
104120 result := make(chan *Bar)
105121 select {
106 case p.operateState <- func(s *pState) {
107 b := newBar(s.ctx, p.bwg, s.forceRefreshCh, filler, s.idCounter, s.width, total, options...)
108 if b.runningBar != nil {
109 s.waitBars[b.runningBar] = b
122 case p.operateState <- func(ps *pState) {
123 logPrefix := fmt.Sprintf("%sbar#%02d ", p.dlogger.Prefix(), ps.idCounter)
124 dlogger := log.New(ps.debugOut, logPrefix, log.Lshortfile)
125 bs := &bState{
126 total: total,
127 filler: filler,
128 priority: ps.idCounter,
129 id: ps.idCounter,
130 width: ps.width,
131 }
132 for _, opt := range options {
133 if opt != nil {
134 opt(bs)
135 }
136 }
137 bar := newBar(p.ctx, p.bwg, p.forceRefresh, bs, dlogger)
138 if bar.runningBar != nil {
139 if bar.priority == ps.idCounter {
140 bar.priority = bar.runningBar.priority
141 }
142 ps.parkedBars[bar.runningBar] = bar
110143 } else {
111 heap.Push(s.bHeap, b)
112 s.heapUpdated = true
113 }
114 s.idCounter++
115 result <- b
144 heap.Push(ps.bHeap, bar)
145 ps.heapUpdated = true
146 }
147 ps.idCounter++
148 result <- bar
116149 }:
117150 return <-result
118151 case <-p.done:
184217 manualOrTickCh, cleanUp := s.manualOrTick()
185218 defer cleanUp()
186219
187 refreshCh := fanInRefreshSrc(p.done, s.forceRefreshCh, manualOrTickCh)
220 refreshCh := fanInRefreshSrc(p.done, p.forceRefresh, manualOrTickCh)
188221
189222 for {
190223 select {
198231 return
199232 }
200233 if err := s.render(cw); err != nil {
201 fmt.Fprintf(s.debugOut, "[mpb] %s %v\n", time.Now(), err)
234 p.dlogger.Println(err)
202235 }
203236 }
204237 }
235268 // only after the bar with completed state has been flushed. this
236269 // ensures no bar ends up with less than 100% rendered.
237270 s.shutdownPending = append(s.shutdownPending, bar)
238 if replacementBar, ok := s.waitBars[bar]; ok {
271 if replacementBar, ok := s.parkedBars[bar]; ok {
239272 heap.Push(s.bHeap, replacementBar)
240273 s.heapUpdated = true
241 delete(s.waitBars, bar)
274 delete(s.parkedBars, bar)
242275 }
243276 if frame.removeOnComplete {
244277 s.heapUpdated = true
7575 func TestWithContext(t *testing.T) {
7676 ctx, cancel := context.WithCancel(context.Background())
7777 shutdown := make(chan struct{})
78 p := mpb.New(
78 p := mpb.NewWithContext(ctx,
7979 mpb.WithOutput(ioutil.Discard),
80 mpb.WithContext(ctx),
8180 mpb.WithRefreshRate(50*time.Millisecond),
8281 mpb.WithShutdownNotifier(shutdown),
8382 )