Codebase list golang-github-go-kit-kit / e9fad41
Make Send/Write Loop context aware (#837) Jose Luis Ordiales Coscia authored 4 years ago Peter Bourgon committed 4 years ago
9 changed file(s) with 96 addition(s) and 53 deletion(s). Raw diff Collapse all Expand all
6767
6868 ```go
6969 import (
70 "context"
7071 "net"
7172 "os"
7273 "runtime"
8081 statsd := statsd.New("foo_svc.", log.NewNopLogger())
8182 report := time.NewTicker(5 * time.Second)
8283 defer report.Stop()
83 go statsd.SendLoop(report.C, "tcp", "statsd.internal:8125")
84 go statsd.SendLoop(context.Background(), report.C, "tcp", "statsd.internal:8125")
8485 goroutines := statsd.NewGauge("goroutine_count")
8586 go exportGoroutines(goroutines)
8687 // ...
00 package cloudwatch
11
22 import (
3 "context"
34 "fmt"
45 "os"
6 "strconv"
57 "sync"
68 "time"
79
1315 "github.com/go-kit/kit/metrics"
1416 "github.com/go-kit/kit/metrics/generic"
1517 "github.com/go-kit/kit/metrics/internal/lv"
16 "strconv"
1718 )
1819
1920 const (
135136 }
136137
137138 // WriteLoop is a helper method that invokes Send every time the passed
138 // channel fires. This method blocks until the channel is closed, so clients
139 // channel fires. This method blocks until ctx is canceled, so clients
139140 // probably want to run it in its own goroutine. For typical usage, create a
140141 // time.Ticker and pass its C channel to this method.
141 func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
142 for range c {
143 if err := cw.Send(); err != nil {
144 cw.logger.Log("during", "Send", "err", err)
142 func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) {
143 for {
144 select {
145 case <-c:
146 if err := cw.Send(); err != nil {
147 cw.logger.Log("during", "Send", "err", err)
148 }
149 case <-ctx.Done():
150 return
145151 }
146152 }
147153 }
22 package cloudwatch2
33
44 import (
5 "context"
56 "math"
67 "sync"
78 "time"
106107 }
107108
108109 // WriteLoop is a helper method that invokes Send every time the passed
109 // channel fires. This method blocks until the channel is closed, so clients
110 // channel fires. This method blocks until ctx is canceled, so clients
110111 // probably want to run it in its own goroutine. For typical usage, create a
111112 // time.Ticker and pass its C channel to this method.
112 func (cw *CloudWatch) WriteLoop(c <-chan time.Time) {
113 for range c {
114 if err := cw.Send(); err != nil {
115 cw.logger.Log("during", "Send", "err", err)
113 func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) {
114 for {
115 select {
116 case <-c:
117 if err := cw.Send(); err != nil {
118 cw.logger.Log("during", "Send", "err", err)
119 }
120 case <-ctx.Done():
121 return
116122 }
117123 }
118124 }
1010 package dogstatsd
1111
1212 import (
13 "context"
1314 "fmt"
1415 "io"
1516 "strings"
108109 }
109110
110111 // WriteLoop is a helper method that invokes WriteTo to the passed writer every
111 // time the passed channel fires. This method blocks until the channel is
112 // closed, so clients probably want to run it in its own goroutine. For typical
112 // time the passed channel fires. This method blocks until ctx is canceled,
113 // so clients probably want to run it in its own goroutine. For typical
113114 // usage, create a time.Ticker and pass its C channel to this method.
114 func (d *Dogstatsd) WriteLoop(c <-chan time.Time, w io.Writer) {
115 for range c {
116 if _, err := d.WriteTo(w); err != nil {
117 d.logger.Log("during", "WriteTo", "err", err)
115 func (d *Dogstatsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
116 for {
117 select {
118 case <-c:
119 if _, err := d.WriteTo(w); err != nil {
120 d.logger.Log("during", "WriteTo", "err", err)
121 }
122 case <-ctx.Done():
123 return
118124 }
119125 }
120126 }
121127
122128 // SendLoop is a helper method that wraps WriteLoop, passing a managed
123129 // connection to the network and address. Like WriteLoop, this method blocks
124 // until the channel is closed, so clients probably want to start it in its own
130 // until ctx is canceled, so clients probably want to start it in its own
125131 // goroutine. For typical usage, create a time.Ticker and pass its C channel to
126132 // this method.
127 func (d *Dogstatsd) SendLoop(c <-chan time.Time, network, address string) {
128 d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger))
133 func (d *Dogstatsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
134 d.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, d.logger))
129135 }
130136
131137 // WriteTo flushes the buffered content of the metrics to the writer, in
77 package graphite
88
99 import (
10 "context"
1011 "fmt"
1112 "io"
1213 "sync"
8283 }
8384
8485 // WriteLoop is a helper method that invokes WriteTo to the passed writer every
85 // time the passed channel fires. This method blocks until the channel is
86 // closed, so clients probably want to run it in its own goroutine. For typical
86 // time the passed channel fires. This method blocks until ctx is canceled,
87 // so clients probably want to run it in its own goroutine. For typical
8788 // usage, create a time.Ticker and pass its C channel to this method.
88 func (g *Graphite) WriteLoop(c <-chan time.Time, w io.Writer) {
89 for range c {
90 if _, err := g.WriteTo(w); err != nil {
91 g.logger.Log("during", "WriteTo", "err", err)
89 func (g *Graphite) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
90 for {
91 select {
92 case <-c:
93 if _, err := g.WriteTo(w); err != nil {
94 g.logger.Log("during", "WriteTo", "err", err)
95 }
96 case <-ctx.Done():
97 return
9298 }
9399 }
94100 }
95101
96102 // SendLoop is a helper method that wraps WriteLoop, passing a managed
97103 // connection to the network and address. Like WriteLoop, this method blocks
98 // until the channel is closed, so clients probably want to start it in its own
104 // until ctx is canceled, so clients probably want to start it in its own
99105 // goroutine. For typical usage, create a time.Ticker and pass its C channel to
100106 // this method.
101 func (g *Graphite) SendLoop(c <-chan time.Time, network, address string) {
102 g.WriteLoop(c, conn.NewDefaultManager(network, address, g.logger))
107 func (g *Graphite) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
108 g.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, g.logger))
103109 }
104110
105111 // WriteTo flushes the buffered content of the metrics to the writer, in
33 package influx
44
55 import (
6 "context"
67 "time"
78
89 influxdb "github.com/influxdata/influxdb1-client/v2"
8788 // time the passed channel fires. This method blocks until the channel is
8889 // closed, so clients probably want to run it in its own goroutine. For typical
8990 // usage, create a time.Ticker and pass its C channel to this method.
90 func (in *Influx) WriteLoop(c <-chan time.Time, w BatchPointsWriter) {
91 for range c {
92 if err := in.WriteTo(w); err != nil {
93 in.logger.Log("during", "WriteTo", "err", err)
91 func (in *Influx) WriteLoop(ctx context.Context, c <-chan time.Time, w BatchPointsWriter) {
92 for {
93 select {
94 case <-c:
95 if err := in.WriteTo(w); err != nil {
96 in.logger.Log("during", "WriteTo", "err", err)
97 }
98 case <-ctx.Done():
99 return
94100 }
95101 }
96102 }
1010 package influxstatsd
1111
1212 import (
13 "context"
1314 "fmt"
1415 "io"
1516 "strings"
108109 }
109110
110111 // WriteLoop is a helper method that invokes WriteTo to the passed writer every
111 // time the passed channel fires. This method blocks until the channel is
112 // closed, so clients probably want to run it in its own goroutine. For typical
112 // time the passed channel fires. This method blocks until ctx is canceled,
113 // so clients probably want to run it in its own goroutine. For typical
113114 // usage, create a time.Ticker and pass its C channel to this method.
114 func (d *Influxstatsd) WriteLoop(c <-chan time.Time, w io.Writer) {
115 for range c {
116 if _, err := d.WriteTo(w); err != nil {
117 d.logger.Log("during", "WriteTo", "err", err)
115 func (d *Influxstatsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
116 for {
117 select {
118 case <-c:
119 if _, err := d.WriteTo(w); err != nil {
120 d.logger.Log("during", "WriteTo", "err", err)
121 }
122 case <-ctx.Done():
123 return
118124 }
119125 }
120126 }
121127
122128 // SendLoop is a helper method that wraps WriteLoop, passing a managed
123129 // connection to the network and address. Like WriteLoop, this method blocks
124 // until the channel is closed, so clients probably want to start it in its own
130 // until ctx is canceled, so clients probably want to start it in its own
125131 // goroutine. For typical usage, create a time.Ticker and pass its C channel to
126132 // this method.
127 func (d *Influxstatsd) SendLoop(c <-chan time.Time, network, address string) {
128 d.WriteLoop(c, conn.NewDefaultManager(network, address, d.logger))
133 func (d *Influxstatsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
134 d.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, d.logger))
129135 }
130136
131137 // WriteTo flushes the buffered content of the metrics to the writer, in
1313 // case "statsd":
1414 // s := statsd.New(...)
1515 // t := time.NewTicker(5*time.Second)
16 // go s.SendLoop(t.C, "tcp", "statsd.local:8125")
16 // go s.SendLoop(ctx, t.C, "tcp", "statsd.local:8125")
1717 // latency = s.NewHistogram(...)
1818 // requests = s.NewCounter(...)
1919 // default:
88 package statsd
99
1010 import (
11 "context"
1112 "fmt"
1213 "io"
1314 "time"
8889 }
8990
9091 // WriteLoop is a helper method that invokes WriteTo to the passed writer every
91 // time the passed channel fires. This method blocks until the channel is
92 // closed, so clients probably want to run it in its own goroutine. For typical
92 // time the passed channel fires. This method blocks until ctx is canceled,
93 // so clients probably want to run it in its own goroutine. For typical
9394 // usage, create a time.Ticker and pass its C channel to this method.
94 func (s *Statsd) WriteLoop(c <-chan time.Time, w io.Writer) {
95 for range c {
96 if _, err := s.WriteTo(w); err != nil {
97 s.logger.Log("during", "WriteTo", "err", err)
95 func (s *Statsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
96 for {
97 select {
98 case <-c:
99 if _, err := s.WriteTo(w); err != nil {
100 s.logger.Log("during", "WriteTo", "err", err)
101 }
102 case <-ctx.Done():
103 return
98104 }
99105 }
100106 }
101107
102108 // SendLoop is a helper method that wraps WriteLoop, passing a managed
103109 // connection to the network and address. Like WriteLoop, this method blocks
104 // until the channel is closed, so clients probably want to start it in its own
110 // until ctx is canceled, so clients probably want to start it in its own
105111 // goroutine. For typical usage, create a time.Ticker and pass its C channel to
106112 // this method.
107 func (s *Statsd) SendLoop(c <-chan time.Time, network, address string) {
108 s.WriteLoop(c, conn.NewDefaultManager(network, address, s.logger))
113 func (s *Statsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
114 s.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, s.logger))
109115 }
110116
111117 // WriteTo flushes the buffered content of the metrics to the writer, in