Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncProducer.Close() waits for Flush.Frequency #2805

Open
ssantichaivekin opened this issue Feb 15, 2024 · 0 comments
Open

AsyncProducer.Close() waits for Flush.Frequency #2805

ssantichaivekin opened this issue Feb 15, 2024 · 0 comments
Labels
needs-investigation Issues that require followup from maintainers

Comments

@ssantichaivekin
Copy link

ssantichaivekin commented Feb 15, 2024

Description

Does Sarama AsyncProducer.Close() wait for up to the duration of Flush.Frequency before performing a flush? I expected the .Close() to just close the channels and just flush as fast as possible, as there will be no future messages.

I did small experiments with my setup, and also looked at async_producer.go. Both confirm that it can wait up to Flush.Frequency before actually flushing.

func (bp *brokerProducer) run() {
...
		// my own comment: this is the only place where output is set
		if bp.timerFired || bp.buffer.readyToFlush() {
			output = bp.output
		} else {
			output = nil
		}

...
func (ps *produceSet) readyToFlush() bool {
	switch {
	// If we don't have any messages, nothing else matters
	case ps.empty():
		return false
	// If all three config values are 0, we always flush as-fast-as-possible
	case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:
		return true
	// If we've passed the message trigger-point
	case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages:
		return true
	// If we've passed the byte trigger-point
	case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
		return true
	default:
		return false
	}
}
Versions
Sarama Kafka Go
1.38.1 go1.20.10
Configuration

Flush.Frequency of 30 seconds.

Logs
logs: CLICK ME

This was the log when I created a producer, set the frequency to 30s, write some messages, and close the producer afterwards. (see 29s `.Close()` in timestamp)

...
2024-02-15T15:11:40-08:00       INFO    bootloader: [sarama] Producer shutting down.
2024-02-15T15:12:09-08:00       INFO    bootloader: [sarama] Closing Client
2024-02-15T15:12:09-08:00       INFO    bootloader: [sarama] Closed connection to broker kafka_broker:29092
2024-02-15T15:12:09-08:00       INFO    bootloader: [sarama] producer/broker/1 input chan closed
2024-02-15T15:12:09-08:00       INFO    bootloader: [sarama] Closed connection to broker kafka_broker:29092
...

Additional Context

Wondering if this is the expected behavior or a bug.

@dnwe dnwe added the needs-investigation Issues that require followup from maintainers label Feb 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-investigation Issues that require followup from maintainers
Projects
None yet
Development

No branches or pull requests

2 participants