Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fixed data race in consumer group example #2725

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

Kaushal28
Copy link

@Kaushal28 Kaushal28 commented Nov 19, 2023

In the consumer loop, if any error occurs (other than sarama.ErrClosedConsumerGroup), then it'll retry calling client.Consume since it's infinite for loop. It will also assign a new ready channel at the end of the iteration. That is the race condition. Since it's in goroutine and the parent function will already be waiting on the different channel now (<-consumer.ready), which will keep waiting forever even if the Setup() closes the channel (because both the channels are now different).

Let me know if this makes sense. Please refer to the comments that I have added in the PR code changes below for more clarity.

I can see the same issue in other examples as well. But fixed this one for now. If this fix makes sense, I can also fix the other occurrences as well.

Here is the explanation of fix:

In this example, since we are only waiting at a single place on the ready channel, we'll close the ready channel only once (sync.Once is required since Setup can be called multiple times in events of rebalance).

Here is a similar fix that was required after following this buggy example: https://github.com/open-telemetry/opentelemetry-collector/pull/1696/files, which was required in my project as well because of the issue that I explained above reagarding infinite waiting in case of error.

Thanks.

Signed-off-by: Kaushal Shah <shahk@juniper.net>
@@ -116,7 +116,6 @@ func main() {
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we assign a new channel here in goroutine, the Setup will only close this new channel and the <-consumer.ready on line 123 below will be waiting on the older channel. So even if the new channel is closed, the wait will never end.

}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
consumer.readyCloser.Do(func() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only close the channel once since only the main() is waiting on the ready channel. No need to close again in cases of rebalance since no one is waiting after first close.

@dnwe dnwe added the needs-investigation Issues that require followup from maintainers label Jan 31, 2024
Copy link

Thank you for your contribution! However, this pull request has not had any activity in the past 90 days and will be closed in 30 days if no updates occur.
If you believe the changes are still valid then please verify your branch has no conflicts with main and rebase if needed. If you are awaiting a (re-)review then please let us know.

@github-actions github-actions bot added the stale Issues and pull requests without any recent activity label Apr 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-investigation Issues that require followup from maintainers stale Issues and pull requests without any recent activity
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants