Codebase list golang-github-farsightsec-go-nmsg / upstream/0.2.0 output.go
upstream/0.2.0

Tree @upstream/0.2.0 (Download .tar.gz)

output.go @upstream/0.2.0raw · history · blame

/*
 * Copyright (c) 2017 by Farsight Security, Inc.
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */

package nmsg

import (
	"io"
	"sync"
	"time"
)

// An Output encapsulates NmsgPayloads in Nmsg containers and writes them to
// an io.Writer.
type Output interface {
	// Send sends an Nmsg Payload along the output. Implementations
	// may queue the payload for later sending, so the caller should
	// not modify the payload after calling Send.
	//
	// Send may be safely called from multiple goroutines.
	Send(*NmsgPayload) error
	// SetSequenced controls whether the Nmsg containers generated by
	// the Output have sequence numbers.
	SetSequenced(bool)
	// SetCompression controls whether the Output generates compressed
	// containers or uncompressed.
	SetCompression(bool)
	// SetCompressionRatio sets the anticipated compression ratio for
	// compressed containers.
	SetCompressionRatio(float32)
	// SetMaxSize sets the maximum size of a container the Output will
	// buffer, and the maximum size of a container or fragment the Output
	// will write. For Ethernet, consider using nmsg.EtherContainerSize.
	SetMaxSize(size int, writeSize int)
	// Close shuts down the output, flushing any queued payloads.
	// It will not close the underlying io.Writer.
	Close() error
}

type output struct {
	w io.Writer
	*Container
}

func (o *output) Send(p *NmsgPayload) error {
	c := o.Container
	c.AddPayload(p)
	_, err := c.WriteTo(o.w)
	return err
}

func (o *output) Close() error {
	return nil
}

// UnbufferedOutput returns an Output which writes an Nmsg container for
// each payload.
func UnbufferedOutput(w io.Writer) Output {
	return &output{w: w, Container: NewContainer()}
}

type bufferedOutput struct {
	output
	mu sync.Mutex
}

func (o *bufferedOutput) Send(p *NmsgPayload) error {
	o.mu.Lock()
	defer o.mu.Unlock()
	var ok, full bool
	for !ok {
		ok, full = o.AddPayload(p)
		if !full {
			return nil
		}

		_, err := o.WriteTo(o.w)
		if err != nil {
			return err
		}
	}
	return nil
}

func (o *bufferedOutput) Close() error {
	o.mu.Lock()
	defer o.mu.Unlock()
	if len(o.Nmsg.Payloads) > 0 {
		_, err := o.WriteTo(o.w)
		return err
	}
	return nil
}

// BufferedOutput creates an Output which collects NmsgPayloads and sends
// them in containers as close as possible to the size set by SetMaxSize()
func BufferedOutput(w io.Writer) Output {
	o := new(bufferedOutput)
	o.output = output{w: w, Container: NewContainer()}
	return o
}

type timedBufferedOutput struct {
	bufferedOutput
	timer *time.Timer
	d     time.Duration
	err   error
}

func (t *timedBufferedOutput) Send(p *NmsgPayload) error {
	t.mu.Lock()
	defer t.mu.Unlock()

	if t.err != nil {
		return t.err
	}

	// We are sending the first payload on a new or recently-flushed
	// output. Reset or restart flush timer.
	if len(t.Nmsg.Payloads) == 0 && !t.timer.Reset(t.d) {
		t.timer = time.AfterFunc(t.d, t.flush)
	}

	var ok, full bool
	for !ok {
		ok, full = t.AddPayload(p)
		if !full {
			break
		}

		t.timer.Reset(t.d)

		_, err := t.WriteTo(t.w)
		if err != nil {
			t.err = err
			return err
		}
	}
	return nil
}

func (t *timedBufferedOutput) Close() error {
	t.timer.Stop()
	return t.bufferedOutput.Close()
}

func (t *timedBufferedOutput) flush() {
	t.mu.Lock()
	defer t.mu.Unlock()
	if len(t.Nmsg.Payloads) > 0 {
		_, t.err = t.WriteTo(t.w)
	}
}

// TimedBufferedOutput creates an Output which collects NmsgPayloads
// and sends them in containers as close as possible to the size provided to
// SetMaxSize or after the given Duration, whichever comes first.
func TimedBufferedOutput(w io.Writer, d time.Duration) Output {
	t := &timedBufferedOutput{d: d}
	t.bufferedOutput.output = output{w: w, Container: NewContainer()}
	t.timer = time.AfterFunc(d, t.flush)

	return t
}