diff --git a/Decoder.go b/Decoder.go new file mode 100644 index 0000000..788f526 --- /dev/null +++ b/Decoder.go @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2019 by Farsight Security, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dnstap + +import ( + framestream "github.com/farsightsec/golang-framestream" + "github.com/golang/protobuf/proto" +) + +// A Decoder reads and parses Dnstap messages from an io.Reader +type Decoder struct { + buf []byte + r Reader +} + +// NewDecoder creates a Decoder using the given dnstap Reader, accepting +// dnstap data frames up to maxSize in size. +func NewDecoder(r Reader, maxSize int) *Decoder { + return &Decoder{ + buf: make([]byte, maxSize), + r: r, + } +} + +// Decode reads and parses a Dnstap message from the Decoder's Reader. +// Decode silently discards data frames larger than the Decoder's configured +// maxSize. +func (d *Decoder) Decode(m *Dnstap) error { + for { + n, err := d.r.ReadFrame(d.buf) + + switch err { + case framestream.ErrDataFrameTooLarge: + continue + case nil: + break + default: + return err + } + + return proto.Unmarshal(d.buf[:n], m) + } +} diff --git a/Encoder.go b/Encoder.go new file mode 100644 index 0000000..66eff40 --- /dev/null +++ b/Encoder.go @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2019 by Farsight Security, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dnstap + +import ( + "github.com/golang/protobuf/proto" +) + +// An Encoder serializes and writes Dnstap messages to an underlying +// dnstap Writer +type Encoder struct { + w Writer +} + +// NewEncoder creates an Encoder using the given dnstap Writer +func NewEncoder(w Writer) *Encoder { + return &Encoder{w} +} + +// Encode serializes and writes the Dnstap message m to the encoder's +// Writer. +func (e *Encoder) Encode(m *Dnstap) error { + b, err := proto.Marshal(m) + if err != nil { + return err + } + + _, err = e.w.WriteFrame(b) + return err +} diff --git a/FrameStreamInput.go b/FrameStreamInput.go index 8403c61..8c2a330 100644 --- a/FrameStreamInput.go +++ b/FrameStreamInput.go @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013-2014 by Farsight Security, Inc. + * Copyright (c) 2013-2019 by Farsight Security, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,11 +18,8 @@ import ( "io" - "log" "os" "time" - - "github.com/farsightsec/golang-framestream" ) // MaxPayloadSize sets the upper limit on input Dnstap payload sizes. If an Input @@ -40,9 +37,9 @@ // A FrameStreamInput reads dnstap data from an io.ReadWriter. type FrameStreamInput struct { - wait chan bool - decoder *framestream.Decoder - timeout time.Duration + wait chan bool + reader Reader + log Logger } // NewFrameStreamInput creates a FrameStreamInput reading data from the given @@ -56,19 +53,20 @@ // given io.ReadWriter with a timeout applied to reading and (for bidirectional // inputs) writing control messages. func NewFrameStreamInputTimeout(r io.ReadWriter, bi bool, timeout time.Duration) (input *FrameStreamInput, err error) { - input = new(FrameStreamInput) - decoderOptions := framestream.DecoderOptions{ - MaxPayloadSize: MaxPayloadSize, - ContentType: FSContentType, - Bidirectional: bi, - Timeout: timeout, + reader, err := NewReader(r, &ReaderOptions{ + Bidirectional: bi, + Timeout: timeout, + }) + + if err != nil { + return nil, err } - input.decoder, err = framestream.NewDecoder(r, &decoderOptions) - if err != nil { - return - } - input.wait = make(chan bool) - return + + return &FrameStreamInput{ + wait: make(chan bool), + reader: reader, + log: nullLogger{}, + }, nil } // NewFrameStreamInputFromFilename creates a FrameStreamInput reading from @@ -78,25 +76,33 @@ if err != nil { return nil, err } - input, err = NewFrameStreamInput(file, false) - return + return NewFrameStreamInput(file, false) +} + +// SetLogger configures a logger for FrameStreamInput read error reporting. +func (input *FrameStreamInput) SetLogger(logger Logger) { + input.log = logger } // ReadInto reads data from the FrameStreamInput into the output channel. // // ReadInto satisfies the dnstap Input interface. func (input *FrameStreamInput) ReadInto(output chan []byte) { + buf := make([]byte, MaxPayloadSize) for { - buf, err := input.decoder.Decode() - if err != nil { - if err != io.EOF { - log.Printf("framestream.Decoder.Decode() failed: %s\n", err) - } - break + n, err := input.reader.ReadFrame(buf) + if err == nil { + newbuf := make([]byte, n) + copy(newbuf, buf) + output <- newbuf + continue } - newbuf := make([]byte, len(buf)) - copy(newbuf, buf) - output <- newbuf + + if err != io.EOF { + input.log.Printf("FrameStreamInput: Read error: %v", err) + } + + break } close(input.wait) } diff --git a/FrameStreamOutput.go b/FrameStreamOutput.go index 5955b8b..c65b382 100644 --- a/FrameStreamOutput.go +++ b/FrameStreamOutput.go @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 by Farsight Security, Inc. + * Copyright (c) 2014,2019 by Farsight Security, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,33 +18,33 @@ import ( "io" - "log" "os" - - "github.com/farsightsec/golang-framestream" ) // FrameStreamOutput implements a dnstap Output to an io.Writer. type FrameStreamOutput struct { outputChannel chan []byte wait chan bool - enc *framestream.Encoder + w Writer + log Logger } // NewFrameStreamOutput creates a FrameStreamOutput writing dnstap data to // the given io.Writer. func NewFrameStreamOutput(w io.Writer) (o *FrameStreamOutput, err error) { - o = new(FrameStreamOutput) - o.outputChannel = make(chan []byte, outputChannelSize) - o.enc, err = framestream.NewEncoder(w, &framestream.EncoderOptions{ContentType: FSContentType}) + ow, err := NewWriter(w, nil) if err != nil { - return + return nil, err } - o.wait = make(chan bool) - return + return &FrameStreamOutput{ + outputChannel: make(chan []byte, outputChannelSize), + wait: make(chan bool), + w: ow, + log: nullLogger{}, + }, nil } -// NewFrameStreamOutputFromFilename creates a file with the namee fname, +// NewFrameStreamOutputFromFilename creates a file with the name fname, // truncates it if it exists, and returns a FrameStreamOutput writing to // the newly created or truncated file. func NewFrameStreamOutputFromFilename(fname string) (o *FrameStreamOutput, err error) { @@ -58,6 +58,12 @@ return NewFrameStreamOutput(w) } +// SetLogger sets an alternate logger for the FrameStreamOutput. The default +// is no logging. +func (o *FrameStreamOutput) SetLogger(logger Logger) { + o.log = logger +} + // GetOutputChannel returns the channel on which the FrameStreamOutput accepts // data. // @@ -69,14 +75,15 @@ // RunOutputLoop processes data received on the channel returned by // GetOutputChannel, returning after the CLose method is called. // If there is an error writing to the Output's writer, RunOutputLoop() -// logs a fatal error exits the program. +// returns, logging an error if a logger is configured with SetLogger() // // RunOutputLoop satisfies the dnstap Output interface. func (o *FrameStreamOutput) RunOutputLoop() { for frame := range o.outputChannel { - if _, err := o.enc.Write(frame); err != nil { - log.Fatalf("framestream.Encoder.Write() failed: %s\n", err) - break + if _, err := o.w.WriteFrame(frame); err != nil { + o.log.Printf("FrameStreamOutput: Write error: %v, returning", err) + close(o.wait) + return } } close(o.wait) @@ -89,6 +96,5 @@ func (o *FrameStreamOutput) Close() { close(o.outputChannel) <-o.wait - o.enc.Flush() - o.enc.Close() + o.w.Close() } diff --git a/FrameStreamSockInput.go b/FrameStreamSockInput.go index 116fa2d..1a9234f 100644 --- a/FrameStreamSockInput.go +++ b/FrameStreamSockInput.go @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013-2014 by Farsight Security, Inc. + * Copyright (c) 2013-2019 by Farsight Security, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ package dnstap import ( - "log" + "fmt" "net" "os" "time" @@ -29,6 +29,7 @@ wait chan bool listener net.Listener timeout time.Duration + log Logger } // NewFrameStreamSockInput creates a FrameStreamSockInput collecting dnstap @@ -36,6 +37,7 @@ func NewFrameStreamSockInput(listener net.Listener) (input *FrameStreamSockInput) { input = new(FrameStreamSockInput) input.listener = listener + input.log = &nullLogger{} return } @@ -43,9 +45,14 @@ // response control messages to clients of the FrameStreamSockInput's listener. // // The timeout is effective only for connections accepted after the call to -// FrameStreamSockInput. +// SetTimeout. func (input *FrameStreamSockInput) SetTimeout(timeout time.Duration) { input.timeout = timeout +} + +// SetLogger configures a logger for the FrameStreamSockInput. +func (input *FrameStreamSockInput) SetLogger(logger Logger) { + input.log = logger } // NewFrameStreamSockInputFromPath creates a unix domain socket at the @@ -69,19 +76,35 @@ // // ReadInto satisfies the dnstap Input interface. func (input *FrameStreamSockInput) ReadInto(output chan []byte) { + var n uint64 for { conn, err := input.listener.Accept() if err != nil { - log.Printf("net.Listener.Accept() failed: %s\n", err) + input.log.Printf("%s: accept failed: %v\n", + input.listener.Addr(), + err) continue + } + n++ + origin := "" + switch conn.RemoteAddr().Network() { + case "tcp", "tcp4", "tcp6": + origin = fmt.Sprintf(" from %s", conn.RemoteAddr()) } i, err := NewFrameStreamInputTimeout(conn, true, input.timeout) if err != nil { - log.Printf("dnstap.NewFrameStreamInput() failed: %s\n", err) + input.log.Printf("%s: connection %d: open input%s failed: %v", + conn.LocalAddr(), n, origin, err) continue } - log.Printf("dnstap.FrameStreamSockInput: accepted a socket connection\n") - go i.ReadInto(output) + input.log.Printf("%s: accepted connection %d%s", + conn.LocalAddr(), n, origin) + i.SetLogger(input.log) + go func(cn uint64) { + i.ReadInto(output) + input.log.Printf("%s: closed connection %d%s", + conn.LocalAddr(), cn, origin) + }(n) } } diff --git a/FrameStreamSockOutput.go b/FrameStreamSockOutput.go index 66d689f..bfe2120 100644 --- a/FrameStreamSockOutput.go +++ b/FrameStreamSockOutput.go @@ -17,36 +17,33 @@ package dnstap import ( - "log" "net" "time" - - "github.com/farsightsec/golang-framestream" ) // A FrameStreamSockOutput manages a socket connection and sends dnstap // data over a framestream connection on that socket. type FrameStreamSockOutput struct { + address net.Addr outputChannel chan []byte - address net.Addr wait chan bool - dialer *net.Dialer - timeout time.Duration - retry time.Duration - flushTimeout time.Duration + wopt SocketWriterOptions } // NewFrameStreamSockOutput creates a FrameStreamSockOutput manaaging a // connection to the given address. func NewFrameStreamSockOutput(address net.Addr) (*FrameStreamSockOutput, error) { return &FrameStreamSockOutput{ + address: address, outputChannel: make(chan []byte, outputChannelSize), - address: address, wait: make(chan bool), - retry: 10 * time.Second, - flushTimeout: 5 * time.Second, - dialer: &net.Dialer{ - Timeout: 30 * time.Second, + wopt: SocketWriterOptions{ + FlushTimeout: 5 * time.Second, + RetryInterval: 10 * time.Second, + Dialer: &net.Dialer{ + Timeout: 30 * time.Second, + }, + Logger: &nullLogger{}, }, }, nil } @@ -55,7 +52,7 @@ // read timeout for handshake responses on the FrameStreamSockOutput's // connection. The default timeout is zero, for no timeout. func (o *FrameStreamSockOutput) SetTimeout(timeout time.Duration) { - o.timeout = timeout + o.wopt.Timeout = timeout } // SetFlushTimeout sets the maximum time data will be kept in the output @@ -63,14 +60,14 @@ // // The default flush timeout is five seconds. func (o *FrameStreamSockOutput) SetFlushTimeout(timeout time.Duration) { - o.flushTimeout = timeout + o.wopt.FlushTimeout = timeout } // SetRetryInterval specifies how long the FrameStreamSockOutput will wait // before re-establishing a failed connection. The default retry interval // is 10 seconds. func (o *FrameStreamSockOutput) SetRetryInterval(retry time.Duration) { - o.retry = retry + o.wopt.RetryInterval = retry } // SetDialer replaces the default net.Dialer for re-establishing the @@ -81,7 +78,13 @@ // FrameStreamSockOutput uses a default dialer with a 30 second // timeout. func (o *FrameStreamSockOutput) SetDialer(dialer *net.Dialer) { - o.dialer = dialer + o.wopt.Dialer = dialer +} + +// SetLogger configures FrameStreamSockOutput to log through the given +// Logger. +func (o *FrameStreamSockOutput) SetLogger(logger Logger) { + o.wopt.Logger = logger } // GetOutputChannel returns the channel on which the @@ -92,132 +95,23 @@ return o.outputChannel } -// A timedConn resets an associated timer on each Write to the underlying -// connection, and is used to implement the output's flush timeout. -type timedConn struct { - net.Conn - timer *time.Timer - timeout time.Duration - - // idle is true if the timer has fired and we have consumed - // the time from its channel. We use this to prevent deadlocking - // when resetting or stopping an already fired timer. - idle bool -} - -// SetIdle informs the timedConn that the associated timer is idle, i.e. -// it has fired and has not been reset. -func (t *timedConn) SetIdle() { - t.idle = true -} - -// Stop stops the underlying timer, consuming any time value if the timer -// had fired before Stop was called. -func (t *timedConn) StopTimer() { - if !t.timer.Stop() && !t.idle { - <-t.timer.C - } - t.idle = true -} - -func (t *timedConn) Write(b []byte) (int, error) { - t.StopTimer() - t.timer.Reset(t.timeout) - t.idle = false - return t.Conn.Write(b) -} - -func (t *timedConn) Close() error { - t.StopTimer() - return t.Conn.Close() -} - // RunOutputLoop reads data from the output channel and sends it over // a connections to the FrameStreamSockOutput's address, establishing // the connection as needed. // // RunOutputLoop satisifes the dnstap Output interface. func (o *FrameStreamSockOutput) RunOutputLoop() { - var enc *framestream.Encoder - var err error + w := NewSocketWriter(o.address, &o.wopt) - // Start with the connection flush timer in a stopped state. - // It will be reset by the first Write call on a new connection. - conn := &timedConn{ - timer: time.NewTimer(0), - timeout: o.flushTimeout, + for b := range o.outputChannel { + // w is of type *SocketWriter, whose Write implementation + // handles all errors by retrying the connection. + w.WriteFrame(b) } - conn.StopTimer() - defer func() { - if enc != nil { - enc.Flush() - enc.Close() - } - if conn != nil { - conn.Close() - } - close(o.wait) - }() - - for { - select { - case frame, ok := <-o.outputChannel: - if !ok { - return - } - - // the retry loop - for ;; time.Sleep(o.retry) { - if enc == nil { - // connect the socket - conn.Conn, err = o.dialer.Dial(o.address.Network(), o.address.String()) - if err != nil { - log.Printf("Dial() failed: %v", err) - continue // = retry - } - - // create the encoder - eopt := framestream.EncoderOptions{ - ContentType: FSContentType, - Bidirectional: true, - Timeout: o.timeout, - } - enc, err = framestream.NewEncoder(conn, &eopt) - if err != nil { - log.Printf("framestream.NewEncoder() failed: %v", err) - conn.Close() - enc = nil - continue // = retry - } - } - - // try writing - if _, err = enc.Write(frame); err != nil { - log.Printf("framestream.Encoder.Write() failed: %v", err) - enc.Close() - enc = nil - conn.Close() - continue // = retry - } - - break // success! - } - - case <-conn.timer.C: - conn.SetIdle() - if enc == nil { - continue - } - if err := enc.Flush(); err != nil { - log.Printf("framestream.Encoder.Flush() failed: %s", err) - enc.Close() - enc = nil - conn.Close() - time.Sleep(o.retry) - } - } - } + w.Close() + close(o.wait) + return } // Close shuts down the FrameStreamSockOutput's output channel and returns diff --git a/Reader.go b/Reader.go new file mode 100644 index 0000000..df45e5c --- /dev/null +++ b/Reader.go @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2019 by Farsight Security, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dnstap + +import ( + "io" + "time" + + framestream "github.com/farsightsec/golang-framestream" +) + +// A Reader is a source of dnstap frames. +type Reader interface { + ReadFrame([]byte) (int, error) +} + +// ReaderOptions specifies configuration for the Reader. +type ReaderOptions struct { + // If Bidirectional is true, the underlying io.Reader must also + // satisfy io.Writer, and the dnstap Reader will use the bidirectional + // Frame Streams protocol. + Bidirectional bool + // Timeout sets the timeout for reading the initial handshake and + // writing response control messages to the underlying Reader. Timeout + // is only effective if the underlying Reader is a net.Conn. + Timeout time.Duration +} + +// NewReader creates a Reader using the given io.Reader and options. +func NewReader(r io.Reader, opt *ReaderOptions) (Reader, error) { + if opt == nil { + opt = &ReaderOptions{} + } + return framestream.NewReader(r, + &framestream.ReaderOptions{ + ContentTypes: [][]byte{FSContentType}, + Timeout: opt.Timeout, + Bidirectional: opt.Bidirectional, + }) +} diff --git a/SocketWriter.go b/SocketWriter.go new file mode 100644 index 0000000..b4119c5 --- /dev/null +++ b/SocketWriter.go @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2019 by Farsight Security, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dnstap + +import ( + "net" + "sync" + "time" + + framestream "github.com/farsightsec/golang-framestream" +) + +// A SocketWriter writes data to a Frame Streams TCP or Unix domain socket, +// establishing or restarting the connection if needed. +type socketWriter struct { + w Writer + c net.Conn + addr net.Addr + opt SocketWriterOptions +} + +// SocketWriterOptions provides configuration options for a SocketWriter +type SocketWriterOptions struct { + // Timeout gives the time the SocketWriter will wait for reads and + // writes to complete. + Timeout time.Duration + // FlushTimeout is the maximum duration data will be buffered while + // being written to the socket. + FlushTimeout time.Duration + // RetryInterval is how long the SocketWriter will wait between + // connection attempts. + RetryInterval time.Duration + // Dialer is the dialer used to establish the connection. If nil, + // SocketWriter will use a default dialer with a 30 second timeout. + Dialer *net.Dialer + // Logger provides the logger for connection establishment, reconnection, + // and error events of the SocketWriter. + Logger Logger +} + +type flushWriter struct { + m sync.Mutex + w *framestream.Writer + d time.Duration + timer *time.Timer + timerActive bool + lastFlushed time.Time + stopped bool +} + +type flusherConn struct { + net.Conn + lastWritten *time.Time +} + +func (c *flusherConn) Write(p []byte) (int, error) { + n, err := c.Conn.Write(p) + *c.lastWritten = time.Now() + return n, err +} + +func newFlushWriter(c net.Conn, d time.Duration) (*flushWriter, error) { + var err error + fw := &flushWriter{timer: time.NewTimer(d), d: d} + if !fw.timer.Stop() { + <-fw.timer.C + } + + fc := &flusherConn{ + Conn: c, + lastWritten: &fw.lastFlushed, + } + + fw.w, err = framestream.NewWriter(fc, + &framestream.WriterOptions{ + ContentTypes: [][]byte{FSContentType}, + Bidirectional: true, + Timeout: d, + }) + if err != nil { + return nil, err + } + go fw.runFlusher() + return fw, nil +} + +func (fw *flushWriter) runFlusher() { + for range fw.timer.C { + fw.m.Lock() + if fw.stopped { + fw.m.Unlock() + return + } + last := fw.lastFlushed + elapsed := time.Since(last) + if elapsed < fw.d { + fw.timer.Reset(fw.d - elapsed) + fw.m.Unlock() + continue + } + fw.w.Flush() + fw.timerActive = false + fw.m.Unlock() + } +} + +func (fw *flushWriter) WriteFrame(p []byte) (int, error) { + fw.m.Lock() + n, err := fw.w.WriteFrame(p) + if !fw.timerActive { + fw.timer.Reset(fw.d) + fw.timerActive = true + } + fw.m.Unlock() + return n, err +} + +func (fw *flushWriter) Close() error { + fw.m.Lock() + fw.stopped = true + fw.timer.Reset(0) + err := fw.w.Close() + fw.m.Unlock() + return err +} + +// NewSocketWriter creates a SocketWriter which writes data to a connection +// to the given addr. The SocketWriter maintains and re-establishes the +// connection to this address as needed. +func NewSocketWriter(addr net.Addr, opt *SocketWriterOptions) Writer { + if opt == nil { + opt = &SocketWriterOptions{} + } + + if opt.Logger == nil { + opt.Logger = &nullLogger{} + } + return &socketWriter{addr: addr, opt: *opt} +} + +func (sw *socketWriter) openWriter() error { + var err error + sw.c, err = sw.opt.Dialer.Dial(sw.addr.Network(), sw.addr.String()) + if err != nil { + return err + } + + wopt := WriterOptions{ + Bidirectional: true, + Timeout: sw.opt.Timeout, + } + + if sw.opt.FlushTimeout == 0 { + sw.w, err = NewWriter(sw.c, &wopt) + } else { + sw.w, err = newFlushWriter(sw.c, sw.opt.FlushTimeout) + } + if err != nil { + sw.c.Close() + return err + } + return nil +} + +// Close shuts down the SocketWriter, closing any open connection. +func (sw *socketWriter) Close() error { + var err error + if sw.w != nil { + err = sw.w.Close() + if err == nil { + return sw.c.Close() + } + sw.c.Close() + return err + } + if sw.c != nil { + return sw.c.Close() + } + return nil +} + +// Write writes the data in p as a Dnstap frame to a connection to the +// SocketWriter's address. Write may block indefinitely while the SocketWriter +// attempts to establish or re-establish the connection and FrameStream session. +func (sw *socketWriter) WriteFrame(p []byte) (int, error) { + for ; ; time.Sleep(sw.opt.RetryInterval) { + if sw.w == nil { + if err := sw.openWriter(); err != nil { + sw.opt.Logger.Printf("%s: open failed: %v", sw.addr, err) + continue + } + } + + n, err := sw.w.WriteFrame(p) + if err != nil { + sw.opt.Logger.Printf("%s: write failed: %v", sw.addr, err) + sw.Close() + continue + } + + return n, nil + } +} diff --git a/TextOutput.go b/TextOutput.go index edaa11a..9e613e5 100644 --- a/TextOutput.go +++ b/TextOutput.go @@ -19,7 +19,6 @@ import ( "bufio" "io" - "log" "os" "github.com/golang/protobuf/proto" @@ -34,6 +33,7 @@ outputChannel chan []byte wait chan bool writer *bufio.Writer + log Logger } // NewTextOutput creates a TextOutput writing dnstap data to the given io.Writer @@ -67,6 +67,11 @@ return NewTextOutput(writer, format), nil } +// SetLogger configures a logger for error events in the TextOutput +func (o *TextOutput) SetLogger(logger Logger) { + o.log = logger +} + // GetOutputChannel returns the channel on which the TextOutput accepts dnstap data. // // GetOutputChannel satisfies the dnstap Output interface. @@ -83,16 +88,16 @@ dt := &Dnstap{} for frame := range o.outputChannel { if err := proto.Unmarshal(frame, dt); err != nil { - log.Fatalf("dnstap.TextOutput: proto.Unmarshal() failed: %s\n", err) + o.log.Printf("dnstap.TextOutput: proto.Unmarshal() failed: %s, returning", err) break } buf, ok := o.format(dt) if !ok { - log.Fatalf("dnstap.TextOutput: text format function failed\n") + o.log.Printf("dnstap.TextOutput: text format function failed, returning") break } if _, err := o.writer.Write(buf); err != nil { - log.Fatalf("dnstap.TextOutput: write failed: %s\n", err) + o.log.Printf("dnstap.TextOutput: write error: %v, returning", err) break } o.writer.Flush() diff --git a/Writer.go b/Writer.go new file mode 100644 index 0000000..f2103b4 --- /dev/null +++ b/Writer.go @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2019 by Farsight Security, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dnstap + +import ( + "io" + "time" + + framestream "github.com/farsightsec/golang-framestream" +) + +// A Writer writes dnstap frames to its destination. +type Writer interface { + WriteFrame([]byte) (int, error) + Close() error +} + +// WriterOptions specifies configuration for the Writer +type WriterOptions struct { + // If Bidirectional is true, the underlying io.Writer must also + // satisfy io.Reader, and the dnstap Writer will use the bidirectional + // Frame Streams protocol. + Bidirectional bool + // Timeout sets the write timeout for data and control messages and the + // read timeout for handshake responses on the underlying Writer. Timeout + // is only effective if the underlying Writer is a net.Conn. + Timeout time.Duration +} + +// NewWriter creates a Writer using the given io.Writer and options. +func NewWriter(w io.Writer, opt *WriterOptions) (Writer, error) { + if opt == nil { + opt = &WriterOptions{} + } + return framestream.NewWriter(w, + &framestream.WriterOptions{ + ContentTypes: [][]byte{FSContentType}, + Timeout: opt.Timeout, + Bidirectional: opt.Bidirectional, + }) +} diff --git a/dnstap/fileoutput.go b/dnstap/fileoutput.go index 5145adc..a0dfef4 100644 --- a/dnstap/fileoutput.go +++ b/dnstap/fileoutput.go @@ -48,21 +48,33 @@ } func openOutputFile(filename string, formatter dnstap.TextFormatFunc, doAppend bool) (o dnstap.Output, err error) { + var fso *dnstap.FrameStreamOutput + var to *dnstap.TextOutput if formatter == nil { if filename == "-" || filename == "" { - o = dnstap.NewTextOutput(os.Stdout, dnstap.TextFormat) - return + to = dnstap.NewTextOutput(os.Stdout, dnstap.TextFormat) + to.SetLogger(logger) + return to, nil } - o, err = dnstap.NewFrameStreamOutputFromFilename(filename) + fso, err = dnstap.NewFrameStreamOutputFromFilename(filename) + if err == nil { + fso.SetLogger(logger) + return fso, nil + } } else { if filename == "-" || filename == "" { if doAppend { return nil, errors.New("cannot append to stdout (-)") } - o = dnstap.NewTextOutput(os.Stdout, formatter) - return + to = dnstap.NewTextOutput(os.Stdout, formatter) + to.SetLogger(logger) + return to, nil } - o, err = dnstap.NewTextOutputFromFilename(filename, formatter, doAppend) + to, err = dnstap.NewTextOutputFromFilename(filename, formatter, doAppend) + if err == nil { + to.SetLogger(logger) + } + return to, nil } return } diff --git a/dnstap/main.go b/dnstap/main.go index 1a8ec5f..613fabf 100644 --- a/dnstap/main.go +++ b/dnstap/main.go @@ -68,6 +68,8 @@ `) } +var logger = log.New(os.Stderr, "", log.LstdFlags) + func main() { var tcpOutputs, unixOutputs stringList var fileInputs, tcpInputs, unixInputs stringList @@ -140,6 +142,7 @@ fmt.Fprintf(os.Stderr, "dnstap: Failed to open input file %s: %v\n", fname, err) os.Exit(1) } + i.SetLogger(logger) fmt.Fprintf(os.Stderr, "dnstap: opened input file %s\n", fname) iwg.Add(1) go runInput(i, output, &iwg) @@ -151,6 +154,7 @@ os.Exit(1) } i.SetTimeout(*flagTimeout) + i.SetLogger(logger) fmt.Fprintf(os.Stderr, "dnstap: opened input socket %s\n", path) iwg.Add(1) go runInput(i, output, &iwg) @@ -163,6 +167,7 @@ } i := dnstap.NewFrameStreamSockInput(l) i.SetTimeout(*flagTimeout) + i.SetLogger(logger) iwg.Add(1) go runInput(i, output, &iwg) } @@ -198,6 +203,7 @@ return err } o.SetTimeout(*flagTimeout) + o.SetLogger(logger) go o.RunOutputLoop() mo.Add(o) } diff --git a/dnstap/mirroroutput.go b/dnstap/mirroroutput.go index 2bf0b2f..90ebffb 100644 --- a/dnstap/mirroroutput.go +++ b/dnstap/mirroroutput.go @@ -15,7 +15,6 @@ */ package main - import ( dnstap "github.com/dnstap/golang-dnstap" diff --git a/dnstap.go b/dnstap.go index 3058ecc..e964ef6 100644 --- a/dnstap.go +++ b/dnstap.go @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 by Farsight Security, Inc. + * Copyright (c) 2014,2019 by Farsight Security, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +29,7 @@ Wait() } -// An Output is a desintaion for dnstap data. It accepts data on the channel +// An Output is a destination for dnstap data. It accepts data on the channel // returned from the GetOutputChannel method. The RunOutputLoop() method // processes data received on this channel, and returns after the Close() // method is called. @@ -38,3 +38,17 @@ RunOutputLoop() Close() } + +// A Logger prints a formatted log message to the destination of the +// implementation's choice. A Logger may be provided for some Input and +// Output implementations for visibility into their ReadInto() and +// RunOutputLoop() loops. +// +// The result of log.New() satisfies the Logger interface. +type Logger interface { + Printf(format string, v ...interface{}) +} + +type nullLogger struct{} + +func (n nullLogger) Printf(format string, v ...interface{}) {} diff --git a/go.mod b/go.mod index 0f9ac13..8447007 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/dnstap/golang-dnstap require ( - github.com/farsightsec/golang-framestream v0.2.0 + github.com/farsightsec/golang-framestream v0.3.0 github.com/golang/protobuf v1.4.2 github.com/miekg/dns v1.1.31 ) diff --git a/go.sum b/go.sum index fe48761..9e78548 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/farsightsec/golang-framestream v0.2.0 h1:tW5KuZqkNIoiBc6LMsMLg+h6kkGyIB4/RR1z7Tqv6wE= -github.com/farsightsec/golang-framestream v0.2.0/go.mod h1:eNde4IQyEiA5br02AouhEHCu3p3UzrCdFR4LuQHklMI= +github.com/farsightsec/golang-framestream v0.3.0 h1:/spFQHucTle/ZIPkYqrfshQqPe2VQEzesH243TjIwqA= +github.com/farsightsec/golang-framestream v0.3.0/go.mod h1:eNde4IQyEiA5br02AouhEHCu3p3UzrCdFR4LuQHklMI= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= diff --git a/sock_test.go b/sock_test.go index f650d10..5e301c8 100644 --- a/sock_test.go +++ b/sock_test.go @@ -3,9 +3,17 @@ import ( "fmt" "net" + "os" "testing" "time" ) + +type testLogger struct{ *testing.T } + +func (t *testLogger) Printf(format string, v ...interface{}) { + t.Helper() + t.Logf(format, v...) +} func dialAndSend(t *testing.T, network, address string) *FrameStreamSockOutput { var addr net.Addr @@ -31,6 +39,7 @@ out.SetTimeout(time.Second) out.SetFlushTimeout(100 * time.Millisecond) out.SetRetryInterval(time.Second) + out.SetLogger(&testLogger{t}) go out.RunOutputLoop() <-time.After(500 * time.Millisecond) @@ -53,6 +62,9 @@ t.Fatal(err) } + defer os.Remove("dnstap.sock") + + in.SetLogger(&testLogger{t}) out := make(chan []byte) go in.ReadInto(out) @@ -83,6 +95,7 @@ } in := NewFrameStreamSockInput(l) + in.SetLogger(&testLogger{t}) out := make(chan []byte) go in.ReadInto(out) readOne(t, out) @@ -130,8 +143,10 @@ i := 1 b.StartTimer() - for _ = range outch { i++ } - if i != b.N { + for _ = range outch { + i++ + } + if i != b.N { b.Error("invalid frame count") } close(readDone) @@ -175,7 +190,9 @@ go func() { <-outch b.StartTimer() - for i := 1; i < b.N; i++ { <-outch } // NB: read never fails + for i := 1; i < b.N; i++ { + <-outch + } // NB: read never fails close(readDone) }()