diff --git a/bar.go b/bar.go index ab56127..dcee8c2 100644 --- a/bar.go +++ b/bar.go @@ -6,10 +6,9 @@ "fmt" "io" "io/ioutil" - "runtime" + "log" "strings" "sync" - "sync/atomic" "time" "unicode/utf8" @@ -36,13 +35,13 @@ priority int index int - runningBar *Bar - cacheState *bState - operateState chan func(*bState) - bFrameCh chan *bFrame - syncTableCh chan [][]chan int - completed chan bool - forceRefreshCh chan time.Time + runningBar *Bar + cacheState *bState + operateState chan func(*bState) + bFrameCh chan *bFrame + syncTableCh chan [][]chan int + completed chan bool + forceRefresh chan<- time.Time // done is closed by Bar's goroutine, after cacheState is written done chan struct{} @@ -50,9 +49,11 @@ shutdown chan struct{} arbitraryCurrent struct { - lock uint32 + sync.Mutex current int64 } + + dlogger *log.Logger } type ( @@ -91,55 +92,33 @@ func newBar( ctx context.Context, wg *sync.WaitGroup, - forceRefreshCh chan time.Time, - filler Filler, - id, width int, - total int64, - options ...BarOption, + forceRefresh chan<- time.Time, + bs *bState, + dlogger *log.Logger, ) *Bar { - if total <= 0 { - total = time.Now().Unix() - } - - s := &bState{ - filler: filler, - id: id, - priority: id, - width: width, - total: total, - } - - for _, opt := range options { - if opt != nil { - opt(s) - } - } - - s.bufP = bytes.NewBuffer(make([]byte, 0, width)) - s.bufB = bytes.NewBuffer(make([]byte, 0, width)) - s.bufA = bytes.NewBuffer(make([]byte, 0, width)) - if s.extender != nil { - s.bufE = bytes.NewBuffer(make([]byte, 0, width)) - } - - b := &Bar{ - priority: s.priority, - runningBar: s.runningBar, - operateState: make(chan func(*bState)), - bFrameCh: make(chan *bFrame, 1), - syncTableCh: make(chan [][]chan int), - completed: make(chan bool), - done: make(chan struct{}), - shutdown: make(chan struct{}), - forceRefreshCh: forceRefreshCh, - } - - if b.runningBar != nil { - b.priority = b.runningBar.priority - } - - go b.serve(ctx, wg, s) - return b + + bs.bufP = bytes.NewBuffer(make([]byte, 0, bs.width)) + bs.bufB = bytes.NewBuffer(make([]byte, 0, bs.width)) + bs.bufA = bytes.NewBuffer(make([]byte, 0, bs.width)) + if bs.extender != nil { + bs.bufE = bytes.NewBuffer(make([]byte, 0, bs.width)) + } + + bar := &Bar{ + priority: bs.priority, + runningBar: bs.runningBar, + operateState: make(chan func(*bState)), + bFrameCh: make(chan *bFrame, 1), + syncTableCh: make(chan [][]chan int), + completed: make(chan bool), + done: make(chan struct{}), + shutdown: make(chan struct{}), + forceRefresh: forceRefresh, + dlogger: dlogger, + } + + go bar.serve(ctx, wg, bs) + return bar } // RemoveAllPrependers removes all prepend functions. @@ -212,7 +191,7 @@ if final && !s.toComplete { s.current = s.total s.toComplete = true - go b.forceRefresh() + go b.refreshNowTillShutdown() } }: return true @@ -226,13 +205,11 @@ if current <= 0 { return } - for !atomic.CompareAndSwapUint32(&b.arbitraryCurrent.lock, 0, 1) { - runtime.Gosched() - } + b.arbitraryCurrent.Lock() last := b.arbitraryCurrent.current b.IncrBy(int(current-last), wdd...) b.arbitraryCurrent.current = current - atomic.StoreUint32(&b.arbitraryCurrent.lock, 0) + b.arbitraryCurrent.Unlock() } // Increment is a shorthand for b.IncrBy(1). @@ -250,7 +227,7 @@ if s.current >= s.total && !s.toComplete { s.current = s.total s.toComplete = true - go b.forceRefresh() + go b.refreshNowTillShutdown() } for _, ar := range s.amountReceivers { ar.NextAmount(n, wdd...) @@ -308,7 +285,7 @@ // recovering if user defined decorator panics for example if p := recover(); p != nil { s.panicMsg = fmt.Sprintf("panic: %v", p) - fmt.Fprintf(debugOut, "%s %s bar id %02d %v\n", "[mpb]", time.Now(), s.id, s.panicMsg) + b.dlogger.Println(s.panicMsg) b.bFrameCh <- &bFrame{ rd: strings.NewReader(fmt.Sprintf(fmt.Sprintf("%%.%ds\n", tw), s.panicMsg)), toShutdown: true, @@ -411,10 +388,10 @@ return table } -func (b *Bar) forceRefresh() { +func (b *Bar) refreshNowTillShutdown() { for { select { - case b.forceRefreshCh <- time.Now(): + case b.forceRefresh <- time.Now(): case <-b.shutdown: return } diff --git a/options.go b/options.go index bc67da9..5ebf22b 100644 --- a/options.go +++ b/options.go @@ -48,13 +48,9 @@ } } -// WithContext provided context will be used for cancellation purposes. +// WithContext deprecated and has no effect, please use NewWithContext instead. func WithContext(ctx context.Context) ContainerOption { return func(s *pState) { - if ctx == nil { - return - } - s.ctx = ctx } } diff --git a/progress.go b/progress.go index 63a69c1..0c09098 100644 --- a/progress.go +++ b/progress.go @@ -6,6 +6,7 @@ "fmt" "io" "io/ioutil" + "log" "os" "sync" "time" @@ -22,11 +23,14 @@ // Progress represents the container that renders Progress bars type Progress struct { + ctx context.Context uwg *sync.WaitGroup cwg *sync.WaitGroup bwg *sync.WaitGroup operateState chan func(*pState) done chan struct{} + forceRefresh chan time.Time + dlogger *log.Logger } type pState struct { @@ -38,33 +42,36 @@ rr time.Duration pMatrix map[int][]chan int aMatrix map[int][]chan int - forceRefreshCh chan time.Time output io.Writer // following are provided/overrided by user - ctx context.Context uwg *sync.WaitGroup manualRefreshCh <-chan time.Time shutdownNotifier chan struct{} - waitBars map[*Bar]*Bar + parkedBars map[*Bar]*Bar debugOut io.Writer } -// New creates new Progress instance, which orchestrates bars rendering -// process. Accepts mpb.ContainerOption funcs for customization. +// New creates new Progress container instance. It's not possible to +// reuse instance after *Progress.Wait() method has been called. func New(options ...ContainerOption) *Progress { + return NewWithContext(context.Background(), options...) +} + +// NewWithContext creates new Progress container instance with provided +// context. It's not possible to reuse instance after *Progress.Wait() +// method has been called. +func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress { pq := make(priorityQueue, 0) heap.Init(&pq) s := &pState{ - ctx: context.Background(), - bHeap: &pq, - width: pwidth, - rr: prr, - waitBars: make(map[*Bar]*Bar), - forceRefreshCh: make(chan time.Time), - debugOut: ioutil.Discard, - output: os.Stdout, + bHeap: &pq, + width: pwidth, + rr: prr, + parkedBars: make(map[*Bar]*Bar), + output: os.Stdout, + debugOut: ioutil.Discard, } for _, opt := range options { @@ -74,11 +81,14 @@ } p := &Progress{ + ctx: ctx, uwg: s.uwg, cwg: new(sync.WaitGroup), bwg: new(sync.WaitGroup), operateState: make(chan func(*pState)), + forceRefresh: make(chan time.Time), done: make(chan struct{}), + dlogger: log.New(s.debugOut, "[mpb] ", log.Lshortfile), } p.cwg.Add(1) go p.serve(s, cwriter.New(s.output)) @@ -101,19 +111,42 @@ // Add creates a bar which renders itself by provided filler. func (p *Progress) Add(total int64, filler Filler, options ...BarOption) *Bar { + if total <= 0 { + total = time.Now().Unix() + } + if filler == nil { + filler = newDefaultBarFiller() + } p.bwg.Add(1) result := make(chan *Bar) select { - case p.operateState <- func(s *pState) { - b := newBar(s.ctx, p.bwg, s.forceRefreshCh, filler, s.idCounter, s.width, total, options...) - if b.runningBar != nil { - s.waitBars[b.runningBar] = b + case p.operateState <- func(ps *pState) { + logPrefix := fmt.Sprintf("%sbar#%02d ", p.dlogger.Prefix(), ps.idCounter) + dlogger := log.New(ps.debugOut, logPrefix, log.Lshortfile) + bs := &bState{ + total: total, + filler: filler, + priority: ps.idCounter, + id: ps.idCounter, + width: ps.width, + } + for _, opt := range options { + if opt != nil { + opt(bs) + } + } + bar := newBar(p.ctx, p.bwg, p.forceRefresh, bs, dlogger) + if bar.runningBar != nil { + if bar.priority == ps.idCounter { + bar.priority = bar.runningBar.priority + } + ps.parkedBars[bar.runningBar] = bar } else { - heap.Push(s.bHeap, b) - s.heapUpdated = true - } - s.idCounter++ - result <- b + heap.Push(ps.bHeap, bar) + ps.heapUpdated = true + } + ps.idCounter++ + result <- bar }: return <-result case <-p.done: @@ -185,7 +218,7 @@ manualOrTickCh, cleanUp := s.manualOrTick() defer cleanUp() - refreshCh := fanInRefreshSrc(p.done, s.forceRefreshCh, manualOrTickCh) + refreshCh := fanInRefreshSrc(p.done, p.forceRefresh, manualOrTickCh) for { select { @@ -199,7 +232,7 @@ return } if err := s.render(cw); err != nil { - fmt.Fprintf(s.debugOut, "[mpb] %s %v\n", time.Now(), err) + p.dlogger.Println(err) } } } @@ -236,10 +269,10 @@ // only after the bar with completed state has been flushed. this // ensures no bar ends up with less than 100% rendered. s.shutdownPending = append(s.shutdownPending, bar) - if replacementBar, ok := s.waitBars[bar]; ok { + if replacementBar, ok := s.parkedBars[bar]; ok { heap.Push(s.bHeap, replacementBar) s.heapUpdated = true - delete(s.waitBars, bar) + delete(s.parkedBars, bar) } if frame.removeOnComplete { s.heapUpdated = true diff --git a/progress_test.go b/progress_test.go index d02efbc..7fe0f92 100644 --- a/progress_test.go +++ b/progress_test.go @@ -76,9 +76,8 @@ func TestWithContext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) shutdown := make(chan struct{}) - p := mpb.New( + p := mpb.NewWithContext(ctx, mpb.WithOutput(ioutil.Discard), - mpb.WithContext(ctx), mpb.WithRefreshRate(50*time.Millisecond), mpb.WithShutdownNotifier(shutdown), )