Update upstream source from tag 'upstream/0.3.0'
Update to upstream version '0.3.0'
with Debian dir 2dc38ae21d52325ad36bfdc735fc8b361f488b8b
Sascha Steinbiss
2 years ago
0 | /* | |
1 | * Copyright (c) 2019 by Farsight Security, Inc. | |
2 | * | |
3 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | * you may not use this file except in compliance with the License. | |
5 | * You may obtain a copy of the License at | |
6 | * | |
7 | * http://www.apache.org/licenses/LICENSE-2.0 | |
8 | * | |
9 | * Unless required by applicable law or agreed to in writing, software | |
10 | * distributed under the License is distributed on an "AS IS" BASIS, | |
11 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | * See the License for the specific language governing permissions and | |
13 | * limitations under the License. | |
14 | */ | |
15 | ||
16 | package dnstap | |
17 | ||
18 | import ( | |
19 | framestream "github.com/farsightsec/golang-framestream" | |
20 | "github.com/golang/protobuf/proto" | |
21 | ) | |
22 | ||
23 | // A Decoder reads and parses Dnstap messages from an io.Reader | |
24 | type Decoder struct { | |
25 | buf []byte | |
26 | r Reader | |
27 | } | |
28 | ||
29 | // NewDecoder creates a Decoder using the given dnstap Reader, accepting | |
30 | // dnstap data frames up to maxSize in size. | |
31 | func NewDecoder(r Reader, maxSize int) *Decoder { | |
32 | return &Decoder{ | |
33 | buf: make([]byte, maxSize), | |
34 | r: r, | |
35 | } | |
36 | } | |
37 | ||
38 | // Decode reads and parses a Dnstap message from the Decoder's Reader. | |
39 | // Decode silently discards data frames larger than the Decoder's configured | |
40 | // maxSize. | |
41 | func (d *Decoder) Decode(m *Dnstap) error { | |
42 | for { | |
43 | n, err := d.r.ReadFrame(d.buf) | |
44 | ||
45 | switch err { | |
46 | case framestream.ErrDataFrameTooLarge: | |
47 | continue | |
48 | case nil: | |
49 | break | |
50 | default: | |
51 | return err | |
52 | } | |
53 | ||
54 | return proto.Unmarshal(d.buf[:n], m) | |
55 | } | |
56 | } |
0 | /* | |
1 | * Copyright (c) 2019 by Farsight Security, Inc. | |
2 | * | |
3 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | * you may not use this file except in compliance with the License. | |
5 | * You may obtain a copy of the License at | |
6 | * | |
7 | * http://www.apache.org/licenses/LICENSE-2.0 | |
8 | * | |
9 | * Unless required by applicable law or agreed to in writing, software | |
10 | * distributed under the License is distributed on an "AS IS" BASIS, | |
11 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | * See the License for the specific language governing permissions and | |
13 | * limitations under the License. | |
14 | */ | |
15 | ||
16 | package dnstap | |
17 | ||
18 | import ( | |
19 | "github.com/golang/protobuf/proto" | |
20 | ) | |
21 | ||
22 | // An Encoder serializes and writes Dnstap messages to an underlying | |
23 | // dnstap Writer | |
24 | type Encoder struct { | |
25 | w Writer | |
26 | } | |
27 | ||
28 | // NewEncoder creates an Encoder using the given dnstap Writer | |
29 | func NewEncoder(w Writer) *Encoder { | |
30 | return &Encoder{w} | |
31 | } | |
32 | ||
33 | // Encode serializes and writes the Dnstap message m to the encoder's | |
34 | // Writer. | |
35 | func (e *Encoder) Encode(m *Dnstap) error { | |
36 | b, err := proto.Marshal(m) | |
37 | if err != nil { | |
38 | return err | |
39 | } | |
40 | ||
41 | _, err = e.w.WriteFrame(b) | |
42 | return err | |
43 | } |
0 | 0 | /* |
1 | * Copyright (c) 2013-2014 by Farsight Security, Inc. | |
1 | * Copyright (c) 2013-2019 by Farsight Security, Inc. | |
2 | 2 | * |
3 | 3 | * Licensed under the Apache License, Version 2.0 (the "License"); |
4 | 4 | * you may not use this file except in compliance with the License. |
17 | 17 | |
18 | 18 | import ( |
19 | 19 | "io" |
20 | "log" | |
21 | 20 | "os" |
22 | 21 | "time" |
23 | ||
24 | "github.com/farsightsec/golang-framestream" | |
25 | 22 | ) |
26 | 23 | |
27 | 24 | // MaxPayloadSize sets the upper limit on input Dnstap payload sizes. If an Input |
39 | 36 | |
40 | 37 | // A FrameStreamInput reads dnstap data from an io.ReadWriter. |
41 | 38 | type FrameStreamInput struct { |
42 | wait chan bool | |
43 | decoder *framestream.Decoder | |
44 | timeout time.Duration | |
39 | wait chan bool | |
40 | reader Reader | |
41 | log Logger | |
45 | 42 | } |
46 | 43 | |
47 | 44 | // NewFrameStreamInput creates a FrameStreamInput reading data from the given |
55 | 52 | // given io.ReadWriter with a timeout applied to reading and (for bidirectional |
56 | 53 | // inputs) writing control messages. |
57 | 54 | func NewFrameStreamInputTimeout(r io.ReadWriter, bi bool, timeout time.Duration) (input *FrameStreamInput, err error) { |
58 | input = new(FrameStreamInput) | |
59 | decoderOptions := framestream.DecoderOptions{ | |
60 | MaxPayloadSize: MaxPayloadSize, | |
61 | ContentType: FSContentType, | |
62 | Bidirectional: bi, | |
63 | Timeout: timeout, | |
55 | reader, err := NewReader(r, &ReaderOptions{ | |
56 | Bidirectional: bi, | |
57 | Timeout: timeout, | |
58 | }) | |
59 | ||
60 | if err != nil { | |
61 | return nil, err | |
64 | 62 | } |
65 | input.decoder, err = framestream.NewDecoder(r, &decoderOptions) | |
66 | if err != nil { | |
67 | return | |
68 | } | |
69 | input.wait = make(chan bool) | |
70 | return | |
63 | ||
64 | return &FrameStreamInput{ | |
65 | wait: make(chan bool), | |
66 | reader: reader, | |
67 | log: nullLogger{}, | |
68 | }, nil | |
71 | 69 | } |
72 | 70 | |
73 | 71 | // NewFrameStreamInputFromFilename creates a FrameStreamInput reading from |
77 | 75 | if err != nil { |
78 | 76 | return nil, err |
79 | 77 | } |
80 | input, err = NewFrameStreamInput(file, false) | |
81 | return | |
78 | return NewFrameStreamInput(file, false) | |
79 | } | |
80 | ||
81 | // SetLogger configures a logger for FrameStreamInput read error reporting. | |
82 | func (input *FrameStreamInput) SetLogger(logger Logger) { | |
83 | input.log = logger | |
82 | 84 | } |
83 | 85 | |
84 | 86 | // ReadInto reads data from the FrameStreamInput into the output channel. |
85 | 87 | // |
86 | 88 | // ReadInto satisfies the dnstap Input interface. |
87 | 89 | func (input *FrameStreamInput) ReadInto(output chan []byte) { |
90 | buf := make([]byte, MaxPayloadSize) | |
88 | 91 | for { |
89 | buf, err := input.decoder.Decode() | |
90 | if err != nil { | |
91 | if err != io.EOF { | |
92 | log.Printf("framestream.Decoder.Decode() failed: %s\n", err) | |
93 | } | |
94 | break | |
92 | n, err := input.reader.ReadFrame(buf) | |
93 | if err == nil { | |
94 | newbuf := make([]byte, n) | |
95 | copy(newbuf, buf) | |
96 | output <- newbuf | |
97 | continue | |
95 | 98 | } |
96 | newbuf := make([]byte, len(buf)) | |
97 | copy(newbuf, buf) | |
98 | output <- newbuf | |
99 | ||
100 | if err != io.EOF { | |
101 | input.log.Printf("FrameStreamInput: Read error: %v", err) | |
102 | } | |
103 | ||
104 | break | |
99 | 105 | } |
100 | 106 | close(input.wait) |
101 | 107 | } |
0 | 0 | /* |
1 | * Copyright (c) 2014 by Farsight Security, Inc. | |
1 | * Copyright (c) 2014,2019 by Farsight Security, Inc. | |
2 | 2 | * |
3 | 3 | * Licensed under the Apache License, Version 2.0 (the "License"); |
4 | 4 | * you may not use this file except in compliance with the License. |
17 | 17 | |
18 | 18 | import ( |
19 | 19 | "io" |
20 | "log" | |
21 | 20 | "os" |
22 | ||
23 | "github.com/farsightsec/golang-framestream" | |
24 | 21 | ) |
25 | 22 | |
26 | 23 | // FrameStreamOutput implements a dnstap Output to an io.Writer. |
27 | 24 | type FrameStreamOutput struct { |
28 | 25 | outputChannel chan []byte |
29 | 26 | wait chan bool |
30 | enc *framestream.Encoder | |
27 | w Writer | |
28 | log Logger | |
31 | 29 | } |
32 | 30 | |
33 | 31 | // NewFrameStreamOutput creates a FrameStreamOutput writing dnstap data to |
34 | 32 | // the given io.Writer. |
35 | 33 | func NewFrameStreamOutput(w io.Writer) (o *FrameStreamOutput, err error) { |
36 | o = new(FrameStreamOutput) | |
37 | o.outputChannel = make(chan []byte, outputChannelSize) | |
38 | o.enc, err = framestream.NewEncoder(w, &framestream.EncoderOptions{ContentType: FSContentType}) | |
34 | ow, err := NewWriter(w, nil) | |
39 | 35 | if err != nil { |
40 | return | |
36 | return nil, err | |
41 | 37 | } |
42 | o.wait = make(chan bool) | |
43 | return | |
38 | return &FrameStreamOutput{ | |
39 | outputChannel: make(chan []byte, outputChannelSize), | |
40 | wait: make(chan bool), | |
41 | w: ow, | |
42 | log: nullLogger{}, | |
43 | }, nil | |
44 | 44 | } |
45 | 45 | |
46 | // NewFrameStreamOutputFromFilename creates a file with the namee fname, | |
46 | // NewFrameStreamOutputFromFilename creates a file with the name fname, | |
47 | 47 | // truncates it if it exists, and returns a FrameStreamOutput writing to |
48 | 48 | // the newly created or truncated file. |
49 | 49 | func NewFrameStreamOutputFromFilename(fname string) (o *FrameStreamOutput, err error) { |
57 | 57 | return NewFrameStreamOutput(w) |
58 | 58 | } |
59 | 59 | |
60 | // SetLogger sets an alternate logger for the FrameStreamOutput. The default | |
61 | // is no logging. | |
62 | func (o *FrameStreamOutput) SetLogger(logger Logger) { | |
63 | o.log = logger | |
64 | } | |
65 | ||
60 | 66 | // GetOutputChannel returns the channel on which the FrameStreamOutput accepts |
61 | 67 | // data. |
62 | 68 | // |
68 | 74 | // RunOutputLoop processes data received on the channel returned by |
69 | 75 | // GetOutputChannel, returning after the CLose method is called. |
70 | 76 | // If there is an error writing to the Output's writer, RunOutputLoop() |
71 | // logs a fatal error exits the program. | |
77 | // returns, logging an error if a logger is configured with SetLogger() | |
72 | 78 | // |
73 | 79 | // RunOutputLoop satisfies the dnstap Output interface. |
74 | 80 | func (o *FrameStreamOutput) RunOutputLoop() { |
75 | 81 | for frame := range o.outputChannel { |
76 | if _, err := o.enc.Write(frame); err != nil { | |
77 | log.Fatalf("framestream.Encoder.Write() failed: %s\n", err) | |
78 | break | |
82 | if _, err := o.w.WriteFrame(frame); err != nil { | |
83 | o.log.Printf("FrameStreamOutput: Write error: %v, returning", err) | |
84 | close(o.wait) | |
85 | return | |
79 | 86 | } |
80 | 87 | } |
81 | 88 | close(o.wait) |
88 | 95 | func (o *FrameStreamOutput) Close() { |
89 | 96 | close(o.outputChannel) |
90 | 97 | <-o.wait |
91 | o.enc.Flush() | |
92 | o.enc.Close() | |
98 | o.w.Close() | |
93 | 99 | } |
0 | 0 | /* |
1 | * Copyright (c) 2013-2014 by Farsight Security, Inc. | |
1 | * Copyright (c) 2013-2019 by Farsight Security, Inc. | |
2 | 2 | * |
3 | 3 | * Licensed under the Apache License, Version 2.0 (the "License"); |
4 | 4 | * you may not use this file except in compliance with the License. |
16 | 16 | package dnstap |
17 | 17 | |
18 | 18 | import ( |
19 | "log" | |
19 | "fmt" | |
20 | 20 | "net" |
21 | 21 | "os" |
22 | 22 | "time" |
28 | 28 | wait chan bool |
29 | 29 | listener net.Listener |
30 | 30 | timeout time.Duration |
31 | log Logger | |
31 | 32 | } |
32 | 33 | |
33 | 34 | // NewFrameStreamSockInput creates a FrameStreamSockInput collecting dnstap |
35 | 36 | func NewFrameStreamSockInput(listener net.Listener) (input *FrameStreamSockInput) { |
36 | 37 | input = new(FrameStreamSockInput) |
37 | 38 | input.listener = listener |
39 | input.log = &nullLogger{} | |
38 | 40 | return |
39 | 41 | } |
40 | 42 | |
42 | 44 | // response control messages to clients of the FrameStreamSockInput's listener. |
43 | 45 | // |
44 | 46 | // The timeout is effective only for connections accepted after the call to |
45 | // FrameStreamSockInput. | |
47 | // SetTimeout. | |
46 | 48 | func (input *FrameStreamSockInput) SetTimeout(timeout time.Duration) { |
47 | 49 | input.timeout = timeout |
50 | } | |
51 | ||
52 | // SetLogger configures a logger for the FrameStreamSockInput. | |
53 | func (input *FrameStreamSockInput) SetLogger(logger Logger) { | |
54 | input.log = logger | |
48 | 55 | } |
49 | 56 | |
50 | 57 | // NewFrameStreamSockInputFromPath creates a unix domain socket at the |
68 | 75 | // |
69 | 76 | // ReadInto satisfies the dnstap Input interface. |
70 | 77 | func (input *FrameStreamSockInput) ReadInto(output chan []byte) { |
78 | var n uint64 | |
71 | 79 | for { |
72 | 80 | conn, err := input.listener.Accept() |
73 | 81 | if err != nil { |
74 | log.Printf("net.Listener.Accept() failed: %s\n", err) | |
82 | input.log.Printf("%s: accept failed: %v\n", | |
83 | input.listener.Addr(), | |
84 | err) | |
75 | 85 | continue |
86 | } | |
87 | n++ | |
88 | origin := "" | |
89 | switch conn.RemoteAddr().Network() { | |
90 | case "tcp", "tcp4", "tcp6": | |
91 | origin = fmt.Sprintf(" from %s", conn.RemoteAddr()) | |
76 | 92 | } |
77 | 93 | i, err := NewFrameStreamInputTimeout(conn, true, input.timeout) |
78 | 94 | if err != nil { |
79 | log.Printf("dnstap.NewFrameStreamInput() failed: %s\n", err) | |
95 | input.log.Printf("%s: connection %d: open input%s failed: %v", | |
96 | conn.LocalAddr(), n, origin, err) | |
80 | 97 | continue |
81 | 98 | } |
82 | log.Printf("dnstap.FrameStreamSockInput: accepted a socket connection\n") | |
83 | go i.ReadInto(output) | |
99 | input.log.Printf("%s: accepted connection %d%s", | |
100 | conn.LocalAddr(), n, origin) | |
101 | i.SetLogger(input.log) | |
102 | go func(cn uint64) { | |
103 | i.ReadInto(output) | |
104 | input.log.Printf("%s: closed connection %d%s", | |
105 | conn.LocalAddr(), cn, origin) | |
106 | }(n) | |
84 | 107 | } |
85 | 108 | } |
86 | 109 |
16 | 16 | package dnstap |
17 | 17 | |
18 | 18 | import ( |
19 | "log" | |
20 | 19 | "net" |
21 | 20 | "time" |
22 | ||
23 | "github.com/farsightsec/golang-framestream" | |
24 | 21 | ) |
25 | 22 | |
26 | 23 | // A FrameStreamSockOutput manages a socket connection and sends dnstap |
27 | 24 | // data over a framestream connection on that socket. |
28 | 25 | type FrameStreamSockOutput struct { |
26 | address net.Addr | |
29 | 27 | outputChannel chan []byte |
30 | address net.Addr | |
31 | 28 | wait chan bool |
32 | dialer *net.Dialer | |
33 | timeout time.Duration | |
34 | retry time.Duration | |
35 | flushTimeout time.Duration | |
29 | wopt SocketWriterOptions | |
36 | 30 | } |
37 | 31 | |
38 | 32 | // NewFrameStreamSockOutput creates a FrameStreamSockOutput manaaging a |
39 | 33 | // connection to the given address. |
40 | 34 | func NewFrameStreamSockOutput(address net.Addr) (*FrameStreamSockOutput, error) { |
41 | 35 | return &FrameStreamSockOutput{ |
36 | address: address, | |
42 | 37 | outputChannel: make(chan []byte, outputChannelSize), |
43 | address: address, | |
44 | 38 | wait: make(chan bool), |
45 | retry: 10 * time.Second, | |
46 | flushTimeout: 5 * time.Second, | |
47 | dialer: &net.Dialer{ | |
48 | Timeout: 30 * time.Second, | |
39 | wopt: SocketWriterOptions{ | |
40 | FlushTimeout: 5 * time.Second, | |
41 | RetryInterval: 10 * time.Second, | |
42 | Dialer: &net.Dialer{ | |
43 | Timeout: 30 * time.Second, | |
44 | }, | |
45 | Logger: &nullLogger{}, | |
49 | 46 | }, |
50 | 47 | }, nil |
51 | 48 | } |
54 | 51 | // read timeout for handshake responses on the FrameStreamSockOutput's |
55 | 52 | // connection. The default timeout is zero, for no timeout. |
56 | 53 | func (o *FrameStreamSockOutput) SetTimeout(timeout time.Duration) { |
57 | o.timeout = timeout | |
54 | o.wopt.Timeout = timeout | |
58 | 55 | } |
59 | 56 | |
60 | 57 | // SetFlushTimeout sets the maximum time data will be kept in the output |
62 | 59 | // |
63 | 60 | // The default flush timeout is five seconds. |
64 | 61 | func (o *FrameStreamSockOutput) SetFlushTimeout(timeout time.Duration) { |
65 | o.flushTimeout = timeout | |
62 | o.wopt.FlushTimeout = timeout | |
66 | 63 | } |
67 | 64 | |
68 | 65 | // SetRetryInterval specifies how long the FrameStreamSockOutput will wait |
69 | 66 | // before re-establishing a failed connection. The default retry interval |
70 | 67 | // is 10 seconds. |
71 | 68 | func (o *FrameStreamSockOutput) SetRetryInterval(retry time.Duration) { |
72 | o.retry = retry | |
69 | o.wopt.RetryInterval = retry | |
73 | 70 | } |
74 | 71 | |
75 | 72 | // SetDialer replaces the default net.Dialer for re-establishing the |
80 | 77 | // FrameStreamSockOutput uses a default dialer with a 30 second |
81 | 78 | // timeout. |
82 | 79 | func (o *FrameStreamSockOutput) SetDialer(dialer *net.Dialer) { |
83 | o.dialer = dialer | |
80 | o.wopt.Dialer = dialer | |
81 | } | |
82 | ||
83 | // SetLogger configures FrameStreamSockOutput to log through the given | |
84 | // Logger. | |
85 | func (o *FrameStreamSockOutput) SetLogger(logger Logger) { | |
86 | o.wopt.Logger = logger | |
84 | 87 | } |
85 | 88 | |
86 | 89 | // GetOutputChannel returns the channel on which the |
91 | 94 | return o.outputChannel |
92 | 95 | } |
93 | 96 | |
94 | // A timedConn resets an associated timer on each Write to the underlying | |
95 | // connection, and is used to implement the output's flush timeout. | |
96 | type timedConn struct { | |
97 | net.Conn | |
98 | timer *time.Timer | |
99 | timeout time.Duration | |
100 | ||
101 | // idle is true if the timer has fired and we have consumed | |
102 | // the time from its channel. We use this to prevent deadlocking | |
103 | // when resetting or stopping an already fired timer. | |
104 | idle bool | |
105 | } | |
106 | ||
107 | // SetIdle informs the timedConn that the associated timer is idle, i.e. | |
108 | // it has fired and has not been reset. | |
109 | func (t *timedConn) SetIdle() { | |
110 | t.idle = true | |
111 | } | |
112 | ||
113 | // Stop stops the underlying timer, consuming any time value if the timer | |
114 | // had fired before Stop was called. | |
115 | func (t *timedConn) StopTimer() { | |
116 | if !t.timer.Stop() && !t.idle { | |
117 | <-t.timer.C | |
118 | } | |
119 | t.idle = true | |
120 | } | |
121 | ||
122 | func (t *timedConn) Write(b []byte) (int, error) { | |
123 | t.StopTimer() | |
124 | t.timer.Reset(t.timeout) | |
125 | t.idle = false | |
126 | return t.Conn.Write(b) | |
127 | } | |
128 | ||
129 | func (t *timedConn) Close() error { | |
130 | t.StopTimer() | |
131 | return t.Conn.Close() | |
132 | } | |
133 | ||
134 | 97 | // RunOutputLoop reads data from the output channel and sends it over |
135 | 98 | // a connections to the FrameStreamSockOutput's address, establishing |
136 | 99 | // the connection as needed. |
137 | 100 | // |
138 | 101 | // RunOutputLoop satisifes the dnstap Output interface. |
139 | 102 | func (o *FrameStreamSockOutput) RunOutputLoop() { |
140 | var enc *framestream.Encoder | |
141 | var err error | |
103 | w := NewSocketWriter(o.address, &o.wopt) | |
142 | 104 | |
143 | // Start with the connection flush timer in a stopped state. | |
144 | // It will be reset by the first Write call on a new connection. | |
145 | conn := &timedConn{ | |
146 | timer: time.NewTimer(0), | |
147 | timeout: o.flushTimeout, | |
105 | for b := range o.outputChannel { | |
106 | // w is of type *SocketWriter, whose Write implementation | |
107 | // handles all errors by retrying the connection. | |
108 | w.WriteFrame(b) | |
148 | 109 | } |
149 | conn.StopTimer() | |
150 | 110 | |
151 | defer func() { | |
152 | if enc != nil { | |
153 | enc.Flush() | |
154 | enc.Close() | |
155 | } | |
156 | if conn != nil { | |
157 | conn.Close() | |
158 | } | |
159 | close(o.wait) | |
160 | }() | |
161 | ||
162 | for { | |
163 | select { | |
164 | case frame, ok := <-o.outputChannel: | |
165 | if !ok { | |
166 | return | |
167 | } | |
168 | ||
169 | // the retry loop | |
170 | for ;; time.Sleep(o.retry) { | |
171 | if enc == nil { | |
172 | // connect the socket | |
173 | conn.Conn, err = o.dialer.Dial(o.address.Network(), o.address.String()) | |
174 | if err != nil { | |
175 | log.Printf("Dial() failed: %v", err) | |
176 | continue // = retry | |
177 | } | |
178 | ||
179 | // create the encoder | |
180 | eopt := framestream.EncoderOptions{ | |
181 | ContentType: FSContentType, | |
182 | Bidirectional: true, | |
183 | Timeout: o.timeout, | |
184 | } | |
185 | enc, err = framestream.NewEncoder(conn, &eopt) | |
186 | if err != nil { | |
187 | log.Printf("framestream.NewEncoder() failed: %v", err) | |
188 | conn.Close() | |
189 | enc = nil | |
190 | continue // = retry | |
191 | } | |
192 | } | |
193 | ||
194 | // try writing | |
195 | if _, err = enc.Write(frame); err != nil { | |
196 | log.Printf("framestream.Encoder.Write() failed: %v", err) | |
197 | enc.Close() | |
198 | enc = nil | |
199 | conn.Close() | |
200 | continue // = retry | |
201 | } | |
202 | ||
203 | break // success! | |
204 | } | |
205 | ||
206 | case <-conn.timer.C: | |
207 | conn.SetIdle() | |
208 | if enc == nil { | |
209 | continue | |
210 | } | |
211 | if err := enc.Flush(); err != nil { | |
212 | log.Printf("framestream.Encoder.Flush() failed: %s", err) | |
213 | enc.Close() | |
214 | enc = nil | |
215 | conn.Close() | |
216 | time.Sleep(o.retry) | |
217 | } | |
218 | } | |
219 | } | |
111 | w.Close() | |
112 | close(o.wait) | |
113 | return | |
220 | 114 | } |
221 | 115 | |
222 | 116 | // Close shuts down the FrameStreamSockOutput's output channel and returns |
0 | /* | |
1 | * Copyright (c) 2019 by Farsight Security, Inc. | |
2 | * | |
3 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | * you may not use this file except in compliance with the License. | |
5 | * You may obtain a copy of the License at | |
6 | * | |
7 | * http://www.apache.org/licenses/LICENSE-2.0 | |
8 | * | |
9 | * Unless required by applicable law or agreed to in writing, software | |
10 | * distributed under the License is distributed on an "AS IS" BASIS, | |
11 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | * See the License for the specific language governing permissions and | |
13 | * limitations under the License. | |
14 | */ | |
15 | ||
16 | package dnstap | |
17 | ||
18 | import ( | |
19 | "io" | |
20 | "time" | |
21 | ||
22 | framestream "github.com/farsightsec/golang-framestream" | |
23 | ) | |
24 | ||
25 | // A Reader is a source of dnstap frames. | |
26 | type Reader interface { | |
27 | ReadFrame([]byte) (int, error) | |
28 | } | |
29 | ||
30 | // ReaderOptions specifies configuration for the Reader. | |
31 | type ReaderOptions struct { | |
32 | // If Bidirectional is true, the underlying io.Reader must also | |
33 | // satisfy io.Writer, and the dnstap Reader will use the bidirectional | |
34 | // Frame Streams protocol. | |
35 | Bidirectional bool | |
36 | // Timeout sets the timeout for reading the initial handshake and | |
37 | // writing response control messages to the underlying Reader. Timeout | |
38 | // is only effective if the underlying Reader is a net.Conn. | |
39 | Timeout time.Duration | |
40 | } | |
41 | ||
42 | // NewReader creates a Reader using the given io.Reader and options. | |
43 | func NewReader(r io.Reader, opt *ReaderOptions) (Reader, error) { | |
44 | if opt == nil { | |
45 | opt = &ReaderOptions{} | |
46 | } | |
47 | return framestream.NewReader(r, | |
48 | &framestream.ReaderOptions{ | |
49 | ContentTypes: [][]byte{FSContentType}, | |
50 | Timeout: opt.Timeout, | |
51 | Bidirectional: opt.Bidirectional, | |
52 | }) | |
53 | } |
0 | /* | |
1 | * Copyright (c) 2019 by Farsight Security, Inc. | |
2 | * | |
3 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | * you may not use this file except in compliance with the License. | |
5 | * You may obtain a copy of the License at | |
6 | * | |
7 | * http://www.apache.org/licenses/LICENSE-2.0 | |
8 | * | |
9 | * Unless required by applicable law or agreed to in writing, software | |
10 | * distributed under the License is distributed on an "AS IS" BASIS, | |
11 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | * See the License for the specific language governing permissions and | |
13 | * limitations under the License. | |
14 | */ | |
15 | ||
16 | package dnstap | |
17 | ||
18 | import ( | |
19 | "net" | |
20 | "sync" | |
21 | "time" | |
22 | ||
23 | framestream "github.com/farsightsec/golang-framestream" | |
24 | ) | |
25 | ||
26 | // A SocketWriter writes data to a Frame Streams TCP or Unix domain socket, | |
27 | // establishing or restarting the connection if needed. | |
28 | type socketWriter struct { | |
29 | w Writer | |
30 | c net.Conn | |
31 | addr net.Addr | |
32 | opt SocketWriterOptions | |
33 | } | |
34 | ||
35 | // SocketWriterOptions provides configuration options for a SocketWriter | |
36 | type SocketWriterOptions struct { | |
37 | // Timeout gives the time the SocketWriter will wait for reads and | |
38 | // writes to complete. | |
39 | Timeout time.Duration | |
40 | // FlushTimeout is the maximum duration data will be buffered while | |
41 | // being written to the socket. | |
42 | FlushTimeout time.Duration | |
43 | // RetryInterval is how long the SocketWriter will wait between | |
44 | // connection attempts. | |
45 | RetryInterval time.Duration | |
46 | // Dialer is the dialer used to establish the connection. If nil, | |
47 | // SocketWriter will use a default dialer with a 30 second timeout. | |
48 | Dialer *net.Dialer | |
49 | // Logger provides the logger for connection establishment, reconnection, | |
50 | // and error events of the SocketWriter. | |
51 | Logger Logger | |
52 | } | |
53 | ||
54 | type flushWriter struct { | |
55 | m sync.Mutex | |
56 | w *framestream.Writer | |
57 | d time.Duration | |
58 | timer *time.Timer | |
59 | timerActive bool | |
60 | lastFlushed time.Time | |
61 | stopped bool | |
62 | } | |
63 | ||
64 | type flusherConn struct { | |
65 | net.Conn | |
66 | lastWritten *time.Time | |
67 | } | |
68 | ||
69 | func (c *flusherConn) Write(p []byte) (int, error) { | |
70 | n, err := c.Conn.Write(p) | |
71 | *c.lastWritten = time.Now() | |
72 | return n, err | |
73 | } | |
74 | ||
75 | func newFlushWriter(c net.Conn, d time.Duration) (*flushWriter, error) { | |
76 | var err error | |
77 | fw := &flushWriter{timer: time.NewTimer(d), d: d} | |
78 | if !fw.timer.Stop() { | |
79 | <-fw.timer.C | |
80 | } | |
81 | ||
82 | fc := &flusherConn{ | |
83 | Conn: c, | |
84 | lastWritten: &fw.lastFlushed, | |
85 | } | |
86 | ||
87 | fw.w, err = framestream.NewWriter(fc, | |
88 | &framestream.WriterOptions{ | |
89 | ContentTypes: [][]byte{FSContentType}, | |
90 | Bidirectional: true, | |
91 | Timeout: d, | |
92 | }) | |
93 | if err != nil { | |
94 | return nil, err | |
95 | } | |
96 | go fw.runFlusher() | |
97 | return fw, nil | |
98 | } | |
99 | ||
100 | func (fw *flushWriter) runFlusher() { | |
101 | for range fw.timer.C { | |
102 | fw.m.Lock() | |
103 | if fw.stopped { | |
104 | fw.m.Unlock() | |
105 | return | |
106 | } | |
107 | last := fw.lastFlushed | |
108 | elapsed := time.Since(last) | |
109 | if elapsed < fw.d { | |
110 | fw.timer.Reset(fw.d - elapsed) | |
111 | fw.m.Unlock() | |
112 | continue | |
113 | } | |
114 | fw.w.Flush() | |
115 | fw.timerActive = false | |
116 | fw.m.Unlock() | |
117 | } | |
118 | } | |
119 | ||
120 | func (fw *flushWriter) WriteFrame(p []byte) (int, error) { | |
121 | fw.m.Lock() | |
122 | n, err := fw.w.WriteFrame(p) | |
123 | if !fw.timerActive { | |
124 | fw.timer.Reset(fw.d) | |
125 | fw.timerActive = true | |
126 | } | |
127 | fw.m.Unlock() | |
128 | return n, err | |
129 | } | |
130 | ||
131 | func (fw *flushWriter) Close() error { | |
132 | fw.m.Lock() | |
133 | fw.stopped = true | |
134 | fw.timer.Reset(0) | |
135 | err := fw.w.Close() | |
136 | fw.m.Unlock() | |
137 | return err | |
138 | } | |
139 | ||
140 | // NewSocketWriter creates a SocketWriter which writes data to a connection | |
141 | // to the given addr. The SocketWriter maintains and re-establishes the | |
142 | // connection to this address as needed. | |
143 | func NewSocketWriter(addr net.Addr, opt *SocketWriterOptions) Writer { | |
144 | if opt == nil { | |
145 | opt = &SocketWriterOptions{} | |
146 | } | |
147 | ||
148 | if opt.Logger == nil { | |
149 | opt.Logger = &nullLogger{} | |
150 | } | |
151 | return &socketWriter{addr: addr, opt: *opt} | |
152 | } | |
153 | ||
154 | func (sw *socketWriter) openWriter() error { | |
155 | var err error | |
156 | sw.c, err = sw.opt.Dialer.Dial(sw.addr.Network(), sw.addr.String()) | |
157 | if err != nil { | |
158 | return err | |
159 | } | |
160 | ||
161 | wopt := WriterOptions{ | |
162 | Bidirectional: true, | |
163 | Timeout: sw.opt.Timeout, | |
164 | } | |
165 | ||
166 | if sw.opt.FlushTimeout == 0 { | |
167 | sw.w, err = NewWriter(sw.c, &wopt) | |
168 | } else { | |
169 | sw.w, err = newFlushWriter(sw.c, sw.opt.FlushTimeout) | |
170 | } | |
171 | if err != nil { | |
172 | sw.c.Close() | |
173 | return err | |
174 | } | |
175 | return nil | |
176 | } | |
177 | ||
178 | // Close shuts down the SocketWriter, closing any open connection. | |
179 | func (sw *socketWriter) Close() error { | |
180 | var err error | |
181 | if sw.w != nil { | |
182 | err = sw.w.Close() | |
183 | if err == nil { | |
184 | return sw.c.Close() | |
185 | } | |
186 | sw.c.Close() | |
187 | return err | |
188 | } | |
189 | if sw.c != nil { | |
190 | return sw.c.Close() | |
191 | } | |
192 | return nil | |
193 | } | |
194 | ||
195 | // Write writes the data in p as a Dnstap frame to a connection to the | |
196 | // SocketWriter's address. Write may block indefinitely while the SocketWriter | |
197 | // attempts to establish or re-establish the connection and FrameStream session. | |
198 | func (sw *socketWriter) WriteFrame(p []byte) (int, error) { | |
199 | for ; ; time.Sleep(sw.opt.RetryInterval) { | |
200 | if sw.w == nil { | |
201 | if err := sw.openWriter(); err != nil { | |
202 | sw.opt.Logger.Printf("%s: open failed: %v", sw.addr, err) | |
203 | continue | |
204 | } | |
205 | } | |
206 | ||
207 | n, err := sw.w.WriteFrame(p) | |
208 | if err != nil { | |
209 | sw.opt.Logger.Printf("%s: write failed: %v", sw.addr, err) | |
210 | sw.Close() | |
211 | continue | |
212 | } | |
213 | ||
214 | return n, nil | |
215 | } | |
216 | } |
18 | 18 | import ( |
19 | 19 | "bufio" |
20 | 20 | "io" |
21 | "log" | |
22 | 21 | "os" |
23 | 22 | |
24 | 23 | "github.com/golang/protobuf/proto" |
33 | 32 | outputChannel chan []byte |
34 | 33 | wait chan bool |
35 | 34 | writer *bufio.Writer |
35 | log Logger | |
36 | 36 | } |
37 | 37 | |
38 | 38 | // NewTextOutput creates a TextOutput writing dnstap data to the given io.Writer |
66 | 66 | return NewTextOutput(writer, format), nil |
67 | 67 | } |
68 | 68 | |
69 | // SetLogger configures a logger for error events in the TextOutput | |
70 | func (o *TextOutput) SetLogger(logger Logger) { | |
71 | o.log = logger | |
72 | } | |
73 | ||
69 | 74 | // GetOutputChannel returns the channel on which the TextOutput accepts dnstap data. |
70 | 75 | // |
71 | 76 | // GetOutputChannel satisfies the dnstap Output interface. |
82 | 87 | dt := &Dnstap{} |
83 | 88 | for frame := range o.outputChannel { |
84 | 89 | if err := proto.Unmarshal(frame, dt); err != nil { |
85 | log.Fatalf("dnstap.TextOutput: proto.Unmarshal() failed: %s\n", err) | |
90 | o.log.Printf("dnstap.TextOutput: proto.Unmarshal() failed: %s, returning", err) | |
86 | 91 | break |
87 | 92 | } |
88 | 93 | buf, ok := o.format(dt) |
89 | 94 | if !ok { |
90 | log.Fatalf("dnstap.TextOutput: text format function failed\n") | |
95 | o.log.Printf("dnstap.TextOutput: text format function failed, returning") | |
91 | 96 | break |
92 | 97 | } |
93 | 98 | if _, err := o.writer.Write(buf); err != nil { |
94 | log.Fatalf("dnstap.TextOutput: write failed: %s\n", err) | |
99 | o.log.Printf("dnstap.TextOutput: write error: %v, returning", err) | |
95 | 100 | break |
96 | 101 | } |
97 | 102 | o.writer.Flush() |
0 | /* | |
1 | * Copyright (c) 2019 by Farsight Security, Inc. | |
2 | * | |
3 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | * you may not use this file except in compliance with the License. | |
5 | * You may obtain a copy of the License at | |
6 | * | |
7 | * http://www.apache.org/licenses/LICENSE-2.0 | |
8 | * | |
9 | * Unless required by applicable law or agreed to in writing, software | |
10 | * distributed under the License is distributed on an "AS IS" BASIS, | |
11 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | * See the License for the specific language governing permissions and | |
13 | * limitations under the License. | |
14 | */ | |
15 | ||
16 | package dnstap | |
17 | ||
18 | import ( | |
19 | "io" | |
20 | "time" | |
21 | ||
22 | framestream "github.com/farsightsec/golang-framestream" | |
23 | ) | |
24 | ||
25 | // A Writer writes dnstap frames to its destination. | |
26 | type Writer interface { | |
27 | WriteFrame([]byte) (int, error) | |
28 | Close() error | |
29 | } | |
30 | ||
31 | // WriterOptions specifies configuration for the Writer | |
32 | type WriterOptions struct { | |
33 | // If Bidirectional is true, the underlying io.Writer must also | |
34 | // satisfy io.Reader, and the dnstap Writer will use the bidirectional | |
35 | // Frame Streams protocol. | |
36 | Bidirectional bool | |
37 | // Timeout sets the write timeout for data and control messages and the | |
38 | // read timeout for handshake responses on the underlying Writer. Timeout | |
39 | // is only effective if the underlying Writer is a net.Conn. | |
40 | Timeout time.Duration | |
41 | } | |
42 | ||
43 | // NewWriter creates a Writer using the given io.Writer and options. | |
44 | func NewWriter(w io.Writer, opt *WriterOptions) (Writer, error) { | |
45 | if opt == nil { | |
46 | opt = &WriterOptions{} | |
47 | } | |
48 | return framestream.NewWriter(w, | |
49 | &framestream.WriterOptions{ | |
50 | ContentTypes: [][]byte{FSContentType}, | |
51 | Timeout: opt.Timeout, | |
52 | Bidirectional: opt.Bidirectional, | |
53 | }) | |
54 | } |
47 | 47 | } |
48 | 48 | |
49 | 49 | func openOutputFile(filename string, formatter dnstap.TextFormatFunc, doAppend bool) (o dnstap.Output, err error) { |
50 | var fso *dnstap.FrameStreamOutput | |
51 | var to *dnstap.TextOutput | |
50 | 52 | if formatter == nil { |
51 | 53 | if filename == "-" || filename == "" { |
52 | o = dnstap.NewTextOutput(os.Stdout, dnstap.TextFormat) | |
53 | return | |
54 | to = dnstap.NewTextOutput(os.Stdout, dnstap.TextFormat) | |
55 | to.SetLogger(logger) | |
56 | return to, nil | |
54 | 57 | } |
55 | o, err = dnstap.NewFrameStreamOutputFromFilename(filename) | |
58 | fso, err = dnstap.NewFrameStreamOutputFromFilename(filename) | |
59 | if err == nil { | |
60 | fso.SetLogger(logger) | |
61 | return fso, nil | |
62 | } | |
56 | 63 | } else { |
57 | 64 | if filename == "-" || filename == "" { |
58 | 65 | if doAppend { |
59 | 66 | return nil, errors.New("cannot append to stdout (-)") |
60 | 67 | } |
61 | o = dnstap.NewTextOutput(os.Stdout, formatter) | |
62 | return | |
68 | to = dnstap.NewTextOutput(os.Stdout, formatter) | |
69 | to.SetLogger(logger) | |
70 | return to, nil | |
63 | 71 | } |
64 | o, err = dnstap.NewTextOutputFromFilename(filename, formatter, doAppend) | |
72 | to, err = dnstap.NewTextOutputFromFilename(filename, formatter, doAppend) | |
73 | if err == nil { | |
74 | to.SetLogger(logger) | |
75 | } | |
76 | return to, nil | |
65 | 77 | } |
66 | 78 | return |
67 | 79 | } |
67 | 67 | `) |
68 | 68 | } |
69 | 69 | |
70 | var logger = log.New(os.Stderr, "", log.LstdFlags) | |
71 | ||
70 | 72 | func main() { |
71 | 73 | var tcpOutputs, unixOutputs stringList |
72 | 74 | var fileInputs, tcpInputs, unixInputs stringList |
139 | 141 | fmt.Fprintf(os.Stderr, "dnstap: Failed to open input file %s: %v\n", fname, err) |
140 | 142 | os.Exit(1) |
141 | 143 | } |
144 | i.SetLogger(logger) | |
142 | 145 | fmt.Fprintf(os.Stderr, "dnstap: opened input file %s\n", fname) |
143 | 146 | iwg.Add(1) |
144 | 147 | go runInput(i, output, &iwg) |
150 | 153 | os.Exit(1) |
151 | 154 | } |
152 | 155 | i.SetTimeout(*flagTimeout) |
156 | i.SetLogger(logger) | |
153 | 157 | fmt.Fprintf(os.Stderr, "dnstap: opened input socket %s\n", path) |
154 | 158 | iwg.Add(1) |
155 | 159 | go runInput(i, output, &iwg) |
162 | 166 | } |
163 | 167 | i := dnstap.NewFrameStreamSockInput(l) |
164 | 168 | i.SetTimeout(*flagTimeout) |
169 | i.SetLogger(logger) | |
165 | 170 | iwg.Add(1) |
166 | 171 | go runInput(i, output, &iwg) |
167 | 172 | } |
197 | 202 | return err |
198 | 203 | } |
199 | 204 | o.SetTimeout(*flagTimeout) |
205 | o.SetLogger(logger) | |
200 | 206 | go o.RunOutputLoop() |
201 | 207 | mo.Add(o) |
202 | 208 | } |
0 | 0 | /* |
1 | * Copyright (c) 2014 by Farsight Security, Inc. | |
1 | * Copyright (c) 2014,2019 by Farsight Security, Inc. | |
2 | 2 | * |
3 | 3 | * Licensed under the Apache License, Version 2.0 (the "License"); |
4 | 4 | * you may not use this file except in compliance with the License. |
28 | 28 | Wait() |
29 | 29 | } |
30 | 30 | |
31 | // An Output is a desintaion for dnstap data. It accepts data on the channel | |
31 | // An Output is a destination for dnstap data. It accepts data on the channel | |
32 | 32 | // returned from the GetOutputChannel method. The RunOutputLoop() method |
33 | 33 | // processes data received on this channel, and returns after the Close() |
34 | 34 | // method is called. |
37 | 37 | RunOutputLoop() |
38 | 38 | Close() |
39 | 39 | } |
40 | ||
41 | // A Logger prints a formatted log message to the destination of the | |
42 | // implementation's choice. A Logger may be provided for some Input and | |
43 | // Output implementations for visibility into their ReadInto() and | |
44 | // RunOutputLoop() loops. | |
45 | // | |
46 | // The result of log.New() satisfies the Logger interface. | |
47 | type Logger interface { | |
48 | Printf(format string, v ...interface{}) | |
49 | } | |
50 | ||
51 | type nullLogger struct{} | |
52 | ||
53 | func (n nullLogger) Printf(format string, v ...interface{}) {} |
0 | 0 | module github.com/dnstap/golang-dnstap |
1 | 1 | |
2 | 2 | require ( |
3 | github.com/farsightsec/golang-framestream v0.2.0 | |
3 | github.com/farsightsec/golang-framestream v0.3.0 | |
4 | 4 | github.com/golang/protobuf v1.4.2 |
5 | 5 | github.com/miekg/dns v1.1.31 |
6 | 6 | ) |
0 | github.com/farsightsec/golang-framestream v0.2.0 h1:tW5KuZqkNIoiBc6LMsMLg+h6kkGyIB4/RR1z7Tqv6wE= | |
1 | github.com/farsightsec/golang-framestream v0.2.0/go.mod h1:eNde4IQyEiA5br02AouhEHCu3p3UzrCdFR4LuQHklMI= | |
0 | github.com/farsightsec/golang-framestream v0.3.0 h1:/spFQHucTle/ZIPkYqrfshQqPe2VQEzesH243TjIwqA= | |
1 | github.com/farsightsec/golang-framestream v0.3.0/go.mod h1:eNde4IQyEiA5br02AouhEHCu3p3UzrCdFR4LuQHklMI= | |
2 | 2 | github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= |
3 | 3 | github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= |
4 | 4 | github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= |
2 | 2 | import ( |
3 | 3 | "fmt" |
4 | 4 | "net" |
5 | "os" | |
5 | 6 | "testing" |
6 | 7 | "time" |
7 | 8 | ) |
9 | ||
10 | type testLogger struct{ *testing.T } | |
11 | ||
12 | func (t *testLogger) Printf(format string, v ...interface{}) { | |
13 | t.Helper() | |
14 | t.Logf(format, v...) | |
15 | } | |
8 | 16 | |
9 | 17 | func dialAndSend(t *testing.T, network, address string) *FrameStreamSockOutput { |
10 | 18 | var addr net.Addr |
30 | 38 | out.SetTimeout(time.Second) |
31 | 39 | out.SetFlushTimeout(100 * time.Millisecond) |
32 | 40 | out.SetRetryInterval(time.Second) |
41 | out.SetLogger(&testLogger{t}) | |
33 | 42 | |
34 | 43 | go out.RunOutputLoop() |
35 | 44 | <-time.After(500 * time.Millisecond) |
52 | 61 | t.Fatal(err) |
53 | 62 | } |
54 | 63 | |
64 | defer os.Remove("dnstap.sock") | |
65 | ||
66 | in.SetLogger(&testLogger{t}) | |
55 | 67 | out := make(chan []byte) |
56 | 68 | go in.ReadInto(out) |
57 | 69 | |
82 | 94 | } |
83 | 95 | |
84 | 96 | in := NewFrameStreamSockInput(l) |
97 | in.SetLogger(&testLogger{t}) | |
85 | 98 | out := make(chan []byte) |
86 | 99 | go in.ReadInto(out) |
87 | 100 | readOne(t, out) |
129 | 142 | i := 1 |
130 | 143 | |
131 | 144 | b.StartTimer() |
132 | for _ = range outch { i++ } | |
133 | if i != b.N { | |
145 | for _ = range outch { | |
146 | i++ | |
147 | } | |
148 | if i != b.N { | |
134 | 149 | b.Error("invalid frame count") |
135 | 150 | } |
136 | 151 | close(readDone) |
174 | 189 | go func() { |
175 | 190 | <-outch |
176 | 191 | b.StartTimer() |
177 | for i := 1; i < b.N; i++ { <-outch } // NB: read never fails | |
192 | for i := 1; i < b.N; i++ { | |
193 | <-outch | |
194 | } // NB: read never fails | |
178 | 195 | close(readDone) |
179 | 196 | }() |
180 | 197 |