Codebase list golang-github-farsightsec-go-nmsg / run/53f3c784-7b2f-4ceb-94e3-ce341903aa2a/main input.go
run/53f3c784-7b2f-4ceb-94e3-ce341903aa2a/main

Tree @run/53f3c784-7b2f-4ceb-94e3-ce341903aa2a/main (Download .tar.gz)

input.go @run/53f3c784-7b2f-4ceb-94e3-ce341903aa2a/mainraw · history · blame

/*
 * Copyright (c) 2017,2018 by Farsight Security, Inc.
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */

package nmsg

import (
	"bufio"
	"fmt"
	"io"
	"time"
)

// An Input is a source of NMSG Payloads.
type Input interface {
	// Recv() returns the next Nmsg Payload from the input,
	// blocking if none is available.
	Recv() (*NmsgPayload, error)
	// Stats() returns interface statistics
	Stats() *InputStatistics
}

// InputStatistics holds useful metrics for input performance.
type InputStatistics struct {
	// Count of total container received, including fragments
	InputContainers uint64
	// Count of total bytes received and processed
	InputBytes uint64
	// Count of containers marked lost by sequence tracking
	LostContainers uint64
	// Count of fragment containers received
	InputFragments uint64
	// Count of fragments expired from cache
	ExpiredFragments uint64
	// Count of containers dropped due to incomplete fragments
	PartialContainers uint64
}

type dataError struct{ error }

func (d *dataError) Error() string { return d.error.Error() }

// IsDataError returns true of the supplied error is an error unpacking
// or decoding the NMSG data rather than an I/O error with the input.
func IsDataError(err error) bool {
	_, ok := err.(*dataError)
	return ok
}

type input struct {
	r      io.Reader
	n      Nmsg
	fcache *fragCache
	scache *seqCache
	stats  InputStatistics
}

func (i *input) Stats() *InputStatistics {
	res := &InputStatistics{}
	*res = i.stats
	return res
}

// NewInput constructs an input from the supplied Reader.
// The size parameter sizes the input buffer, and should
// be greater than the maximum anticipated container size
// for datagram inputs.
func NewInput(r io.Reader, size int) Input {
	return &input{
		r:      bufio.NewReaderSize(r, size),
		n:      Nmsg{},
		fcache: newFragmentCache(2 * time.Minute),
		scache: newSequenceCache(2 * time.Minute),
	}
}

type checksumError struct {
	calc, wire uint32
}

func (c *checksumError) Error() string {
	return fmt.Sprintf("checksum mismatch: %x != %x", c.calc, c.wire)
}

func (i *input) Recv() (*NmsgPayload, error) {
	for len(i.n.Payloads) == 0 {
		var c Container
		n, err := c.ReadFrom(i.r)
		if err != nil {
			return nil, err
		}
		if n == 0 {
			return nil, io.EOF
		}

		i.stats.InputBytes += uint64(n)

		if c.NmsgFragment != nil {
			i.stats.InputFragments++
			var b []byte
			if b = i.fcache.Insert(c.NmsgFragment); b == nil {
				continue
			}
			err = c.fromNmsgBytes(b, c.isCompressed, false)
			if err != nil {
				return nil, &dataError{err}
			}
		}

		i.stats.InputContainers++
		i.stats.LostContainers += uint64(i.scache.Update(&c.Nmsg))
		i.scache.Expire()
		i.n = c.Nmsg
	}
	ccount, fcount := i.fcache.Expire()
	i.stats.PartialContainers += uint64(ccount)
	i.stats.ExpiredFragments += uint64(fcount)
	p := i.n.Payloads[0]
	i.n.Payloads = i.n.Payloads[1:]

	var err error
	if len(i.n.PayloadCrcs) > 0 {
		wire := i.n.PayloadCrcs[0]
		calc := nmsgCRC(p.Payload)
		if wire != calc {
			err = &dataError{&checksumError{calc, wire}}
		}
		i.n.PayloadCrcs = i.n.PayloadCrcs[1:]
	}

	return p, err
}