Codebase list golang-github-influxdata-line-protocol / cme/main encoder.go
cme/main

Tree @cme/main (Download .tar.gz)

encoder.go @cme/mainraw · history · blame

package protocol

import (
	"fmt"
	"io"
	"math"
	"sort"
	"strconv"
)

// Encoder marshals Metrics into influxdb line protocol.
// It is not safe for concurrent use, make a new one!
// The default behavior when encountering a field error is to ignore the field and move on.
// If you wish it to error out on field errors, use Encoder.FailOnFieldErr(true)
type Encoder struct {
	w                io.Writer
	fieldSortOrder   FieldSortOrder
	fieldTypeSupport FieldTypeSupport
	failOnFieldError bool
	maxLineBytes     int
	fieldList        []*Field
	header           []byte
	footer           []byte
	pair             []byte
}

// SetMaxLineBytes sets a maximum length for a line, Encode will error if the generated line is longer
func (e *Encoder) SetMaxLineBytes(i int) {
	e.maxLineBytes = i
}

// SetFieldSortOrder sets a sort order for the data.
// The options are:
// NoSortFields (doesn't sort the fields)
// SortFields (sorts the keys in alphabetical order)
func (e *Encoder) SetFieldSortOrder(s FieldSortOrder) {
	e.fieldSortOrder = s
}

// SetFieldTypeSupport sets flags for if the encoder supports certain optional field types such as uint64
func (e *Encoder) SetFieldTypeSupport(s FieldTypeSupport) {
	e.fieldTypeSupport = s
}

// FailOnFieldErr whether or not to fail on a field error or just move on.
// The default behavior to move on
func (e *Encoder) FailOnFieldErr(s bool) {
	e.failOnFieldError = s
}

// NewEncoder gives us an encoder that marshals to a writer in influxdb line protocol
// as defined by:
// https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/
func NewEncoder(w io.Writer) *Encoder {
	return &Encoder{
		w:         w,
		header:    make([]byte, 0, 128),
		footer:    make([]byte, 0, 128),
		pair:      make([]byte, 0, 128),
		fieldList: make([]*Field, 0, 16),
	}
}

// This is here to significantly reduce allocations, wish that we had constant/immutable keyword that applied to
// more complex objects
var comma = []byte(",")

// Encode marshals a Metric to the io.Writer in the Encoder
func (e *Encoder) Encode(m Metric) (int, error) {
	err := e.buildHeader(m)
	if err != nil {
		return 0, err
	}

	e.buildFooter(m)

	// here we make a copy of the fields so we can do an in-place sort
	e.fieldList = append(e.fieldList[:0], m.FieldList()...)

	if e.fieldSortOrder == SortFields {
		sort.Slice(e.fieldList, func(i, j int) bool {
			return e.fieldList[i].Key < e.fieldList[j].Key
		})
	}
	i := 0
	totalWritten := 0
	pairsLen := 0
	firstField := true
	for _, field := range e.fieldList {
		err = e.buildFieldPair(field.Key, field.Value)
		if err != nil {
			if e.failOnFieldError {
				return 0, err
			}
			continue
		}

		bytesNeeded := len(e.header) + pairsLen + len(e.pair) + len(e.footer)

		// Additional length needed for field separator `,`
		if !firstField {
			bytesNeeded++
		}

		if e.maxLineBytes > 0 && bytesNeeded > e.maxLineBytes {
			// Need at least one field per line
			if firstField {
				return 0, ErrNeedMoreSpace
			}

			i, err = e.w.Write(e.footer)
			if err != nil {
				return 0, err
			}
			totalWritten += i

			bytesNeeded = len(e.header) + len(e.pair) + len(e.footer)

			if e.maxLineBytes > 0 && bytesNeeded > e.maxLineBytes {
				return 0, ErrNeedMoreSpace
			}

			i, err = e.w.Write(e.header)
			if err != nil {
				return 0, err
			}
			totalWritten += i

			i, err = e.w.Write(e.pair)
			if err != nil {
				return 0, err
			}
			totalWritten += i

			pairsLen += len(e.pair)
			firstField = false
			continue
		}

		if firstField {
			i, err = e.w.Write(e.header)
			if err != nil {
				return 0, err
			}
			totalWritten += i

		} else {
			i, err = e.w.Write(comma)
			if err != nil {
				return 0, err
			}
			totalWritten += i

		}

		e.w.Write(e.pair)

		pairsLen += len(e.pair)
		firstField = false
	}

	if firstField {
		return 0, ErrNoFields
	}
	i, err = e.w.Write(e.footer)
	if err != nil {
		return 0, err
	}
	totalWritten += i
	return totalWritten, nil

}

func (e *Encoder) buildHeader(m Metric) error {
	e.header = e.header[:0]
	name := nameEscape(m.Name())
	if name == "" {
		return ErrInvalidName
	}
	e.header = append(e.header, name...)

	for _, tag := range m.TagList() {
		key := escape(tag.Key)
		value := escape(tag.Value)

		// Some keys and values are not encodeable as line protocol, such as
		// those with a trailing '\' or empty strings.
		if key == "" || value == "" {
			continue
		}

		e.header = append(e.header, ',')
		e.header = append(e.header, key...)
		e.header = append(e.header, '=')
		e.header = append(e.header, value...)
	}

	e.header = append(e.header, ' ')
	return nil
}

func (e *Encoder) buildFieldPair(key string, value interface{}) error {
	e.pair = e.pair[:0]
	key = escape(key)
	// Some keys are not encodeable as line protocol, such as those with a
	// trailing '\' or empty strings.
	if key == "" {
		return &FieldError{"invalid field key"}
	}
	e.pair = append(e.pair, key...)
	e.pair = append(e.pair, '=')
	switch v := value.(type) {
	case uint64:
		if e.fieldTypeSupport&UintSupport != 0 {
			e.pair = append(strconv.AppendUint(e.pair, v, 10), 'u')
		} else if v <= uint64(math.MaxInt64) {
			e.pair = append(strconv.AppendInt(e.pair, int64(v), 10), 'i')
		} else {
			e.pair = append(strconv.AppendInt(e.pair, math.MaxInt64, 10), 'i')
		}
	case int64:
		e.pair = append(strconv.AppendInt(e.pair, v, 10), 'i')
	case int:
		e.pair = append(strconv.AppendInt(e.pair, int64(v), 10), 'i')
	case float64:
		if math.IsNaN(v) {
			return &FieldError{"is NaN"}
		}

		if math.IsInf(v, 0) {
			return &FieldError{"is Inf"}
		}

		e.pair = strconv.AppendFloat(e.pair, v, 'f', -1, 64)
	case float32:
		v32 := float64(v)
		if math.IsNaN(v32) {
			return &FieldError{"is NaN"}
		}

		if math.IsInf(v32, 0) {
			return &FieldError{"is Inf"}
		}

		e.pair = strconv.AppendFloat(e.pair, v32, 'f', -1, 64)

	case string:
		e.pair = append(e.pair, '"')
		e.pair = append(e.pair, stringFieldEscape(v)...)
		e.pair = append(e.pair, '"')
	case bool:
		e.pair = strconv.AppendBool(e.pair, v)
	default:
		return &FieldError{fmt.Sprintf("invalid value type: %T", v)}
	}
	return nil
}

func (e *Encoder) buildFooter(m Metric) {
	e.footer = e.footer[:0]
	e.footer = append(e.footer, ' ')
	e.footer = strconv.AppendInt(e.footer, m.Time().UnixNano(), 10)
	e.footer = append(e.footer, '\n')
}