Codebase list golang-github-dnstap-golang-dnstap / upstream/0.2.0 FrameStreamSockOutput.go
upstream/0.2.0

Tree @upstream/0.2.0 (Download .tar.gz)

FrameStreamSockOutput.go @upstream/0.2.0raw · history · blame

/*
 * Copyright (c) 2019 by Farsight Security, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package dnstap

import (
	"log"
	"net"
	"time"

	"github.com/farsightsec/golang-framestream"
)

// A FrameStreamSockOutput manages a socket connection and sends dnstap
// data over a framestream connection on that socket.
type FrameStreamSockOutput struct {
	outputChannel chan []byte
	address       net.Addr
	wait          chan bool
	dialer        *net.Dialer
	timeout       time.Duration
	retry         time.Duration
	flushTimeout  time.Duration
}

// NewFrameStreamSockOutput creates a FrameStreamSockOutput manaaging a
// connection to the given address.
func NewFrameStreamSockOutput(address net.Addr) (*FrameStreamSockOutput, error) {
	return &FrameStreamSockOutput{
		outputChannel: make(chan []byte, outputChannelSize),
		address:       address,
		wait:          make(chan bool),
		retry:         10 * time.Second,
		flushTimeout:  5 * time.Second,
		dialer: &net.Dialer{
			Timeout: 30 * time.Second,
		},
	}, nil
}

// SetTimeout sets the write timeout for data and control messages and the
// read timeout for handshake responses on the FrameStreamSockOutput's
// connection. The default timeout is zero, for no timeout.
func (o *FrameStreamSockOutput) SetTimeout(timeout time.Duration) {
	o.timeout = timeout
}

// SetFlushTimeout sets the maximum time data will be kept in the output
// buffer.
//
// The default flush timeout is five seconds.
func (o *FrameStreamSockOutput) SetFlushTimeout(timeout time.Duration) {
	o.flushTimeout = timeout
}

// SetRetryInterval specifies how long the FrameStreamSockOutput will wait
// before re-establishing a failed connection. The default retry interval
// is 10 seconds.
func (o *FrameStreamSockOutput) SetRetryInterval(retry time.Duration) {
	o.retry = retry
}

// SetDialer replaces the default net.Dialer for re-establishing the
// the FrameStreamSockOutput connection. This can be used to set the
// timeout for connection establishment and enable keepalives
// new connections.
//
// FrameStreamSockOutput uses a default dialer with a 30 second
// timeout.
func (o *FrameStreamSockOutput) SetDialer(dialer *net.Dialer) {
	o.dialer = dialer
}

// GetOutputChannel returns the channel on which the
// FrameStreamSockOutput accepts data.
//
// GetOutputChannel satisifes the dnstap Output interface.
func (o *FrameStreamSockOutput) GetOutputChannel() chan []byte {
	return o.outputChannel
}

// A timedConn resets an associated timer on each Write to the underlying
// connection, and is used to implement the output's flush timeout.
type timedConn struct {
	net.Conn
	timer   *time.Timer
	timeout time.Duration

	// idle is true if the timer has fired and we have consumed
	// the time from its channel. We use this to prevent deadlocking
	// when resetting or stopping an already fired timer.
	idle bool
}

// SetIdle informs the timedConn that the associated timer is idle, i.e.
// it has fired and has not been reset.
func (t *timedConn) SetIdle() {
	t.idle = true
}

// Stop stops the underlying timer, consuming any time value if the timer
// had fired before Stop was called.
func (t *timedConn) StopTimer() {
	if !t.timer.Stop() && !t.idle {
		<-t.timer.C
	}
	t.idle = true
}

func (t *timedConn) Write(b []byte) (int, error) {
	t.StopTimer()
	t.timer.Reset(t.timeout)
	t.idle = false
	return t.Conn.Write(b)
}

func (t *timedConn) Close() error {
	t.StopTimer()
	return t.Conn.Close()
}

// RunOutputLoop reads data from the output channel and sends it over
// a connections to the FrameStreamSockOutput's address, establishing
// the connection as needed.
//
// RunOutputLoop satisifes the dnstap Output interface.
func (o *FrameStreamSockOutput) RunOutputLoop() {
	var enc *framestream.Encoder
	var err error

	// Start with the connection flush timer in a stopped state.
	// It will be reset by the first Write call on a new connection.
	conn := &timedConn{
		timer:   time.NewTimer(0),
		timeout: o.flushTimeout,
	}
	conn.StopTimer()

	defer func() {
		if enc != nil {
			enc.Flush()
			enc.Close()
		}
		if conn != nil {
			conn.Close()
		}
		close(o.wait)
	}()

	for {
		select {
		case frame, ok := <-o.outputChannel:
			if !ok {
				return
			}

			// the retry loop
			for ;; time.Sleep(o.retry) {
				if enc == nil {
					// connect the socket
					conn.Conn, err = o.dialer.Dial(o.address.Network(), o.address.String())
					if err != nil {
						log.Printf("Dial() failed: %v", err)
						continue // = retry
					}

					// create the encoder
					eopt := framestream.EncoderOptions{
						ContentType:   FSContentType,
						Bidirectional: true,
						Timeout:       o.timeout,
					}
					enc, err = framestream.NewEncoder(conn, &eopt)
					if err != nil {
						log.Printf("framestream.NewEncoder() failed: %v", err)
						conn.Close()
						enc = nil
						continue // = retry
					}
				}

				// try writing
				if _, err = enc.Write(frame); err != nil {
					log.Printf("framestream.Encoder.Write() failed: %v", err)
					enc.Close()
					enc = nil
					conn.Close()
					continue // = retry
				}

				break // success!
			}

		case <-conn.timer.C:
			conn.SetIdle()
			if enc == nil {
				continue
			}
			if err := enc.Flush(); err != nil {
				log.Printf("framestream.Encoder.Flush() failed: %s", err)
				enc.Close()
				enc = nil
				conn.Close()
				time.Sleep(o.retry)
			}
		}
	}
}

// Close shuts down the FrameStreamSockOutput's output channel and returns
// after all pending data has been flushed and the connection has been closed.
//
// Close satisifes the dnstap Output interface
func (o *FrameStreamSockOutput) Close() {
	close(o.outputChannel)
	<-o.wait
}