Codebase list golang-github-vbauerster-mpb / b346b6e
make dropS, dropD channels on pState Those channels closed once on unrecoverable errors in order to prevent goroutine leaks. Vladimir Bauer 3 years ago
3 changed file(s) with 28 addition(s) and 17 deletion(s). Raw diff Collapse all Expand all
181181 }
182182
183183 for _, columnCase := range testCases {
184 mpb.SyncWidth(toSyncMatrix(columnCase))
184 mpb.SyncWidth(toSyncMatrix(columnCase), nil)
185185 var results []chan string
186186 for _, step := range columnCase {
187187 step := step
6464 sync = false
6565 l = bHeap.Len()
6666 }
67 syncWidth(pMatrix)
68 syncWidth(aMatrix)
67 drop := req.data.(<-chan struct{})
68 syncWidth(pMatrix, drop)
69 syncWidth(aMatrix, drop)
6970 case h_iter:
7071 data := req.data.(iterData)
7172 for _, b := range bHeap {
103104 }
104105 }
105106
106 func (m heapManager) sync() {
107 m <- heapRequest{cmd: h_sync}
107 func (m heapManager) sync(drop <-chan struct{}) {
108 m <- heapRequest{cmd: h_sync, data: drop}
108109 }
109110
110111 func (m heapManager) push(b *Bar, sync bool) {
134135 m <- heapRequest{cmd: h_end, data: ch}
135136 }
136137
137 func syncWidth(matrix map[int][]chan int) {
138 func syncWidth(matrix map[int][]chan int, drop <-chan struct{}) {
138139 for _, column := range matrix {
139 go maxWidthDistributor(column)
140 go maxWidthDistributor(column, drop)
140141 }
141142 }
142143
143 func maxWidthDistributor(column []chan int) {
144 func maxWidthDistributor(column []chan int, drop <-chan struct{}) {
144145 var maxWidth int
145146 for _, ch := range column {
146 if w := <-ch; w > maxWidth {
147 maxWidth = w
147 select {
148 case w := <-ch:
149 if w > maxWidth {
150 maxWidth = w
151 }
152 case <-drop:
153 return
148154 }
149155 }
150156 for _, ch := range column {
3333
3434 // pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine.
3535 type pState struct {
36 ctx context.Context
37 hm heapManager
38 rows []io.Reader
36 ctx context.Context
37 hm heapManager
38 dropS, dropD chan struct{}
39 rows []io.Reader
3940
4041 // following are provided/overrided by user
4142 refreshRate time.Duration
6869 s := &pState{
6970 ctx: ctx,
7071 hm: make(heapManager),
72 dropS: make(chan struct{}),
73 dropD: make(chan struct{}),
7174 rows: make([]io.Reader, 32),
7275 refreshRate: defaultRefreshRate,
7376 popPriority: math.MinInt32,
299302 }
300303
301304 func (s *pState) render(cw *cwriter.Writer) (err error) {
305 s.hm.sync(s.dropS)
306
302307 var width, height int
303308 if cw.IsTerminal() {
304309 width, height, err = cw.GetTermSize()
305310 if err != nil {
311 close(s.dropS)
306312 return err
307313 }
308314 } else {
314320 height = 100
315321 }
316322
317 s.hm.sync()
318323 iter := make(chan *Bar)
319324 s.hm.iter(iter, nil)
320325 for b := range iter {
330335
331336 var popCount int
332337
333 iter, drop := make(chan *Bar), make(chan struct{})
334 s.hm.drain(iter, drop)
338 iter := make(chan *Bar)
339 s.hm.drain(iter, s.dropD)
335340 s.rows = s.rows[:0]
336341
337342 for b := range iter {
338343 frame := <-b.frameCh
339344 if frame.err != nil {
340 close(drop)
345 close(s.dropD)
341346 b.cancel()
342347 return frame.err // b.frameCh is buffered it's ok to return here
343348 }