Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[broker] Fix issue where Key_Shared consumers could get stuck #10920

Merged
merged 1 commit into from Jun 14, 2021

Conversation

massakam
Copy link
Contributor

Motivation

Repeatedly opening and closing consumers on a Key_Shared subscription can occasionally stop dispatching to all consumers. The following is stats of the topic when the phenomenon occurred.

stats.json

{
  "msgRateIn" : 0.0,
  "msgThroughputIn" : 0.0,
  "msgRateOut" : 0.0,
  "msgThroughputOut" : 0.0,
  "averageMsgSize" : 0.0,
  "storageSize" : 888206,
  "backlogSize" : 153264,
  "publishers" : [ ],
  "subscriptions" : {
    "sub1" : {
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "msgRateRedeliver" : 0.0,
      "msgBacklog" : 2324,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 0,
      "unackedMessages" : 1,
      "type" : "Key_Shared",
      "msgRateExpired" : 0.0,
      "consumers" : [ {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "5a9ed",
        "availablePermits" : 607,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "connectedSince" : "2021-06-11T18:57:08.475+09:00",
        "clientVersion" : "2.4.2.36-yjrelease",
        "address" : "/xxx.xxx.xxx.xxx:59196"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "0e74e",
        "availablePermits" : 686,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "connectedSince" : "2021-06-11T18:57:31.293+09:00",
        "clientVersion" : "2.4.2.36-yjrelease",
        "address" : "/xxx.xxx.xxx.xxx:59198"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "b8ac0",
        "availablePermits" : 952,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "connectedSince" : "2021-06-11T18:57:58.618+09:00",
        "clientVersion" : "2.4.2.36-yjrelease",
        "address" : "/xxx.xxx.xxx.xxx:59188"
      }, {
        "msgRateOut" : 0.0,
        "msgThroughputOut" : 0.0,
        "msgRateRedeliver" : 0.0,
        "consumerName" : "43e0a",
        "availablePermits" : 1000,
        "unackedMessages" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "metadata" : { },
        "connectedSince" : "2021-06-11T18:58:24.501+09:00",
        "clientVersion" : "2.4.2.36-yjrelease",
        "address" : "/xxx.xxx.xxx.xxx:59190"
      } ],
      "isReplicated" : false,
      "consumersAfterMarkDeletePosition" : {
        "43e0a" : "3860483:12549"
      }
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled"
}

The strange thing is that every consumer has an unackedMessages value of 0, but the subscription-level unackedMessages value is 1.

Modifications

The cause of this issue is the following part:

When removeConsumer() of the superclass is called, the pending acks owned by that consumer are added to messagesToRedeliver.

consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> {
if (addMessageToReplay(ledgerId, entryId)) {
redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
}
});

However, the consumer has not yet been removed from selector, so the broker attempts to send messages to the consumer that has already been closed. Those messages are removed from messagesToRedeliver, but they aren't actually sent to any consumer.

// remove positions first from replay list first : sendMessages recycles entries
if (readType == ReadType.Replay) {
for (int i = 0; i < messagesForC; i++) {
Entry entry = entriesWithSameKey.get(i);
messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId());
}
}

As a result, the mark-delete position does not move and all consumers will get stuck.

Therefore, in PersistentStickyKeyDispatcherMultipleConsumers#removeConsumer(), we need to remove the consumer from selector before calling removeConsumer() of the superclass.

@massakam massakam added type/bug The PR fixed a bug or issue reported a bug area/broker component/key-shared release/2.8.1 labels Jun 14, 2021
@massakam massakam added this to the 2.9.0 milestone Jun 14, 2021
@massakam massakam self-assigned this Jun 14, 2021
super.removeConsumer(consumer);
// The consumer must be removed from the selector before calling the superclass removeConsumer method.
// In the superclass removeConsumer method, the pending acks that the consumer has are added to
// messagesToRedeliver. If the consumer has not been removed from the selector at this point,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the consumer has not been removed from the selector at this point

Looks like a race condition between sending messages to the consumer and remove the consumer from the selector?

We have a synchronized to protect the readEntriesComplete and removeConsumer. How can this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codelipenghui Perhaps this happens if the messages to be redelivered is in the managed ledger cache. In this case,

readMoreEntries()
↓
readEntriesComplete()
↓
sendMessagesToConsumers()

are executed and completed synchronously in PersistentDispatcherMultipleConsumers#removeConsumer().

consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> {
if (addMessageToReplay(ledgerId, entryId)) {
redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId));
}
});
totalAvailablePermits -= consumer.getAvailablePermits();
if (log.isDebugEnabled()) {
log.debug("[{}] Decreased totalAvailablePermits by {} in PersistentDispatcherMultipleConsumers. "
+ "New dispatcher permit count is {}", name, consumer.getAvailablePermits(),
totalAvailablePermits);
}
readMoreEntries();

synchronized protection does not work because all of these methods are executed by the same thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I think the stack trace here would be like:

removeConsumer()

super.removeConsumer()

readMoreEntries() (This is what is used to trigger the re-delivery of the messages that were pending on the removed consumer)

readEntriesComplete()

sendMessagesToConsumers() As mentioned, this can select the removed consumer which is still in the selector list.

At this point, the sendMessagesToConsumers() will fail and the message will stay into the pendingAcks set for that consumer, but, since the consumer was already removed, the redelivery of this message will not happen.

I think this change is the correct one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. Thanks for the explanation @massakam @merlimat. I missed that super.removeConsumer() also get a chance to call readMoreEntries().

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@merlimat merlimat merged commit 40c8317 into apache:master Jun 14, 2021
@massakam massakam deleted the fix-key-shared-stuck branch June 15, 2021 01:00
@codelipenghui codelipenghui added cherry-picked/branch-2.7 Archived: 2.7 is end of life cherry-picked/branch-2.8 Archived: 2.8 is end of life labels Jun 15, 2021
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Jun 17, 2021
yangl pushed a commit to yangl/pulsar that referenced this pull request Jun 23, 2021
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Oct 7, 2021
(cherry picked from commit 8065d6c)
(cherry picked from commit f66d03d)
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-2.7 Archived: 2.7 is end of life cherry-picked/branch-2.8 Archived: 2.8 is end of life release/2.7.3 release/2.8.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants