/*
* Copyright (c) 2019 by Farsight Security, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dnstap
import (
"net"
"sync"
"time"
framestream "github.com/farsightsec/golang-framestream"
)
// A SocketWriter writes data to a Frame Streams TCP or Unix domain socket,
// establishing or restarting the connection if needed.
type socketWriter struct {
w Writer
c net.Conn
addr net.Addr
opt SocketWriterOptions
}
// SocketWriterOptions provides configuration options for a SocketWriter
type SocketWriterOptions struct {
// Timeout gives the time the SocketWriter will wait for reads and
// writes to complete.
Timeout time.Duration
// FlushTimeout is the maximum duration data will be buffered while
// being written to the socket.
FlushTimeout time.Duration
// RetryInterval is how long the SocketWriter will wait between
// connection attempts.
RetryInterval time.Duration
// Dialer is the dialer used to establish the connection. If nil,
// SocketWriter will use a default dialer with a 30 second timeout.
Dialer *net.Dialer
// Logger provides the logger for connection establishment, reconnection,
// and error events of the SocketWriter.
Logger Logger
}
type flushWriter struct {
m sync.Mutex
w *framestream.Writer
d time.Duration
timer *time.Timer
timerActive bool
lastFlushed time.Time
stopped bool
}
type flusherConn struct {
net.Conn
lastWritten *time.Time
}
func (c *flusherConn) Write(p []byte) (int, error) {
n, err := c.Conn.Write(p)
*c.lastWritten = time.Now()
return n, err
}
func newFlushWriter(c net.Conn, d time.Duration) (*flushWriter, error) {
var err error
fw := &flushWriter{timer: time.NewTimer(d), d: d}
if !fw.timer.Stop() {
<-fw.timer.C
}
fc := &flusherConn{
Conn: c,
lastWritten: &fw.lastFlushed,
}
fw.w, err = framestream.NewWriter(fc,
&framestream.WriterOptions{
ContentTypes: [][]byte{FSContentType},
Bidirectional: true,
Timeout: d,
})
if err != nil {
return nil, err
}
go fw.runFlusher()
return fw, nil
}
func (fw *flushWriter) runFlusher() {
for range fw.timer.C {
fw.m.Lock()
if fw.stopped {
fw.m.Unlock()
return
}
last := fw.lastFlushed
elapsed := time.Since(last)
if elapsed < fw.d {
fw.timer.Reset(fw.d - elapsed)
fw.m.Unlock()
continue
}
fw.w.Flush()
fw.timerActive = false
fw.m.Unlock()
}
}
func (fw *flushWriter) WriteFrame(p []byte) (int, error) {
fw.m.Lock()
n, err := fw.w.WriteFrame(p)
if !fw.timerActive {
fw.timer.Reset(fw.d)
fw.timerActive = true
}
fw.m.Unlock()
return n, err
}
func (fw *flushWriter) Close() error {
fw.m.Lock()
fw.stopped = true
fw.timer.Reset(0)
err := fw.w.Close()
fw.m.Unlock()
return err
}
// NewSocketWriter creates a SocketWriter which writes data to a connection
// to the given addr. The SocketWriter maintains and re-establishes the
// connection to this address as needed.
func NewSocketWriter(addr net.Addr, opt *SocketWriterOptions) Writer {
if opt == nil {
opt = &SocketWriterOptions{}
}
if opt.Logger == nil {
opt.Logger = &nullLogger{}
}
return &socketWriter{addr: addr, opt: *opt}
}
func (sw *socketWriter) openWriter() error {
var err error
sw.c, err = sw.opt.Dialer.Dial(sw.addr.Network(), sw.addr.String())
if err != nil {
return err
}
wopt := WriterOptions{
Bidirectional: true,
Timeout: sw.opt.Timeout,
}
if sw.opt.FlushTimeout == 0 {
sw.w, err = NewWriter(sw.c, &wopt)
} else {
sw.w, err = newFlushWriter(sw.c, sw.opt.FlushTimeout)
}
if err != nil {
sw.c.Close()
return err
}
return nil
}
// Close shuts down the SocketWriter, closing any open connection.
func (sw *socketWriter) Close() error {
var err error
if sw.w != nil {
err = sw.w.Close()
if err == nil {
return sw.c.Close()
}
sw.c.Close()
return err
}
if sw.c != nil {
return sw.c.Close()
}
return nil
}
// Write writes the data in p as a Dnstap frame to a connection to the
// SocketWriter's address. Write may block indefinitely while the SocketWriter
// attempts to establish or re-establish the connection and FrameStream session.
func (sw *socketWriter) WriteFrame(p []byte) (int, error) {
for ; ; time.Sleep(sw.opt.RetryInterval) {
if sw.w == nil {
if err := sw.openWriter(); err != nil {
sw.opt.Logger.Printf("%s: open failed: %v", sw.addr, err)
continue
}
}
n, err := sw.w.WriteFrame(p)
if err != nil {
sw.opt.Logger.Printf("%s: write failed: %v", sw.addr, err)
sw.Close()
continue
}
return n, nil
}
}