From d6f3add5c6080fb6ab2e8dc5c7734ea37edbe8e6 Mon Sep 17 00:00:00 2001 From: Cam Date: Tue, 25 May 2021 10:26:21 -0700 Subject: [PATCH 1/3] vendor: github.com/fluent/fluent-logger-golang 1.6.1 Updates the fluent logger library. Namely this fixes a couple places where the library could panic when closing and writing to channels. see https://github.com/fluent/fluent-logger-golang/pull/93 and https://github.com/fluent/fluent-logger-golang/pull/95 closes #40829 closes #32567 Signed-off-by: Cam (cherry picked from commit a6a98d6928622c70feb8bf58a8dd17c3948cdd9d) Signed-off-by: Wesley --- vendor.conf | 2 +- .../fluent/fluent-logger-golang/README.md | 78 +++++++- .../fluent-logger-golang/fluent/fluent.go | 175 +++++++++++++----- .../fluent-logger-golang/fluent/proto.go | 16 +- 4 files changed, 214 insertions(+), 57 deletions(-) diff --git a/vendor.conf b/vendor.conf index dbf3e9979a268..7df83121523ef 100644 --- a/vendor.conf +++ b/vendor.conf @@ -106,7 +106,7 @@ github.com/godbus/dbus/v5 37bf87eef99d69c4f1d3528bd66e github.com/Graylog2/go-gelf 1550ee647df0510058c9d67a45c56f18911d80b8 # v2 branch # fluent-logger-golang deps -github.com/fluent/fluent-logger-golang 7a6c9dcd7f14c2ed5d8c55c11b894e5455ee311b # v1.4.0 +github.com/fluent/fluent-logger-golang b9b7fb02ccfee8ba4e69aa87386820c2bf24fd11 # v1.6.1 github.com/philhofer/fwd bb6d471dc95d4fe11e432687f8b70ff496cf3136 # v1.0.0 github.com/tinylib/msgp af6442a0fcf6e2a1b824f70dd0c734f01e817751 # v1.1.0 diff --git a/vendor/github.com/fluent/fluent-logger-golang/README.md b/vendor/github.com/fluent/fluent-logger-golang/README.md index 5930602619082..5ef54450d8166 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/README.md +++ b/vendor/github.com/fluent/fluent-logger-golang/README.md @@ -2,6 +2,7 @@ fluent-logger-golang ==== [![Build Status](https://travis-ci.org/fluent/fluent-logger-golang.png?branch=master)](https://travis-ci.org/fluent/fluent-logger-golang) +[![GoDoc](https://godoc.org/github.com/fluent/fluent-logger-golang/fluent?status.svg)](https://godoc.org/github.com/fluent/fluent-logger-golang/fluent) ## A structured event logger for Fluentd (Golang) @@ -19,8 +20,6 @@ Install the package with `go get` and use `import` to include it in your project import "github.com/fluent/fluent-logger-golang/fluent" ``` -GoDoc: http://godoc.org/github.com/fluent/fluent-logger-golang/fluent - ## Example ```go @@ -29,7 +28,7 @@ package main import ( "github.com/fluent/fluent-logger-golang/fluent" "fmt" - "time" + //"time" ) func main() { @@ -59,21 +58,92 @@ func main() { f := fluent.New(fluent.Config{FluentPort: 80, FluentHost: "example.com"}) ``` +### FluentNetwork + +Specify the network protocol, as "tcp" (use `FluentHost` and `FluentPort`) or "unix" (use `FluentSocketPath`). +The default is "tcp". + +### FluentHost + +Specify a hostname or IP address as a string for the destination of the "tcp" protocol. +The default is "127.0.0.1". + +### FluentPort + +Specify the TCP port of the destination. The default is 24224. + +### FluentSocketPath + +Specify the unix socket path when `FluentNetwork` is "unix". + +### Timeout + +Set the timeout value of `time.Duration` to connect to the destination. +The default is 3 seconds. + ### WriteTimeout -Sets the timeout for Write call of logger.Post. +Sets the timeout value of `time.Duration` for Write call of `logger.Post`. Since the default is zero value, Write will not time out. +### BufferLimit + +Sets the number of events buffered on the memory. Records will be stored in memory up to this number. If the buffer is full, the call to record logs will fail. +The default is 8192. + +### RetryWait + +Set the duration of the initial wait for the first retry, in milliseconds. The actual retry wait will be `r * 1.5^(N-1)` (r: this value, N: the number of retries). +The default is 500. + +### MaxRetry + +Sets the maximum number of retries. If the number of retries become larger than this value, the write/send operation will fail. +The default is 13. + +### MaxRetryWait + +The maximum duration of wait between retries, in milliseconds. If the calculated retry wait is larger than this value, the actual retry wait will be this value. +The default is 60,000 (60 seconds). + +### TagPrefix + +Sets the prefix string of the tag. Prefix will be appended with a dot `.`, like `ppp.tag` (ppp: the value of this parameter, tag: the tag string specified in a call). +The default is blank. + ### Async Enable asynchronous I/O (connect and write) for sending events to Fluentd. The default is false. +### ForceStopAsyncSend + +When Async is enabled, immediately discard the event queue on close() and return (instead of trying MaxRetry times for each event in the queue before returning) +The default is false. + +### SubSecondPrecision + +Enable time encoding as EventTime, which contains sub-second precision values. The messages encoded with this option can be received only by Fluentd v0.14 or later. +The default is false. + +### MarshalAsJson + +Enable Json data marshaling to send messages using Json format (instead of the standard MessagePack). It is supported by Fluentd `in_forward` plugin. +The default is false. + ### RequestAck Sets whether to request acknowledgment from Fluentd to increase the reliability of the connection. The default is false. +## FAQ + +### Does this logger support the features of Fluentd Forward Protocol v1? + +"the features" includes heartbeat messages (for TCP keepalive), TLS transport and shared key authentication. + +This logger doesn't support those features. Patches are welcome! + ## Tests ``` go test diff --git a/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go b/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go index 5bbd52668ea27..9d5d8af4d27e5 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go +++ b/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go @@ -15,8 +15,9 @@ import ( "bytes" "encoding/base64" "encoding/binary" - "github.com/tinylib/msgp/msgp" "math/rand" + + "github.com/tinylib/msgp/msgp" ) const ( @@ -37,18 +38,19 @@ const ( ) type Config struct { - FluentPort int `json:"fluent_port"` - FluentHost string `json:"fluent_host"` - FluentNetwork string `json:"fluent_network"` - FluentSocketPath string `json:"fluent_socket_path"` - Timeout time.Duration `json:"timeout"` - WriteTimeout time.Duration `json:"write_timeout"` - BufferLimit int `json:"buffer_limit"` - RetryWait int `json:"retry_wait"` - MaxRetry int `json:"max_retry"` - MaxRetryWait int `json:"max_retry_wait"` - TagPrefix string `json:"tag_prefix"` - Async bool `json:"async"` + FluentPort int `json:"fluent_port"` + FluentHost string `json:"fluent_host"` + FluentNetwork string `json:"fluent_network"` + FluentSocketPath string `json:"fluent_socket_path"` + Timeout time.Duration `json:"timeout"` + WriteTimeout time.Duration `json:"write_timeout"` + BufferLimit int `json:"buffer_limit"` + RetryWait int `json:"retry_wait"` + MaxRetry int `json:"max_retry"` + MaxRetryWait int `json:"max_retry_wait"` + TagPrefix string `json:"tag_prefix"` + Async bool `json:"async"` + ForceStopAsyncSend bool `json:"force_stop_async_send"` // Deprecated: Use Async instead AsyncConnect bool `json:"async_connect"` MarshalAsJSON bool `json:"marshal_as_json"` @@ -63,6 +65,18 @@ type Config struct { RequestAck bool `json:"request_ack"` } +type ErrUnknownNetwork struct { + network string +} + +func (e *ErrUnknownNetwork) Error() string { + return "unknown network " + e.network +} + +func NewErrUnknownNetwork(network string) error { + return &ErrUnknownNetwork{network} +} + type msgToSend struct { data []byte ack string @@ -71,15 +85,32 @@ type msgToSend struct { type Fluent struct { Config - pending chan *msgToSend - wg sync.WaitGroup + dialer dialer + stopRunning chan bool + pending chan *msgToSend + pendingMutex sync.RWMutex + chanClosed bool + wg sync.WaitGroup muconn sync.Mutex conn net.Conn } // New creates a new Logger. -func New(config Config) (f *Fluent, err error) { +func New(config Config) (*Fluent, error) { + if config.Timeout == 0 { + config.Timeout = defaultTimeout + } + return newWithDialer(config, &net.Dialer{ + Timeout: config.Timeout, + }) +} + +type dialer interface { + Dial(string, string) (net.Conn, error) +} + +func newWithDialer(config Config, d dialer) (f *Fluent, err error) { if config.FluentNetwork == "" { config.FluentNetwork = defaultNetwork } @@ -92,9 +123,6 @@ func New(config Config) (f *Fluent, err error) { if config.FluentSocketPath == "" { config.FluentSocketPath = defaultSocketPath } - if config.Timeout == 0 { - config.Timeout = defaultTimeout - } if config.WriteTimeout == 0 { config.WriteTimeout = defaultWriteTimeout } @@ -114,15 +142,22 @@ func New(config Config) (f *Fluent, err error) { fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead") config.Async = config.Async || config.AsyncConnect } + if config.Async { f = &Fluent{ - Config: config, - pending: make(chan *msgToSend, config.BufferLimit), + Config: config, + dialer: d, + pending: make(chan *msgToSend, config.BufferLimit), + pendingMutex: sync.RWMutex{}, + stopRunning: make(chan bool, 1), } f.wg.Add(1) go f.run() } else { - f = &Fluent{Config: config} + f = &Fluent{ + Config: config, + dialer: d, + } err = f.connect() } return @@ -292,16 +327,32 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg // Close closes the connection, waiting for pending logs to be sent func (f *Fluent) Close() (err error) { + defer f.close(f.conn) if f.Config.Async { + f.pendingMutex.Lock() + if f.chanClosed { + f.pendingMutex.Unlock() + return nil + } + f.chanClosed = true + f.pendingMutex.Unlock() + if f.Config.ForceStopAsyncSend { + f.stopRunning <- true + close(f.stopRunning) + } close(f.pending) f.wg.Wait() } - f.close() - return + return nil } // appendBuffer appends data to buffer with lock. func (f *Fluent) appendBuffer(msg *msgToSend) error { + f.pendingMutex.RLock() + defer f.pendingMutex.RUnlock() + if f.chanClosed { + return fmt.Errorf("fluent#appendBuffer: Logger already closed") + } select { case f.pending <- msg: default: @@ -311,9 +362,9 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error { } // close closes the connection. -func (f *Fluent) close() { +func (f *Fluent) close(c net.Conn) { f.muconn.Lock() - if f.conn != nil { + if f.conn != nil && f.conn == c { f.conn.Close() f.conn = nil } @@ -322,19 +373,24 @@ func (f *Fluent) close() { // connect establishes a new connection using the specified transport. func (f *Fluent) connect() (err error) { - switch f.Config.FluentNetwork { case "tcp": - f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout) + f.conn, err = f.dialer.Dial( + f.Config.FluentNetwork, + f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort)) case "unix": - f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentSocketPath, f.Config.Timeout) + f.conn, err = f.dialer.Dial( + f.Config.FluentNetwork, + f.Config.FluentSocketPath) default: - err = net.UnknownNetworkError(f.Config.FluentNetwork) + err = NewErrUnknownNetwork(f.Config.FluentNetwork) } return err } func (f *Fluent) run() { + drainEvents := false + var emitEventDrainMsg sync.Once for { select { case entry, ok := <-f.pending: @@ -342,11 +398,22 @@ func (f *Fluent) run() { f.wg.Done() return } + if drainEvents { + emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) }) + continue + } err := f.write(entry) if err != nil { fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339)) } } + select { + case stopRunning, ok := <-f.stopRunning: + if stopRunning || !ok { + drainEvents = true + } + default: + } } } @@ -355,48 +422,56 @@ func e(x, y float64) int { } func (f *Fluent) write(msg *msgToSend) error { - + var c net.Conn for i := 0; i < f.Config.MaxRetry; i++ { - + c = f.conn // Connect if needed - f.muconn.Lock() - if f.conn == nil { - err := f.connect() - if err != nil { - f.muconn.Unlock() - waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1)) - if waitTime > f.Config.MaxRetryWait { - waitTime = f.Config.MaxRetryWait + if c == nil { + f.muconn.Lock() + if f.conn == nil { + err := f.connect() + if err != nil { + f.muconn.Unlock() + + if _, ok := err.(*ErrUnknownNetwork); ok { + // do not retry on unknown network error + break + } + waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1)) + if waitTime > f.Config.MaxRetryWait { + waitTime = f.Config.MaxRetryWait + } + time.Sleep(time.Duration(waitTime) * time.Millisecond) + continue } - time.Sleep(time.Duration(waitTime) * time.Millisecond) - continue } + c = f.conn + f.muconn.Unlock() } - f.muconn.Unlock() // We're connected, write msg t := f.Config.WriteTimeout if time.Duration(0) < t { - f.conn.SetWriteDeadline(time.Now().Add(t)) + c.SetWriteDeadline(time.Now().Add(t)) } else { - f.conn.SetWriteDeadline(time.Time{}) + c.SetWriteDeadline(time.Time{}) } - _, err := f.conn.Write(msg.data) + _, err := c.Write(msg.data) if err != nil { - f.close() + f.close(c) } else { // Acknowledgment check if msg.ack != "" { resp := &AckResp{} if f.Config.MarshalAsJSON { - dec := json.NewDecoder(f.conn) + dec := json.NewDecoder(c) err = dec.Decode(resp) } else { - r := msgp.NewReader(f.conn) + r := msgp.NewReader(c) err = resp.DecodeMsg(r) } if err != nil || resp.Ack != msg.ack { - f.close() + f.close(c) continue } } diff --git a/vendor/github.com/fluent/fluent-logger-golang/fluent/proto.go b/vendor/github.com/fluent/fluent-logger-golang/fluent/proto.go index 76fc860ee357d..95faa9624a77f 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/fluent/proto.go +++ b/vendor/github.com/fluent/fluent-logger-golang/fluent/proto.go @@ -3,6 +3,7 @@ package fluent import ( + "fmt" "time" "github.com/tinylib/msgp/msgp" @@ -93,8 +94,19 @@ func (t *EventTime) MarshalBinaryTo(b []byte) error { return nil } -// UnmarshalBinary is not implemented since decoding messages is not supported -// by this library. +// Although decoding messages is not officially supported by this library, +// UnmarshalBinary is implemented for testing and general completeness. func (t *EventTime) UnmarshalBinary(b []byte) error { + if len(b) != length { + return fmt.Errorf("Invalid EventTime byte length: %d", len(b)) + } + + sec := (int32(b[0]) << 24) | (int32(b[1]) << 16) + sec = sec | (int32(b[2]) << 8) | int32(b[3]) + + nsec := (int32(b[4]) << 24) | (int32(b[5]) << 16) + nsec = nsec | (int32(b[6]) << 8) | int32(b[7]) + + *t = EventTime(time.Unix(int64(sec), int64(nsec))) return nil } From 81fc02b7e1ce3df73909e3dac7f8cc9f55add1cf Mon Sep 17 00:00:00 2001 From: Albin Kerouanton Date: Mon, 1 Nov 2021 10:29:20 +0100 Subject: [PATCH 2/3] vendor: github.com/fluent/fluent-logger-golang v1.8.0 Updates the fluent logger library to v1.8.0. Following PRs/commits were merged since last bump: * [Add callback for error handling when using async](https://github.com/fluent/fluent-logger-golang/pull/97) * [Fix panic when accessing unexported struct field](https://github.com/fluent/fluent-logger-golang/pull/99) * [Properly stop logger during (re)connect failure](https://github.com/fluent/fluent-logger-golang/pull/82) * [Support a TLS-enabled connection](https://github.com/fluent/fluent-logger-golang/commit/e5d6aa13b74ca74e01d37caa7230cfc2bc204b1f) See https://github.com/fluent/fluent-logger-golang/compare/v1.6.1..v1.8.0 Signed-off-by: Albin Kerouanton (cherry picked from commit e24d61b7efac787ff3d5176d994608937a057522) Signed-off-by: Wesley --- vendor.conf | 2 +- .../fluent/fluent-logger-golang/README.md | 36 +- .../fluent-logger-golang/fluent/fluent.go | 363 ++++++++++++------ 3 files changed, 283 insertions(+), 118 deletions(-) diff --git a/vendor.conf b/vendor.conf index 7df83121523ef..503c9d229d7a7 100644 --- a/vendor.conf +++ b/vendor.conf @@ -106,7 +106,7 @@ github.com/godbus/dbus/v5 37bf87eef99d69c4f1d3528bd66e github.com/Graylog2/go-gelf 1550ee647df0510058c9d67a45c56f18911d80b8 # v2 branch # fluent-logger-golang deps -github.com/fluent/fluent-logger-golang b9b7fb02ccfee8ba4e69aa87386820c2bf24fd11 # v1.6.1 +github.com/fluent/fluent-logger-golang 0b652e850a9140d0b1db6390d8925d0601e952db # v1.8.0 github.com/philhofer/fwd bb6d471dc95d4fe11e432687f8b70ff496cf3136 # v1.0.0 github.com/tinylib/msgp af6442a0fcf6e2a1b824f70dd0c734f01e817751 # v1.1.0 diff --git a/vendor/github.com/fluent/fluent-logger-golang/README.md b/vendor/github.com/fluent/fluent-logger-golang/README.md index 5ef54450d8166..554619a31cd0e 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/README.md +++ b/vendor/github.com/fluent/fluent-logger-golang/README.md @@ -1,7 +1,7 @@ fluent-logger-golang ==== -[![Build Status](https://travis-ci.org/fluent/fluent-logger-golang.png?branch=master)](https://travis-ci.org/fluent/fluent-logger-golang) +[![Build Status](https://github.com/fluent/fluent-logger-golang/actions/workflows/ci.yaml/badge.svg?branch=master)](https://github.com/fluent/fluent-logger-golang/actions) [![GoDoc](https://godoc.org/github.com/fluent/fluent-logger-golang/fluent?status.svg)](https://godoc.org/github.com/fluent/fluent-logger-golang/fluent) ## A structured event logger for Fluentd (Golang) @@ -60,7 +60,12 @@ f := fluent.New(fluent.Config{FluentPort: 80, FluentHost: "example.com"}) ### FluentNetwork -Specify the network protocol, as "tcp" (use `FluentHost` and `FluentPort`) or "unix" (use `FluentSocketPath`). +Specify the network protocol. The supported values are: + + * "tcp" (use `FluentHost` and `FluentPort`) + * "tls" (use`FluentHost` and `FluentPort`) + * "unix" (use `FluentSocketPath`) + The default is "tcp". ### FluentHost @@ -121,6 +126,12 @@ The default is false. When Async is enabled, immediately discard the event queue on close() and return (instead of trying MaxRetry times for each event in the queue before returning) The default is false. +### AsyncResultCallback + +When Async is enabled, if this is callback is provided, it will be called on every write to Fluentd. The callback function +takes two arguments - a `[]byte` of the message that was to be sent and an `error`. If the `error` is not nil this means the +delivery of the message was unsuccessful. + ### SubSecondPrecision Enable time encoding as EventTime, which contains sub-second precision values. The messages encoded with this option can be received only by Fluentd v0.14 or later. @@ -136,6 +147,10 @@ The default is false. Sets whether to request acknowledgment from Fluentd to increase the reliability of the connection. The default is false. +### TlsInsecureSkipVerify + +Skip verifying the server certificate. Useful for development and testing. The default is false. + ## FAQ ### Does this logger support the features of Fluentd Forward Protocol v1? @@ -144,7 +159,24 @@ of the connection. The default is false. This logger doesn't support those features. Patches are welcome! +### Is it allowed to call `Fluent.Post()` after connection close? + +Before v1.8.0, the Fluent logger silently reopened connections whenever +`Fluent.Post()` was called. + +```go +logger, _ := fluent.New(fluent.Config{}) +logger.Post(tag, data) +logger.Close() +logger.Post(tag, data) /* reopen connection */ +``` + +However, this behavior was confusing, in particular when multiple goroutines +were involved. Starting v1.8.0, the logger no longer accepts `Fluent.Post()` +after `Fluent.Close()`, and instead returns a "Logger already closed" error. + ## Tests ``` + go test ``` diff --git a/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go b/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go index 9d5d8af4d27e5..c2e1b25955586 100644 --- a/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go +++ b/vendor/github.com/fluent/fluent-logger-golang/fluent/fluent.go @@ -1,10 +1,13 @@ package fluent import ( + "context" + "crypto/tls" "encoding/json" "errors" "fmt" "math" + "math/rand" "net" "os" "reflect" @@ -15,7 +18,6 @@ import ( "bytes" "encoding/base64" "encoding/binary" - "math/rand" "github.com/tinylib/msgp/msgp" ) @@ -35,22 +37,30 @@ const ( // Default sub-second precision value to false since it is only compatible // with fluentd versions v0.14 and above. defaultSubSecondPrecision = false + + // Default value whether to skip checking insecure certs on TLS connections. + defaultTlsInsecureSkipVerify = false ) +// randomGenerator is used by getUniqueId to generate ack hashes. Its value is replaced +// during tests with a deterministic function. +var randomGenerator = rand.Uint64 + type Config struct { - FluentPort int `json:"fluent_port"` - FluentHost string `json:"fluent_host"` - FluentNetwork string `json:"fluent_network"` - FluentSocketPath string `json:"fluent_socket_path"` - Timeout time.Duration `json:"timeout"` - WriteTimeout time.Duration `json:"write_timeout"` - BufferLimit int `json:"buffer_limit"` - RetryWait int `json:"retry_wait"` - MaxRetry int `json:"max_retry"` - MaxRetryWait int `json:"max_retry_wait"` - TagPrefix string `json:"tag_prefix"` - Async bool `json:"async"` - ForceStopAsyncSend bool `json:"force_stop_async_send"` + FluentPort int `json:"fluent_port"` + FluentHost string `json:"fluent_host"` + FluentNetwork string `json:"fluent_network"` + FluentSocketPath string `json:"fluent_socket_path"` + Timeout time.Duration `json:"timeout"` + WriteTimeout time.Duration `json:"write_timeout"` + BufferLimit int `json:"buffer_limit"` + RetryWait int `json:"retry_wait"` + MaxRetry int `json:"max_retry"` + MaxRetryWait int `json:"max_retry_wait"` + TagPrefix string `json:"tag_prefix"` + Async bool `json:"async"` + ForceStopAsyncSend bool `json:"force_stop_async_send"` + AsyncResultCallback func(data []byte, err error) // Deprecated: Use Async instead AsyncConnect bool `json:"async_connect"` MarshalAsJSON bool `json:"marshal_as_json"` @@ -63,6 +73,9 @@ type Config struct { // respond with an acknowledgement. This option improves the reliability // of the message transmission. RequestAck bool `json:"request_ack"` + + // Flag to skip verifying insecure certs on TLS connections + TlsInsecureSkipVerify bool `json: "tls_insecure_skip_verify"` } type ErrUnknownNetwork struct { @@ -85,17 +98,24 @@ type msgToSend struct { type Fluent struct { Config - dialer dialer - stopRunning chan bool - pending chan *msgToSend - pendingMutex sync.RWMutex - chanClosed bool - wg sync.WaitGroup - - muconn sync.Mutex + dialer dialer + // stopRunning is used in async mode to signal to run() it should abort. + stopRunning chan struct{} + // cancelDialings is used by Close() to stop any in-progress dialing. + cancelDialings context.CancelFunc + pending chan *msgToSend + pendingMutex sync.RWMutex + closed bool + wg sync.WaitGroup + + muconn sync.RWMutex conn net.Conn } +type dialer interface { + DialContext(ctx context.Context, network, address string) (net.Conn, error) +} + // New creates a new Logger. func New(config Config) (*Fluent, error) { if config.Timeout == 0 { @@ -106,10 +126,6 @@ func New(config Config) (*Fluent, error) { }) } -type dialer interface { - Dial(string, string) (net.Conn, error) -} - func newWithDialer(config Config, d dialer) (f *Fluent, err error) { if config.FluentNetwork == "" { config.FluentNetwork = defaultNetwork @@ -138,27 +154,36 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) { if config.MaxRetryWait == 0 { config.MaxRetryWait = defaultMaxRetryWait } + if !config.TlsInsecureSkipVerify { + config.TlsInsecureSkipVerify = defaultTlsInsecureSkipVerify + } if config.AsyncConnect { fmt.Fprintf(os.Stderr, "fluent#New: AsyncConnect is now deprecated, please use Async instead") config.Async = config.Async || config.AsyncConnect } if config.Async { + ctx, cancel := context.WithCancel(context.Background()) + f = &Fluent{ - Config: config, - dialer: d, - pending: make(chan *msgToSend, config.BufferLimit), - pendingMutex: sync.RWMutex{}, - stopRunning: make(chan bool, 1), + Config: config, + dialer: d, + stopRunning: make(chan struct{}), + cancelDialings: cancel, + pending: make(chan *msgToSend, config.BufferLimit), + pendingMutex: sync.RWMutex{}, + muconn: sync.RWMutex{}, } + f.wg.Add(1) - go f.run() + go f.run(ctx) } else { f = &Fluent{ Config: config, dialer: d, + muconn: sync.RWMutex{}, } - err = f.connect() + err = f.connect(context.Background()) } return } @@ -211,13 +236,18 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err fields := msgtype.NumField() for i := 0; i < fields; i++ { field := msgtype.Field(i) + value := msg.FieldByIndex(field.Index) + // ignore unexported fields + if !value.CanInterface() { + continue + } name := field.Name if n1 := field.Tag.Get("msg"); n1 != "" { name = n1 } else if n2 := field.Tag.Get("codec"); n2 != "" { name = n2 } - kv[name] = msg.FieldByIndex(field.Index).Interface() + kv[name] = value.Interface() } return f.EncodeAndPostData(tag, tm, kv) } @@ -254,8 +284,12 @@ func (f *Fluent) postRawData(msg *msgToSend) error { if f.Config.Async { return f.appendBuffer(msg) } + // Synchronous write - return f.write(msg) + if f.closed { + return fmt.Errorf("fluent#postRawData: Logger already closed") + } + return f.writeWithRetry(context.Background(), msg) } // For sending forward protocol adopted JSON @@ -289,7 +323,7 @@ func getUniqueID(timeUnix int64) (string, error) { enc.Close() return "", err } - if err := binary.Write(enc, binary.LittleEndian, rand.Uint64()); err != nil { + if err := binary.Write(enc, binary.LittleEndian, randomGenerator()); err != nil { enc.Close() return "", err } @@ -325,32 +359,53 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg return } -// Close closes the connection, waiting for pending logs to be sent +// Close closes the connection, waiting for pending logs to be sent. If the client is +// running in async mode, the run() goroutine exits before Close() returns. func (f *Fluent) Close() (err error) { - defer f.close(f.conn) if f.Config.Async { f.pendingMutex.Lock() - if f.chanClosed { + if f.closed { f.pendingMutex.Unlock() return nil } - f.chanClosed = true + f.closed = true f.pendingMutex.Unlock() + if f.Config.ForceStopAsyncSend { - f.stopRunning <- true close(f.stopRunning) + f.cancelDialings() } + close(f.pending) + // If ForceStopAsyncSend is false, all logs in the channel have to be sent + // before closing the connection. At this point closed is true so no more + // logs are written to the channel and f.pending has been closed, so run() + // goroutine will exit as soon as all logs in the channel are sent. + if !f.Config.ForceStopAsyncSend { + f.wg.Wait() + } + } + + f.muconn.Lock() + f.close() + f.closed = true + f.muconn.Unlock() + + // If ForceStopAsyncSend is true, we shall close the connection before waiting for + // run() goroutine to exit to be sure we aren't waiting on ack message that might + // never come (eg. because fluentd server is down). However we want to be sure the + // run() goroutine stops before returning from Close(). + if f.Config.ForceStopAsyncSend { f.wg.Wait() } - return nil + return } // appendBuffer appends data to buffer with lock. func (f *Fluent) appendBuffer(msg *msgToSend) error { f.pendingMutex.RLock() defer f.pendingMutex.RUnlock() - if f.chanClosed { + if f.closed { return fmt.Errorf("fluent#appendBuffer: Logger already closed") } select { @@ -361,58 +416,114 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error { return nil } -// close closes the connection. -func (f *Fluent) close(c net.Conn) { - f.muconn.Lock() - if f.conn != nil && f.conn == c { +// close closes the connection. Callers should take care of locking muconn first. +func (f *Fluent) close() { + if f.conn != nil { f.conn.Close() f.conn = nil } - f.muconn.Unlock() } -// connect establishes a new connection using the specified transport. -func (f *Fluent) connect() (err error) { +// connect establishes a new connection using the specified transport. Caller should +// take care of locking muconn first. +func (f *Fluent) connect(ctx context.Context) (err error) { switch f.Config.FluentNetwork { case "tcp": - f.conn, err = f.dialer.Dial( + f.conn, err = f.dialer.DialContext(ctx, f.Config.FluentNetwork, f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort)) + case "tls": + tlsConfig := &tls.Config{InsecureSkipVerify: f.Config.TlsInsecureSkipVerify} + f.conn, err = tls.DialWithDialer( + &net.Dialer{Timeout: f.Config.Timeout}, + "tcp", + f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), tlsConfig, + ) case "unix": - f.conn, err = f.dialer.Dial( + f.conn, err = f.dialer.DialContext(ctx, f.Config.FluentNetwork, f.Config.FluentSocketPath) default: err = NewErrUnknownNetwork(f.Config.FluentNetwork) } + return err } -func (f *Fluent) run() { - drainEvents := false - var emitEventDrainMsg sync.Once +var errIsClosing = errors.New("fluent logger is closing") + +// Caller should take care of locking muconn first. +func (f *Fluent) connectWithRetry(ctx context.Context) error { + // A Time channel is used instead of time.Sleep() to avoid blocking this + // goroutine during way too much time (because of the exponential back-off + // retry). + // time.NewTimer() is used instead of time.After() to avoid leaking the + // timer channel (cf. https://pkg.go.dev/time#After). + timeout := time.NewTimer(time.Duration(0)) + defer func() { + // timeout.Stop() is called in a function literal instead of being + // defered directly as it's re-assigned below when the retry loop spins. + timeout.Stop() + }() + + for i := 0; i < f.Config.MaxRetry; i++ { + select { + case <-timeout.C: + err := f.connect(ctx) + if err == nil { + return nil + } + + if _, ok := err.(*ErrUnknownNetwork); ok { + return err + } + if err == context.Canceled { + return errIsClosing + } + + waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1)) + if waitTime > f.Config.MaxRetryWait { + waitTime = f.Config.MaxRetryWait + } + + timeout = time.NewTimer(time.Duration(waitTime) * time.Millisecond) + case <-ctx.Done(): + return errIsClosing + } + } + + return fmt.Errorf("could not connect to fluentd after %d retries", f.Config.MaxRetry) +} + +// run is the goroutine used to unqueue and write logs in async mode. That +// goroutine is meant to run during the whole life of the Fluent logger. +func (f *Fluent) run(ctx context.Context) { for { select { case entry, ok := <-f.pending: + // f.stopRunning is closed before f.pending only when ForceStopAsyncSend + // is enabled. Otherwise, f.pending is closed when Close() is called. if !ok { f.wg.Done() return } - if drainEvents { - emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) }) - continue - } - err := f.write(entry) - if err != nil { + + err := f.writeWithRetry(ctx, entry) + if err != nil && err != errIsClosing { fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339)) } - } - select { - case stopRunning, ok := <-f.stopRunning: - if stopRunning || !ok { - drainEvents = true + if f.AsyncResultCallback != nil { + var data []byte + if entry != nil { + data = entry.data + } + f.AsyncResultCallback(data, err) } - default: + case <-f.stopRunning: + fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) + + f.wg.Done() + return } } } @@ -421,63 +532,85 @@ func e(x, y float64) int { return int(math.Pow(x, y)) } -func (f *Fluent) write(msg *msgToSend) error { - var c net.Conn +func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error { for i := 0; i < f.Config.MaxRetry; i++ { - c = f.conn - // Connect if needed - if c == nil { - f.muconn.Lock() - if f.conn == nil { - err := f.connect() - if err != nil { - f.muconn.Unlock() - - if _, ok := err.(*ErrUnknownNetwork); ok { - // do not retry on unknown network error - break - } - waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1)) - if waitTime > f.Config.MaxRetryWait { - waitTime = f.Config.MaxRetryWait - } - time.Sleep(time.Duration(waitTime) * time.Millisecond) - continue - } - } - c = f.conn - f.muconn.Unlock() + if retry, err := f.write(ctx, msg); !retry { + return err + } + } + + return fmt.Errorf("fluent#write: failed to write after %d attempts", f.Config.MaxRetry) +} + +// write writes the provided msg to fluentd server. Its first return values is +// a bool indicating whether the write should be retried. +// This method relies on function literals to execute muconn.Unlock or +// muconn.RUnlock in deferred calls to ensure the mutex is unlocked even in +// the case of panic recovering. +func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) { + closer := func() { + f.muconn.Lock() + defer f.muconn.Unlock() + + f.close() + } + + if err := func() (err error) { + f.muconn.Lock() + defer f.muconn.Unlock() + + if f.conn == nil { + err = f.connectWithRetry(ctx) + } + + return err + }(); err != nil { + // Here, we don't want to retry the write since connectWithRetry already + // retries Config.MaxRetry times to connect. + return false, fmt.Errorf("fluent#write: %v", err) + } + + if err := func() (err error) { + f.muconn.RLock() + defer f.muconn.RUnlock() + + if f.conn == nil { + return fmt.Errorf("connection has been closed before writing to it") } - // We're connected, write msg t := f.Config.WriteTimeout if time.Duration(0) < t { - c.SetWriteDeadline(time.Now().Add(t)) + f.conn.SetWriteDeadline(time.Now().Add(t)) } else { - c.SetWriteDeadline(time.Time{}) + f.conn.SetWriteDeadline(time.Time{}) } - _, err := c.Write(msg.data) - if err != nil { - f.close(c) + + _, err = f.conn.Write(msg.data) + return err + }(); err != nil { + closer() + return true, fmt.Errorf("fluent#write: %v", err) + } + + // Acknowledgment check + if msg.ack != "" { + resp := &AckResp{} + var err error + if f.Config.MarshalAsJSON { + dec := json.NewDecoder(f.conn) + err = dec.Decode(resp) } else { - // Acknowledgment check - if msg.ack != "" { - resp := &AckResp{} - if f.Config.MarshalAsJSON { - dec := json.NewDecoder(c) - err = dec.Decode(resp) - } else { - r := msgp.NewReader(c) - err = resp.DecodeMsg(r) - } - if err != nil || resp.Ack != msg.ack { - f.close(c) - continue - } - } - return err + r := msgp.NewReader(f.conn) + err = resp.DecodeMsg(r) + } + + if err != nil || resp.Ack != msg.ack { + fmt.Fprintf(os.Stderr, "fluent#write: message ack (%s) doesn't match expected one (%s). Closing connection...", resp.Ack, msg.ack) + + closer() + return true, err } } - return fmt.Errorf("fluent#write: failed to reconnect, max retry: %v", f.Config.MaxRetry) + return false, nil } From f9df098e7684fb42287860377dcd6931cca62a73 Mon Sep 17 00:00:00 2001 From: Albin Kerouanton Date: Mon, 1 Nov 2021 10:44:07 +0100 Subject: [PATCH 3/3] fluentd: Turn ForceStopAsyncSend true when async connect is used The flag ForceStopAsyncSend was added to fluent logger lib in v1.5.0 (at this time named AsyncStop) to tell fluentd to abort sending logs asynchronously as soon as possible, when its Close() method is called. However this flag was broken because of the way the lib was handling it (basically, the lib could be stucked in retry-connect loop without checking this flag). Since fluent logger lib v1.7.0, calling Close() (when ForceStopAsyncSend is true) will really stop all ongoing send/connect procedure, wherever it's stucked. Signed-off-by: Albin Kerouanton (cherry picked from commit bd61629b6b009b5fa50e31c180e86cede86ce9de) Signed-off-by: Wesley --- daemon/logger/fluentd/fluentd.go | 1 + 1 file changed, 1 insertion(+) diff --git a/daemon/logger/fluentd/fluentd.go b/daemon/logger/fluentd/fluentd.go index 829b606c9f3f8..997c01c65257e 100644 --- a/daemon/logger/fluentd/fluentd.go +++ b/daemon/logger/fluentd/fluentd.go @@ -242,6 +242,7 @@ func parseConfig(cfg map[string]string) (fluent.Config, error) { AsyncConnect: asyncConnect, SubSecondPrecision: subSecondPrecision, RequestAck: requestAck, + ForceStopAsyncSend: async || asyncConnect, } return config, nil