/*
* Copyright (c) 2017 by Farsight Security, Inc.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package nmsg
import (
"io"
"sync"
"time"
)
// An Output encapsulates NmsgPayloads in Nmsg containers and writes them to
// an io.Writer.
type Output interface {
// Send sends an Nmsg Payload along the output. Implementations
// may queue the payload for later sending, so the caller should
// not modify the payload after calling Send.
//
// Send may be safely called from multiple goroutines.
Send(*NmsgPayload) error
// SetSequenced controls whether the Nmsg containers generated by
// the Output have sequence numbers.
SetSequenced(bool)
// SetCompression controls whether the Output generates compressed
// containers or uncompressed.
SetCompression(bool)
// SetCompressionRatio sets the anticipated compression ratio for
// compressed containers.
SetCompressionRatio(float32)
// SetMaxSize sets the maximum size of a container the Output will
// buffer, and the maximum size of a container or fragment the Output
// will write. For Ethernet, consider using nmsg.EtherContainerSize.
SetMaxSize(size int, writeSize int)
// Close shuts down the output, flushing any queued payloads.
// It will not close the underlying io.Writer.
Close() error
}
type output struct {
w io.Writer
*Container
}
func (o *output) Send(p *NmsgPayload) error {
c := o.Container
c.AddPayload(p)
_, err := c.WriteTo(o.w)
return err
}
func (o *output) Close() error {
return nil
}
// UnbufferedOutput returns an Output which writes an Nmsg container for
// each payload.
func UnbufferedOutput(w io.Writer) Output {
return &output{w: w, Container: NewContainer()}
}
type bufferedOutput struct {
output
mu sync.Mutex
}
func (o *bufferedOutput) Send(p *NmsgPayload) error {
o.mu.Lock()
defer o.mu.Unlock()
var ok, full bool
for !ok {
ok, full = o.AddPayload(p)
if !full {
return nil
}
_, err := o.WriteTo(o.w)
if err != nil {
return err
}
}
return nil
}
func (o *bufferedOutput) Close() error {
o.mu.Lock()
defer o.mu.Unlock()
if len(o.Nmsg.Payloads) > 0 {
_, err := o.WriteTo(o.w)
return err
}
return nil
}
// BufferedOutput creates an Output which collects NmsgPayloads and sends
// them in containers as close as possible to the size set by SetMaxSize()
func BufferedOutput(w io.Writer) Output {
o := new(bufferedOutput)
o.output = output{w: w, Container: NewContainer()}
return o
}
type timedBufferedOutput struct {
bufferedOutput
timer *time.Timer
d time.Duration
err error
}
func (t *timedBufferedOutput) Send(p *NmsgPayload) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.err != nil {
return t.err
}
// We are sending the first payload on a new or recently-flushed
// output. Reset or restart flush timer.
if len(t.Nmsg.Payloads) == 0 && !t.timer.Reset(t.d) {
t.timer = time.AfterFunc(t.d, t.flush)
}
var ok, full bool
for !ok {
ok, full = t.AddPayload(p)
if !full {
break
}
t.timer.Reset(t.d)
_, err := t.WriteTo(t.w)
if err != nil {
t.err = err
return err
}
}
return nil
}
func (t *timedBufferedOutput) Close() error {
t.timer.Stop()
return t.bufferedOutput.Close()
}
func (t *timedBufferedOutput) flush() {
t.mu.Lock()
defer t.mu.Unlock()
if len(t.Nmsg.Payloads) > 0 {
_, t.err = t.WriteTo(t.w)
}
}
// TimedBufferedOutput creates an Output which collects NmsgPayloads
// and sends them in containers as close as possible to the size provided to
// SetMaxSize or after the given Duration, whichever comes first.
func TimedBufferedOutput(w io.Writer, d time.Duration) Output {
t := &timedBufferedOutput{d: d}
t.bufferedOutput.output = output{w: w, Container: NewContainer()}
t.timer = time.AfterFunc(d, t.flush)
return t
}