Codebase list golang-github-influxdata-line-protocol / 5e9704c
New upstream version 0.0~git20181118.934b9e6 Alexandre Viau 5 years ago
8 changed file(s) with 977 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
0 version: 2
1 jobs:
2 build:
3 docker:
4 # specify the version
5 - image: circleci/golang:1.10.2
6
7 working_directory: /go/src/github.com/influxdata/line-protocol
8 steps:
9 - checkout
10
11 - run: go get -v -t -d ./...
12 - run: go get honnef.co/go/tools/cmd/megacheck
13 - run: go vet -v ./...
14 - run: go test -v ./...
15 - run: megacheck ./...
0 # Test binary, build with `go test -c`
1 *.test
2
3 # Output of the go coverage tool, specifically when used with LiteIDE
4 *.out
0 The MIT License (MIT)
1
2 Copyright (c) 2013-2018 InfluxData Inc.
3
4 Permission is hereby granted, free of charge, to any person obtaining a copy of
5 this software and associated documentation files (the "Software"), to deal in
6 the Software without restriction, including without limitation the rights to
7 use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8 the Software, and to permit persons to whom the Software is furnished to do so,
9 subject to the following conditions:
10
11 The above copyright notice and this permission notice shall be included in all
12 copies or substantial portions of the Software.
13
14 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16 FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17 COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18 IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19 CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
0 # line-protocol
0 package protocol
1
2 import (
3 "fmt"
4 "io"
5 "math"
6 "sort"
7 "strconv"
8 )
9
10 // Encoder marshals Metrics into influxdb line protocol.
11 // It is not safe for concurrent use, make a new one!
12 // The default behavior when encountering a field error is to ignore the field and move on.
13 // If you wish it to error out on field errors, use Encoder.FailOnFieldErr(true)
14 type Encoder struct {
15 w io.Writer
16 fieldSortOrder FieldSortOrder
17 fieldTypeSupport FieldTypeSupport
18 failOnFieldError bool
19 maxLineBytes int
20 fieldList []*Field
21 header []byte
22 footer []byte
23 pair []byte
24 }
25
26 // SetMaxLineBytes sets a maximum length for a line, Encode will error if the generated line is longer
27 func (e *Encoder) SetMaxLineBytes(i int) {
28 e.maxLineBytes = i
29 }
30
31 // SetFieldSortOrder sets a sort order for the data.
32 // The options are:
33 // NoSortFields (doesn't sort the fields)
34 // SortFields (sorts the keys in alphabetical order)
35 func (e *Encoder) SetFieldSortOrder(s FieldSortOrder) {
36 e.fieldSortOrder = s
37 }
38
39 // SetFieldTypeSupport sets flags for if the encoder supports certain optional field types such as uint64
40 func (e *Encoder) SetFieldTypeSupport(s FieldTypeSupport) {
41 e.fieldTypeSupport = s
42 }
43
44 // FailOnFieldErr whether or not to fail on a field error or just move on.
45 // The default behavior to move on
46 func (e *Encoder) FailOnFieldErr(s bool) {
47 e.failOnFieldError = s
48 }
49
50 // NewEncoder gives us an encoder that marshals to a writer in influxdb line protocol
51 // as defined by:
52 // https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/
53 func NewEncoder(w io.Writer) *Encoder {
54 return &Encoder{
55 w: w,
56 header: make([]byte, 0, 128),
57 footer: make([]byte, 0, 128),
58 pair: make([]byte, 0, 128),
59 fieldList: make([]*Field, 0, 16),
60 }
61 }
62
63 // This is here to significantly reduce allocations, wish that we had constant/immutable keyword that applied to
64 // more complex objects
65 var comma = []byte(",")
66
67 // Encode marshals a Metric to the io.Writer in the Encoder
68 func (e *Encoder) Encode(m Metric) (int, error) {
69 err := e.buildHeader(m)
70 if err != nil {
71 return 0, err
72 }
73
74 e.buildFooter(m)
75
76 // here we make a copy of the fields so we can do an in-place sort
77 e.fieldList = append(e.fieldList[:0], m.FieldList()...)
78
79 if e.fieldSortOrder == SortFields {
80 sort.Slice(e.fieldList, func(i, j int) bool {
81 return e.fieldList[i].Key < e.fieldList[j].Key
82 })
83 }
84 i := 0
85 totalWritten := 0
86 pairsLen := 0
87 firstField := true
88 for _, field := range e.fieldList {
89 err = e.buildFieldPair(field.Key, field.Value)
90 if err != nil {
91 if e.failOnFieldError {
92 return 0, err
93 }
94 continue
95 }
96
97 bytesNeeded := len(e.header) + pairsLen + len(e.pair) + len(e.footer)
98
99 // Additional length needed for field separator `,`
100 if !firstField {
101 bytesNeeded++
102 }
103
104 if e.maxLineBytes > 0 && bytesNeeded > e.maxLineBytes {
105 // Need at least one field per line
106 if firstField {
107 return 0, ErrNeedMoreSpace
108 }
109
110 i, err = e.w.Write(e.footer)
111 if err != nil {
112 return 0, err
113 }
114 totalWritten += i
115
116 bytesNeeded = len(e.header) + len(e.pair) + len(e.footer)
117
118 if e.maxLineBytes > 0 && bytesNeeded > e.maxLineBytes {
119 return 0, ErrNeedMoreSpace
120 }
121
122 i, err = e.w.Write(e.header)
123 if err != nil {
124 return 0, err
125 }
126 totalWritten += i
127
128 i, err = e.w.Write(e.pair)
129 if err != nil {
130 return 0, err
131 }
132 totalWritten += i
133
134 pairsLen += len(e.pair)
135 firstField = false
136 continue
137 }
138
139 if firstField {
140 i, err = e.w.Write(e.header)
141 if err != nil {
142 return 0, err
143 }
144 totalWritten += i
145
146 } else {
147 i, err = e.w.Write(comma)
148 if err != nil {
149 return 0, err
150 }
151 totalWritten += i
152
153 }
154
155 e.w.Write(e.pair)
156
157 pairsLen += len(e.pair)
158 firstField = false
159 }
160
161 if firstField {
162 return 0, ErrNoFields
163 }
164 i, err = e.w.Write(e.footer)
165 if err != nil {
166 return 0, err
167 }
168 totalWritten += i
169 return totalWritten, nil
170
171 }
172
173 func (e *Encoder) buildHeader(m Metric) error {
174 e.header = e.header[:0]
175 name := nameEscape(m.Name())
176 if name == "" {
177 return ErrInvalidName
178 }
179 e.header = append(e.header, name...)
180
181 for _, tag := range m.TagList() {
182 key := escape(tag.Key)
183 value := escape(tag.Value)
184
185 // Some keys and values are not encodeable as line protocol, such as
186 // those with a trailing '\' or empty strings.
187 if key == "" || value == "" {
188 continue
189 }
190
191 e.header = append(e.header, ',')
192 e.header = append(e.header, key...)
193 e.header = append(e.header, '=')
194 e.header = append(e.header, value...)
195 }
196
197 e.header = append(e.header, ' ')
198 return nil
199 }
200
201 func (e *Encoder) buildFieldPair(key string, value interface{}) error {
202 e.pair = e.pair[:0]
203 key = escape(key)
204 // Some keys are not encodeable as line protocol, such as those with a
205 // trailing '\' or empty strings.
206 if key == "" {
207 return &FieldError{"invalid field key"}
208 }
209 e.pair = append(e.pair, key...)
210 e.pair = append(e.pair, '=')
211 switch v := value.(type) {
212 case uint64:
213 if e.fieldTypeSupport&UintSupport != 0 {
214 e.pair = append(strconv.AppendUint(e.pair, v, 10), 'u')
215 } else if v <= uint64(math.MaxInt64) {
216 e.pair = append(strconv.AppendInt(e.pair, int64(v), 10), 'i')
217 } else {
218 e.pair = append(strconv.AppendInt(e.pair, math.MaxInt64, 10), 'i')
219 }
220 case int64:
221 e.pair = append(strconv.AppendInt(e.pair, v, 10), 'i')
222 case int:
223 e.pair = append(strconv.AppendInt(e.pair, int64(v), 10), 'i')
224 case float64:
225 if math.IsNaN(v) {
226 return &FieldError{"is NaN"}
227 }
228
229 if math.IsInf(v, 0) {
230 return &FieldError{"is Inf"}
231 }
232
233 e.pair = strconv.AppendFloat(e.pair, v, 'f', -1, 64)
234 case float32:
235 v32 := float64(v)
236 if math.IsNaN(v32) {
237 return &FieldError{"is NaN"}
238 }
239
240 if math.IsInf(v32, 0) {
241 return &FieldError{"is Inf"}
242 }
243
244 e.pair = strconv.AppendFloat(e.pair, v32, 'f', -1, 64)
245
246 case string:
247 e.pair = append(e.pair, '"')
248 e.pair = append(e.pair, stringFieldEscape(v)...)
249 e.pair = append(e.pair, '"')
250 case bool:
251 e.pair = strconv.AppendBool(e.pair, v)
252 default:
253 return &FieldError{fmt.Sprintf("invalid value type: %T", v)}
254 }
255 return nil
256 }
257
258 func (e *Encoder) buildFooter(m Metric) {
259 e.footer = e.footer[:0]
260 e.footer = append(e.footer, ' ')
261 e.footer = strconv.AppendInt(e.footer, m.Time().UnixNano(), 10)
262 e.footer = append(e.footer, '\n')
263 }
0 package protocol
1
2 import (
3 "bytes"
4 "math"
5 "sort"
6 "testing"
7 "time"
8
9 "github.com/stretchr/testify/require"
10 )
11
12 type MockMetric struct {
13 name string
14 tags []*Tag
15 fields []*Field
16 t time.Time
17 }
18
19 func (m *MockMetric) Name() string {
20 return m.name
21 }
22 func (m *MockMetric) TagList() []*Tag {
23 return m.tags
24 }
25
26 func (m *MockMetric) FieldList() []*Field {
27 return m.fields
28 }
29
30 func (m *MockMetric) Time() time.Time {
31 return m.t
32 }
33
34 func (m *MockMetric) AddTag(k, v string) {
35 for i, tag := range m.tags {
36 if k == tag.Key {
37 m.tags[i].Value = v
38 return
39 }
40 }
41 m.tags = append(m.tags, &Tag{Key: k, Value: v})
42 }
43
44 func (m *MockMetric) AddField(k string, v interface{}) {
45 for i, field := range m.fields {
46 if k == field.Key {
47 m.fields[i].Value = v
48 return
49 }
50 }
51 m.fields = append(m.fields, &Field{Key: k, Value: convertField(v)})
52 }
53
54 func convertField(v interface{}) interface{} {
55 switch v := v.(type) {
56 case float64:
57 return v
58 case int64:
59 return v
60 case string:
61 return v
62 case bool:
63 return v
64 case int:
65 return int64(v)
66 case uint:
67 return uint64(v)
68 case uint64:
69 return uint64(v)
70 case []byte:
71 return string(v)
72 case int32:
73 return int64(v)
74 case int16:
75 return int64(v)
76 case int8:
77 return int64(v)
78 case uint32:
79 return uint64(v)
80 case uint16:
81 return uint64(v)
82 case uint8:
83 return uint64(v)
84 case float32:
85 return float64(v)
86 default:
87 return nil
88 }
89 }
90
91 func NewMockMetric(name string,
92 tags map[string]string,
93 fields map[string]interface{},
94 tm time.Time,
95 ) Metric {
96 // var vtype telegraf.ValueType
97 // if len(tp) > 0 {
98 // vtype = tp[0]
99 // } else {
100 // vtype = telegraf.Untyped
101 // }
102 m := &MockMetric{
103 name: name,
104 tags: nil,
105 fields: nil,
106 t: tm,
107 }
108
109 if len(tags) > 0 {
110 m.tags = make([]*Tag, 0, len(tags))
111 for k, v := range tags {
112 m.tags = append(m.tags,
113 &Tag{Key: k, Value: v})
114 }
115 sort.Slice(m.tags, func(i, j int) bool { return m.tags[i].Key < m.tags[j].Key })
116 }
117
118 m.fields = make([]*Field, 0, len(fields))
119 for k, v := range fields {
120 v := convertField(v)
121 if v == nil {
122 continue
123 }
124 m.AddField(k, v)
125 }
126
127 return m
128 }
129
130 var tests = []struct {
131 name string
132 maxBytes int
133 typeSupport FieldTypeSupport
134 input Metric
135 output []byte
136 failOnFieldErr bool
137 err error
138 }{
139 {
140 name: "minimal",
141 input: NewMockMetric(
142 "cpu",
143 map[string]string{},
144 map[string]interface{}{
145 "value": 42.0,
146 },
147 time.Unix(0, 0),
148 ),
149 output: []byte("cpu value=42 0\n"),
150 },
151 {
152 name: "multiple tags",
153 input: NewMockMetric(
154 "cpu",
155 map[string]string{
156 "host": "localhost",
157 "cpu": "CPU0",
158 },
159 map[string]interface{}{
160 "value": 42.0,
161 },
162 time.Unix(0, 0),
163 ),
164 output: []byte("cpu,cpu=CPU0,host=localhost value=42 0\n"),
165 },
166 {
167 name: "multiple fields",
168 input: NewMockMetric(
169 "cpu",
170 map[string]string{},
171 map[string]interface{}{
172 "x": 42.0,
173 "y": 42.0,
174 },
175 time.Unix(0, 0),
176 ),
177 output: []byte("cpu x=42,y=42 0\n"),
178 },
179 {
180 name: "float NaN",
181 input: NewMockMetric(
182 "cpu",
183 map[string]string{},
184 map[string]interface{}{
185 "x": math.NaN(),
186 "y": 42,
187 },
188 time.Unix(0, 0),
189 ),
190 output: []byte("cpu y=42i 0\n"),
191 },
192 {
193 name: "float NaN only",
194 input: NewMockMetric(
195 "cpu",
196 map[string]string{},
197 map[string]interface{}{
198 "value": math.NaN(),
199 },
200 time.Unix(0, 0),
201 ),
202 err: ErrNoFields,
203 },
204 {
205 name: "float Inf",
206 input: NewMockMetric(
207 "cpu",
208 map[string]string{},
209 map[string]interface{}{
210 "value": math.Inf(1),
211 "y": 42,
212 },
213 time.Unix(0, 0),
214 ),
215 output: []byte("cpu y=42i 0\n"),
216 },
217 {
218 name: "integer field",
219 input: NewMockMetric(
220 "cpu",
221 map[string]string{},
222 map[string]interface{}{
223 "value": 42,
224 },
225 time.Unix(0, 0),
226 ),
227 output: []byte("cpu value=42i 0\n"),
228 },
229 {
230 name: "integer field 64-bit",
231 input: NewMockMetric(
232 "cpu",
233 map[string]string{},
234 map[string]interface{}{
235 "value": int64(123456789012345),
236 },
237 time.Unix(0, 0),
238 ),
239 output: []byte("cpu value=123456789012345i 0\n"),
240 },
241 {
242 name: "uint field",
243 input: NewMockMetric(
244 "cpu",
245 map[string]string{},
246 map[string]interface{}{
247 "value": uint64(42),
248 },
249 time.Unix(0, 0),
250 ),
251 output: []byte("cpu value=42u 0\n"),
252 typeSupport: UintSupport,
253 },
254 {
255 name: "uint field max value",
256 input: NewMockMetric(
257 "cpu",
258 map[string]string{},
259 map[string]interface{}{
260 "value": uint64(18446744073709551615),
261 },
262 time.Unix(0, 0),
263 ),
264 output: []byte("cpu value=18446744073709551615u 0\n"),
265 typeSupport: UintSupport,
266 },
267 {
268 name: "uint field no uint support",
269 input: NewMockMetric(
270 "cpu",
271 map[string]string{},
272 map[string]interface{}{
273 "value": uint64(42),
274 },
275 time.Unix(0, 0),
276 ),
277 output: []byte("cpu value=42i 0\n"),
278 },
279 {
280 name: "uint field no uint support overflow",
281 input: NewMockMetric(
282 "cpu",
283 map[string]string{},
284 map[string]interface{}{
285 "value": uint64(18446744073709551615),
286 },
287 time.Unix(0, 0),
288 ),
289 output: []byte("cpu value=9223372036854775807i 0\n"),
290 },
291 {
292 name: "bool field",
293 input: NewMockMetric(
294 "cpu",
295 map[string]string{},
296 map[string]interface{}{
297 "value": true,
298 },
299 time.Unix(0, 0),
300 ),
301 output: []byte("cpu value=true 0\n"),
302 },
303 {
304 name: "string field",
305 input: NewMockMetric(
306 "cpu",
307 map[string]string{},
308 map[string]interface{}{
309 "value": "howdy",
310 },
311 time.Unix(0, 0),
312 ),
313 output: []byte("cpu value=\"howdy\" 0\n"),
314 },
315 {
316 name: "timestamp",
317 input: NewMockMetric(
318 "cpu",
319 map[string]string{},
320 map[string]interface{}{
321 "value": 42.0,
322 },
323 time.Unix(1519194109, 42),
324 ),
325 output: []byte("cpu value=42 1519194109000000042\n"),
326 },
327 {
328 name: "split fields exact",
329 maxBytes: 33,
330 input: NewMockMetric(
331 "cpu",
332 map[string]string{},
333 map[string]interface{}{
334 "abc": 123,
335 "def": 456,
336 },
337 time.Unix(1519194109, 42),
338 ),
339 output: []byte("cpu abc=123i 1519194109000000042\ncpu def=456i 1519194109000000042\n"),
340 },
341 {
342 name: "split fields extra",
343 maxBytes: 34,
344 input: NewMockMetric(
345 "cpu",
346 map[string]string{},
347 map[string]interface{}{
348 "abc": 123,
349 "def": 456,
350 },
351 time.Unix(1519194109, 42),
352 ),
353 output: []byte("cpu abc=123i 1519194109000000042\ncpu def=456i 1519194109000000042\n"),
354 },
355 {
356 name: "name newline",
357 input: NewMockMetric(
358 "c\npu",
359 map[string]string{},
360 map[string]interface{}{
361 "value": 42,
362 },
363 time.Unix(0, 0),
364 ),
365 output: []byte("c\\npu value=42i 0\n"),
366 },
367 {
368 name: "tag newline",
369 input: NewMockMetric(
370 "cpu",
371 map[string]string{
372 "host": "x\ny",
373 },
374 map[string]interface{}{
375 "value": 42,
376 },
377 time.Unix(0, 0),
378 ),
379 output: []byte("cpu,host=x\\ny value=42i 0\n"),
380 },
381 {
382 name: "string newline",
383 input: NewMockMetric(
384 "cpu",
385 map[string]string{},
386 map[string]interface{}{
387 "value": "x\ny",
388 },
389 time.Unix(0, 0),
390 ),
391 output: []byte("cpu value=\"x\\ny\" 0\n"),
392 },
393 {
394 name: "need more space",
395 maxBytes: 32,
396 input: NewMockMetric(
397 "cpu",
398 map[string]string{},
399 map[string]interface{}{
400 "abc": 123,
401 "def": 456,
402 },
403 time.Unix(1519194109, 42),
404 ),
405 output: nil,
406 err: ErrNeedMoreSpace,
407 },
408 {
409 name: "no fields",
410 input: NewMockMetric(
411 "cpu",
412 map[string]string{},
413 map[string]interface{}{},
414 time.Unix(0, 0),
415 ),
416 err: ErrNoFields,
417 },
418 {
419 name: "procstat",
420 input: NewMockMetric(
421 "procstat",
422 map[string]string{
423 "exe": "bash",
424 "process_name": "bash",
425 },
426 map[string]interface{}{
427 "cpu_time": 0,
428 "cpu_time_guest": float64(0),
429 "cpu_time_guest_nice": float64(0),
430 "cpu_time_idle": float64(0),
431 "cpu_time_iowait": float64(0),
432 "cpu_time_irq": float64(0),
433 "cpu_time_nice": float64(0),
434 "cpu_time_soft_irq": float64(0),
435 "cpu_time_steal": float64(0),
436 "cpu_time_stolen": float64(0),
437 "cpu_time_system": float64(0),
438 "cpu_time_user": float64(0.02),
439 "cpu_usage": float64(0),
440 "involuntary_context_switches": 2,
441 "memory_data": 1576960,
442 "memory_locked": 0,
443 "memory_rss": 5103616,
444 "memory_stack": 139264,
445 "memory_swap": 0,
446 "memory_vms": 21659648,
447 "nice_priority": 20,
448 "num_fds": 4,
449 "num_threads": 1,
450 "pid": 29417,
451 "read_bytes": 0,
452 "read_count": 259,
453 "realtime_priority": 0,
454 "rlimit_cpu_time_hard": 2147483647,
455 "rlimit_cpu_time_soft": 2147483647,
456 "rlimit_file_locks_hard": 2147483647,
457 "rlimit_file_locks_soft": 2147483647,
458 "rlimit_memory_data_hard": 2147483647,
459 "rlimit_memory_data_soft": 2147483647,
460 "rlimit_memory_locked_hard": 65536,
461 "rlimit_memory_locked_soft": 65536,
462 "rlimit_memory_rss_hard": 2147483647,
463 "rlimit_memory_rss_soft": 2147483647,
464 "rlimit_memory_stack_hard": 2147483647,
465 "rlimit_memory_stack_soft": 8388608,
466 "rlimit_memory_vms_hard": 2147483647,
467 "rlimit_memory_vms_soft": 2147483647,
468 "rlimit_nice_priority_hard": 0,
469 "rlimit_nice_priority_soft": 0,
470 "rlimit_num_fds_hard": 4096,
471 "rlimit_num_fds_soft": 1024,
472 "rlimit_realtime_priority_hard": 0,
473 "rlimit_realtime_priority_soft": 0,
474 "rlimit_signals_pending_hard": 78994,
475 "rlimit_signals_pending_soft": 78994,
476 "signals_pending": 0,
477 "voluntary_context_switches": 42,
478 "write_bytes": 106496,
479 "write_count": 35,
480 },
481 time.Unix(0, 1517620624000000000),
482 ),
483 output: []byte("procstat,exe=bash,process_name=bash cpu_time=0i,cpu_time_guest=0,cpu_time_guest_nice=0,cpu_time_idle=0,cpu_time_iowait=0,cpu_time_irq=0,cpu_time_nice=0,cpu_time_soft_irq=0,cpu_time_steal=0,cpu_time_stolen=0,cpu_time_system=0,cpu_time_user=0.02,cpu_usage=0,involuntary_context_switches=2i,memory_data=1576960i,memory_locked=0i,memory_rss=5103616i,memory_stack=139264i,memory_swap=0i,memory_vms=21659648i,nice_priority=20i,num_fds=4i,num_threads=1i,pid=29417i,read_bytes=0i,read_count=259i,realtime_priority=0i,rlimit_cpu_time_hard=2147483647i,rlimit_cpu_time_soft=2147483647i,rlimit_file_locks_hard=2147483647i,rlimit_file_locks_soft=2147483647i,rlimit_memory_data_hard=2147483647i,rlimit_memory_data_soft=2147483647i,rlimit_memory_locked_hard=65536i,rlimit_memory_locked_soft=65536i,rlimit_memory_rss_hard=2147483647i,rlimit_memory_rss_soft=2147483647i,rlimit_memory_stack_hard=2147483647i,rlimit_memory_stack_soft=8388608i,rlimit_memory_vms_hard=2147483647i,rlimit_memory_vms_soft=2147483647i,rlimit_nice_priority_hard=0i,rlimit_nice_priority_soft=0i,rlimit_num_fds_hard=4096i,rlimit_num_fds_soft=1024i,rlimit_realtime_priority_hard=0i,rlimit_realtime_priority_soft=0i,rlimit_signals_pending_hard=78994i,rlimit_signals_pending_soft=78994i,signals_pending=0i,voluntary_context_switches=42i,write_bytes=106496i,write_count=35i 1517620624000000000\n"),
484 },
485 {
486 name: "error out on field error",
487 input: NewMockMetric(
488 "cpu",
489 map[string]string{},
490 map[string]interface{}{
491 "x": math.NaN(),
492 "y": 42,
493 },
494 time.Unix(0, 0),
495 ),
496 failOnFieldErr: true,
497 err: &FieldError{s: "is NaN"},
498 },
499 }
500
501 func TestEncoder(t *testing.T) {
502 for _, tt := range tests {
503 t.Run(tt.name, func(t *testing.T) {
504 buf := &bytes.Buffer{}
505 serializer := NewEncoder(buf)
506 serializer.SetMaxLineBytes(tt.maxBytes)
507 serializer.SetFieldSortOrder(SortFields)
508 serializer.SetFieldTypeSupport(tt.typeSupport)
509 serializer.FailOnFieldErr(tt.failOnFieldErr)
510 _, err := serializer.Encode(tt.input)
511 require.Equal(t, tt.err, err)
512 require.Equal(t, string(tt.output), buf.String())
513 })
514 }
515 }
516
517 func BenchmarkSerializer(b *testing.B) {
518 for _, tt := range tests {
519 b.Run(tt.name, func(b *testing.B) {
520 buf := &bytes.Buffer{}
521 serializer := NewEncoder(buf)
522 serializer.SetMaxLineBytes(tt.maxBytes)
523 serializer.SetFieldTypeSupport(tt.typeSupport)
524 for n := 0; n < b.N; n++ {
525 output, err := serializer.Encode(tt.input)
526 _ = err
527 _ = output
528 }
529 })
530 }
531 }
0 package protocol
1
2 import (
3 "strings"
4 )
5
6 const (
7 escapes = "\t\n\f\r ,="
8 nameEscapes = "\t\n\f\r ,"
9 stringFieldEscapes = "\t\n\f\r\\\""
10 )
11
12 var (
13 escaper = strings.NewReplacer(
14 "\t", `\t`,
15 "\n", `\n`,
16 "\f", `\f`,
17 "\r", `\r`,
18 `,`, `\,`,
19 ` `, `\ `,
20 `=`, `\=`,
21 )
22
23 nameEscaper = strings.NewReplacer(
24 "\t", `\t`,
25 "\n", `\n`,
26 "\f", `\f`,
27 "\r", `\r`,
28 `,`, `\,`,
29 ` `, `\ `,
30 )
31
32 stringFieldEscaper = strings.NewReplacer(
33 "\t", `\t`,
34 "\n", `\n`,
35 "\f", `\f`,
36 "\r", `\r`,
37 `"`, `\"`,
38 `\`, `\\`,
39 )
40 )
41
42 // The various escape functions allocate, I'd like to fix that.
43 // TODO: make escape not allocate
44
45 // Escape a tagkey, tagvalue, or fieldkey
46 func escape(s string) string {
47 if strings.ContainsAny(s, escapes) {
48 return escaper.Replace(s)
49 }
50 return s
51 }
52
53 // Escape a measurement name
54 func nameEscape(s string) string {
55 if strings.ContainsAny(s, nameEscapes) {
56 return nameEscaper.Replace(s)
57 }
58 return s
59 }
60
61 // Escape a string field
62 func stringFieldEscape(s string) string {
63 if strings.ContainsAny(s, stringFieldEscapes) {
64 return stringFieldEscaper.Replace(s)
65 }
66 return s
67 }
0 package protocol
1
2 import "time"
3
4 // Tag holds the keys and values for a bunch of Tag k/v pairs
5 type Tag struct {
6 Key string
7 Value string
8 }
9
10 // Field holds the keys and values for a bunch of Metric Field k/v pairs
11 type Field struct {
12 Key string
13 Value interface{}
14 }
15
16 // Metric is the interface for marshaling, if you implement this interface you can be marshalled into the line protocol. Woot!
17 type Metric interface {
18 Time() time.Time
19 Name() string
20 TagList() []*Tag
21 FieldList() []*Field
22 }
23
24 // FieldSortOrder is a type for controlling if Fields are sorted
25 type FieldSortOrder int
26
27 const (
28 // NoSortFields tells the Decoder to not sort the fields
29 NoSortFields FieldSortOrder = iota
30
31 // SortFields tells the Decoder to sort the fields
32 SortFields
33 )
34
35 // FieldTypeSupport is a type for the parser to understand its type support
36 type FieldTypeSupport int
37
38 const (
39 // UintSupport means the parser understands uint64s and can store them without having to convert to int64
40 UintSupport FieldTypeSupport = 1 << iota
41 )
42
43 // MetricError is an error causing a metric to be unserializable.
44 type MetricError struct {
45 s string
46 }
47
48 func (e MetricError) Error() string {
49 return e.s
50 }
51
52 // FieldError is an error causing a field to be unserializable.
53 type FieldError struct {
54 s string
55 }
56
57 func (e FieldError) Error() string {
58 return e.s
59 }
60
61 var (
62 // ErrNeedMoreSpace tells us that the Decoder's io.Reader is full
63 ErrNeedMoreSpace = &MetricError{"need more space"}
64
65 // ErrInvalidName tells us that the chosen name is invalid
66 ErrInvalidName = &MetricError{"invalid name"}
67
68 // ErrNoFields tells us that there were no serializable fields in the line/metric
69 ErrNoFields = &MetricError{"no serializable fields"}
70 )