Package list golang-github-go-kit-kit / f3a0f4c
removing Emitter interface and exposing Emitter struct JP Robinson 5 years ago
2 changed file(s) with 19 addition(s) and 33 deletion(s). Raw diff Collapse all Expand all
2424
2525 // Emitter will keep track of all metrics and, once started,
2626 // will emit the metrics via the Flush method to the given address.
27 type Emitter interface {
28 NewCounter(name string) metrics.Counter
29 NewHistogram(name string, min int64, max int64, sigfigs int, quantiles ...int) (metrics.Histogram, error)
30 NewTimeHistogram(name string, unit time.Duration, min int64, max int64, sigfigs int, quantiles ...int) (metrics.TimeHistogram, error)
31 NewGauge(name string) metrics.Gauge
32
33 Start(reportInvterval time.Duration) error
34 Flush()
35 Stop() error
36 }
37
38 type emitter struct {
27 type Emitter struct {
3928 prefix string
4029
4130 addr string
5746 // a TCP or a UDP connection with the given address and periodically post
5847 // metrics to the connection in the Graphite plaintext protocol.
5948 // If the provided `tcp` parameter is false, a UDP connection will be used.
60 func NewEmitter(addr string, tcp bool, metricsPrefix string, logger log.Logger) Emitter {
61 return &emitter{
49 func NewEmitter(addr string, tcp bool, metricsPrefix string, logger log.Logger) *Emitter {
50 return &Emitter{
6251 addr: addr,
6352 tcp: tcp,
6453 stop: make(chan bool),
6958
7059 // NewCounter returns a Counter whose value will be periodically emitted in
7160 // a Graphite-compatible format once the Emitter is started. Fields are ignored.
72 func (e *emitter) NewCounter(name string) metrics.Counter {
61 func (e *Emitter) NewCounter(name string) metrics.Counter {
7362 // only one flush at a time
7463 c := &counter{name, 0}
7564 e.mtx.Lock()
8776 //
8877 // The values of this histogram will be periodically emitted in a Graphite-compatible
8978 // format once the Emitter is started. Fields are ignored.
90 func (e *emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
79 func (e *Emitter) NewHistogram(name string, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.Histogram, error) {
9180 gauges := map[int]metrics.Gauge{}
9281 for _, quantile := range quantiles {
9382 if quantile <= 0 || quantile >= 100 {
10594
10695 // NewTimeHistogram returns a TimeHistogram wrapper around the windowed
10796 // HDR histrogram provided by this package.
108 func (e *emitter) NewTimeHistogram(name string, unit time.Duration, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.TimeHistogram, error) {
97 func (e *Emitter) NewTimeHistogram(name string, unit time.Duration, minValue, maxValue int64, sigfigs int, quantiles ...int) (metrics.TimeHistogram, error) {
10998 h, err := e.NewHistogram(name, minValue, maxValue, sigfigs, quantiles...)
11099 if err != nil {
111100 return nil, err
115104
116105 // NewGauge returns a Gauge whose value will be periodically emitted in
117106 // a Graphite-compatible format once the Emitter is started. Fields are ignored.
118 func (e *emitter) NewGauge(name string) metrics.Gauge {
107 func (e *Emitter) NewGauge(name string) metrics.Gauge {
119108 e.mtx.Lock()
120109 defer e.mtx.Unlock()
121110 return e.gauge(name)
122111 }
123112
124 func (e *emitter) gauge(name string) metrics.Gauge {
113 func (e *Emitter) gauge(name string) metrics.Gauge {
125114 g := &gauge{name, 0}
126115 e.gauges = append(e.gauges, g)
127116 return g
128117 }
129118
130 func (e *emitter) dial() error {
119 func (e *Emitter) dial() error {
131120 if e.tcp {
132121 tAddr, err := net.ResolveTCPAddr("tcp", e.addr)
133122 if err != nil {
152141
153142 // Start will kick off a background goroutine to
154143 // call Flush once every interval.
155 func (e *emitter) Start(interval time.Duration) error {
144 func (e *Emitter) Start(interval time.Duration) error {
156145 var err error
157146 e.start.Do(func() {
158147 err = e.dial()
176165
177166 // Stop will flush the current metrics and close the
178167 // current Graphite connection, if it exists.
179 func (e *emitter) Stop() error {
168 func (e *Emitter) Stop() error {
180169 if e.conn == nil {
181170 return nil
182171 }
192181 return err
193182 }
194183
195 // Flush will attempt to create a connection with the given address
196 // and write the current metrics to it in the Graphite plaintext protocol.
197 //
198 // Users can call this method on process shutdown to ensure
199 // the current metrics are pushed to Graphite.
200 func (e *emitter) Flush() { e.flush(e.conn) }
201
202 func (e *emitter) flush(conn io.Writer) {
184 // Flush will write the current metrics to the Emitter's
185 // connection in the Graphite plaintext protocol.
186 func (e *Emitter) Flush() { e.flush(e.conn) }
187
188 func (e *Emitter) flush(conn io.Writer) {
203189 // only one flush at a time
204190 e.mtx.Lock()
205191 defer e.mtx.Unlock()
2626
2727 // flush the current metrics into a buffer to examine
2828 var b bytes.Buffer
29 e.(*emitter).flush(&b)
29 e.flush(&b)
3030 teststat.AssertGraphiteNormalHistogram(t, prefix, name, mean, stdev, quantiles, b.String())
3131 }
3232
3535 prefix = "prefix"
3636 name = "m"
3737 value = 123
38 e = NewEmitter("", true, prefix, nil).(*emitter)
38 e = NewEmitter("", true, prefix, nil)
3939 b bytes.Buffer
4040 )
4141 e.NewCounter(name).With(metrics.Field{Key: "ignored", Value: "field"}).Add(uint64(value))
6161 g.Set(float64(value))
6262 g.Add(float64(delta))
6363
64 e.(*emitter).flush(&b)
64 e.flush(&b)
6565 payload := b.String()
6666
6767 want := fmt.Sprintf("%s.%s %d", prefix, name, value+delta)