New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add lock for reassigning partition to new consumer #2487
Conversation
3beee37
to
e45089f
Compare
@yitian108 are you using NewBalanceStrategySticky for each of your consumers rather than the deprecated shared global of BalanceStrategySticky? They should already have a unique partitionMovements instance if so: Lines 107 to 112 in fb81408
|
Hi @dnwe , I used the following api for each consumer: config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky Is this global of BalanceStrategySticky? If that, How should I use the NewBalanceStrategySticky at the starting of config kafka? ps: I am using the samara version 1.37, which new version using the NewBalanceSTrategySticky? |
It was |
Thanks @dnwe and @napallday, I'll try to use NewBalanceStrategyRange() instead to test the case. |
Hi @dnwe @napallday, I try to use NewBalanceStrategySticky() as below config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategySticky() However, the IDE GoLand prompts "'Strategy' is deprecated", it seems that I used the wrong way to achive the object, do you have another good way to implement the requirement? |
hi @yitian108, it's expected. From Sarama v1.37.0(more precisely PR #2339 ), |
Thanks @napallday , to my understand, I should use like this |
correct! @yitian108 |
Thank you for your contribution! However, this pull request has not had any activity in the past 90 days and will be closed in 30 days if no updates occur. |
When I try to test peer to peer consume using sarama library, some codes snippet like this:
Sometimes, running this code will report the panic 'concurrent map writes', and the location of panic points to the method 'addPartitionMovementRecord' in balance_strategy.go, so I add the lock for concurrently running the multiple consumers access the same new partition.
Please check it.