Codebase list golang-github-dnstap-golang-dnstap / upstream/0.3.0
New upstream version 0.3.0 Sascha Steinbiss 3 years ago
17 changed file(s) with 617 addition(s) and 208 deletion(s). Raw diff Collapse all Expand all
0 /*
1 * Copyright (c) 2019 by Farsight Security, Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 package dnstap
17
18 import (
19 framestream "github.com/farsightsec/golang-framestream"
20 "github.com/golang/protobuf/proto"
21 )
22
23 // A Decoder reads and parses Dnstap messages from an io.Reader
24 type Decoder struct {
25 buf []byte
26 r Reader
27 }
28
29 // NewDecoder creates a Decoder using the given dnstap Reader, accepting
30 // dnstap data frames up to maxSize in size.
31 func NewDecoder(r Reader, maxSize int) *Decoder {
32 return &Decoder{
33 buf: make([]byte, maxSize),
34 r: r,
35 }
36 }
37
38 // Decode reads and parses a Dnstap message from the Decoder's Reader.
39 // Decode silently discards data frames larger than the Decoder's configured
40 // maxSize.
41 func (d *Decoder) Decode(m *Dnstap) error {
42 for {
43 n, err := d.r.ReadFrame(d.buf)
44
45 switch err {
46 case framestream.ErrDataFrameTooLarge:
47 continue
48 case nil:
49 break
50 default:
51 return err
52 }
53
54 return proto.Unmarshal(d.buf[:n], m)
55 }
56 }
0 /*
1 * Copyright (c) 2019 by Farsight Security, Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 package dnstap
17
18 import (
19 "github.com/golang/protobuf/proto"
20 )
21
22 // An Encoder serializes and writes Dnstap messages to an underlying
23 // dnstap Writer
24 type Encoder struct {
25 w Writer
26 }
27
28 // NewEncoder creates an Encoder using the given dnstap Writer
29 func NewEncoder(w Writer) *Encoder {
30 return &Encoder{w}
31 }
32
33 // Encode serializes and writes the Dnstap message m to the encoder's
34 // Writer.
35 func (e *Encoder) Encode(m *Dnstap) error {
36 b, err := proto.Marshal(m)
37 if err != nil {
38 return err
39 }
40
41 _, err = e.w.WriteFrame(b)
42 return err
43 }
00 /*
1 * Copyright (c) 2013-2014 by Farsight Security, Inc.
1 * Copyright (c) 2013-2019 by Farsight Security, Inc.
22 *
33 * Licensed under the Apache License, Version 2.0 (the "License");
44 * you may not use this file except in compliance with the License.
1717
1818 import (
1919 "io"
20 "log"
2120 "os"
2221 "time"
23
24 "github.com/farsightsec/golang-framestream"
2522 )
2623
2724 // MaxPayloadSize sets the upper limit on input Dnstap payload sizes. If an Input
3936
4037 // A FrameStreamInput reads dnstap data from an io.ReadWriter.
4138 type FrameStreamInput struct {
42 wait chan bool
43 decoder *framestream.Decoder
44 timeout time.Duration
39 wait chan bool
40 reader Reader
41 log Logger
4542 }
4643
4744 // NewFrameStreamInput creates a FrameStreamInput reading data from the given
5552 // given io.ReadWriter with a timeout applied to reading and (for bidirectional
5653 // inputs) writing control messages.
5754 func NewFrameStreamInputTimeout(r io.ReadWriter, bi bool, timeout time.Duration) (input *FrameStreamInput, err error) {
58 input = new(FrameStreamInput)
59 decoderOptions := framestream.DecoderOptions{
60 MaxPayloadSize: MaxPayloadSize,
61 ContentType: FSContentType,
62 Bidirectional: bi,
63 Timeout: timeout,
55 reader, err := NewReader(r, &ReaderOptions{
56 Bidirectional: bi,
57 Timeout: timeout,
58 })
59
60 if err != nil {
61 return nil, err
6462 }
65 input.decoder, err = framestream.NewDecoder(r, &decoderOptions)
66 if err != nil {
67 return
68 }
69 input.wait = make(chan bool)
70 return
63
64 return &FrameStreamInput{
65 wait: make(chan bool),
66 reader: reader,
67 log: nullLogger{},
68 }, nil
7169 }
7270
7371 // NewFrameStreamInputFromFilename creates a FrameStreamInput reading from
7775 if err != nil {
7876 return nil, err
7977 }
80 input, err = NewFrameStreamInput(file, false)
81 return
78 return NewFrameStreamInput(file, false)
79 }
80
81 // SetLogger configures a logger for FrameStreamInput read error reporting.
82 func (input *FrameStreamInput) SetLogger(logger Logger) {
83 input.log = logger
8284 }
8385
8486 // ReadInto reads data from the FrameStreamInput into the output channel.
8587 //
8688 // ReadInto satisfies the dnstap Input interface.
8789 func (input *FrameStreamInput) ReadInto(output chan []byte) {
90 buf := make([]byte, MaxPayloadSize)
8891 for {
89 buf, err := input.decoder.Decode()
90 if err != nil {
91 if err != io.EOF {
92 log.Printf("framestream.Decoder.Decode() failed: %s\n", err)
93 }
94 break
92 n, err := input.reader.ReadFrame(buf)
93 if err == nil {
94 newbuf := make([]byte, n)
95 copy(newbuf, buf)
96 output <- newbuf
97 continue
9598 }
96 newbuf := make([]byte, len(buf))
97 copy(newbuf, buf)
98 output <- newbuf
99
100 if err != io.EOF {
101 input.log.Printf("FrameStreamInput: Read error: %v", err)
102 }
103
104 break
99105 }
100106 close(input.wait)
101107 }
00 /*
1 * Copyright (c) 2014 by Farsight Security, Inc.
1 * Copyright (c) 2014,2019 by Farsight Security, Inc.
22 *
33 * Licensed under the Apache License, Version 2.0 (the "License");
44 * you may not use this file except in compliance with the License.
1717
1818 import (
1919 "io"
20 "log"
2120 "os"
22
23 "github.com/farsightsec/golang-framestream"
2421 )
2522
2623 // FrameStreamOutput implements a dnstap Output to an io.Writer.
2724 type FrameStreamOutput struct {
2825 outputChannel chan []byte
2926 wait chan bool
30 enc *framestream.Encoder
27 w Writer
28 log Logger
3129 }
3230
3331 // NewFrameStreamOutput creates a FrameStreamOutput writing dnstap data to
3432 // the given io.Writer.
3533 func NewFrameStreamOutput(w io.Writer) (o *FrameStreamOutput, err error) {
36 o = new(FrameStreamOutput)
37 o.outputChannel = make(chan []byte, outputChannelSize)
38 o.enc, err = framestream.NewEncoder(w, &framestream.EncoderOptions{ContentType: FSContentType})
34 ow, err := NewWriter(w, nil)
3935 if err != nil {
40 return
36 return nil, err
4137 }
42 o.wait = make(chan bool)
43 return
38 return &FrameStreamOutput{
39 outputChannel: make(chan []byte, outputChannelSize),
40 wait: make(chan bool),
41 w: ow,
42 log: nullLogger{},
43 }, nil
4444 }
4545
46 // NewFrameStreamOutputFromFilename creates a file with the namee fname,
46 // NewFrameStreamOutputFromFilename creates a file with the name fname,
4747 // truncates it if it exists, and returns a FrameStreamOutput writing to
4848 // the newly created or truncated file.
4949 func NewFrameStreamOutputFromFilename(fname string) (o *FrameStreamOutput, err error) {
5757 return NewFrameStreamOutput(w)
5858 }
5959
60 // SetLogger sets an alternate logger for the FrameStreamOutput. The default
61 // is no logging.
62 func (o *FrameStreamOutput) SetLogger(logger Logger) {
63 o.log = logger
64 }
65
6066 // GetOutputChannel returns the channel on which the FrameStreamOutput accepts
6167 // data.
6268 //
6874 // RunOutputLoop processes data received on the channel returned by
6975 // GetOutputChannel, returning after the CLose method is called.
7076 // If there is an error writing to the Output's writer, RunOutputLoop()
71 // logs a fatal error exits the program.
77 // returns, logging an error if a logger is configured with SetLogger()
7278 //
7379 // RunOutputLoop satisfies the dnstap Output interface.
7480 func (o *FrameStreamOutput) RunOutputLoop() {
7581 for frame := range o.outputChannel {
76 if _, err := o.enc.Write(frame); err != nil {
77 log.Fatalf("framestream.Encoder.Write() failed: %s\n", err)
78 break
82 if _, err := o.w.WriteFrame(frame); err != nil {
83 o.log.Printf("FrameStreamOutput: Write error: %v, returning", err)
84 close(o.wait)
85 return
7986 }
8087 }
8188 close(o.wait)
8895 func (o *FrameStreamOutput) Close() {
8996 close(o.outputChannel)
9097 <-o.wait
91 o.enc.Flush()
92 o.enc.Close()
98 o.w.Close()
9399 }
00 /*
1 * Copyright (c) 2013-2014 by Farsight Security, Inc.
1 * Copyright (c) 2013-2019 by Farsight Security, Inc.
22 *
33 * Licensed under the Apache License, Version 2.0 (the "License");
44 * you may not use this file except in compliance with the License.
1616 package dnstap
1717
1818 import (
19 "log"
19 "fmt"
2020 "net"
2121 "os"
2222 "time"
2828 wait chan bool
2929 listener net.Listener
3030 timeout time.Duration
31 log Logger
3132 }
3233
3334 // NewFrameStreamSockInput creates a FrameStreamSockInput collecting dnstap
3536 func NewFrameStreamSockInput(listener net.Listener) (input *FrameStreamSockInput) {
3637 input = new(FrameStreamSockInput)
3738 input.listener = listener
39 input.log = &nullLogger{}
3840 return
3941 }
4042
4244 // response control messages to clients of the FrameStreamSockInput's listener.
4345 //
4446 // The timeout is effective only for connections accepted after the call to
45 // FrameStreamSockInput.
47 // SetTimeout.
4648 func (input *FrameStreamSockInput) SetTimeout(timeout time.Duration) {
4749 input.timeout = timeout
50 }
51
52 // SetLogger configures a logger for the FrameStreamSockInput.
53 func (input *FrameStreamSockInput) SetLogger(logger Logger) {
54 input.log = logger
4855 }
4956
5057 // NewFrameStreamSockInputFromPath creates a unix domain socket at the
6875 //
6976 // ReadInto satisfies the dnstap Input interface.
7077 func (input *FrameStreamSockInput) ReadInto(output chan []byte) {
78 var n uint64
7179 for {
7280 conn, err := input.listener.Accept()
7381 if err != nil {
74 log.Printf("net.Listener.Accept() failed: %s\n", err)
82 input.log.Printf("%s: accept failed: %v\n",
83 input.listener.Addr(),
84 err)
7585 continue
86 }
87 n++
88 origin := ""
89 switch conn.RemoteAddr().Network() {
90 case "tcp", "tcp4", "tcp6":
91 origin = fmt.Sprintf(" from %s", conn.RemoteAddr())
7692 }
7793 i, err := NewFrameStreamInputTimeout(conn, true, input.timeout)
7894 if err != nil {
79 log.Printf("dnstap.NewFrameStreamInput() failed: %s\n", err)
95 input.log.Printf("%s: connection %d: open input%s failed: %v",
96 conn.LocalAddr(), n, origin, err)
8097 continue
8198 }
82 log.Printf("dnstap.FrameStreamSockInput: accepted a socket connection\n")
83 go i.ReadInto(output)
99 input.log.Printf("%s: accepted connection %d%s",
100 conn.LocalAddr(), n, origin)
101 i.SetLogger(input.log)
102 go func(cn uint64) {
103 i.ReadInto(output)
104 input.log.Printf("%s: closed connection %d%s",
105 conn.LocalAddr(), cn, origin)
106 }(n)
84107 }
85108 }
86109
1616 package dnstap
1717
1818 import (
19 "log"
2019 "net"
2120 "time"
22
23 "github.com/farsightsec/golang-framestream"
2421 )
2522
2623 // A FrameStreamSockOutput manages a socket connection and sends dnstap
2724 // data over a framestream connection on that socket.
2825 type FrameStreamSockOutput struct {
26 address net.Addr
2927 outputChannel chan []byte
30 address net.Addr
3128 wait chan bool
32 dialer *net.Dialer
33 timeout time.Duration
34 retry time.Duration
35 flushTimeout time.Duration
29 wopt SocketWriterOptions
3630 }
3731
3832 // NewFrameStreamSockOutput creates a FrameStreamSockOutput manaaging a
3933 // connection to the given address.
4034 func NewFrameStreamSockOutput(address net.Addr) (*FrameStreamSockOutput, error) {
4135 return &FrameStreamSockOutput{
36 address: address,
4237 outputChannel: make(chan []byte, outputChannelSize),
43 address: address,
4438 wait: make(chan bool),
45 retry: 10 * time.Second,
46 flushTimeout: 5 * time.Second,
47 dialer: &net.Dialer{
48 Timeout: 30 * time.Second,
39 wopt: SocketWriterOptions{
40 FlushTimeout: 5 * time.Second,
41 RetryInterval: 10 * time.Second,
42 Dialer: &net.Dialer{
43 Timeout: 30 * time.Second,
44 },
45 Logger: &nullLogger{},
4946 },
5047 }, nil
5148 }
5451 // read timeout for handshake responses on the FrameStreamSockOutput's
5552 // connection. The default timeout is zero, for no timeout.
5653 func (o *FrameStreamSockOutput) SetTimeout(timeout time.Duration) {
57 o.timeout = timeout
54 o.wopt.Timeout = timeout
5855 }
5956
6057 // SetFlushTimeout sets the maximum time data will be kept in the output
6259 //
6360 // The default flush timeout is five seconds.
6461 func (o *FrameStreamSockOutput) SetFlushTimeout(timeout time.Duration) {
65 o.flushTimeout = timeout
62 o.wopt.FlushTimeout = timeout
6663 }
6764
6865 // SetRetryInterval specifies how long the FrameStreamSockOutput will wait
6966 // before re-establishing a failed connection. The default retry interval
7067 // is 10 seconds.
7168 func (o *FrameStreamSockOutput) SetRetryInterval(retry time.Duration) {
72 o.retry = retry
69 o.wopt.RetryInterval = retry
7370 }
7471
7572 // SetDialer replaces the default net.Dialer for re-establishing the
8077 // FrameStreamSockOutput uses a default dialer with a 30 second
8178 // timeout.
8279 func (o *FrameStreamSockOutput) SetDialer(dialer *net.Dialer) {
83 o.dialer = dialer
80 o.wopt.Dialer = dialer
81 }
82
83 // SetLogger configures FrameStreamSockOutput to log through the given
84 // Logger.
85 func (o *FrameStreamSockOutput) SetLogger(logger Logger) {
86 o.wopt.Logger = logger
8487 }
8588
8689 // GetOutputChannel returns the channel on which the
9194 return o.outputChannel
9295 }
9396
94 // A timedConn resets an associated timer on each Write to the underlying
95 // connection, and is used to implement the output's flush timeout.
96 type timedConn struct {
97 net.Conn
98 timer *time.Timer
99 timeout time.Duration
100
101 // idle is true if the timer has fired and we have consumed
102 // the time from its channel. We use this to prevent deadlocking
103 // when resetting or stopping an already fired timer.
104 idle bool
105 }
106
107 // SetIdle informs the timedConn that the associated timer is idle, i.e.
108 // it has fired and has not been reset.
109 func (t *timedConn) SetIdle() {
110 t.idle = true
111 }
112
113 // Stop stops the underlying timer, consuming any time value if the timer
114 // had fired before Stop was called.
115 func (t *timedConn) StopTimer() {
116 if !t.timer.Stop() && !t.idle {
117 <-t.timer.C
118 }
119 t.idle = true
120 }
121
122 func (t *timedConn) Write(b []byte) (int, error) {
123 t.StopTimer()
124 t.timer.Reset(t.timeout)
125 t.idle = false
126 return t.Conn.Write(b)
127 }
128
129 func (t *timedConn) Close() error {
130 t.StopTimer()
131 return t.Conn.Close()
132 }
133
13497 // RunOutputLoop reads data from the output channel and sends it over
13598 // a connections to the FrameStreamSockOutput's address, establishing
13699 // the connection as needed.
137100 //
138101 // RunOutputLoop satisifes the dnstap Output interface.
139102 func (o *FrameStreamSockOutput) RunOutputLoop() {
140 var enc *framestream.Encoder
141 var err error
103 w := NewSocketWriter(o.address, &o.wopt)
142104
143 // Start with the connection flush timer in a stopped state.
144 // It will be reset by the first Write call on a new connection.
145 conn := &timedConn{
146 timer: time.NewTimer(0),
147 timeout: o.flushTimeout,
105 for b := range o.outputChannel {
106 // w is of type *SocketWriter, whose Write implementation
107 // handles all errors by retrying the connection.
108 w.WriteFrame(b)
148109 }
149 conn.StopTimer()
150110
151 defer func() {
152 if enc != nil {
153 enc.Flush()
154 enc.Close()
155 }
156 if conn != nil {
157 conn.Close()
158 }
159 close(o.wait)
160 }()
161
162 for {
163 select {
164 case frame, ok := <-o.outputChannel:
165 if !ok {
166 return
167 }
168
169 // the retry loop
170 for ;; time.Sleep(o.retry) {
171 if enc == nil {
172 // connect the socket
173 conn.Conn, err = o.dialer.Dial(o.address.Network(), o.address.String())
174 if err != nil {
175 log.Printf("Dial() failed: %v", err)
176 continue // = retry
177 }
178
179 // create the encoder
180 eopt := framestream.EncoderOptions{
181 ContentType: FSContentType,
182 Bidirectional: true,
183 Timeout: o.timeout,
184 }
185 enc, err = framestream.NewEncoder(conn, &eopt)
186 if err != nil {
187 log.Printf("framestream.NewEncoder() failed: %v", err)
188 conn.Close()
189 enc = nil
190 continue // = retry
191 }
192 }
193
194 // try writing
195 if _, err = enc.Write(frame); err != nil {
196 log.Printf("framestream.Encoder.Write() failed: %v", err)
197 enc.Close()
198 enc = nil
199 conn.Close()
200 continue // = retry
201 }
202
203 break // success!
204 }
205
206 case <-conn.timer.C:
207 conn.SetIdle()
208 if enc == nil {
209 continue
210 }
211 if err := enc.Flush(); err != nil {
212 log.Printf("framestream.Encoder.Flush() failed: %s", err)
213 enc.Close()
214 enc = nil
215 conn.Close()
216 time.Sleep(o.retry)
217 }
218 }
219 }
111 w.Close()
112 close(o.wait)
113 return
220114 }
221115
222116 // Close shuts down the FrameStreamSockOutput's output channel and returns
0 /*
1 * Copyright (c) 2019 by Farsight Security, Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 package dnstap
17
18 import (
19 "io"
20 "time"
21
22 framestream "github.com/farsightsec/golang-framestream"
23 )
24
25 // A Reader is a source of dnstap frames.
26 type Reader interface {
27 ReadFrame([]byte) (int, error)
28 }
29
30 // ReaderOptions specifies configuration for the Reader.
31 type ReaderOptions struct {
32 // If Bidirectional is true, the underlying io.Reader must also
33 // satisfy io.Writer, and the dnstap Reader will use the bidirectional
34 // Frame Streams protocol.
35 Bidirectional bool
36 // Timeout sets the timeout for reading the initial handshake and
37 // writing response control messages to the underlying Reader. Timeout
38 // is only effective if the underlying Reader is a net.Conn.
39 Timeout time.Duration
40 }
41
42 // NewReader creates a Reader using the given io.Reader and options.
43 func NewReader(r io.Reader, opt *ReaderOptions) (Reader, error) {
44 if opt == nil {
45 opt = &ReaderOptions{}
46 }
47 return framestream.NewReader(r,
48 &framestream.ReaderOptions{
49 ContentTypes: [][]byte{FSContentType},
50 Timeout: opt.Timeout,
51 Bidirectional: opt.Bidirectional,
52 })
53 }
0 /*
1 * Copyright (c) 2019 by Farsight Security, Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 package dnstap
17
18 import (
19 "net"
20 "sync"
21 "time"
22
23 framestream "github.com/farsightsec/golang-framestream"
24 )
25
26 // A SocketWriter writes data to a Frame Streams TCP or Unix domain socket,
27 // establishing or restarting the connection if needed.
28 type socketWriter struct {
29 w Writer
30 c net.Conn
31 addr net.Addr
32 opt SocketWriterOptions
33 }
34
35 // SocketWriterOptions provides configuration options for a SocketWriter
36 type SocketWriterOptions struct {
37 // Timeout gives the time the SocketWriter will wait for reads and
38 // writes to complete.
39 Timeout time.Duration
40 // FlushTimeout is the maximum duration data will be buffered while
41 // being written to the socket.
42 FlushTimeout time.Duration
43 // RetryInterval is how long the SocketWriter will wait between
44 // connection attempts.
45 RetryInterval time.Duration
46 // Dialer is the dialer used to establish the connection. If nil,
47 // SocketWriter will use a default dialer with a 30 second timeout.
48 Dialer *net.Dialer
49 // Logger provides the logger for connection establishment, reconnection,
50 // and error events of the SocketWriter.
51 Logger Logger
52 }
53
54 type flushWriter struct {
55 m sync.Mutex
56 w *framestream.Writer
57 d time.Duration
58 timer *time.Timer
59 timerActive bool
60 lastFlushed time.Time
61 stopped bool
62 }
63
64 type flusherConn struct {
65 net.Conn
66 lastWritten *time.Time
67 }
68
69 func (c *flusherConn) Write(p []byte) (int, error) {
70 n, err := c.Conn.Write(p)
71 *c.lastWritten = time.Now()
72 return n, err
73 }
74
75 func newFlushWriter(c net.Conn, d time.Duration) (*flushWriter, error) {
76 var err error
77 fw := &flushWriter{timer: time.NewTimer(d), d: d}
78 if !fw.timer.Stop() {
79 <-fw.timer.C
80 }
81
82 fc := &flusherConn{
83 Conn: c,
84 lastWritten: &fw.lastFlushed,
85 }
86
87 fw.w, err = framestream.NewWriter(fc,
88 &framestream.WriterOptions{
89 ContentTypes: [][]byte{FSContentType},
90 Bidirectional: true,
91 Timeout: d,
92 })
93 if err != nil {
94 return nil, err
95 }
96 go fw.runFlusher()
97 return fw, nil
98 }
99
100 func (fw *flushWriter) runFlusher() {
101 for range fw.timer.C {
102 fw.m.Lock()
103 if fw.stopped {
104 fw.m.Unlock()
105 return
106 }
107 last := fw.lastFlushed
108 elapsed := time.Since(last)
109 if elapsed < fw.d {
110 fw.timer.Reset(fw.d - elapsed)
111 fw.m.Unlock()
112 continue
113 }
114 fw.w.Flush()
115 fw.timerActive = false
116 fw.m.Unlock()
117 }
118 }
119
120 func (fw *flushWriter) WriteFrame(p []byte) (int, error) {
121 fw.m.Lock()
122 n, err := fw.w.WriteFrame(p)
123 if !fw.timerActive {
124 fw.timer.Reset(fw.d)
125 fw.timerActive = true
126 }
127 fw.m.Unlock()
128 return n, err
129 }
130
131 func (fw *flushWriter) Close() error {
132 fw.m.Lock()
133 fw.stopped = true
134 fw.timer.Reset(0)
135 err := fw.w.Close()
136 fw.m.Unlock()
137 return err
138 }
139
140 // NewSocketWriter creates a SocketWriter which writes data to a connection
141 // to the given addr. The SocketWriter maintains and re-establishes the
142 // connection to this address as needed.
143 func NewSocketWriter(addr net.Addr, opt *SocketWriterOptions) Writer {
144 if opt == nil {
145 opt = &SocketWriterOptions{}
146 }
147
148 if opt.Logger == nil {
149 opt.Logger = &nullLogger{}
150 }
151 return &socketWriter{addr: addr, opt: *opt}
152 }
153
154 func (sw *socketWriter) openWriter() error {
155 var err error
156 sw.c, err = sw.opt.Dialer.Dial(sw.addr.Network(), sw.addr.String())
157 if err != nil {
158 return err
159 }
160
161 wopt := WriterOptions{
162 Bidirectional: true,
163 Timeout: sw.opt.Timeout,
164 }
165
166 if sw.opt.FlushTimeout == 0 {
167 sw.w, err = NewWriter(sw.c, &wopt)
168 } else {
169 sw.w, err = newFlushWriter(sw.c, sw.opt.FlushTimeout)
170 }
171 if err != nil {
172 sw.c.Close()
173 return err
174 }
175 return nil
176 }
177
178 // Close shuts down the SocketWriter, closing any open connection.
179 func (sw *socketWriter) Close() error {
180 var err error
181 if sw.w != nil {
182 err = sw.w.Close()
183 if err == nil {
184 return sw.c.Close()
185 }
186 sw.c.Close()
187 return err
188 }
189 if sw.c != nil {
190 return sw.c.Close()
191 }
192 return nil
193 }
194
195 // Write writes the data in p as a Dnstap frame to a connection to the
196 // SocketWriter's address. Write may block indefinitely while the SocketWriter
197 // attempts to establish or re-establish the connection and FrameStream session.
198 func (sw *socketWriter) WriteFrame(p []byte) (int, error) {
199 for ; ; time.Sleep(sw.opt.RetryInterval) {
200 if sw.w == nil {
201 if err := sw.openWriter(); err != nil {
202 sw.opt.Logger.Printf("%s: open failed: %v", sw.addr, err)
203 continue
204 }
205 }
206
207 n, err := sw.w.WriteFrame(p)
208 if err != nil {
209 sw.opt.Logger.Printf("%s: write failed: %v", sw.addr, err)
210 sw.Close()
211 continue
212 }
213
214 return n, nil
215 }
216 }
1818 import (
1919 "bufio"
2020 "io"
21 "log"
2221 "os"
2322
2423 "github.com/golang/protobuf/proto"
3332 outputChannel chan []byte
3433 wait chan bool
3534 writer *bufio.Writer
35 log Logger
3636 }
3737
3838 // NewTextOutput creates a TextOutput writing dnstap data to the given io.Writer
6666 return NewTextOutput(writer, format), nil
6767 }
6868
69 // SetLogger configures a logger for error events in the TextOutput
70 func (o *TextOutput) SetLogger(logger Logger) {
71 o.log = logger
72 }
73
6974 // GetOutputChannel returns the channel on which the TextOutput accepts dnstap data.
7075 //
7176 // GetOutputChannel satisfies the dnstap Output interface.
8287 dt := &Dnstap{}
8388 for frame := range o.outputChannel {
8489 if err := proto.Unmarshal(frame, dt); err != nil {
85 log.Fatalf("dnstap.TextOutput: proto.Unmarshal() failed: %s\n", err)
90 o.log.Printf("dnstap.TextOutput: proto.Unmarshal() failed: %s, returning", err)
8691 break
8792 }
8893 buf, ok := o.format(dt)
8994 if !ok {
90 log.Fatalf("dnstap.TextOutput: text format function failed\n")
95 o.log.Printf("dnstap.TextOutput: text format function failed, returning")
9196 break
9297 }
9398 if _, err := o.writer.Write(buf); err != nil {
94 log.Fatalf("dnstap.TextOutput: write failed: %s\n", err)
99 o.log.Printf("dnstap.TextOutput: write error: %v, returning", err)
95100 break
96101 }
97102 o.writer.Flush()
0 /*
1 * Copyright (c) 2019 by Farsight Security, Inc.
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 package dnstap
17
18 import (
19 "io"
20 "time"
21
22 framestream "github.com/farsightsec/golang-framestream"
23 )
24
25 // A Writer writes dnstap frames to its destination.
26 type Writer interface {
27 WriteFrame([]byte) (int, error)
28 Close() error
29 }
30
31 // WriterOptions specifies configuration for the Writer
32 type WriterOptions struct {
33 // If Bidirectional is true, the underlying io.Writer must also
34 // satisfy io.Reader, and the dnstap Writer will use the bidirectional
35 // Frame Streams protocol.
36 Bidirectional bool
37 // Timeout sets the write timeout for data and control messages and the
38 // read timeout for handshake responses on the underlying Writer. Timeout
39 // is only effective if the underlying Writer is a net.Conn.
40 Timeout time.Duration
41 }
42
43 // NewWriter creates a Writer using the given io.Writer and options.
44 func NewWriter(w io.Writer, opt *WriterOptions) (Writer, error) {
45 if opt == nil {
46 opt = &WriterOptions{}
47 }
48 return framestream.NewWriter(w,
49 &framestream.WriterOptions{
50 ContentTypes: [][]byte{FSContentType},
51 Timeout: opt.Timeout,
52 Bidirectional: opt.Bidirectional,
53 })
54 }
4747 }
4848
4949 func openOutputFile(filename string, formatter dnstap.TextFormatFunc, doAppend bool) (o dnstap.Output, err error) {
50 var fso *dnstap.FrameStreamOutput
51 var to *dnstap.TextOutput
5052 if formatter == nil {
5153 if filename == "-" || filename == "" {
52 o = dnstap.NewTextOutput(os.Stdout, dnstap.TextFormat)
53 return
54 to = dnstap.NewTextOutput(os.Stdout, dnstap.TextFormat)
55 to.SetLogger(logger)
56 return to, nil
5457 }
55 o, err = dnstap.NewFrameStreamOutputFromFilename(filename)
58 fso, err = dnstap.NewFrameStreamOutputFromFilename(filename)
59 if err == nil {
60 fso.SetLogger(logger)
61 return fso, nil
62 }
5663 } else {
5764 if filename == "-" || filename == "" {
5865 if doAppend {
5966 return nil, errors.New("cannot append to stdout (-)")
6067 }
61 o = dnstap.NewTextOutput(os.Stdout, formatter)
62 return
68 to = dnstap.NewTextOutput(os.Stdout, formatter)
69 to.SetLogger(logger)
70 return to, nil
6371 }
64 o, err = dnstap.NewTextOutputFromFilename(filename, formatter, doAppend)
72 to, err = dnstap.NewTextOutputFromFilename(filename, formatter, doAppend)
73 if err == nil {
74 to.SetLogger(logger)
75 }
76 return to, nil
6577 }
6678 return
6779 }
6767 `)
6868 }
6969
70 var logger = log.New(os.Stderr, "", log.LstdFlags)
71
7072 func main() {
7173 var tcpOutputs, unixOutputs stringList
7274 var fileInputs, tcpInputs, unixInputs stringList
139141 fmt.Fprintf(os.Stderr, "dnstap: Failed to open input file %s: %v\n", fname, err)
140142 os.Exit(1)
141143 }
144 i.SetLogger(logger)
142145 fmt.Fprintf(os.Stderr, "dnstap: opened input file %s\n", fname)
143146 iwg.Add(1)
144147 go runInput(i, output, &iwg)
150153 os.Exit(1)
151154 }
152155 i.SetTimeout(*flagTimeout)
156 i.SetLogger(logger)
153157 fmt.Fprintf(os.Stderr, "dnstap: opened input socket %s\n", path)
154158 iwg.Add(1)
155159 go runInput(i, output, &iwg)
162166 }
163167 i := dnstap.NewFrameStreamSockInput(l)
164168 i.SetTimeout(*flagTimeout)
169 i.SetLogger(logger)
165170 iwg.Add(1)
166171 go runInput(i, output, &iwg)
167172 }
197202 return err
198203 }
199204 o.SetTimeout(*flagTimeout)
205 o.SetLogger(logger)
200206 go o.RunOutputLoop()
201207 mo.Add(o)
202208 }
1414 */
1515
1616 package main
17
1817
1918 import (
2019 dnstap "github.com/dnstap/golang-dnstap"
00 /*
1 * Copyright (c) 2014 by Farsight Security, Inc.
1 * Copyright (c) 2014,2019 by Farsight Security, Inc.
22 *
33 * Licensed under the Apache License, Version 2.0 (the "License");
44 * you may not use this file except in compliance with the License.
2828 Wait()
2929 }
3030
31 // An Output is a desintaion for dnstap data. It accepts data on the channel
31 // An Output is a destination for dnstap data. It accepts data on the channel
3232 // returned from the GetOutputChannel method. The RunOutputLoop() method
3333 // processes data received on this channel, and returns after the Close()
3434 // method is called.
3737 RunOutputLoop()
3838 Close()
3939 }
40
41 // A Logger prints a formatted log message to the destination of the
42 // implementation's choice. A Logger may be provided for some Input and
43 // Output implementations for visibility into their ReadInto() and
44 // RunOutputLoop() loops.
45 //
46 // The result of log.New() satisfies the Logger interface.
47 type Logger interface {
48 Printf(format string, v ...interface{})
49 }
50
51 type nullLogger struct{}
52
53 func (n nullLogger) Printf(format string, v ...interface{}) {}
00 module github.com/dnstap/golang-dnstap
11
22 require (
3 github.com/farsightsec/golang-framestream v0.2.0
3 github.com/farsightsec/golang-framestream v0.3.0
44 github.com/golang/protobuf v1.4.2
55 github.com/miekg/dns v1.1.31
66 )
0 github.com/farsightsec/golang-framestream v0.2.0 h1:tW5KuZqkNIoiBc6LMsMLg+h6kkGyIB4/RR1z7Tqv6wE=
1 github.com/farsightsec/golang-framestream v0.2.0/go.mod h1:eNde4IQyEiA5br02AouhEHCu3p3UzrCdFR4LuQHklMI=
0 github.com/farsightsec/golang-framestream v0.3.0 h1:/spFQHucTle/ZIPkYqrfshQqPe2VQEzesH243TjIwqA=
1 github.com/farsightsec/golang-framestream v0.3.0/go.mod h1:eNde4IQyEiA5br02AouhEHCu3p3UzrCdFR4LuQHklMI=
22 github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
33 github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
44 github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
22 import (
33 "fmt"
44 "net"
5 "os"
56 "testing"
67 "time"
78 )
9
10 type testLogger struct{ *testing.T }
11
12 func (t *testLogger) Printf(format string, v ...interface{}) {
13 t.Helper()
14 t.Logf(format, v...)
15 }
816
917 func dialAndSend(t *testing.T, network, address string) *FrameStreamSockOutput {
1018 var addr net.Addr
3038 out.SetTimeout(time.Second)
3139 out.SetFlushTimeout(100 * time.Millisecond)
3240 out.SetRetryInterval(time.Second)
41 out.SetLogger(&testLogger{t})
3342
3443 go out.RunOutputLoop()
3544 <-time.After(500 * time.Millisecond)
5261 t.Fatal(err)
5362 }
5463
64 defer os.Remove("dnstap.sock")
65
66 in.SetLogger(&testLogger{t})
5567 out := make(chan []byte)
5668 go in.ReadInto(out)
5769
8294 }
8395
8496 in := NewFrameStreamSockInput(l)
97 in.SetLogger(&testLogger{t})
8598 out := make(chan []byte)
8699 go in.ReadInto(out)
87100 readOne(t, out)
129142 i := 1
130143
131144 b.StartTimer()
132 for _ = range outch { i++ }
133 if i != b.N {
145 for _ = range outch {
146 i++
147 }
148 if i != b.N {
134149 b.Error("invalid frame count")
135150 }
136151 close(readDone)
174189 go func() {
175190 <-outch
176191 b.StartTimer()
177 for i := 1; i < b.N; i++ { <-outch } // NB: read never fails
192 for i := 1; i < b.N; i++ {
193 <-outch
194 } // NB: read never fails
178195 close(readDone)
179196 }()
180197