Codebase list golang-github-vbauerster-mpb / 9e49535
Fix concurrent Progress.RemoveBar hang Vladimir Bauer 8 years ago
5 changed file(s) with 72 addition(s) and 65 deletion(s). Raw diff Collapse all Expand all
3232 index int
3333
3434 operateState chan func(*bState)
35 quit chan struct{}
35 done chan struct{}
36 shutdown chan struct{}
3637 once sync.Once
3738
38 // cacheState is used after quit is closed
39 // cacheState is used after done is closed
3940 cacheState *bState
4041 }
4142
9697 b := &Bar{
9798 priority: id,
9899 operateState: make(chan func(*bState)),
99 quit: make(chan struct{}),
100 done: make(chan struct{}),
101 shutdown: make(chan struct{}),
100102 }
101103
102104 go b.serve(s, wg, cancel)
109111 case b.operateState <- func(s *bState) {
110112 s.prependFuncs = nil
111113 }:
112 case <-b.quit:
114 case <-b.done:
113115 }
114116 }
115117
119121 case b.operateState <- func(s *bState) {
120122 s.appendFuncs = nil
121123 }:
122 case <-b.quit:
124 case <-b.done:
123125 }
124126 }
125127
165167 s.completed = true
166168 }
167169 }:
168 case <-b.quit:
170 case <-b.done:
169171 }
170172 }
171173
179181 case b.operateState <- func(s *bState) {
180182 s.refill = &refill{r, till}
181183 }:
182 case <-b.quit:
184 case <-b.done:
183185 }
184186 }
185187
189191 select {
190192 case b.operateState <- func(s *bState) { result <- len(s.appendFuncs) }:
191193 return <-result
192 case <-b.quit:
194 case <-b.done:
193195 return len(b.cacheState.appendFuncs)
194196 }
195197 }
200202 select {
201203 case b.operateState <- func(s *bState) { result <- len(s.prependFuncs) }:
202204 return <-result
203 case <-b.quit:
205 case <-b.done:
204206 return len(b.cacheState.prependFuncs)
205207 }
206208 }
211213 select {
212214 case b.operateState <- func(s *bState) { result <- s.id }:
213215 return <-result
214 case <-b.quit:
216 case <-b.done:
215217 return b.cacheState.id
216218 }
217219 }
222224 select {
223225 case b.operateState <- func(s *bState) { result <- s.current }:
224226 return <-result
225 case <-b.quit:
227 case <-b.done:
226228 return b.cacheState.current
227229 }
228230 }
233235 select {
234236 case b.operateState <- func(s *bState) { result <- s.total }:
235237 return <-result
236 case <-b.quit:
238 case <-b.done:
237239 return b.cacheState.total
238240 }
239241 }
246248 s.total = total
247249 s.dynamic = !final
248250 }:
249 case <-b.quit:
251 case <-b.done:
250252 }
251253 }
252254
254256 // Can be used as condition in for loop
255257 func (b *Bar) InProgress() bool {
256258 select {
257 case <-b.quit:
259 case <-b.done:
258260 return false
259261 default:
260262 return true
264266 // Complete stops bar's progress tracking, but not removes the bar.
265267 // If you need to remove, call Progress.RemoveBar(*Bar) instead.
266268 func (b *Bar) Complete() {
267 b.once.Do(b.shutdown)
268 <-b.quit
269 }
270
271 func (b *Bar) shutdown() {
272 b.quit <- struct{}{}
269 b.once.Do(func() {
270 close(b.shutdown)
271 })
273272 }
274273
275274 func (b *Bar) serve(s *bState, wg *sync.WaitGroup, cancel <-chan struct{}) {
275 defer func() {
276 b.cacheState = s
277 close(b.done)
278 wg.Done()
279 }()
276280 for {
277281 select {
278282 case op := <-b.operateState:
279283 op(s)
280284 case <-cancel:
281285 s.aborted = true
282 cancel = nil
283 go b.Complete()
284 case <-b.quit:
285 b.cacheState = s
286 close(b.quit)
287 wg.Done()
286 return
287 case <-b.shutdown:
288288 return
289289 }
290290 }
310310 s.draw(tw, prependWs, appendWs)
311311 ch <- &bufReader{io.MultiReader(s.bufP, s.bufB, s.bufA), s.completed}
312312 }:
313 case <-b.quit:
313 case <-b.done:
314314 s := b.cacheState
315315 var r io.Reader
316316 if s.panic != "" {
2626 ewg *sync.WaitGroup
2727
2828 operateState chan func(*pState)
29 quit chan struct{}
29 done chan struct{}
30 shutdown chan struct{}
3031 once sync.Once
32
33 cacheHeap *priorityQueue
3134 }
3235
3336 type (
8083 ewg: s.ewg,
8184 wg: new(sync.WaitGroup),
8285 operateState: make(chan func(*pState)),
83 quit: make(chan struct{}),
86 done: make(chan struct{}),
87 shutdown: make(chan struct{}),
8488 }
8589 go p.serve(s)
8690 return p
100104 result <- b
101105 }:
102106 return <-result
103 case <-p.quit:
104 return new(Bar)
107 case <-p.done:
108 // fail early
109 return nil
105110 }
106111 }
107112
119124 }
120125 }:
121126 return <-result
122 case <-p.quit:
127 case <-p.done:
123128 return false
124129 }
125130 }
131136 case p.operateState <- func(s *pState) {
132137 s.bHeap.update(b, priority)
133138 }:
134 case <-p.quit:
139 case <-p.done:
135140 }
136141 }
137142
143148 result <- s.bHeap.Len()
144149 }:
145150 return <-result
146 case <-p.quit:
147 return 0
151 case <-p.done:
152 return p.cacheHeap.Len()
148153 }
149154 }
150155
158163 }
159164 // first wait for all bars to quit
160165 p.wg.Wait()
161 p.once.Do(p.shutdown)
162 <-p.quit
163 }
164
165 func (p *Progress) shutdown() {
166 p.quit <- struct{}{}
166 p.once.Do(func() {
167 close(p.shutdown)
168 })
169 <-p.done
167170 }
168171
169172 func newWidthSync(timeout <-chan struct{}, numBars, numColumn int) *widthSync {
1616 winch := make(chan os.Signal, 1)
1717 signal.Notify(winch, syscall.SIGWINCH)
1818
19 defer func() {
20 s.ticker.Stop()
21 signal.Stop(winch)
22 p.cacheHeap = s.bHeap
23 close(p.done)
24 if s.shutdownNotifier != nil {
25 close(s.shutdownNotifier)
26 }
27 }()
28
1929 var numP, numA int
2030 var timer *time.Timer
2131 var resumeTicker <-chan time.Time
22 resumeDelay := 320 * time.Millisecond
32 resumeDelay := 300 * time.Millisecond
2333
2434 for {
2535 select {
5666 s.ticker = time.NewTicker(s.rr)
5767 resumeTicker = nil
5868 case <-s.cancel:
59 s.ticker.Stop()
60 s.cancel = nil
61 // don't return here, p.Stop() must be called eventually
62 case <-p.quit:
63 close(p.quit)
64 if s.cancel != nil {
65 s.ticker.Stop()
66 }
67 if s.shutdownNotifier != nil {
68 close(s.shutdownNotifier)
69 }
70 signal.Stop(winch)
69 return
70 case <-p.shutdown:
7171 return
7272 }
7373 }
1212 "github.com/vbauerster/mpb"
1313 "github.com/vbauerster/mpb/decor"
1414 )
15
16 func init() {
17 rand.Seed(time.Now().UnixNano())
18 }
1519
1620 func TestAddBar(t *testing.T) {
1721 p := mpb.New()
1010 )
1111
1212 func (p *Progress) serve(s *pState) {
13
14 defer func() {
15 s.ticker.Stop()
16 p.cacheHeap = s.bHeap
17 close(p.done)
18 if s.shutdownNotifier != nil {
19 close(s.shutdownNotifier)
20 }
21 }()
1322
1423 var numP, numA int
1524
3342 fmt.Fprintln(os.Stderr, err)
3443 }
3544 case <-s.cancel:
36 s.ticker.Stop()
37 s.cancel = nil
38 // don't return here, p.Stop() must be called eventually
39 case <-p.quit:
40 close(p.quit)
41 if s.cancel != nil {
42 s.ticker.Stop()
43 }
44 if s.shutdownNotifier != nil {
45 close(s.shutdownNotifier)
46 }
45 return
46 case <-p.shutdown:
4747 return
4848 }
4949 }