Codebase list golang-github-fluent-fluent-logger-golang / ce139ad
Import upstream version 1.9.0 Debian Janitor 2 years ago
8 changed file(s) with 1038 addition(s) and 311 deletion(s). Raw diff Collapse all Expand all
0 name: ci
1
2 on:
3 pull_request:
4 branches: '*'
5 push:
6 branches:
7 - master
8 - main
9 - 'release-*'
10
11 jobs:
12 test:
13 strategy:
14 matrix:
15 os: [ubuntu, macos, windows]
16 golang: ['1.13', '1.16', '1.17']
17 # currently, we cannot run non-x86_64 machines on Github Actions cloud env.
18 runs-on: ${{ matrix.os }}-latest
19 name: CI golang ${{ matrix.golang }} on ${{ matrix.os }}
20 steps:
21 - uses: actions/checkout@v2
22 - uses: actions/setup-go@v2
23 with:
24 go-version: ${{ matrix.golang }}
25 - name: Change GO11MODULES
26 run: go env -w GO111MODULE=auto
27 - name: Install requirements
28 run: |
29 go get github.com/bmizerany/assert
30 go get github.com/philhofer/fwd
31 go get github.com/tinylib/msgp
32 - name: Test
33 run: go test -v ./fluent
00 .DS_Store
11 *~
22 .project
3 .settings
3 .settings
4 .idea
+0
-14
.travis.yml less more
0 language: go
1 go:
2 - 1.9
3 - 1.11
4 - 1.12
5 - tip
6 install:
7 - go get github.com/bmizerany/assert
8 - go get github.com/philhofer/fwd
9 - go get github.com/tinylib/msgp
10 sudo: false
11 matrix:
12 allow_failures:
13 - go: tip
00 # CHANGELOG
1
2 ## 1.9.0
3
4 New features
5
6 * Add a new option `AsyncReconnectInterval` for periodic connection
7 refresh. #111
8
9 Contributions to this release
10
11 * Conor Evans
12
13 ## 1.8.0
14
15 New features
16
17 * Support logging over secure connections using TLS. #107
18
19 Changes
20
21 * Change `Fluent.Post()` to return an error after connection close. #105
22
23 Contributions to this release
24
25 * Love Sharma
26 * Fujimoto Seiji
27
28 ## 1.7.0
29 * Update connection management to stop logger during connection failures
30
31 ## 1.6.3
32 * Fix not to panic when accessing unexported struct fields
33
34 ## 1.6.2
35 * Add `AsyncResultCallback` to allow users to handle errors when using asynchronous message sending.
36
37 ## 1.6.1
38 * Add another fix for `Close` called twice in Async
39
40 ## 1.6.0
41 * Add support for `ppc64le`
42 * Fix unexpected behaviors&panic around `Close`
143
244 ## 1.5.0
345 * Add `ForceStopAsyncSend` to stop asynchronous message transferring immediately when `close()` called
00 fluent-logger-golang
11 ====
22
3 [![Build Status](https://travis-ci.org/fluent/fluent-logger-golang.png?branch=master)](https://travis-ci.org/fluent/fluent-logger-golang)
3 [![Build Status](https://github.com/fluent/fluent-logger-golang/actions/workflows/ci.yaml/badge.svg?branch=master)](https://github.com/fluent/fluent-logger-golang/actions)
44 [![GoDoc](https://godoc.org/github.com/fluent/fluent-logger-golang/fluent?status.svg)](https://godoc.org/github.com/fluent/fluent-logger-golang/fluent)
55
66 ## A structured event logger for Fluentd (Golang)
5757 f := fluent.New(fluent.Config{FluentPort: 80, FluentHost: "example.com"})
5858 ```
5959
60 ### FluentNetwork
61
62 Specify the network protocol. The supported values are:
63
64 * "tcp" (use `FluentHost` and `FluentPort`)
65 * "tls" (use`FluentHost` and `FluentPort`)
66 * "unix" (use `FluentSocketPath`)
67
68 The default is "tcp".
69
70 ### FluentHost
71
72 Specify a hostname or IP address as a string for the destination of the "tcp" protocol.
73 The default is "127.0.0.1".
74
75 ### FluentPort
76
77 Specify the TCP port of the destination. The default is 24224.
78
79 ### FluentSocketPath
80
81 Specify the unix socket path when `FluentNetwork` is "unix".
82
83 ### Timeout
84
85 Set the timeout value of `time.Duration` to connect to the destination.
86 The default is 3 seconds.
87
6088 ### WriteTimeout
6189
62 Sets the timeout for Write call of logger.Post.
90 Sets the timeout value of `time.Duration` for Write call of `logger.Post`.
6391 Since the default is zero value, Write will not time out.
92
93 ### BufferLimit
94
95 Sets the number of events buffered on the memory. Records will be stored in memory up to this number. If the buffer is full, the call to record logs will fail.
96 The default is 8192.
97
98 ### RetryWait
99
100 Set the duration of the initial wait for the first retry, in milliseconds. The actual retry wait will be `r * 1.5^(N-1)` (r: this value, N: the number of retries).
101 The default is 500.
102
103 ### MaxRetry
104
105 Sets the maximum number of retries. If the number of retries become larger than this value, the write/send operation will fail.
106 The default is 13.
107
108 ### MaxRetryWait
109
110 The maximum duration of wait between retries, in milliseconds. If the calculated retry wait is larger than this value, the actual retry wait will be this value.
111 The default is 60,000 (60 seconds).
112
113 ### TagPrefix
114
115 Sets the prefix string of the tag. Prefix will be appended with a dot `.`, like `ppp.tag` (ppp: the value of this parameter, tag: the tag string specified in a call).
116 The default is blank.
64117
65118 ### Async
66119
72125 When Async is enabled, immediately discard the event queue on close() and return (instead of trying MaxRetry times for each event in the queue before returning)
73126 The default is false.
74127
128 ### AsyncResultCallback
129
130 When Async is enabled, if this is callback is provided, it will be called on every write to Fluentd. The callback function
131 takes two arguments - a `[]byte` of the message that was to be sent and an `error`. If the `error` is not nil this means the
132 delivery of the message was unsuccessful.
133
134 ### AsyncReconnectInterval
135 When async is enabled, this option defines the interval (ms) at which the connection
136 to the fluentd-address is re-established. This option is useful if the address
137 may resolve to one or more IP addresses, e.g. a Consul service address.
138
139 ### SubSecondPrecision
140
141 Enable time encoding as EventTime, which contains sub-second precision values. The messages encoded with this option can be received only by Fluentd v0.14 or later.
142 The default is false.
143
144 ### MarshalAsJson
145
146 Enable Json data marshaling to send messages using Json format (instead of the standard MessagePack). It is supported by Fluentd `in_forward` plugin.
147 The default is false.
148
75149 ### RequestAck
76150
77151 Sets whether to request acknowledgment from Fluentd to increase the reliability
78152 of the connection. The default is false.
153
154 ### TlsInsecureSkipVerify
155
156 Skip verifying the server certificate. Useful for development and testing. The default is false.
79157
80158 ## FAQ
81159
85163
86164 This logger doesn't support those features. Patches are welcome!
87165
166 ### Is it allowed to call `Fluent.Post()` after connection close?
167
168 Before v1.8.0, the Fluent logger silently reopened connections whenever
169 `Fluent.Post()` was called.
170
171 ```go
172 logger, _ := fluent.New(fluent.Config{})
173 logger.Post(tag, data)
174 logger.Close()
175 logger.Post(tag, data) /* reopen connection */
176 ```
177
178 However, this behavior was confusing, in particular when multiple goroutines
179 were involved. Starting v1.8.0, the logger no longer accepts `Fluent.Post()`
180 after `Fluent.Close()`, and instead returns a "Logger already closed" error.
181
88182 ## Tests
89183 ```
184
90185 go test
91186 ```
00 package fluent
11
22 import (
3 "context"
4 "crypto/tls"
35 "encoding/json"
46 "errors"
57 "fmt"
68 "math"
9 "math/rand"
710 "net"
811 "os"
912 "reflect"
1417 "bytes"
1518 "encoding/base64"
1619 "encoding/binary"
20
1721 "github.com/tinylib/msgp/msgp"
18 "math/rand"
1922 )
2023
2124 const (
3336 // Default sub-second precision value to false since it is only compatible
3437 // with fluentd versions v0.14 and above.
3538 defaultSubSecondPrecision = false
39
40 // Default value whether to skip checking insecure certs on TLS connections.
41 defaultTlsInsecureSkipVerify = false
3642 )
3743
44 // randomGenerator is used by getUniqueId to generate ack hashes. Its value is replaced
45 // during tests with a deterministic function.
46 var randomGenerator = rand.Uint64
47
3848 type Config struct {
39 FluentPort int `json:"fluent_port"`
40 FluentHost string `json:"fluent_host"`
41 FluentNetwork string `json:"fluent_network"`
42 FluentSocketPath string `json:"fluent_socket_path"`
43 Timeout time.Duration `json:"timeout"`
44 WriteTimeout time.Duration `json:"write_timeout"`
45 BufferLimit int `json:"buffer_limit"`
46 RetryWait int `json:"retry_wait"`
47 MaxRetry int `json:"max_retry"`
48 MaxRetryWait int `json:"max_retry_wait"`
49 TagPrefix string `json:"tag_prefix"`
50 Async bool `json:"async"`
51 ForceStopAsyncSend bool `json:"force_stop_async_send"`
49 FluentPort int `json:"fluent_port"`
50 FluentHost string `json:"fluent_host"`
51 FluentNetwork string `json:"fluent_network"`
52 FluentSocketPath string `json:"fluent_socket_path"`
53 Timeout time.Duration `json:"timeout"`
54 WriteTimeout time.Duration `json:"write_timeout"`
55 BufferLimit int `json:"buffer_limit"`
56 RetryWait int `json:"retry_wait"`
57 MaxRetry int `json:"max_retry"`
58 MaxRetryWait int `json:"max_retry_wait"`
59 TagPrefix string `json:"tag_prefix"`
60 Async bool `json:"async"`
61 ForceStopAsyncSend bool `json:"force_stop_async_send"`
62 AsyncResultCallback func(data []byte, err error)
5263 // Deprecated: Use Async instead
5364 AsyncConnect bool `json:"async_connect"`
5465 MarshalAsJSON bool `json:"marshal_as_json"`
66
67 // AsyncReconnectInterval defines the interval (ms) at which the connection
68 // to the fluentd-address is re-established. This option is useful if the address
69 // may resolve to one or more IP addresses, e.g. a Consul service address.
70 AsyncReconnectInterval int `json:"async_reconnect_interval"`
5571
5672 // Sub-second precision timestamps are only possible for those using fluentd
5773 // v0.14+ and serializing their messages with msgpack.
6177 // respond with an acknowledgement. This option improves the reliability
6278 // of the message transmission.
6379 RequestAck bool `json:"request_ack"`
80
81 // Flag to skip verifying insecure certs on TLS connections
82 TlsInsecureSkipVerify bool `json: "tls_insecure_skip_verify"`
6483 }
6584
6685 type ErrUnknownNetwork struct {
83102 type Fluent struct {
84103 Config
85104
86 stopRunning chan bool
87 pending chan *msgToSend
88 wg sync.WaitGroup
89
90 muconn sync.Mutex
105 dialer dialer
106 // stopRunning is used in async mode to signal to run() it should abort.
107 stopRunning chan struct{}
108 // cancelDialings is used by Close() to stop any in-progress dialing.
109 cancelDialings context.CancelFunc
110 pending chan *msgToSend
111 pendingMutex sync.RWMutex
112 closed bool
113 wg sync.WaitGroup
114
115 // time at which the most recent connection to fluentd-address was established.
116 latestReconnectTime time.Time
117
118 muconn sync.RWMutex
91119 conn net.Conn
92120 }
93121
122 type dialer interface {
123 DialContext(ctx context.Context, network, address string) (net.Conn, error)
124 }
125
94126 // New creates a new Logger.
95 func New(config Config) (f *Fluent, err error) {
127 func New(config Config) (*Fluent, error) {
128 if config.Timeout == 0 {
129 config.Timeout = defaultTimeout
130 }
131 return newWithDialer(config, &net.Dialer{
132 Timeout: config.Timeout,
133 })
134 }
135
136 func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
96137 if config.FluentNetwork == "" {
97138 config.FluentNetwork = defaultNetwork
98139 }
105146 if config.FluentSocketPath == "" {
106147 config.FluentSocketPath = defaultSocketPath
107148 }
108 if config.Timeout == 0 {
109 config.Timeout = defaultTimeout
110 }
111149 if config.WriteTimeout == 0 {
112150 config.WriteTimeout = defaultWriteTimeout
113151 }
122160 }
123161 if config.MaxRetryWait == 0 {
124162 config.MaxRetryWait = defaultMaxRetryWait
163 }
164 if !config.TlsInsecureSkipVerify {
165 config.TlsInsecureSkipVerify = defaultTlsInsecureSkipVerify
125166 }
126167 if config.AsyncConnect {
127168 fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead")
128169 config.Async = config.Async || config.AsyncConnect
129170 }
171
130172 if config.Async {
173 ctx, cancel := context.WithCancel(context.Background())
174
131175 f = &Fluent{
132 Config: config,
133 pending: make(chan *msgToSend, config.BufferLimit),
134 }
176 Config: config,
177 dialer: d,
178 stopRunning: make(chan struct{}),
179 cancelDialings: cancel,
180 pending: make(chan *msgToSend, config.BufferLimit),
181 pendingMutex: sync.RWMutex{},
182 muconn: sync.RWMutex{},
183 }
184
135185 f.wg.Add(1)
136 go f.run()
186 go f.run(ctx)
137187 } else {
138 f = &Fluent{Config: config}
139 err = f.connect()
188 f = &Fluent{
189 Config: config,
190 dialer: d,
191 muconn: sync.RWMutex{},
192 }
193 err = f.connect(context.Background())
140194 }
141195 return
142196 }
189243 fields := msgtype.NumField()
190244 for i := 0; i < fields; i++ {
191245 field := msgtype.Field(i)
246 value := msg.FieldByIndex(field.Index)
247 // ignore unexported fields
248 if !value.CanInterface() {
249 continue
250 }
192251 name := field.Name
193252 if n1 := field.Tag.Get("msg"); n1 != "" {
194253 name = n1
195254 } else if n2 := field.Tag.Get("codec"); n2 != "" {
196255 name = n2
197256 }
198 kv[name] = msg.FieldByIndex(field.Index).Interface()
257 kv[name] = value.Interface()
199258 }
200259 return f.EncodeAndPostData(tag, tm, kv)
201260 }
232291 if f.Config.Async {
233292 return f.appendBuffer(msg)
234293 }
294
235295 // Synchronous write
236 return f.write(msg)
296 if f.closed {
297 return fmt.Errorf("fluent#postRawData: Logger already closed")
298 }
299 return f.writeWithRetry(context.Background(), msg)
237300 }
238301
239302 // For sending forward protocol adopted JSON
267330 enc.Close()
268331 return "", err
269332 }
270 if err := binary.Write(enc, binary.LittleEndian, rand.Uint64()); err != nil {
333 if err := binary.Write(enc, binary.LittleEndian, randomGenerator()); err != nil {
271334 enc.Close()
272335 return "", err
273336 }
303366 return
304367 }
305368
306 // Close closes the connection, waiting for pending logs to be sent
369 // Close closes the connection, waiting for pending logs to be sent. If the client is
370 // running in async mode, the run() goroutine exits before Close() returns.
307371 func (f *Fluent) Close() (err error) {
308372 if f.Config.Async {
373 f.pendingMutex.Lock()
374 if f.closed {
375 f.pendingMutex.Unlock()
376 return nil
377 }
378 f.closed = true
379 f.pendingMutex.Unlock()
380
309381 if f.Config.ForceStopAsyncSend {
310 f.stopRunning <- true
311382 close(f.stopRunning)
312 }
383 f.cancelDialings()
384 }
385
313386 close(f.pending)
387 // If ForceStopAsyncSend is false, all logs in the channel have to be sent
388 // before closing the connection. At this point closed is true so no more
389 // logs are written to the channel and f.pending has been closed, so run()
390 // goroutine will exit as soon as all logs in the channel are sent.
391 if !f.Config.ForceStopAsyncSend {
392 f.wg.Wait()
393 }
394 }
395
396 f.muconn.Lock()
397 f.close()
398 f.closed = true
399 f.muconn.Unlock()
400
401 // If ForceStopAsyncSend is true, we shall close the connection before waiting for
402 // run() goroutine to exit to be sure we aren't waiting on ack message that might
403 // never come (eg. because fluentd server is down). However we want to be sure the
404 // run() goroutine stops before returning from Close().
405 if f.Config.ForceStopAsyncSend {
314406 f.wg.Wait()
315407 }
316 f.close(f.conn)
317408 return
318409 }
319410
320411 // appendBuffer appends data to buffer with lock.
321412 func (f *Fluent) appendBuffer(msg *msgToSend) error {
413 f.pendingMutex.RLock()
414 defer f.pendingMutex.RUnlock()
415 if f.closed {
416 return fmt.Errorf("fluent#appendBuffer: Logger already closed")
417 }
322418 select {
323419 case f.pending <- msg:
324420 default:
327423 return nil
328424 }
329425
330 // close closes the connection.
331 func (f *Fluent) close(c net.Conn) {
332 f.muconn.Lock()
333 if f.conn != nil && f.conn == c {
426 // close closes the connection. Callers should take care of locking muconn first.
427 func (f *Fluent) close() {
428 if f.conn != nil {
334429 f.conn.Close()
335430 f.conn = nil
336431 }
337 f.muconn.Unlock()
338 }
339
340 // connect establishes a new connection using the specified transport.
341 func (f *Fluent) connect() (err error) {
342
432 }
433
434 // connect establishes a new connection using the specified transport. Caller should
435 // take care of locking muconn first.
436 func (f *Fluent) connect(ctx context.Context) (err error) {
343437 switch f.Config.FluentNetwork {
344438 case "tcp":
345 f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout)
439 f.conn, err = f.dialer.DialContext(ctx,
440 f.Config.FluentNetwork,
441 f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
442 case "tls":
443 tlsConfig := &tls.Config{InsecureSkipVerify: f.Config.TlsInsecureSkipVerify}
444 f.conn, err = tls.DialWithDialer(
445 &net.Dialer{Timeout: f.Config.Timeout},
446 "tcp",
447 f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), tlsConfig,
448 )
346449 case "unix":
347 f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentSocketPath, f.Config.Timeout)
450 f.conn, err = f.dialer.DialContext(ctx,
451 f.Config.FluentNetwork,
452 f.Config.FluentSocketPath)
348453 default:
349454 err = NewErrUnknownNetwork(f.Config.FluentNetwork)
350455 }
456
457 if err == nil {
458 f.latestReconnectTime = time.Now()
459 }
460
351461 return err
352462 }
353463
354 func (f *Fluent) run() {
355 drainEvents := false
356 var emitEventDrainMsg sync.Once
464 var errIsClosing = errors.New("fluent logger is closing")
465
466 // Caller should take care of locking muconn first.
467 func (f *Fluent) connectWithRetry(ctx context.Context) error {
468 // A Time channel is used instead of time.Sleep() to avoid blocking this
469 // goroutine during way too much time (because of the exponential back-off
470 // retry).
471 // time.NewTimer() is used instead of time.After() to avoid leaking the
472 // timer channel (cf. https://pkg.go.dev/time#After).
473 timeout := time.NewTimer(time.Duration(0))
474 defer func() {
475 // timeout.Stop() is called in a function literal instead of being
476 // defered directly as it's re-assigned below when the retry loop spins.
477 timeout.Stop()
478 }()
479
480 for i := 0; i < f.Config.MaxRetry; i++ {
481 select {
482 case <-timeout.C:
483 err := f.connect(ctx)
484 if err == nil {
485 return nil
486 }
487
488 if _, ok := err.(*ErrUnknownNetwork); ok {
489 return err
490 }
491 if err == context.Canceled {
492 return errIsClosing
493 }
494
495 waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
496 if waitTime > f.Config.MaxRetryWait {
497 waitTime = f.Config.MaxRetryWait
498 }
499
500 timeout = time.NewTimer(time.Duration(waitTime) * time.Millisecond)
501 case <-ctx.Done():
502 return errIsClosing
503 }
504 }
505
506 return fmt.Errorf("could not connect to fluentd after %d retries", f.Config.MaxRetry)
507 }
508
509 // run is the goroutine used to unqueue and write logs in async mode. That
510 // goroutine is meant to run during the whole life of the Fluent logger.
511 func (f *Fluent) run(ctx context.Context) {
357512 for {
358513 select {
359514 case entry, ok := <-f.pending:
515 // f.stopRunning is closed before f.pending only when ForceStopAsyncSend
516 // is enabled. Otherwise, f.pending is closed when Close() is called.
360517 if !ok {
361518 f.wg.Done()
362519 return
363520 }
364 if drainEvents {
365 emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
366 continue
367 }
368 err := f.write(entry)
369 if err != nil {
521
522 if f.AsyncReconnectInterval > 0 {
523 if time.Since(f.latestReconnectTime) > time.Duration(f.AsyncReconnectInterval)*time.Millisecond {
524 f.muconn.Lock()
525 f.close()
526 f.connectWithRetry(ctx)
527 f.muconn.Unlock()
528 }
529 }
530
531 err := f.writeWithRetry(ctx, entry)
532 if err != nil && err != errIsClosing {
370533 fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
371534 }
372 }
373 select {
374 case stopRunning, ok := <-f.stopRunning:
375 if stopRunning || !ok {
376 drainEvents = true
377 }
378 default:
535 if f.AsyncResultCallback != nil {
536 var data []byte
537 if entry != nil {
538 data = entry.data
539 }
540 f.AsyncResultCallback(data, err)
541 }
542 case <-f.stopRunning:
543 fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339))
544
545 f.wg.Done()
546 return
379547 }
380548 }
381549 }
384552 return int(math.Pow(x, y))
385553 }
386554
387 func (f *Fluent) write(msg *msgToSend) error {
388 var c net.Conn
555 func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error {
389556 for i := 0; i < f.Config.MaxRetry; i++ {
390 c = f.conn
391 // Connect if needed
392 if c == nil {
393 f.muconn.Lock()
394 if f.conn == nil {
395 err := f.connect()
396 if err != nil {
397 f.muconn.Unlock()
398
399 if _, ok := err.(*ErrUnknownNetwork); ok {
400 // do not retry on unknown network error
401 break
402 }
403 waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
404 if waitTime > f.Config.MaxRetryWait {
405 waitTime = f.Config.MaxRetryWait
406 }
407 time.Sleep(time.Duration(waitTime) * time.Millisecond)
408 continue
409 }
410 }
411 c = f.conn
412 f.muconn.Unlock()
413 }
414
415 // We're connected, write msg
557 if retry, err := f.write(ctx, msg); !retry {
558 return err
559 }
560 }
561
562 return fmt.Errorf("fluent#write: failed to write after %d attempts", f.Config.MaxRetry)
563 }
564
565 // write writes the provided msg to fluentd server. Its first return values is
566 // a bool indicating whether the write should be retried.
567 // This method relies on function literals to execute muconn.Unlock or
568 // muconn.RUnlock in deferred calls to ensure the mutex is unlocked even in
569 // the case of panic recovering.
570 func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) {
571 closer := func() {
572 f.muconn.Lock()
573 defer f.muconn.Unlock()
574
575 f.close()
576 }
577
578 if err := func() (err error) {
579 f.muconn.Lock()
580 defer f.muconn.Unlock()
581
582 if f.conn == nil {
583 err = f.connectWithRetry(ctx)
584 }
585
586 return err
587 }(); err != nil {
588 // Here, we don't want to retry the write since connectWithRetry already
589 // retries Config.MaxRetry times to connect.
590 return false, fmt.Errorf("fluent#write: %v", err)
591 }
592
593 if err := func() (err error) {
594 f.muconn.RLock()
595 defer f.muconn.RUnlock()
596
597 if f.conn == nil {
598 return fmt.Errorf("connection has been closed before writing to it")
599 }
600
416601 t := f.Config.WriteTimeout
417602 if time.Duration(0) < t {
418 c.SetWriteDeadline(time.Now().Add(t))
603 f.conn.SetWriteDeadline(time.Now().Add(t))
419604 } else {
420 c.SetWriteDeadline(time.Time{})
421 }
422 _, err := c.Write(msg.data)
423 if err != nil {
424 f.close(c)
605 f.conn.SetWriteDeadline(time.Time{})
606 }
607
608 _, err = f.conn.Write(msg.data)
609 return err
610 }(); err != nil {
611 closer()
612 return true, fmt.Errorf("fluent#write: %v", err)
613 }
614
615 // Acknowledgment check
616 if msg.ack != "" {
617 resp := &AckResp{}
618 var err error
619 if f.Config.MarshalAsJSON {
620 dec := json.NewDecoder(f.conn)
621 err = dec.Decode(resp)
425622 } else {
426 // Acknowledgment check
427 if msg.ack != "" {
428 resp := &AckResp{}
429 if f.Config.MarshalAsJSON {
430 dec := json.NewDecoder(c)
431 err = dec.Decode(resp)
432 } else {
433 r := msgp.NewReader(c)
434 err = resp.DecodeMsg(r)
435 }
436 if err != nil || resp.Ack != msg.ack {
437 f.close(c)
438 continue
439 }
440 }
441 return err
442 }
443 }
444
445 return fmt.Errorf("fluent#write: failed to reconnect, max retry: %v", f.Config.MaxRetry)
446 }
623 r := msgp.NewReader(f.conn)
624 err = resp.DecodeMsg(r)
625 }
626
627 if err != nil || resp.Ack != msg.ack {
628 fmt.Fprintf(os.Stderr, "fluent#write: message ack (%s) doesn't match expected one (%s). Closing connection...", resp.Ack, msg.ack)
629
630 closer()
631 return true, err
632 }
633 }
634
635 return false, nil
636 }
00 package fluent
11
22 import (
3 "bytes"
4 "context"
35 "encoding/json"
6 "errors"
7 "fmt"
48 "io/ioutil"
59 "net"
610 "reflect"
913 "time"
1014
1115 "github.com/bmizerany/assert"
16 "github.com/tinylib/msgp/msgp"
1217 )
1318
14 const (
15 RECV_BUF_LEN = 1024
16 )
17
18 // Conn is net.Conn with the parameters to be verified in the test
19 func init() {
20 // randomGenerator points to rand.Uint64 by default. Unfortunately, even when it's
21 // seeded, it produces different values from time to time and thus is not fully
22 // deterministic. This prevents writing stable tests for RequestAck config option.
23 // Thus we need to change it to ensure the hashes are stable during tests.
24 randomGenerator = func() uint64 {
25 return 1
26 }
27 }
28
29 func newTestDialer() *testDialer {
30 return &testDialer{
31 dialCh: make(chan *Conn),
32 }
33 }
34
35 // testDialer is a stub for net.Dialer. It implements the Dial() method used by
36 // the logger to connect to Fluentd. It uses a *Conn channel to let the tests
37 // synchronize with calls to Dial() and let them define what each call to Dial()
38 // should return. This is especially useful for testing edge cases like
39 // transient connection failures.
40 // To help write test cases with succeeding or failing connection dialing, testDialer
41 // provides waitForNextDialing(). Any call to Dial() from the logger should be matched
42 // with a call to waitForNextDialing() in the test cases.
43 //
44 // For instance, to test an async logger that have to dial 4 times before succeeding,
45 // the test should look like this:
46 //
47 // d := newTestDialer() // Create a new stubbed dialer
48 // cfg := Config{
49 // Async: true,
50 // // ...
51 // }
52 // f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer
53 // f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
54 //
55 // d.waitForNextDialing(false, false) // 1st dialing attempt fails
56 // d.waitForNextDialing(false, false) // 2nd attempt fails too
57 // d.waitForNextDialing(false, false) // 3rd attempt fails too
58 // d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds
59 //
60 // Note that in the above example, the logger operates in async mode. As such,
61 // a call to Post, PostWithTime or EncodeAndPostData is needed *before* calling
62 // waitForNextDialing(), as in async mode the logger initializes its connection
63 // lazily, in a separate goroutine.
64 // This also means non-async loggers can't be tested exactly the same way, as the
65 // dialing isn't done lazily but during the logger initialization. To test such
66 // case, you have to put the calls to newWithDialer() and to EncodeAndPostData()
67 // into their own goroutine. An example:
68 //
69 // d := newTestDialer() // Create a new stubbed dialer
70 // cfg := Config{
71 // Async: false,
72 // // ...
73 // }
74 // go func() {
75 // f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer
76 // f.Close()
77 // }()
78 //
79 // d.waitForNextDialing(false, false) // 1st dialing attempt fails
80 // d.waitForNextDialing(false, false) // 2nd attempt fails too
81 // d.waitForNextDialing(false, false) // 3rd attempt fails too
82 // d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds
83 //
84 // Moreover, waitForNextDialing() returns a *Conn which extends net.Conn to provide testing
85 // facilities. For instance, you can call waitForNextWrite() on these connections, to
86 // specify how the next Conn.Write() call behaves (e.g. accept or reject it, or make a
87 // specific ack checksum available) and to assert what is sent to Fluentd (when the write
88 // is accepted). Again, any call to Write() on the logger side have to be matched with
89 // a call to waitForNextWrite() in the test cases.
90 //
91 // Here's a full example:
92 //
93 // d := newTestDialer()
94 // cfg := Config{Async: true}
95 //
96 // f := newWithDialer(cfg, d)
97 // f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
98 //
99 // conn := d.waitForNextDialing(true, false) // Accept the dialing
100 // conn.waitForNextWrite(false, "") // Discard the 1st attempt to write the message
101 //
102 // conn := d.waitForNextDialing(true, false)
103 // assertReceived(t, // t is *testing.T
104 // conn.waitForNextWrite(true, ""),
105 // "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]")
106 //
107 // f.EncodeAndPostData("something_else", time.Unix(1482493050, 0), map[string]string{"bar": "baz"})
108 // assertReceived(t, // t is *testing.T
109 // conn.waitForNextWrite(true, ""),
110 // "[\"something_else\",1482493050,{\"bar\":\"baz\"},{}]")
111 //
112 // In this example, the 1st connection dialing succeeds but the 1st attempt to write the
113 // message is discarded. As the logger discards the connection whenever a message
114 // couldn't be written, it tries to re-dial and thus we need to accept the dialing again.
115 // Then the write is retried and accepted. When a second message is written, the write is
116 // accepted straightaway. Moreover, the messages written to the connections are asserted
117 // using assertReceived() to make sure the logger encodes the messages properly.
118 //
119 // Again, the example above is using async mode thus, calls to f and conn are running in
120 // the same goroutine. However in sync mode, all calls to f.EncodeAndPostData() as well
121 // as the logger initialization shall be placed in a separate goroutine or the code
122 // allowing the dialing and writing attempts (eg. waitForNextDialing() & waitForNextWrite())
123 // would never be reached.
124 type testDialer struct {
125 dialCh chan *Conn
126 }
127
128 // DialContext is the stubbed method called by the logger to establish the connection to
129 // Fluentd. It is paired with waitForNextDialing().
130 func (d *testDialer) DialContext(ctx context.Context, _, _ string) (net.Conn, error) {
131 // It waits for a *Conn to be pushed into dialCh using waitForNextDialing(). When the
132 // *Conn is nil, the Dial is deemed to fail.
133 select {
134 case conn := <-d.dialCh:
135 if conn == nil {
136 return nil, errors.New("failed to dial")
137 }
138 return conn, nil
139 case <-ctx.Done():
140 return nil, errors.New("failed to dial")
141 }
142 }
143
144 // waitForNextDialing is the method used by test cases below to indicate whether the next
145 // dialing attempt made by the logger should succeed or not. See examples provided on
146 // testDialer docs.
147 func (d *testDialer) waitForNextDialing(accept bool, delayReads bool) *Conn {
148 var conn *Conn
149 if accept {
150 conn = &Conn{
151 nextWriteAttemptCh: make(chan nextWrite),
152 writtenCh: make(chan []byte),
153 }
154
155 if delayReads {
156 conn.delayNextReadCh = make(chan struct{})
157 }
158 }
159
160 d.dialCh <- conn
161 return conn
162 }
163
164 // assertReceived is used below by test cases to assert the content written to a *Conn
165 // matches an expected string. This is generally used in conjunction with
166 // Conn.waitForNextWrite().
167 func assertReceived(t *testing.T, rcv []byte, expected string) {
168 if string(rcv) != expected {
169 t.Fatalf("got %s, expect %s", string(rcv), expected)
170 }
171 }
172
173 // Conn extends net.Conn to add channels used to synchronise across goroutines, eg.
174 // between the goroutine doing the dialing (through newWithDialer in sync mode, or the
175 // first message logging in async mode) and the testing goroutine (making calls to
176 // Conn.waitForNextWrite()).
177 // This should be of low importance if you're not trying to understand/change how
178 // waitFor...() methods work. See examples provided in testDialer docs for higher
179 // level details.
19180 type Conn struct {
20181 net.Conn
21182 buf []byte
22183 writeDeadline time.Time
23 }
24
184 // nextWriteAttemptCh is used by waitForNextWrite() to let Write() know if the next write
185 // attempt should succeed or fail.
186 nextWriteAttemptCh chan nextWrite
187 // writtenCh is used by Write() to signal to waitForNextWrite() when a write
188 // happened.
189 writtenCh chan []byte
190 // delayNextReadCh is used to delay next conn.Read() attempt when testing ack resp.
191 delayNextReadCh chan struct{}
192 }
193
194 // nextWrite is the struct passed by Conn.waitForNextWrite() to Conn.Write() through
195 // Conn.nextWriteAttemptCh to let Write() know if it should accept or discard the next write
196 // operation and what ack checksum should be made readable from the connection.
197 // This should be of low importance if you're not trying to understand/change how
198 // waitFor...() methods work. See examples provided in testDialer docs for higher
199 // level details.
200 type nextWrite struct {
201 accept bool
202 ack string
203 }
204
205 // waitForNextWrite is the method used to tell how the next write made by the logger
206 // should behave. It can either accept or discard the next write operation. Moreover
207 // an ack checksum can be passed such that the next Write operation will make it
208 // readable from the connection, as the logger will try to read it to ack the Write
209 // operation. See examples provided in testDialer docs.
210 func (c *Conn) waitForNextWrite(accept bool, ack string) []byte {
211 c.nextWriteAttemptCh <- nextWrite{accept, ack}
212 if accept {
213 return <-c.writtenCh
214 }
215 return []byte{}
216 }
217
218 // Read is a stubbed version of net.Conn Read() that returns the ack checksum of the last
219 // Write operation.
25220 func (c *Conn) Read(b []byte) (int, error) {
221 if c.delayNextReadCh != nil {
222 select {
223 case _, ok := <-c.delayNextReadCh:
224 if !ok {
225 return 0, errors.New("connection has been closed")
226 }
227 default:
228 }
229 }
230
26231 copy(b, c.buf)
27232 return len(c.buf), nil
28233 }
29234
235 // Write is a stubbed version of net.Conn Write(). Its behavior is determined by the last
236 // call to waitForNextWrite(). See examples provided in testDialer docs.
30237 func (c *Conn) Write(b []byte) (int, error) {
31 c.buf = make([]byte, len(b))
32 copy(c.buf, b)
238 next, ok := nextWrite{true, ""}, true
239 if c.nextWriteAttemptCh != nil {
240 next, ok = <-c.nextWriteAttemptCh
241 }
242 if !next.accept || !ok {
243 return 0, errors.New("transient write failure")
244 }
245
246 // Write the acknowledgment to c.buf to make it available to subsequent
247 // call to Read().
248 c.buf = make([]byte, len(next.ack))
249 copy(c.buf, next.ack)
250
251 // Write the payload received to writtenCh to assert on it.
252 if c.writtenCh != nil {
253 c.writtenCh <- b
254 }
255
33256 return len(b), nil
34257 }
35258
39262 }
40263
41264 func (c *Conn) Close() error {
265 if c.delayNextReadCh != nil {
266 close(c.delayNextReadCh)
267 }
268
42269 return nil
43 }
44
45 func init() {
46 numProcs := runtime.NumCPU()
47 if numProcs < 2 {
48 numProcs = 2
49 }
50 runtime.GOMAXPROCS(numProcs)
51
52 listener, err := net.Listen("tcp", "0.0.0.0:6666")
53 if err != nil {
54 println("error listening:", err.Error())
55 }
56 go func() {
57 for {
58 conn, err := listener.Accept()
59 if err != nil {
60 println("Error accept:", err.Error())
61 return
62 }
63 go EchoFunc(conn)
64 }
65 }()
66 }
67
68 func EchoFunc(conn net.Conn) {
69 for {
70 buf := make([]byte, RECV_BUF_LEN)
71 n, err := conn.Read(buf)
72 if err != nil {
73 println("Error reading:", err.Error())
74 return
75 }
76 println("received ", n, " bytes of data =", string(buf))
77 }
78270 }
79271
80272 func Test_New_itShouldUseDefaultConfigValuesIfNoOtherProvided(t *testing.T) {
138330 assert.Equal(t, f.Config.MarshalAsJSON, true)
139331 }
140332
141 func Test_send_WritePendingToConn(t *testing.T) {
142 f, _ := New(Config{Async: true})
143
144 conn := &Conn{}
145 f.conn = conn
146
147 msg := "This is test writing."
148 bmsg := &msgToSend{data: []byte(msg)}
149 f.pending <- bmsg
150
151 err := f.write(bmsg)
152 if err != nil {
153 t.Error(err)
154 }
155
156 rcv := make([]byte, len(conn.buf))
157 _, _ = conn.Read(rcv)
158 if string(rcv) != msg {
159 t.Errorf("got %s, except %s", string(rcv), msg)
160 }
161
162 f.Close()
163 }
164
165333 func Test_MarshalAsMsgpack(t *testing.T) {
166334 f := &Fluent{Config: Config{}}
167
168 conn := &Conn{}
169 f.conn = conn
170335
171336 tag := "tag"
172337 var data = map[string]string{
220385
221386 func Test_MarshalAsJSON(t *testing.T) {
222387 f := &Fluent{Config: Config{MarshalAsJSON: true}}
223
224 conn := &Conn{}
225 f.conn = conn
226388
227389 var data = map[string]string{
228390 "foo": "bar",
273435 }
274436 }
275437
276 func TestAsyncConnect(t *testing.T) {
277 type result struct {
278 f *Fluent
279 err error
280 }
281 ch := make(chan result, 1)
438 func TestPostWithTime(t *testing.T) {
439 testcases := map[string]Config{
440 "with Async": {
441 Async: true,
442 MarshalAsJSON: true,
443 TagPrefix: "acme",
444 },
445 "without Async": {
446 Async: false,
447 MarshalAsJSON: true,
448 TagPrefix: "acme",
449 },
450 }
451
452 for tcname := range testcases {
453 t.Run(tcname, func(t *testing.T) {
454 tc := testcases[tcname]
455 t.Parallel()
456
457 d := newTestDialer()
458 var f *Fluent
459 defer func() {
460 if f != nil {
461 f.Close()
462 }
463 }()
464
465 go func() {
466 var err error
467 if f, err = newWithDialer(tc, d); err != nil {
468 t.Errorf("Unexpected error: %v", err)
469 }
470
471 _ = f.PostWithTime("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
472 _ = f.PostWithTime("tag_name", time.Unix(1482493050, 0), map[string]string{"fluentd": "is awesome"})
473 _ = f.PostWithTime("tag_name", time.Unix(1634263200, 0),
474 struct {Welcome string `msg:"welcome"`; cannot string}{"to use", "see me"})
475 }()
476
477 conn := d.waitForNextDialing(true, false)
478 assertReceived(t,
479 conn.waitForNextWrite(true, ""),
480 "[\"acme.tag_name\",1482493046,{\"foo\":\"bar\"},{}]")
481
482 assertReceived(t,
483 conn.waitForNextWrite(true, ""),
484 "[\"acme.tag_name\",1482493050,{\"fluentd\":\"is awesome\"},{}]")
485 assertReceived(t,
486 conn.waitForNextWrite(true, ""),
487 "[\"acme.tag_name\",1634263200,{\"welcome\":\"to use\"},{}]")
488 })
489 }
490 }
491
492 func TestReconnectAndResendAfterTransientFailure(t *testing.T) {
493 testcases := map[string]Config{
494 "with Async": {
495 Async: true,
496 MarshalAsJSON: true,
497 },
498 "without Async": {
499 Async: false,
500 MarshalAsJSON: true,
501 },
502 }
503
504 for tcname := range testcases {
505 t.Run(tcname, func(t *testing.T) {
506 tc := testcases[tcname]
507 t.Parallel()
508
509 d := newTestDialer()
510 var f *Fluent
511 defer func() {
512 if f != nil {
513 f.Close()
514 }
515 }()
516
517 go func() {
518 var err error
519 if f, err = newWithDialer(tc, d); err != nil {
520 t.Errorf("Unexpected error: %v", err)
521 }
522
523 _ = f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
524 _ = f.EncodeAndPostData("tag_name", time.Unix(1482493050, 0), map[string]string{"fluentd": "is awesome"})
525 }()
526
527 // Accept the first connection dialing and write.
528 conn := d.waitForNextDialing(true, false)
529 assertReceived(t,
530 conn.waitForNextWrite(true, ""),
531 "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]")
532
533 // The next write will fail and the next connection dialing will be dropped
534 // to test if the logger is reconnecting as expected.
535 conn.waitForNextWrite(false, "")
536 d.waitForNextDialing(false, false)
537
538 // Next, we allow a new connection to be established and we allow the last message to be written.
539 conn = d.waitForNextDialing(true, false)
540 assertReceived(t,
541 conn.waitForNextWrite(true, ""),
542 "[\"tag_name\",1482493050,{\"fluentd\":\"is awesome\"},{}]")
543 })
544 }
545 }
546
547 func timeout(t *testing.T, duration time.Duration, fn func(), reason string) {
548 done := make(chan struct{})
282549 go func() {
283 config := Config{
284 FluentPort: 8888,
285 Async: true,
286 }
287 f, err := New(config)
288 ch <- result{f: f, err: err}
550 fn()
551 done <- struct{}{}
289552 }()
290553
291554 select {
292 case res := <-ch:
293 if res.err != nil {
294 t.Errorf("fluent.New() failed with %#v", res.err)
295 return
296 }
297 res.f.Close()
298 case <-time.After(time.Millisecond * 500):
299 t.Error("Async must not block")
300 }
301 }
302
303 func Test_PostWithTimeNotTimeOut(t *testing.T) {
304 f, err := New(Config{
305 FluentPort: 6666,
306 Async: false,
307 MarshalAsJSON: true, // easy to check equality
308 })
309 if err != nil {
310 t.Error(err)
555 case <-time.After(duration):
556 t.Fatalf("time out after %s: %s", duration.String(), reason)
557 case <-done:
311558 return
312559 }
313
314 var testData = []struct {
315 in map[string]string
316 out string
560 }
561
562 func TestCloseOnFailingAsyncConnect(t *testing.T) {
563 testcases := map[string]Config{
564 "with ForceStopAsyncSend and with RequestAck": {
565 Async: true,
566 ForceStopAsyncSend: true,
567 RequestAck: true,
568 },
569 "with ForceStopAsyncSend and without RequestAck": {
570 Async: true,
571 ForceStopAsyncSend: true,
572 RequestAck: false,
573 },
574 "without ForceStopAsyncSend and with RequestAck": {
575 Async: true,
576 ForceStopAsyncSend: false,
577 RequestAck: true,
578 },
579 "without ForceStopAsyncSend and without RequestAck": {
580 Async: true,
581 ForceStopAsyncSend: false,
582 RequestAck: false,
583 },
584 }
585
586 for tcname := range testcases {
587 t.Run(tcname, func(t *testing.T) {
588 tc := testcases[tcname]
589 t.Parallel()
590
591 d := newTestDialer()
592 f, err := newWithDialer(tc, d)
593 if err != nil {
594 t.Errorf("Unexpected error: %v", err)
595 }
596
597 timeout(t, 1*time.Second, func() { f.Close() }, "failed to close the logger")
598 })
599 }
600 }
601
602 func ackRespMsgp(t *testing.T, ack string) string {
603 msg := AckResp{ack}
604 buf := &bytes.Buffer{}
605 ackW := msgp.NewWriter(buf)
606 if err := msg.EncodeMsg(ackW); err != nil {
607 t.Fatalf("Unexpected error: %v", err)
608 }
609 ackW.Flush()
610 return buf.String()
611 }
612
613 func TestNoPanicOnAsyncClose(t *testing.T) {
614 testcases := []struct {
615 name string
616 config Config
617 shouldError bool
317618 }{
318619 {
319 map[string]string{"foo": "bar"},
320 "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]",
620 name: "Channel closed before write",
621 config: Config{
622 Async: true,
623 },
624 shouldError: true,
321625 },
322626 {
323 map[string]string{"fuga": "bar", "hoge": "fuga"},
324 "[\"tag_name\",1482493046,{\"fuga\":\"bar\",\"hoge\":\"fuga\"},{}]",
325 },
326 }
327 for _, tt := range testData {
328 conn := &Conn{}
329 f.conn = conn
330
331 err = f.PostWithTime("tag_name", time.Unix(1482493046, 0), tt.in)
627 name: "Channel not closed at all",
628 config: Config{
629 Async: true,
630 },
631 shouldError: false,
632 },
633 }
634 for _, testcase := range testcases {
635 t.Run(testcase.name, func(t *testing.T) {
636 t.Parallel()
637 d := newTestDialer()
638 f, err := newWithDialer(testcase.config, d)
639 if err != nil {
640 t.Errorf("Unexpected error: %v", err)
641 }
642 if testcase.shouldError {
643 f.Close()
644 }
645 e := f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
646 if testcase.shouldError {
647 assert.Equal(t, fmt.Errorf("fluent#appendBuffer: Logger already closed"), e)
648 } else {
649 assert.Equal(t, nil, e)
650 }
651 })
652 }
653 }
654
655 func TestNoPanicOnAsyncMultipleClose(t *testing.T) {
656 config := Config{
657 Async: true,
658 }
659 d := newTestDialer()
660 f, err := newWithDialer(config, d)
661 if err != nil {
662 t.Errorf("Unexpected error: %v", err)
663 }
664 f.Close()
665 f.Close()
666 }
667
668 func TestCloseOnFailingAsyncReconnect(t *testing.T) {
669 testcases := map[string]Config{
670 "with RequestAck": {
671 Async: true,
672 ForceStopAsyncSend: true,
673 RequestAck: true,
674 },
675 "without RequestAck": {
676 Async: true,
677 ForceStopAsyncSend: true,
678 RequestAck: false,
679 },
680 }
681
682 for tcname := range testcases {
683 t.Run(tcname, func(t *testing.T) {
684 tc := testcases[tcname]
685 t.Parallel()
686
687 d := newTestDialer()
688 f, err := newWithDialer(tc, d)
689 if err != nil {
690 t.Errorf("Unexpected error: %v", err)
691 }
692
693 // Send a first message successfully.
694 _ = f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
695 conn := d.waitForNextDialing(true, false)
696 conn.waitForNextWrite(true, ackRespMsgp(t, "dgxdWAAAAAABAAAAAAAAAA=="))
697
698 // Then try to send one during a transient connection failure.
699 _ = f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"bar": "baz"})
700 conn.waitForNextWrite(false, "")
701
702 // And add some more logs to the log buffer.
703 _ = f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"acme": "corporation"})
704
705 // But close the logger before it got sent. This is expected to not block.
706 timeout(t, 60*time.Second, func() { f.Close() }, "failed to close the logger")
707 })
708 }
709 }
710
711 func TestCloseWhileWaitingForAckResponse(t *testing.T) {
712 t.Parallel()
713
714 d := newTestDialer()
715 f, err := newWithDialer(Config{
716 Async: true,
717 RequestAck: true,
718 ForceStopAsyncSend: true,
719 }, d)
720 if err != nil {
721 t.Errorf("Unexpected error: %v", err)
722 }
723
724 _ = f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
725 conn := d.waitForNextDialing(true, true)
726 conn.waitForNextWrite(true, ackRespMsgp(t, "dgxdWAAAAAABAAAAAAAAAA=="))
727
728 // Test if the logger can really by closed while the client waits for a ack message.
729 timeout(t, 30*time.Second, func() {
730 f.Close()
731 }, "failed to close the logger")
732 }
733
734 func TestSyncWriteAfterCloseFails(t *testing.T) {
735 d := newTestDialer()
736
737 go func() {
738 f, err := newWithDialer(Config{Async: false}, d)
332739 if err != nil {
333 t.Errorf("in=%s, err=%s", tt.in, err)
334 }
335
336 rcv := make([]byte, len(conn.buf))
337 _, _ = conn.Read(rcv)
338 if string(rcv) != tt.out {
339 t.Errorf("got %s, except %s", string(rcv), tt.out)
340 }
341
342 if !conn.writeDeadline.IsZero() {
343 t.Errorf("got %s, except 0", conn.writeDeadline)
344 }
345 }
346 }
347
348 func Test_PostMsgpMarshaler(t *testing.T) {
349 f, err := New(Config{
350 FluentPort: 6666,
351 Async: false,
352 MarshalAsJSON: true, // easy to check equality
353 })
354 if err != nil {
355 t.Error(err)
356 return
357 }
358
359 var testData = []struct {
360 in *TestMessage
361 out string
362 }{
363 {
364 &TestMessage{Foo: "bar"},
365 "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]",
366 },
367 }
368 for _, tt := range testData {
369 conn := &Conn{}
370 f.conn = conn
371
372 err = f.PostWithTime("tag_name", time.Unix(1482493046, 0), tt.in)
740 t.Errorf("Unexpected error: %v", err)
741 }
742
743 err = f.PostWithTime("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
373744 if err != nil {
374 t.Errorf("in=%s, err=%s", tt.in, err)
375 }
376
377 rcv := make([]byte, len(conn.buf))
378 _, _ = conn.Read(rcv)
379 if string(rcv) != tt.out {
380 t.Errorf("got %s, except %s", string(rcv), tt.out)
381 }
382
383 if !conn.writeDeadline.IsZero() {
384 t.Errorf("got %s, except 0", conn.writeDeadline)
385 }
386 }
745 t.Errorf("Unexpected error: %v", err)
746 }
747
748 err = f.Close()
749 if err != nil {
750 t.Errorf("Unexpected error: %v", err)
751 }
752
753 // Now let's post some event after Fluent.Close().
754 err = f.PostWithTime("tag_name", time.Unix(1482493050, 0), map[string]string{"foo": "buzz"})
755
756 // The event submission must fail,
757 assert.NotEqual(t, err, nil);
758
759 // and also must keep Fluentd closed.
760 assert.NotEqual(t, f.closed, false);
761 }()
762
763 conn := d.waitForNextDialing(true, false)
764 conn.waitForNextWrite(true, "")
387765 }
388766
389767 func Benchmark_PostWithShortMessage(b *testing.B) {
390768 b.StopTimer()
391 f, err := New(Config{})
769 d := newTestDialer()
770 f, err := newWithDialer(Config{}, d)
392771 if err != nil {
393772 panic(err)
394773 }
00 package fluent
11
2 const Version = "1.4.0"
2 const Version = "1.9.0"