Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[broker] Fix issue where Key_Shared consumers could get stuck #10920

Merged
merged 1 commit into from Jun 14, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -121,8 +121,14 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce

@Override
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
super.removeConsumer(consumer);
// The consumer must be removed from the selector before calling the superclass removeConsumer method.
// In the superclass removeConsumer method, the pending acks that the consumer has are added to
// messagesToRedeliver. If the consumer has not been removed from the selector at this point,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the consumer has not been removed from the selector at this point

Looks like a race condition between sending messages to the consumer and remove the consumer from the selector?

We have a synchronized to protect the readEntriesComplete and removeConsumer. How can this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui Perhaps this happens if the messages to be redelivered is in the managed ledger cache. In this case,

readMoreEntries()
↓
readEntriesComplete()
↓
sendMessagesToConsumers()

are executed and completed synchronously in PersistentDispatcherMultipleConsumers#removeConsumer().

consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> {
if (addMessageToReplay(ledgerId, entryId)) {
redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
}
});
totalAvailablePermits -= consumer.getAvailablePermits();
if (log.isDebugEnabled()) {
log.debug("[{}] Decreased totalAvailablePermits by {} in PersistentDispatcherMultipleConsumers. "
+ "New dispatcher permit count is {}", name, consumer.getAvailablePermits(),
totalAvailablePermits);
}
readMoreEntries();

synchronized protection does not work because all of these methods are executed by the same thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think the stack trace here would be like:

removeConsumer()

super.removeConsumer()

readMoreEntries() (This is what is used to trigger the re-delivery of the messages that were pending on the removed consumer)

readEntriesComplete()

sendMessagesToConsumers() As mentioned, this can select the removed consumer which is still in the selector list.

At this point, the sendMessagesToConsumers() will fail and the message will stay into the pendingAcks set for that consumer, but, since the consumer was already removed, the redelivery of this message will not happen.

I think this change is the correct one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. Thanks for the explanation @massakam @merlimat. I missed that super.removeConsumer() also get a chance to call readMoreEntries().

// the broker will try to redeliver the messages to the consumer that has already been closed.
// As a result, the messages are not redelivered to any consumer, and the mark-delete position does not move,
// eventually causing all consumers to get stuck.
selector.removeConsumer(consumer);
super.removeConsumer(consumer);
if (recentlyJoinedConsumers != null) {
recentlyJoinedConsumers.remove(consumer);
if (consumerList.size() == 1) {
Expand Down