diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..eabc6c6 --- /dev/null +++ b/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2017, 2018, DCSO Deutsche Cyber-Sicherheitsorganisation GmbH +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the DCSO Deutsche Cyber-Sicherheitsorganisation GmbH + nor the names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e91675a --- /dev/null +++ b/Makefile @@ -0,0 +1,25 @@ +## simple makefile to log workflow +.PHONY: all test clean build install + +GOFLAGS ?= $(GOFLAGS:) + +all: install test + + +build: + @go build $(GOFLAGS) ./... + +install: + @go get $(GOFLAGS) ./... + +test: install + @go vet $(GOFLAGS) ./... + @go test -cover $(GOFLAGS) ./... + +bench: install + @go test -run=NONE -bench=. $(GOFLAGS) ./... + +clean: + @go clean $(GOFLAGS) -i ./... + +## EOF diff --git a/README.md b/README.md new file mode 100644 index 0000000..956dab9 --- /dev/null +++ b/README.md @@ -0,0 +1,65 @@ +# fluxline + +Encoder for Golang to prepare sets of metrics in [InfluxDB's Line Protocol](https://docs.influxdata.com/influxdb/v1.4/write_protocols/line_protocol_reference) format. As input, we use structs annotated with the `influx` tag, similar to how `encoding/json` works. + +Supports the following Go builtin types as fields: + + - `string` + - `int32`, `int64`, `int16`, `int8`, `int`, `uint32`, `uint64`, `uint16`, `uint8`, `uint` + - `float64`, `float32` + - `bool` + - `time.Time` + +## Remarks + +Not thread safe. If the struct is modified elsewhere concurrently, one would need to protect the read access required for encoding. + +## Get the code + +```bash +go get github.com/DCSO/fluxline +``` + +## Usage example + +```golang +package main + +import ( + "bytes" + "fmt" + "log" + + "github.com/DCSO/fluxline" +) + +const measurement = "example" + +type MyCounts struct { + Success uint64 `influx:"success"` + Error uint64 `influx:"error"` +} + +func main() { + var counts MyCounts + var b bytes.Buffer + + // ... + counts.Success++ + // ... + counts.Success++ + // ... + counts.Error++ + // ... + + tags := make(map[string]string) + tags["foo"] = "bar" + + encoder := fluxline.NewEncoder(&b) + err := encoder.Encode(measurement, counts, tags) + if err != nil { + log.Fatal(err) + } + fmt.Print(b.String()) +} +``` diff --git a/encoder.go b/encoder.go new file mode 100644 index 0000000..82cdf26 --- /dev/null +++ b/encoder.go @@ -0,0 +1,146 @@ +package fluxline + +// DCSO fluxline +// Copyright (c) 2017, 2018, DCSO GmbH + +import ( + "fmt" + "io" + "reflect" + "sort" + "strings" + "time" + + "github.com/ShowMax/go-fqdn" +) + +// Encoder represents a component that encapsulates a target environment for +// measurement submissions, as given by hostname and receiving writer. +type Encoder struct { + host string + Writer io.Writer +} + +func escapeSpecialChars(in string) string { + str := strings.Replace(in, ",", `\,`, -1) + str = strings.Replace(str, "=", `\=`, -1) + str = strings.Replace(str, " ", `\ `, -1) + return str +} + +func toInfluxRepr(tag string, val interface{}) (string, error) { + switch v := val.(type) { + case string: + if len(v) > 64000 { + return "", fmt.Errorf("%s: string too long (%d characters, max. 64K)", tag, len(v)) + } + return fmt.Sprintf("%q", v), nil + case int32, int64, int16, int8, int: + return fmt.Sprintf("%di", v), nil + case uint32, uint64, uint16, uint8, uint: + return fmt.Sprintf("%di", v), nil + case float64, float32: + return fmt.Sprintf("%g", v), nil + case bool: + return fmt.Sprintf("%t", v), nil + case time.Time: + return fmt.Sprintf("%d", uint64(v.UnixNano())), nil + default: + return "", fmt.Errorf("%s: unsupported type for Influx Line Protocol", tag) + } +} + +func recordFields(val interface{}, + fieldSet map[string]string) (map[string]string, error) { + t := reflect.TypeOf(val) + v := reflect.ValueOf(val) + + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + tag := field.Tag.Get("influx") + if tag == "" { + continue + } + repr, err := toInfluxRepr(tag, v.Field(i).Interface()) + if err != nil { + return nil, err + } + fieldSet[tag] = repr + } + return fieldSet, nil +} + +func (a *Encoder) formatLineProtocol(prefix string, + tags map[string]string, fieldSet map[string]string) string { + out := "" + tagstr := "" + + // sort by key to obtain stable output order + keys := make([]string, 0, len(tags)) + for key := range tags { + keys = append(keys, key) + } + sort.Strings(keys) + + // serialize tags + for _, k := range keys { + tagstr += "," + tagstr += fmt.Sprintf("%s=%s", escapeSpecialChars(k), escapeSpecialChars(tags[k])) + } + + // sort by key to obtain stable output order + keys = make([]string, 0, len(fieldSet)) + for key := range fieldSet { + keys = append(keys, key) + } + sort.Strings(keys) + + // serialize fields + first := true + for _, k := range keys { + if !first { + out += "," + } else { + first = false + } + out += fmt.Sprintf("%s=%s", escapeSpecialChars(k), fieldSet[k]) + } + if out == "" { + return "" + } + + // construct line protocol string + return fmt.Sprintf("%s,host=%s%s %s %d\n", prefix, a.host, + tagstr, out, uint64(time.Now().UnixNano())) +} + +// Encode writes the line protocol representation for a given measurement +// name, data struct and tag map to the io.Writer specified on encoder creation. +func (a *Encoder) Encode(prefix string, val interface{}, + tags map[string]string) error { + fieldSet := make(map[string]string) + fieldSet, err := recordFields(val, fieldSet) + if err != nil { + return err + } + _, err = a.Writer.Write([]byte(a.formatLineProtocol(prefix, tags, fieldSet))) + return err +} + +// EncodeMap writes the line protocol representation for a given measurement +// name, field value map and tag map to the io.Writer specified on encoder +// creation. +func (a *Encoder) EncodeMap(prefix string, val map[string]string, + tags map[string]string) error { + _, err := a.Writer.Write([]byte(a.formatLineProtocol(prefix, tags, val))) + return err +} + +// NewEncoder creates a new encoder that writes to the given io.Writer. +func NewEncoder(w io.Writer) *Encoder { + a := &Encoder{ + host: fqdn.Get(), + Writer: w, + } + return a +} diff --git a/encoder_test.go b/encoder_test.go new file mode 100644 index 0000000..d4824a4 --- /dev/null +++ b/encoder_test.go @@ -0,0 +1,146 @@ +package fluxline + +// DCSO fluxline +// Copyright (c) 2017, 2018, DCSO GmbH + +import ( + "bytes" + "io" + "math" + "regexp" + "strings" + "testing" + "time" +) + +var testStruct = struct { + TestVal uint64 `influx:"testval"` + TestVal2 uint64 `influx:"testvalue"` + TestVal3 int64 `influx:"testvalue2"` + TestVal4 string `influx:"testvalue3"` + TestDate time.Time `influx:"testvaluetime"` + TestBool bool `influx:"testvaluebool"` + TestFloat float64 `influx:"testvalueflt64"` + TestFloat32 float32 `influx:"testvalueflt32"` +}{ + TestVal: 1, + TestVal2: 2, + TestVal3: -3, + TestVal4: `foobar"baz`, + TestDate: time.Now(), + TestFloat32: math.Pi, + TestFloat: 1.29e-24, +} + +var testStructInvalidType = struct { + TestVal uint64 `influx:"testval"` + TestVal2 uint64 `influx:"testvalue"` + TestVal3 int64 `influx:"testvalue2"` + Foo io.Writer `influx:"testinval"` +}{ + TestVal: 1, + TestVal2: 2, + TestVal3: -3, +} + +var testStructStringLong = struct { + TestStr string `influx:"testval"` +}{ + TestStr: strings.Repeat("#", 70000), +} + +var testStructPartUntagged = struct { + TestVal uint64 `influx:"testval"` + TestVal2 uint64 `influx:"testvalue"` + TestVal3 int64 +}{ + TestVal: 1, + TestVal2: 2, + TestVal3: -3, +} + +var testStructAllUntagged = struct { + TestVal uint64 + TestVal2 uint64 + TestVal3 int64 +}{ + TestVal: 1, + TestVal2: 2, + TestVal3: -3, +} + +func TestEncoderEncoder(t *testing.T) { + var b bytes.Buffer + + ile := NewEncoder(&b) + tags := make(map[string]string) + tags["foo"] = "bar" + tags["baaz gogo"] = "gu,gu" + err := ile.Encode("mytool", testStruct, tags) + if err != nil { + t.Fatal(err) + } + + out := b.String() + if len(out) == 0 { + t.Fatalf("unexpected result length: %d == 0", len(out)) + } + + if match, _ := regexp.Match(`^mytool,host=[^,]+,baaz\\ gogo=gu\\,gu,foo=bar testval=1i,testvalue=2i,testvalue2=-3i,testvalue3=\"foobar\\\"baz\",testvaluebool=false,testvalueflt32=3.1415927,testvalueflt64=1.29e-24,testvaluetime=`, []byte(out)); !match { + t.Fatalf("unexpected match content: %s", out) + } +} + +func TestEncoderTypeFail(t *testing.T) { + var b bytes.Buffer + + ile := NewEncoder(&b) + tags := make(map[string]string) + err := ile.Encode("mytool", testStructInvalidType, tags) + if err == nil { + t.Fatal(err) + } +} + +func TestEncoderStringTooLongFail(t *testing.T) { + var b bytes.Buffer + + ile := NewEncoder(&b) + tags := make(map[string]string) + err := ile.Encode("mytool", testStructStringLong, tags) + if err == nil { + t.Fatal(err) + } +} + +func TestEncoderPartUntagged(t *testing.T) { + var b bytes.Buffer + + ile := NewEncoder(&b) + tags := make(map[string]string) + err := ile.Encode("mytool", testStructPartUntagged, tags) + if err != nil { + t.Fatal(err) + } + + out := b.String() + if match, _ := regexp.Match(`^mytool,host=[^,]+ testval=1i,testvalue=2i`, []byte(out)); !match { + t.Fatalf("unexpected match content: %s", out) + } +} + +func TestEncoderAllUntagged(t *testing.T) { + var b bytes.Buffer + + ile := NewEncoder(&b) + tags := make(map[string]string) + err := ile.Encode("mytool", testStructAllUntagged, tags) + if err != nil { + t.Fatal(err) + } + + out := b.String() + if len(out) != 0 { + t.Fatalf("unexpected result length: %d != 0", len(out)) + } +}