Codebase list golang-github-dnstap-golang-dnstap / 421847c0-30af-4624-b1a8-3ca74c0706fb/main SocketWriter.go
421847c0-30af-4624-b1a8-3ca74c0706fb/main

Tree @421847c0-30af-4624-b1a8-3ca74c0706fb/main (Download .tar.gz)

SocketWriter.go @421847c0-30af-4624-b1a8-3ca74c0706fb/mainraw · history · blame

/*
 * Copyright (c) 2019 by Farsight Security, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package dnstap

import (
	"net"
	"sync"
	"time"

	framestream "github.com/farsightsec/golang-framestream"
)

// A SocketWriter writes data to a Frame Streams TCP or Unix domain socket,
// establishing or restarting the connection if needed.
type socketWriter struct {
	w    Writer
	c    net.Conn
	addr net.Addr
	opt  SocketWriterOptions
}

// SocketWriterOptions provides configuration options for a SocketWriter
type SocketWriterOptions struct {
	// Timeout gives the time the SocketWriter will wait for reads and
	// writes to complete.
	Timeout time.Duration
	// FlushTimeout is the maximum duration data will be buffered while
	// being written to the socket.
	FlushTimeout time.Duration
	// RetryInterval is how long the SocketWriter will wait between
	// connection attempts.
	RetryInterval time.Duration
	// Dialer is the dialer used to establish the connection. If nil,
	// SocketWriter will use a default dialer with a 30 second timeout.
	Dialer *net.Dialer
	// Logger provides the logger for connection establishment, reconnection,
	// and error events of the SocketWriter.
	Logger Logger
}

type flushWriter struct {
	m           sync.Mutex
	w           *framestream.Writer
	d           time.Duration
	timer       *time.Timer
	timerActive bool
	lastFlushed time.Time
	stopped     bool
}

type flusherConn struct {
	net.Conn
	lastWritten *time.Time
}

func (c *flusherConn) Write(p []byte) (int, error) {
	n, err := c.Conn.Write(p)
	*c.lastWritten = time.Now()
	return n, err
}

func newFlushWriter(c net.Conn, d time.Duration) (*flushWriter, error) {
	var err error
	fw := &flushWriter{timer: time.NewTimer(d), d: d}
	if !fw.timer.Stop() {
		<-fw.timer.C
	}

	fc := &flusherConn{
		Conn:        c,
		lastWritten: &fw.lastFlushed,
	}

	fw.w, err = framestream.NewWriter(fc,
		&framestream.WriterOptions{
			ContentTypes:  [][]byte{FSContentType},
			Bidirectional: true,
			Timeout:       d,
		})
	if err != nil {
		return nil, err
	}
	go fw.runFlusher()
	return fw, nil
}

func (fw *flushWriter) runFlusher() {
	for range fw.timer.C {
		fw.m.Lock()
		if fw.stopped {
			fw.m.Unlock()
			return
		}
		last := fw.lastFlushed
		elapsed := time.Since(last)
		if elapsed < fw.d {
			fw.timer.Reset(fw.d - elapsed)
			fw.m.Unlock()
			continue
		}
		fw.w.Flush()
		fw.timerActive = false
		fw.m.Unlock()
	}
}

func (fw *flushWriter) WriteFrame(p []byte) (int, error) {
	fw.m.Lock()
	n, err := fw.w.WriteFrame(p)
	if !fw.timerActive {
		fw.timer.Reset(fw.d)
		fw.timerActive = true
	}
	fw.m.Unlock()
	return n, err
}

func (fw *flushWriter) Close() error {
	fw.m.Lock()
	fw.stopped = true
	fw.timer.Reset(0)
	err := fw.w.Close()
	fw.m.Unlock()
	return err
}

// NewSocketWriter creates a SocketWriter which writes data to a connection
// to the given addr. The SocketWriter maintains and re-establishes the
// connection to this address as needed.
func NewSocketWriter(addr net.Addr, opt *SocketWriterOptions) Writer {
	if opt == nil {
		opt = &SocketWriterOptions{}
	}

	if opt.Logger == nil {
		opt.Logger = &nullLogger{}
	}
	return &socketWriter{addr: addr, opt: *opt}
}

func (sw *socketWriter) openWriter() error {
	var err error
	sw.c, err = sw.opt.Dialer.Dial(sw.addr.Network(), sw.addr.String())
	if err != nil {
		return err
	}

	wopt := WriterOptions{
		Bidirectional: true,
		Timeout:       sw.opt.Timeout,
	}

	if sw.opt.FlushTimeout == 0 {
		sw.w, err = NewWriter(sw.c, &wopt)
	} else {
		sw.w, err = newFlushWriter(sw.c, sw.opt.FlushTimeout)
	}
	if err != nil {
		sw.c.Close()
		return err
	}
	return nil
}

// Close shuts down the SocketWriter, closing any open connection.
func (sw *socketWriter) Close() error {
	var err error
	if sw.w != nil {
		err = sw.w.Close()
		if err == nil {
			return sw.c.Close()
		}
		sw.c.Close()
		return err
	}
	if sw.c != nil {
		return sw.c.Close()
	}
	return nil
}

// Write writes the data in p as a Dnstap frame to a connection to the
// SocketWriter's address. Write may block indefinitely while the SocketWriter
// attempts to establish or re-establish the connection and FrameStream session.
func (sw *socketWriter) WriteFrame(p []byte) (int, error) {
	for ; ; time.Sleep(sw.opt.RetryInterval) {
		if sw.w == nil {
			if err := sw.openWriter(); err != nil {
				sw.opt.Logger.Printf("%s: open failed: %v", sw.addr, err)
				continue
			}
		}

		n, err := sw.w.WriteFrame(p)
		if err != nil {
			sw.opt.Logger.Printf("%s: write failed: %v", sw.addr, err)
			sw.Close()
			continue
		}

		return n, nil
	}
}