adding error checking on flush with exponential back off
JP Robinson
7 years ago
174 | 174 | return err |
175 | 175 | } |
176 | 176 | |
177 | var ( | |
178 | RetryMax = 10 | |
179 | RetryWait = 2 * time.Millisecond | |
180 | RetryMultiplier = 2 | |
181 | ) | |
182 | ||
177 | 183 | // Flush will write the current metrics to the Emitter's |
178 | 184 | // connection in the Graphite plaintext protocol. |
179 | func (e *Emitter) Flush() { e.flush(e.conn) } | |
180 | ||
181 | func (e *Emitter) flush(conn io.Writer) { | |
185 | func (e *Emitter) Flush() error { | |
182 | 186 | // only one flush at a time |
183 | 187 | e.mtx.Lock() |
184 | 188 | defer e.mtx.Unlock() |
185 | 189 | |
190 | // set the system up to perform a retry loop | |
191 | var err error | |
192 | wait := RetryWait | |
193 | for i := 1; i <= RetryMax; i++ { | |
194 | err = e.flush(e.conn) | |
195 | if err == nil { | |
196 | return nil | |
197 | } | |
198 | e.logger.Log( | |
199 | "err", err, | |
200 | "msg", fmt.Sprintf("unable to flush metrics on attempt %d, waiting %s", i, wait), | |
201 | ) | |
202 | time.Sleep(wait) | |
203 | wait = wait * time.Duration(RetryMultiplier) | |
204 | } | |
205 | // log if we were unable to emit metrics | |
206 | if err != nil { | |
207 | e.logger.Log( | |
208 | "err", err, | |
209 | "msg", fmt.Sprintf("unable to flush metrics after %d attempts. giving up.", RetryMax), | |
210 | ) | |
211 | } | |
212 | return err | |
213 | } | |
214 | ||
215 | func (e *Emitter) flush(conn io.Writer) error { | |
216 | ||
186 | 217 | // buffer the writer and make sure to flush it |
187 | 218 | w := bufio.NewWriter(conn) |
188 | defer w.Flush() | |
189 | 219 | |
190 | 220 | // emit counter stats |
191 | 221 | for _, c := range e.counters { |
207 | 237 | for _, g := range e.gauges { |
208 | 238 | fmt.Fprintf(w, "%s.%s %.2f %d\n", e.prefix, g.Name(), g.Get(), time.Now().Unix()) |
209 | 239 | } |
240 | ||
241 | // check for error | |
242 | return w.Flush() | |
210 | 243 | } |
211 | 244 | |
212 | 245 | type counter struct { |