New upstream version 0.0~git20180222.25ae683
Sascha Steinbiss
5 years ago
0 | Copyright (c) 2017, 2018, DCSO Deutsche Cyber-Sicherheitsorganisation GmbH | |
1 | All rights reserved. | |
2 | ||
3 | Redistribution and use in source and binary forms, with or without | |
4 | modification, are permitted provided that the following conditions are met: | |
5 | ||
6 | * Redistributions of source code must retain the above copyright notice, this | |
7 | list of conditions and the following disclaimer. | |
8 | ||
9 | * Redistributions in binary form must reproduce the above copyright notice, | |
10 | this list of conditions and the following disclaimer in the documentation | |
11 | and/or other materials provided with the distribution. | |
12 | ||
13 | * Neither the name of the DCSO Deutsche Cyber-Sicherheitsorganisation GmbH | |
14 | nor the names of its contributors may be used to endorse or promote products | |
15 | derived from this software without specific prior written permission. | |
16 | ||
17 | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
18 | AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
19 | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |
20 | DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | |
21 | FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |
22 | DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | |
23 | SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | |
24 | CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, | |
25 | OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | |
26 | OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
0 | ## simple makefile to log workflow | |
1 | .PHONY: all test clean build install | |
2 | ||
3 | GOFLAGS ?= $(GOFLAGS:) | |
4 | ||
5 | all: install test | |
6 | ||
7 | ||
8 | build: | |
9 | @go build $(GOFLAGS) ./... | |
10 | ||
11 | install: | |
12 | @go get $(GOFLAGS) ./... | |
13 | ||
14 | test: install | |
15 | @go vet $(GOFLAGS) ./... | |
16 | @go test -cover $(GOFLAGS) ./... | |
17 | ||
18 | bench: install | |
19 | @go test -run=NONE -bench=. $(GOFLAGS) ./... | |
20 | ||
21 | clean: | |
22 | @go clean $(GOFLAGS) -i ./... | |
23 | ||
24 | ## EOF |
0 | # fluxline | |
1 | ||
2 | Encoder for Golang to prepare sets of metrics in [InfluxDB's Line Protocol](https://docs.influxdata.com/influxdb/v1.4/write_protocols/line_protocol_reference) format. As input, we use structs annotated with the `influx` tag, similar to how `encoding/json` works. | |
3 | ||
4 | Supports the following Go builtin types as fields: | |
5 | ||
6 | - `string` | |
7 | - `int32`, `int64`, `int16`, `int8`, `int`, `uint32`, `uint64`, `uint16`, `uint8`, `uint` | |
8 | - `float64`, `float32` | |
9 | - `bool` | |
10 | - `time.Time` | |
11 | ||
12 | ## Remarks | |
13 | ||
14 | Not thread safe. If the struct is modified elsewhere concurrently, one would need to protect the read access required for encoding. | |
15 | ||
16 | ## Get the code | |
17 | ||
18 | ```bash | |
19 | go get github.com/DCSO/fluxline | |
20 | ``` | |
21 | ||
22 | ## Usage example | |
23 | ||
24 | ```golang | |
25 | package main | |
26 | ||
27 | import ( | |
28 | "bytes" | |
29 | "fmt" | |
30 | "log" | |
31 | ||
32 | "github.com/DCSO/fluxline" | |
33 | ) | |
34 | ||
35 | const measurement = "example" | |
36 | ||
37 | type MyCounts struct { | |
38 | Success uint64 `influx:"success"` | |
39 | Error uint64 `influx:"error"` | |
40 | } | |
41 | ||
42 | func main() { | |
43 | var counts MyCounts | |
44 | var b bytes.Buffer | |
45 | ||
46 | // ... | |
47 | counts.Success++ | |
48 | // ... | |
49 | counts.Success++ | |
50 | // ... | |
51 | counts.Error++ | |
52 | // ... | |
53 | ||
54 | tags := make(map[string]string) | |
55 | tags["foo"] = "bar" | |
56 | ||
57 | encoder := fluxline.NewEncoder(&b) | |
58 | err := encoder.Encode(measurement, counts, tags) | |
59 | if err != nil { | |
60 | log.Fatal(err) | |
61 | } | |
62 | fmt.Print(b.String()) | |
63 | } | |
64 | ``` |
0 | package fluxline | |
1 | ||
2 | // DCSO fluxline | |
3 | // Copyright (c) 2017, 2018, DCSO GmbH | |
4 | ||
5 | import ( | |
6 | "fmt" | |
7 | "io" | |
8 | "reflect" | |
9 | "sort" | |
10 | "strings" | |
11 | "time" | |
12 | ||
13 | "github.com/ShowMax/go-fqdn" | |
14 | ) | |
15 | ||
16 | // Encoder represents a component that encapsulates a target environment for | |
17 | // measurement submissions, as given by hostname and receiving writer. | |
18 | type Encoder struct { | |
19 | host string | |
20 | Writer io.Writer | |
21 | } | |
22 | ||
23 | func escapeSpecialChars(in string) string { | |
24 | str := strings.Replace(in, ",", `\,`, -1) | |
25 | str = strings.Replace(str, "=", `\=`, -1) | |
26 | str = strings.Replace(str, " ", `\ `, -1) | |
27 | return str | |
28 | } | |
29 | ||
30 | func toInfluxRepr(tag string, val interface{}) (string, error) { | |
31 | switch v := val.(type) { | |
32 | case string: | |
33 | if len(v) > 64000 { | |
34 | return "", fmt.Errorf("%s: string too long (%d characters, max. 64K)", tag, len(v)) | |
35 | } | |
36 | return fmt.Sprintf("%q", v), nil | |
37 | case int32, int64, int16, int8, int: | |
38 | return fmt.Sprintf("%di", v), nil | |
39 | case uint32, uint64, uint16, uint8, uint: | |
40 | return fmt.Sprintf("%di", v), nil | |
41 | case float64, float32: | |
42 | return fmt.Sprintf("%g", v), nil | |
43 | case bool: | |
44 | return fmt.Sprintf("%t", v), nil | |
45 | case time.Time: | |
46 | return fmt.Sprintf("%d", uint64(v.UnixNano())), nil | |
47 | default: | |
48 | return "", fmt.Errorf("%s: unsupported type for Influx Line Protocol", tag) | |
49 | } | |
50 | } | |
51 | ||
52 | func recordFields(val interface{}, | |
53 | fieldSet map[string]string) (map[string]string, error) { | |
54 | t := reflect.TypeOf(val) | |
55 | v := reflect.ValueOf(val) | |
56 | ||
57 | for i := 0; i < t.NumField(); i++ { | |
58 | field := t.Field(i) | |
59 | tag := field.Tag.Get("influx") | |
60 | if tag == "" { | |
61 | continue | |
62 | } | |
63 | repr, err := toInfluxRepr(tag, v.Field(i).Interface()) | |
64 | if err != nil { | |
65 | return nil, err | |
66 | } | |
67 | fieldSet[tag] = repr | |
68 | } | |
69 | return fieldSet, nil | |
70 | } | |
71 | ||
72 | func (a *Encoder) formatLineProtocol(prefix string, | |
73 | tags map[string]string, fieldSet map[string]string) string { | |
74 | out := "" | |
75 | tagstr := "" | |
76 | ||
77 | // sort by key to obtain stable output order | |
78 | keys := make([]string, 0, len(tags)) | |
79 | for key := range tags { | |
80 | keys = append(keys, key) | |
81 | } | |
82 | sort.Strings(keys) | |
83 | ||
84 | // serialize tags | |
85 | for _, k := range keys { | |
86 | tagstr += "," | |
87 | tagstr += fmt.Sprintf("%s=%s", escapeSpecialChars(k), escapeSpecialChars(tags[k])) | |
88 | } | |
89 | ||
90 | // sort by key to obtain stable output order | |
91 | keys = make([]string, 0, len(fieldSet)) | |
92 | for key := range fieldSet { | |
93 | keys = append(keys, key) | |
94 | } | |
95 | sort.Strings(keys) | |
96 | ||
97 | // serialize fields | |
98 | first := true | |
99 | for _, k := range keys { | |
100 | if !first { | |
101 | out += "," | |
102 | } else { | |
103 | first = false | |
104 | } | |
105 | out += fmt.Sprintf("%s=%s", escapeSpecialChars(k), fieldSet[k]) | |
106 | } | |
107 | if out == "" { | |
108 | return "" | |
109 | } | |
110 | ||
111 | // construct line protocol string | |
112 | return fmt.Sprintf("%s,host=%s%s %s %d\n", prefix, a.host, | |
113 | tagstr, out, uint64(time.Now().UnixNano())) | |
114 | } | |
115 | ||
116 | // Encode writes the line protocol representation for a given measurement | |
117 | // name, data struct and tag map to the io.Writer specified on encoder creation. | |
118 | func (a *Encoder) Encode(prefix string, val interface{}, | |
119 | tags map[string]string) error { | |
120 | fieldSet := make(map[string]string) | |
121 | fieldSet, err := recordFields(val, fieldSet) | |
122 | if err != nil { | |
123 | return err | |
124 | } | |
125 | _, err = a.Writer.Write([]byte(a.formatLineProtocol(prefix, tags, fieldSet))) | |
126 | return err | |
127 | } | |
128 | ||
129 | // EncodeMap writes the line protocol representation for a given measurement | |
130 | // name, field value map and tag map to the io.Writer specified on encoder | |
131 | // creation. | |
132 | func (a *Encoder) EncodeMap(prefix string, val map[string]string, | |
133 | tags map[string]string) error { | |
134 | _, err := a.Writer.Write([]byte(a.formatLineProtocol(prefix, tags, val))) | |
135 | return err | |
136 | } | |
137 | ||
138 | // NewEncoder creates a new encoder that writes to the given io.Writer. | |
139 | func NewEncoder(w io.Writer) *Encoder { | |
140 | a := &Encoder{ | |
141 | host: fqdn.Get(), | |
142 | Writer: w, | |
143 | } | |
144 | return a | |
145 | } |
0 | package fluxline | |
1 | ||
2 | // DCSO fluxline | |
3 | // Copyright (c) 2017, 2018, DCSO GmbH | |
4 | ||
5 | import ( | |
6 | "bytes" | |
7 | "io" | |
8 | "math" | |
9 | "regexp" | |
10 | "strings" | |
11 | "testing" | |
12 | "time" | |
13 | ) | |
14 | ||
15 | var testStruct = struct { | |
16 | TestVal uint64 `influx:"testval"` | |
17 | TestVal2 uint64 `influx:"testvalue"` | |
18 | TestVal3 int64 `influx:"testvalue2"` | |
19 | TestVal4 string `influx:"testvalue3"` | |
20 | TestDate time.Time `influx:"testvaluetime"` | |
21 | TestBool bool `influx:"testvaluebool"` | |
22 | TestFloat float64 `influx:"testvalueflt64"` | |
23 | TestFloat32 float32 `influx:"testvalueflt32"` | |
24 | }{ | |
25 | TestVal: 1, | |
26 | TestVal2: 2, | |
27 | TestVal3: -3, | |
28 | TestVal4: `foobar"baz`, | |
29 | TestDate: time.Now(), | |
30 | TestFloat32: math.Pi, | |
31 | TestFloat: 1.29e-24, | |
32 | } | |
33 | ||
34 | var testStructInvalidType = struct { | |
35 | TestVal uint64 `influx:"testval"` | |
36 | TestVal2 uint64 `influx:"testvalue"` | |
37 | TestVal3 int64 `influx:"testvalue2"` | |
38 | Foo io.Writer `influx:"testinval"` | |
39 | }{ | |
40 | TestVal: 1, | |
41 | TestVal2: 2, | |
42 | TestVal3: -3, | |
43 | } | |
44 | ||
45 | var testStructStringLong = struct { | |
46 | TestStr string `influx:"testval"` | |
47 | }{ | |
48 | TestStr: strings.Repeat("#", 70000), | |
49 | } | |
50 | ||
51 | var testStructPartUntagged = struct { | |
52 | TestVal uint64 `influx:"testval"` | |
53 | TestVal2 uint64 `influx:"testvalue"` | |
54 | TestVal3 int64 | |
55 | }{ | |
56 | TestVal: 1, | |
57 | TestVal2: 2, | |
58 | TestVal3: -3, | |
59 | } | |
60 | ||
61 | var testStructAllUntagged = struct { | |
62 | TestVal uint64 | |
63 | TestVal2 uint64 | |
64 | TestVal3 int64 | |
65 | }{ | |
66 | TestVal: 1, | |
67 | TestVal2: 2, | |
68 | TestVal3: -3, | |
69 | } | |
70 | ||
71 | func TestEncoderEncoder(t *testing.T) { | |
72 | var b bytes.Buffer | |
73 | ||
74 | ile := NewEncoder(&b) | |
75 | tags := make(map[string]string) | |
76 | tags["foo"] = "bar" | |
77 | tags["baaz gogo"] = "gu,gu" | |
78 | err := ile.Encode("mytool", testStruct, tags) | |
79 | if err != nil { | |
80 | t.Fatal(err) | |
81 | } | |
82 | ||
83 | out := b.String() | |
84 | if len(out) == 0 { | |
85 | t.Fatalf("unexpected result length: %d == 0", len(out)) | |
86 | } | |
87 | ||
88 | if match, _ := regexp.Match(`^mytool,host=[^,]+,baaz\\ gogo=gu\\,gu,foo=bar testval=1i,testvalue=2i,testvalue2=-3i,testvalue3=\"foobar\\\"baz\",testvaluebool=false,testvalueflt32=3.1415927,testvalueflt64=1.29e-24,testvaluetime=`, []byte(out)); !match { | |
89 | t.Fatalf("unexpected match content: %s", out) | |
90 | } | |
91 | } | |
92 | ||
93 | func TestEncoderTypeFail(t *testing.T) { | |
94 | var b bytes.Buffer | |
95 | ||
96 | ile := NewEncoder(&b) | |
97 | tags := make(map[string]string) | |
98 | err := ile.Encode("mytool", testStructInvalidType, tags) | |
99 | if err == nil { | |
100 | t.Fatal(err) | |
101 | } | |
102 | } | |
103 | ||
104 | func TestEncoderStringTooLongFail(t *testing.T) { | |
105 | var b bytes.Buffer | |
106 | ||
107 | ile := NewEncoder(&b) | |
108 | tags := make(map[string]string) | |
109 | err := ile.Encode("mytool", testStructStringLong, tags) | |
110 | if err == nil { | |
111 | t.Fatal(err) | |
112 | } | |
113 | } | |
114 | ||
115 | func TestEncoderPartUntagged(t *testing.T) { | |
116 | var b bytes.Buffer | |
117 | ||
118 | ile := NewEncoder(&b) | |
119 | tags := make(map[string]string) | |
120 | err := ile.Encode("mytool", testStructPartUntagged, tags) | |
121 | if err != nil { | |
122 | t.Fatal(err) | |
123 | } | |
124 | ||
125 | out := b.String() | |
126 | if match, _ := regexp.Match(`^mytool,host=[^,]+ testval=1i,testvalue=2i`, []byte(out)); !match { | |
127 | t.Fatalf("unexpected match content: %s", out) | |
128 | } | |
129 | } | |
130 | ||
131 | func TestEncoderAllUntagged(t *testing.T) { | |
132 | var b bytes.Buffer | |
133 | ||
134 | ile := NewEncoder(&b) | |
135 | tags := make(map[string]string) | |
136 | err := ile.Encode("mytool", testStructAllUntagged, tags) | |
137 | if err != nil { | |
138 | t.Fatal(err) | |
139 | } | |
140 | ||
141 | out := b.String() | |
142 | if len(out) != 0 { | |
143 | t.Fatalf("unexpected result length: %d != 0", len(out)) | |
144 | } | |
145 | } |