Codebase list golang-github-dcso-fluxline / 25ae683
initial commit Sascha Steinbiss 6 years ago
5 changed file(s) with 409 addition(s) and 0 deletion(s). Raw diff Collapse all Expand all
0 Copyright (c) 2017, 2018, DCSO Deutsche Cyber-Sicherheitsorganisation GmbH
1 All rights reserved.
2
3 Redistribution and use in source and binary forms, with or without
4 modification, are permitted provided that the following conditions are met:
5
6 * Redistributions of source code must retain the above copyright notice, this
7 list of conditions and the following disclaimer.
8
9 * Redistributions in binary form must reproduce the above copyright notice,
10 this list of conditions and the following disclaimer in the documentation
11 and/or other materials provided with the distribution.
12
13 * Neither the name of the DCSO Deutsche Cyber-Sicherheitsorganisation GmbH
14 nor the names of its contributors may be used to endorse or promote products
15 derived from this software without specific prior written permission.
16
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
21 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
0 ## simple makefile to log workflow
1 .PHONY: all test clean build install
2
3 GOFLAGS ?= $(GOFLAGS:)
4
5 all: install test
6
7
8 build:
9 @go build $(GOFLAGS) ./...
10
11 install:
12 @go get $(GOFLAGS) ./...
13
14 test: install
15 @go vet $(GOFLAGS) ./...
16 @go test -cover $(GOFLAGS) ./...
17
18 bench: install
19 @go test -run=NONE -bench=. $(GOFLAGS) ./...
20
21 clean:
22 @go clean $(GOFLAGS) -i ./...
23
24 ## EOF
0 # fluxline
1
2 Encoder for Golang to prepare sets of metrics in [InfluxDB's Line Protocol](https://docs.influxdata.com/influxdb/v1.4/write_protocols/line_protocol_reference) format. As input, we use structs annotated with the `influx` tag, similar to how `encoding/json` works.
3
4 Supports the following Go builtin types as fields:
5
6 - `string`
7 - `int32`, `int64`, `int16`, `int8`, `int`, `uint32`, `uint64`, `uint16`, `uint8`, `uint`
8 - `float64`, `float32`
9 - `bool`
10 - `time.Time`
11
12 ## Remarks
13
14 Not thread safe. If the struct is modified elsewhere concurrently, one would need to protect the read access required for encoding.
15
16 ## Get the code
17
18 ```bash
19 go get github.com/DCSO/fluxline
20 ```
21
22 ## Usage example
23
24 ```golang
25 package main
26
27 import (
28 "bytes"
29 "fmt"
30 "log"
31
32 "github.com/DCSO/fluxline"
33 )
34
35 const measurement = "example"
36
37 type MyCounts struct {
38 Success uint64 `influx:"success"`
39 Error uint64 `influx:"error"`
40 }
41
42 func main() {
43 var counts MyCounts
44 var b bytes.Buffer
45
46 // ...
47 counts.Success++
48 // ...
49 counts.Success++
50 // ...
51 counts.Error++
52 // ...
53
54 tags := make(map[string]string)
55 tags["foo"] = "bar"
56
57 encoder := fluxline.NewEncoder(&b)
58 err := encoder.Encode(measurement, counts, tags)
59 if err != nil {
60 log.Fatal(err)
61 }
62 fmt.Print(b.String())
63 }
64 ```
0 package fluxline
1
2 // DCSO fluxline
3 // Copyright (c) 2017, 2018, DCSO GmbH
4
5 import (
6 "fmt"
7 "io"
8 "reflect"
9 "sort"
10 "strings"
11 "time"
12
13 "github.com/ShowMax/go-fqdn"
14 )
15
16 // Encoder represents a component that encapsulates a target environment for
17 // measurement submissions, as given by hostname and receiving writer.
18 type Encoder struct {
19 host string
20 Writer io.Writer
21 }
22
23 func escapeSpecialChars(in string) string {
24 str := strings.Replace(in, ",", `\,`, -1)
25 str = strings.Replace(str, "=", `\=`, -1)
26 str = strings.Replace(str, " ", `\ `, -1)
27 return str
28 }
29
30 func toInfluxRepr(tag string, val interface{}) (string, error) {
31 switch v := val.(type) {
32 case string:
33 if len(v) > 64000 {
34 return "", fmt.Errorf("%s: string too long (%d characters, max. 64K)", tag, len(v))
35 }
36 return fmt.Sprintf("%q", v), nil
37 case int32, int64, int16, int8, int:
38 return fmt.Sprintf("%di", v), nil
39 case uint32, uint64, uint16, uint8, uint:
40 return fmt.Sprintf("%di", v), nil
41 case float64, float32:
42 return fmt.Sprintf("%g", v), nil
43 case bool:
44 return fmt.Sprintf("%t", v), nil
45 case time.Time:
46 return fmt.Sprintf("%d", uint64(v.UnixNano())), nil
47 default:
48 return "", fmt.Errorf("%s: unsupported type for Influx Line Protocol", tag)
49 }
50 }
51
52 func recordFields(val interface{},
53 fieldSet map[string]string) (map[string]string, error) {
54 t := reflect.TypeOf(val)
55 v := reflect.ValueOf(val)
56
57 for i := 0; i < t.NumField(); i++ {
58 field := t.Field(i)
59 tag := field.Tag.Get("influx")
60 if tag == "" {
61 continue
62 }
63 repr, err := toInfluxRepr(tag, v.Field(i).Interface())
64 if err != nil {
65 return nil, err
66 }
67 fieldSet[tag] = repr
68 }
69 return fieldSet, nil
70 }
71
72 func (a *Encoder) formatLineProtocol(prefix string,
73 tags map[string]string, fieldSet map[string]string) string {
74 out := ""
75 tagstr := ""
76
77 // sort by key to obtain stable output order
78 keys := make([]string, 0, len(tags))
79 for key := range tags {
80 keys = append(keys, key)
81 }
82 sort.Strings(keys)
83
84 // serialize tags
85 for _, k := range keys {
86 tagstr += ","
87 tagstr += fmt.Sprintf("%s=%s", escapeSpecialChars(k), escapeSpecialChars(tags[k]))
88 }
89
90 // sort by key to obtain stable output order
91 keys = make([]string, 0, len(fieldSet))
92 for key := range fieldSet {
93 keys = append(keys, key)
94 }
95 sort.Strings(keys)
96
97 // serialize fields
98 first := true
99 for _, k := range keys {
100 if !first {
101 out += ","
102 } else {
103 first = false
104 }
105 out += fmt.Sprintf("%s=%s", escapeSpecialChars(k), fieldSet[k])
106 }
107 if out == "" {
108 return ""
109 }
110
111 // construct line protocol string
112 return fmt.Sprintf("%s,host=%s%s %s %d\n", prefix, a.host,
113 tagstr, out, uint64(time.Now().UnixNano()))
114 }
115
116 // Encode writes the line protocol representation for a given measurement
117 // name, data struct and tag map to the io.Writer specified on encoder creation.
118 func (a *Encoder) Encode(prefix string, val interface{},
119 tags map[string]string) error {
120 fieldSet := make(map[string]string)
121 fieldSet, err := recordFields(val, fieldSet)
122 if err != nil {
123 return err
124 }
125 _, err = a.Writer.Write([]byte(a.formatLineProtocol(prefix, tags, fieldSet)))
126 return err
127 }
128
129 // EncodeMap writes the line protocol representation for a given measurement
130 // name, field value map and tag map to the io.Writer specified on encoder
131 // creation.
132 func (a *Encoder) EncodeMap(prefix string, val map[string]string,
133 tags map[string]string) error {
134 _, err := a.Writer.Write([]byte(a.formatLineProtocol(prefix, tags, val)))
135 return err
136 }
137
138 // NewEncoder creates a new encoder that writes to the given io.Writer.
139 func NewEncoder(w io.Writer) *Encoder {
140 a := &Encoder{
141 host: fqdn.Get(),
142 Writer: w,
143 }
144 return a
145 }
0 package fluxline
1
2 // DCSO fluxline
3 // Copyright (c) 2017, 2018, DCSO GmbH
4
5 import (
6 "bytes"
7 "io"
8 "math"
9 "regexp"
10 "strings"
11 "testing"
12 "time"
13 )
14
15 var testStruct = struct {
16 TestVal uint64 `influx:"testval"`
17 TestVal2 uint64 `influx:"testvalue"`
18 TestVal3 int64 `influx:"testvalue2"`
19 TestVal4 string `influx:"testvalue3"`
20 TestDate time.Time `influx:"testvaluetime"`
21 TestBool bool `influx:"testvaluebool"`
22 TestFloat float64 `influx:"testvalueflt64"`
23 TestFloat32 float32 `influx:"testvalueflt32"`
24 }{
25 TestVal: 1,
26 TestVal2: 2,
27 TestVal3: -3,
28 TestVal4: `foobar"baz`,
29 TestDate: time.Now(),
30 TestFloat32: math.Pi,
31 TestFloat: 1.29e-24,
32 }
33
34 var testStructInvalidType = struct {
35 TestVal uint64 `influx:"testval"`
36 TestVal2 uint64 `influx:"testvalue"`
37 TestVal3 int64 `influx:"testvalue2"`
38 Foo io.Writer `influx:"testinval"`
39 }{
40 TestVal: 1,
41 TestVal2: 2,
42 TestVal3: -3,
43 }
44
45 var testStructStringLong = struct {
46 TestStr string `influx:"testval"`
47 }{
48 TestStr: strings.Repeat("#", 70000),
49 }
50
51 var testStructPartUntagged = struct {
52 TestVal uint64 `influx:"testval"`
53 TestVal2 uint64 `influx:"testvalue"`
54 TestVal3 int64
55 }{
56 TestVal: 1,
57 TestVal2: 2,
58 TestVal3: -3,
59 }
60
61 var testStructAllUntagged = struct {
62 TestVal uint64
63 TestVal2 uint64
64 TestVal3 int64
65 }{
66 TestVal: 1,
67 TestVal2: 2,
68 TestVal3: -3,
69 }
70
71 func TestEncoderEncoder(t *testing.T) {
72 var b bytes.Buffer
73
74 ile := NewEncoder(&b)
75 tags := make(map[string]string)
76 tags["foo"] = "bar"
77 tags["baaz gogo"] = "gu,gu"
78 err := ile.Encode("mytool", testStruct, tags)
79 if err != nil {
80 t.Fatal(err)
81 }
82
83 out := b.String()
84 if len(out) == 0 {
85 t.Fatalf("unexpected result length: %d == 0", len(out))
86 }
87
88 if match, _ := regexp.Match(`^mytool,host=[^,]+,baaz\\ gogo=gu\\,gu,foo=bar testval=1i,testvalue=2i,testvalue2=-3i,testvalue3=\"foobar\\\"baz\",testvaluebool=false,testvalueflt32=3.1415927,testvalueflt64=1.29e-24,testvaluetime=`, []byte(out)); !match {
89 t.Fatalf("unexpected match content: %s", out)
90 }
91 }
92
93 func TestEncoderTypeFail(t *testing.T) {
94 var b bytes.Buffer
95
96 ile := NewEncoder(&b)
97 tags := make(map[string]string)
98 err := ile.Encode("mytool", testStructInvalidType, tags)
99 if err == nil {
100 t.Fatal(err)
101 }
102 }
103
104 func TestEncoderStringTooLongFail(t *testing.T) {
105 var b bytes.Buffer
106
107 ile := NewEncoder(&b)
108 tags := make(map[string]string)
109 err := ile.Encode("mytool", testStructStringLong, tags)
110 if err == nil {
111 t.Fatal(err)
112 }
113 }
114
115 func TestEncoderPartUntagged(t *testing.T) {
116 var b bytes.Buffer
117
118 ile := NewEncoder(&b)
119 tags := make(map[string]string)
120 err := ile.Encode("mytool", testStructPartUntagged, tags)
121 if err != nil {
122 t.Fatal(err)
123 }
124
125 out := b.String()
126 if match, _ := regexp.Match(`^mytool,host=[^,]+ testval=1i,testvalue=2i`, []byte(out)); !match {
127 t.Fatalf("unexpected match content: %s", out)
128 }
129 }
130
131 func TestEncoderAllUntagged(t *testing.T) {
132 var b bytes.Buffer
133
134 ile := NewEncoder(&b)
135 tags := make(map[string]string)
136 err := ile.Encode("mytool", testStructAllUntagged, tags)
137 if err != nil {
138 t.Fatal(err)
139 }
140
141 out := b.String()
142 if len(out) != 0 {
143 t.Fatalf("unexpected result length: %d != 0", len(out))
144 }
145 }