diff --git a/metrics/README.md b/metrics/README.md index fd3347e..5aa791a 100644 --- a/metrics/README.md +++ b/metrics/README.md @@ -68,6 +68,7 @@ ```go import ( + "context" "net" "os" "runtime" @@ -81,7 +82,7 @@ statsd := statsd.New("foo_svc.", log.NewNopLogger()) report := time.NewTicker(5 * time.Second) defer report.Stop() - go statsd.SendLoop(report.C, "tcp", "statsd.internal:8125") + go statsd.SendLoop(context.Background(), report.C, "tcp", "statsd.internal:8125") goroutines := statsd.NewGauge("goroutine_count") go exportGoroutines(goroutines) // ... diff --git a/metrics/cloudwatch/cloudwatch.go b/metrics/cloudwatch/cloudwatch.go index 4322d4c..4bda287 100644 --- a/metrics/cloudwatch/cloudwatch.go +++ b/metrics/cloudwatch/cloudwatch.go @@ -1,8 +1,10 @@ package cloudwatch import ( + "context" "fmt" "os" + "strconv" "sync" "time" @@ -14,7 +16,6 @@ "github.com/go-kit/kit/metrics" "github.com/go-kit/kit/metrics/generic" "github.com/go-kit/kit/metrics/internal/lv" - "strconv" ) const ( @@ -136,13 +137,18 @@ } // WriteLoop is a helper method that invokes Send every time the passed -// channel fires. This method blocks until the channel is closed, so clients +// channel fires. This method blocks until ctx is canceled, so clients // probably want to run it in its own goroutine. For typical usage, create a // time.Ticker and pass its C channel to this method. -func (cw *CloudWatch) WriteLoop(c <-chan time.Time) { - for range c { - if err := cw.Send(); err != nil { - cw.logger.Log("during", "Send", "err", err) +func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) { + for { + select { + case <-c: + if err := cw.Send(); err != nil { + cw.logger.Log("during", "Send", "err", err) + } + case <-ctx.Done(): + return } } } diff --git a/metrics/cloudwatch2/cloudwatch2.go b/metrics/cloudwatch2/cloudwatch2.go index 5204764..03bf68d 100644 --- a/metrics/cloudwatch2/cloudwatch2.go +++ b/metrics/cloudwatch2/cloudwatch2.go @@ -3,6 +3,7 @@ package cloudwatch2 import ( + "context" "math" "sync" "time" @@ -107,13 +108,18 @@ } // WriteLoop is a helper method that invokes Send every time the passed -// channel fires. This method blocks until the channel is closed, so clients +// channel fires. This method blocks until ctx is canceled, so clients // probably want to run it in its own goroutine. For typical usage, create a // time.Ticker and pass its C channel to this method. -func (cw *CloudWatch) WriteLoop(c <-chan time.Time) { - for range c { - if err := cw.Send(); err != nil { - cw.logger.Log("during", "Send", "err", err) +func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) { + for { + select { + case <-c: + if err := cw.Send(); err != nil { + cw.logger.Log("during", "Send", "err", err) + } + case <-ctx.Done(): + return } } } diff --git a/metrics/dogstatsd/dogstatsd.go b/metrics/dogstatsd/dogstatsd.go index ccdcd57..bb2f003 100644 --- a/metrics/dogstatsd/dogstatsd.go +++ b/metrics/dogstatsd/dogstatsd.go @@ -11,6 +11,7 @@ package dogstatsd import ( + "context" "fmt" "io" "strings" @@ -109,24 +110,29 @@ } // WriteLoop is a helper method that invokes WriteTo to the passed writer every -// time the passed channel fires. This method blocks until the channel is -// closed, so clients probably want to run it in its own goroutine. For typical +// time the passed channel fires. This method blocks until ctx is canceled, +// so clients probably want to run it in its own goroutine. For typical // usage, create a time.Ticker and pass its C channel to this method. -func (d *Dogstatsd) WriteLoop(c <-chan time.Time, w io.Writer) { - for range c { - if _, err := d.WriteTo(w); err != nil { - d.logger.Log("during", "WriteTo", "err", err) +func (d *Dogstatsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) { + for { + select { + case <-c: + if _, err := d.WriteTo(w); err != nil { + d.logger.Log("during", "WriteTo", "err", err) + } + case <-ctx.Done(): + return } } } // SendLoop is a helper method that wraps WriteLoop, passing a managed // connection to the network and address. Like WriteLoop, this method blocks -// until the channel is closed, so clients probably want to start it in its own +// until ctx is canceled, so clients probably want to start it in its own // goroutine. For typical usage, create a time.Ticker and pass its C channel to // this method. -func (d *Dogstatsd) SendLoop(c <-chan time.Time, network, address string) { - d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger)) +func (d *Dogstatsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) { + d.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, d.logger)) } // WriteTo flushes the buffered content of the metrics to the writer, in diff --git a/metrics/graphite/graphite.go b/metrics/graphite/graphite.go index 9814e1f..ddb05c2 100644 --- a/metrics/graphite/graphite.go +++ b/metrics/graphite/graphite.go @@ -8,6 +8,7 @@ package graphite import ( + "context" "fmt" "io" "sync" @@ -83,24 +84,29 @@ } // WriteLoop is a helper method that invokes WriteTo to the passed writer every -// time the passed channel fires. This method blocks until the channel is -// closed, so clients probably want to run it in its own goroutine. For typical +// time the passed channel fires. This method blocks until ctx is canceled, +// so clients probably want to run it in its own goroutine. For typical // usage, create a time.Ticker and pass its C channel to this method. -func (g *Graphite) WriteLoop(c <-chan time.Time, w io.Writer) { - for range c { - if _, err := g.WriteTo(w); err != nil { - g.logger.Log("during", "WriteTo", "err", err) +func (g *Graphite) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) { + for { + select { + case <-c: + if _, err := g.WriteTo(w); err != nil { + g.logger.Log("during", "WriteTo", "err", err) + } + case <-ctx.Done(): + return } } } // SendLoop is a helper method that wraps WriteLoop, passing a managed // connection to the network and address. Like WriteLoop, this method blocks -// until the channel is closed, so clients probably want to start it in its own +// until ctx is canceled, so clients probably want to start it in its own // goroutine. For typical usage, create a time.Ticker and pass its C channel to // this method. -func (g *Graphite) SendLoop(c <-chan time.Time, network, address string) { - g.WriteLoop(c, conn.NewDefaultManager(network, address, g.logger)) +func (g *Graphite) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) { + g.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, g.logger)) } // WriteTo flushes the buffered content of the metrics to the writer, in diff --git a/metrics/influx/influx.go b/metrics/influx/influx.go index f902170..a73ffd7 100644 --- a/metrics/influx/influx.go +++ b/metrics/influx/influx.go @@ -4,6 +4,7 @@ package influx import ( + "context" "time" influxdb "github.com/influxdata/influxdb1-client/v2" @@ -88,10 +89,15 @@ // time the passed channel fires. This method blocks until the channel is // closed, so clients probably want to run it in its own goroutine. For typical // usage, create a time.Ticker and pass its C channel to this method. -func (in *Influx) WriteLoop(c <-chan time.Time, w BatchPointsWriter) { - for range c { - if err := in.WriteTo(w); err != nil { - in.logger.Log("during", "WriteTo", "err", err) +func (in *Influx) WriteLoop(ctx context.Context, c <-chan time.Time, w BatchPointsWriter) { + for { + select { + case <-c: + if err := in.WriteTo(w); err != nil { + in.logger.Log("during", "WriteTo", "err", err) + } + case <-ctx.Done(): + return } } } diff --git a/metrics/influxstatsd/influxstatsd.go b/metrics/influxstatsd/influxstatsd.go index a036345..2efe068 100644 --- a/metrics/influxstatsd/influxstatsd.go +++ b/metrics/influxstatsd/influxstatsd.go @@ -11,6 +11,7 @@ package influxstatsd import ( + "context" "fmt" "io" "strings" @@ -109,24 +110,29 @@ } // WriteLoop is a helper method that invokes WriteTo to the passed writer every -// time the passed channel fires. This method blocks until the channel is -// closed, so clients probably want to run it in its own goroutine. For typical +// time the passed channel fires. This method blocks until ctx is canceled, +// so clients probably want to run it in its own goroutine. For typical // usage, create a time.Ticker and pass its C channel to this method. -func (d *Influxstatsd) WriteLoop(c <-chan time.Time, w io.Writer) { - for range c { - if _, err := d.WriteTo(w); err != nil { - d.logger.Log("during", "WriteTo", "err", err) +func (d *Influxstatsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) { + for { + select { + case <-c: + if _, err := d.WriteTo(w); err != nil { + d.logger.Log("during", "WriteTo", "err", err) + } + case <-ctx.Done(): + return } } } // SendLoop is a helper method that wraps WriteLoop, passing a managed // connection to the network and address. Like WriteLoop, this method blocks -// until the channel is closed, so clients probably want to start it in its own +// until ctx is canceled, so clients probably want to start it in its own // goroutine. For typical usage, create a time.Ticker and pass its C channel to // this method. -func (d *Influxstatsd) SendLoop(c <-chan time.Time, network, address string) { - d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger)) +func (d *Influxstatsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) { + d.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, d.logger)) } // WriteTo flushes the buffered content of the metrics to the writer, in diff --git a/metrics/provider/provider.go b/metrics/provider/provider.go index d35d66a..ea1c99a 100644 --- a/metrics/provider/provider.go +++ b/metrics/provider/provider.go @@ -14,7 +14,7 @@ // case "statsd": // s := statsd.New(...) // t := time.NewTicker(5*time.Second) -// go s.SendLoop(t.C, "tcp", "statsd.local:8125") +// go s.SendLoop(ctx, t.C, "tcp", "statsd.local:8125") // latency = s.NewHistogram(...) // requests = s.NewCounter(...) // default: diff --git a/metrics/statsd/statsd.go b/metrics/statsd/statsd.go index 8dfbf6f..d2cfbc7 100644 --- a/metrics/statsd/statsd.go +++ b/metrics/statsd/statsd.go @@ -9,6 +9,7 @@ package statsd import ( + "context" "fmt" "io" "time" @@ -89,24 +90,29 @@ } // WriteLoop is a helper method that invokes WriteTo to the passed writer every -// time the passed channel fires. This method blocks until the channel is -// closed, so clients probably want to run it in its own goroutine. For typical +// time the passed channel fires. This method blocks until ctx is canceled, +// so clients probably want to run it in its own goroutine. For typical // usage, create a time.Ticker and pass its C channel to this method. -func (s *Statsd) WriteLoop(c <-chan time.Time, w io.Writer) { - for range c { - if _, err := s.WriteTo(w); err != nil { - s.logger.Log("during", "WriteTo", "err", err) +func (s *Statsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) { + for { + select { + case <-c: + if _, err := s.WriteTo(w); err != nil { + s.logger.Log("during", "WriteTo", "err", err) + } + case <-ctx.Done(): + return } } } // SendLoop is a helper method that wraps WriteLoop, passing a managed // connection to the network and address. Like WriteLoop, this method blocks -// until the channel is closed, so clients probably want to start it in its own +// until ctx is canceled, so clients probably want to start it in its own // goroutine. For typical usage, create a time.Ticker and pass its C channel to // this method. -func (s *Statsd) SendLoop(c <-chan time.Time, network, address string) { - s.WriteLoop(c, conn.NewDefaultManager(network, address, s.logger)) +func (s *Statsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) { + s.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, s.logger)) } // WriteTo flushes the buffered content of the metrics to the writer, in