diff --git a/_examples/progressAsWriter/go.mod b/_examples/progressAsWriter/go.mod new file mode 100644 index 0000000..8521861 --- /dev/null +++ b/_examples/progressAsWriter/go.mod @@ -0,0 +1,13 @@ +module github.com/vbauerster/mpb/_examples/progressAsWriter + +go 1.17 + +require github.com/vbauerster/mpb/v8 v8.0.0 + +require ( + github.com/VividCortex/ewma v1.2.0 // indirect + github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect + github.com/mattn/go-runewidth v0.0.13 // indirect + github.com/rivo/uniseg v0.2.0 // indirect + golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect +) diff --git a/_examples/progressAsWriter/main.go b/_examples/progressAsWriter/main.go new file mode 100644 index 0000000..261a958 --- /dev/null +++ b/_examples/progressAsWriter/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "fmt" + "log" + "math/rand" + "sync" + "time" + + "github.com/vbauerster/mpb/v8" + "github.com/vbauerster/mpb/v8/decor" +) + +func main() { + total, numBars := 100, 2 + var wg sync.WaitGroup + wg.Add(numBars) + done := make(chan struct{}) + p := mpb.New( + mpb.WithWidth(64), + mpb.WithWaitGroup(&wg), + mpb.WithShutdownNotifier(done), + ) + + log.SetOutput(p) + + for i := 0; i < numBars; i++ { + name := fmt.Sprintf("Bar#%d:", i) + bar := p.AddBar(int64(total), + mpb.PrependDecorators( + decor.Name(name), + decor.Percentage(decor.WCSyncSpace), + ), + mpb.AppendDecorators( + decor.OnComplete( + decor.EwmaETA(decor.ET_STYLE_GO, 60, decor.WCSyncWidth), "done", + ), + ), + ) + // simulating some work + go func() { + defer wg.Done() + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + max := 100 * time.Millisecond + for i := 0; i < total; i++ { + // start variable is solely for EWMA calculation + // EWMA's unit of measure is an iteration's duration + start := time.Now() + time.Sleep(time.Duration(rng.Intn(10)+1) * max / 10) + bar.Increment() + // we need to call DecoratorEwmaUpdate to fulfill ewma decorator's contract + bar.DecoratorEwmaUpdate(time.Since(start)) + } + log.Println(name, "done") + }() + } + + var qwg sync.WaitGroup + qwg.Add(1) + go func() { + quit: + for { + select { + case <-done: + // after done, underlying io.Writer returns mpb.DoneError + // so following isn't printed + log.Println("all done") + break quit + default: + log.Println("waiting for done") + time.Sleep(100 * time.Millisecond) + } + } + qwg.Done() + }() + + p.Wait() + qwg.Wait() +} diff --git a/progress.go b/progress.go index cda4458..9aacbf2 100644 --- a/progress.go +++ b/progress.go @@ -18,6 +18,9 @@ prr = 150 * time.Millisecond // default RefreshRate ) +// DoneError represents an error when `*mpb.Progress` is done but its functionality is requested. +var DoneError = fmt.Errorf("%T instance can't be reused after it's done!", (*Progress)(nil)) + // Progress represents a container that renders one or more progress bars. type Progress struct { ctx context.Context @@ -25,6 +28,7 @@ cwg *sync.WaitGroup bwg *sync.WaitGroup operateState chan func(*pState) + interceptIo chan func(io.Writer) done chan struct{} refreshCh chan time.Time once sync.Once @@ -83,6 +87,7 @@ cwg: new(sync.WaitGroup), bwg: new(sync.WaitGroup), operateState: make(chan func(*pState)), + interceptIo: make(chan func(io.Writer)), done: make(chan struct{}), } @@ -132,7 +137,7 @@ return bar case <-p.done: p.bwg.Done() - panic(fmt.Sprintf("%T instance can't be reused after it's done!", p)) + panic(DoneError) } } @@ -178,6 +183,29 @@ } } +// Write is implementation of io.Writer. +// Writing to `*mpb.Progress` will print lines above a running bar. +// Writes aren't flushed immediatly, but at next refresh cycle. +// If Write is called after `*mpb.Progress` is done, `mpb.DoneError` +// is returned. +func (p *Progress) Write(b []byte) (int, error) { + type result struct { + n int + err error + } + ch := make(chan *result) + select { + case p.interceptIo <- func(w io.Writer) { + n, err := w.Write(b) + ch <- &result{n, err} + }: + res := <-ch + return res.n, res.err + case <-p.done: + return 0, DoneError + } +} + // Wait waits for all bars to complete and finally shutdowns container. // After this method has been called, there is no way to reuse *Progress // instance. @@ -221,6 +249,8 @@ select { case op := <-p.operateState: op(s) + case fn := <-p.interceptIo: + fn(cw) case <-p.refreshCh: render(s.debugOut) case <-s.shutdownNotifier: