Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription #10762

Merged
merged 5 commits into from Jul 15, 2021

Conversation

massakam
Copy link
Contributor

@massakam massakam commented Jun 1, 2021

Motivation

Messages with the same key can be out of order if message redelivery occurs on a Key_Shared subscription.

  1. Suppose PersistentDispatcherMultipleConsumers#messagesToRedeliver contains message-1 and message-2. Message-1 will be delivered to consumer-a and message-2 will be delivered to consumer-b.
  2. The dispatcher tried to send message-1 to consumer-a, but the consumer was too slow to send it.
  3. Consumer-a is added to stuckConsumers.
    if (!nextStuckConsumers.isEmpty()) {
    isDispatcherStuckOnReplays = true;
    stuckConsumers.addAll(nextStuckConsumers);
    }
  4. The next time readMoreEntries() is run, getMessagesToReplayNow() will return an empty Set because isDispatcherStuckOnReplays is true.
    protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
    if (isDispatcherStuckOnReplays) {
    // If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked
    // messages kicks in), instead of keep replaying the same old messages, since the consumer that these
    // messages are routing to might be busy at the moment
    this.isDispatcherStuckOnReplays = false;
    return Collections.emptySet();
  5. The dispatcher reads newer messages instead of the messages contained in messagesToRedeliver.
    Set<PositionImpl> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
    if (!messagesToReplayNow.isEmpty()) {
    if (log.isDebugEnabled()) {
    log.debug("[{}] Schedule replay of {} messages for {} consumers", name, messagesToReplayNow.size(),
    consumerList.size());
    }
    havePendingReplayRead = true;
    Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
    ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
    // clear already acked positions from replay bucket
    deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(),
    ((PositionImpl) position).getEntryId()));
    // if all the entries are acked-entries and cleared up from messagesToRedeliver, try to read
    // next entries as readCompletedEntries-callback was never called
    if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
    havePendingReplayRead = false;
    // We should not call readMoreEntries() recursively in the same thread
    // as there is a risk of StackOverflowError
    topic.getBrokerService().executor().execute(() -> readMoreEntries());
    }
    } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
    log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
    totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
    } else if (!havePendingRead) {
    if (log.isDebugEnabled()) {
    log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
    consumerList.size());
    }
    havePendingRead = true;
    cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(),
    this,
    ReadType.Normal, topic.getMaxReadPosition());
  6. A new message (message-3) is delivered to consumer-b.
  7. Message-2 contained in messagesToRedeliver is delivered to consumer-b.
  8. As a result, the order of message-2 and message-3 is reversed.

Modifications

When adding a message to be redeliver to messagesToRedeliver, save the hash of the key that the message has. If the dispatcher attempts to send newer messages to the consumer that have a key corresponding to any one of the saved hash values, they will be added to messagesToRedeliver instead of being sent. This prevents messages with the same key from being out of order.

@massakam massakam added type/bug The PR fixed a bug or issue reported a bug area/broker component/key-shared labels Jun 1, 2021
@massakam massakam added this to the 2.8.0 milestone Jun 1, 2021
@massakam massakam self-assigned this Jun 1, 2021
if (addMessageToReplay(position.getLedgerId(), position.getEntryId())) {
// TODO: We want to pass a sticky key hash as a third argument to guarantee the order of the messages
// on Key_Shared subscription, but it's difficult to get the sticky key here
if (addMessageToReplay(position.getLedgerId(), position.getEntryId(), null)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is executed when redelivery is requested from the consumer side (e.g. negative ack, ack timeout). So the user should allow the messages to be out of order in this case.

Copy link
Contributor

@nkurihar nkurihar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great work!

I left one question for curiosity

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@eolivelli eolivelli requested a review from lhotari July 5, 2021 11:58
@massakam
Copy link
Contributor Author

massakam commented Jul 6, 2021

@lhotari PTAL

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@massakam nice idea!

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Outstanding work @massakam !


public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
if (hashesToBeBlocked != null) {
for (LongPair longPair : hashesToBeBlocked.keys()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using key() will make a copy of the whole set, so we should instead do the forEach().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat If we try to remove any entry in forEach(), it seems to cause a deadlock. Is there any way to avoid this without using keys()?

Copy link
Member

@lhotari lhotari Jul 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@massakam Does the deadlock happen in 2.8.x / master branch? There are changes in #9787 that resolved some dead lock issues. (see #10691 for more details)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari It happens in the master branch. hashesToBeBlocked is an instance of org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap, so it should be unrelated to those fixes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we can go on with this but we should also import here a fixed version of ConcurrentLongLongPairHashMap that doesn't have the problem

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we should also import here a fixed version of ConcurrentLongLongPairHashMap that doesn't have the problem

@merlimat Such a version doesn't exist yet, does it?

Copy link
Contributor Author

@massakam massakam Jul 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the keys to be removed in the forEach() loop to the newly created list and remove them from hashesToBeBlocked after exiting the loop. This makes the entries to be copied part of the set rather than the whole set. PTAL.
266ca5d

@massakam massakam force-pushed the fix-key-shared-order2 branch 2 times, most recently from 5decc85 to ae48c4a Compare July 8, 2021 04:17
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@codelipenghui
Copy link
Contributor

@merlimat Please help review this PR again.

@codelipenghui codelipenghui merged commit 5aee599 into apache:master Jul 15, 2021
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Jul 15, 2021
…ering messages on Key_Shared subscription (apache#10762)

Messages with the same key can be out of order if message redelivery occurs on a Key_Shared subscription.

1. Suppose `PersistentDispatcherMultipleConsumers#messagesToRedeliver` contains message-1 and message-2. Message-1 will be delivered to consumer-a and message-2 will be delivered to consumer-b.
2. The dispatcher tried to send message-1 to consumer-a, but the consumer was too slow to send it.
3. Consumer-a is added to `stuckConsumers`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L263-L266
4. The next time `readMoreEntries()` is run, `getMessagesToReplayNow()` will return an empty Set because `isDispatcherStuckOnReplays` is true.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L368-L374
5. The dispatcher reads newer messages instead of the messages contained in `messagesToRedeliver`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L233-L267
6. A new message (message-3) is delivered to consumer-b.
7. Message-2 contained in messagesToRedeliver is delivered to consumer-b.
8. As a result, the order of message-2 and message-3 is reversed.

When adding a message to be redeliver to `messagesToRedeliver`, save the hash of the key that the message has. If the dispatcher attempts to send newer messages to the consumer that have a key corresponding to any one of the saved hash values, they will be added to `messagesToRedeliver` instead of being sent. This prevents messages with the same key from being out of order.

(cherry picked from commit 5aee599)
@massakam massakam deleted the fix-key-shared-order2 branch July 16, 2021 01:11
codelipenghui pushed a commit that referenced this pull request Jul 19, 2021
…ering messages on Key_Shared subscription (#10762)

Messages with the same key can be out of order if message redelivery occurs on a Key_Shared subscription.

1. Suppose `PersistentDispatcherMultipleConsumers#messagesToRedeliver` contains message-1 and message-2. Message-1 will be delivered to consumer-a and message-2 will be delivered to consumer-b.
2. The dispatcher tried to send message-1 to consumer-a, but the consumer was too slow to send it.
3. Consumer-a is added to `stuckConsumers`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L263-L266
4. The next time `readMoreEntries()` is run, `getMessagesToReplayNow()` will return an empty Set because `isDispatcherStuckOnReplays` is true.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L368-L374
5. The dispatcher reads newer messages instead of the messages contained in `messagesToRedeliver`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L233-L267
6. A new message (message-3) is delivered to consumer-b.
7. Message-2 contained in messagesToRedeliver is delivered to consumer-b.
8. As a result, the order of message-2 and message-3 is reversed.

When adding a message to be redeliver to `messagesToRedeliver`, save the hash of the key that the message has. If the dispatcher attempts to send newer messages to the consumer that have a key corresponding to any one of the saved hash values, they will be added to `messagesToRedeliver` instead of being sent. This prevents messages with the same key from being out of order.

(cherry picked from commit 5aee599)
@codelipenghui codelipenghui added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Jul 19, 2021
eolivelli pushed a commit to datastax/pulsar that referenced this pull request Jul 19, 2021
…ering messages on Key_Shared subscription (apache#10762)

Messages with the same key can be out of order if message redelivery occurs on a Key_Shared subscription.

1. Suppose `PersistentDispatcherMultipleConsumers#messagesToRedeliver` contains message-1 and message-2. Message-1 will be delivered to consumer-a and message-2 will be delivered to consumer-b.
2. The dispatcher tried to send message-1 to consumer-a, but the consumer was too slow to send it.
3. Consumer-a is added to `stuckConsumers`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L263-L266
4. The next time `readMoreEntries()` is run, `getMessagesToReplayNow()` will return an empty Set because `isDispatcherStuckOnReplays` is true.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L368-L374
5. The dispatcher reads newer messages instead of the messages contained in `messagesToRedeliver`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L233-L267
6. A new message (message-3) is delivered to consumer-b.
7. Message-2 contained in messagesToRedeliver is delivered to consumer-b.
8. As a result, the order of message-2 and message-3 is reversed.

When adding a message to be redeliver to `messagesToRedeliver`, save the hash of the key that the message has. If the dispatcher attempts to send newer messages to the consumer that have a key corresponding to any one of the saved hash values, they will be added to `messagesToRedeliver` instead of being sent. This prevents messages with the same key from being out of order.

(cherry picked from commit 5aee599)
congbobo184 pushed a commit that referenced this pull request Jul 20, 2021
Cherry-picking #10762 broke the Delayed messages feature in branch-2.7.

This patch restores the method that has been dropped
codelipenghui pushed a commit that referenced this pull request Jul 23, 2021
…ering messages on Key_Shared subscription (#10762)

Messages with the same key can be out of order if message redelivery occurs on a Key_Shared subscription.

1. Suppose `PersistentDispatcherMultipleConsumers#messagesToRedeliver` contains message-1 and message-2. Message-1 will be delivered to consumer-a and message-2 will be delivered to consumer-b.
2. The dispatcher tried to send message-1 to consumer-a, but the consumer was too slow to send it.
3. Consumer-a is added to `stuckConsumers`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L263-L266
4. The next time `readMoreEntries()` is run, `getMessagesToReplayNow()` will return an empty Set because `isDispatcherStuckOnReplays` is true.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L368-L374
5. The dispatcher reads newer messages instead of the messages contained in `messagesToRedeliver`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L233-L267
6. A new message (message-3) is delivered to consumer-b.
7. Message-2 contained in messagesToRedeliver is delivered to consumer-b.
8. As a result, the order of message-2 and message-3 is reversed.

When adding a message to be redeliver to `messagesToRedeliver`, save the hash of the key that the message has. If the dispatcher attempts to send newer messages to the consumer that have a key corresponding to any one of the saved hash values, they will be added to `messagesToRedeliver` instead of being sent. This prevents messages with the same key from being out of order.

(cherry picked from commit 5aee599)
@codelipenghui codelipenghui added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Jul 23, 2021
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…ering messages on Key_Shared subscription (apache#10762)

### Motivation

Messages with the same key can be out of order if message redelivery occurs on a Key_Shared subscription.

1. Suppose `PersistentDispatcherMultipleConsumers#messagesToRedeliver` contains message-1 and message-2. Message-1 will be delivered to consumer-a and message-2 will be delivered to consumer-b.
2. The dispatcher tried to send message-1 to consumer-a, but the consumer was too slow to send it.
3. Consumer-a is added to `stuckConsumers`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L263-L266
4. The next time `readMoreEntries()` is run, `getMessagesToReplayNow()` will return an empty Set because `isDispatcherStuckOnReplays` is true.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L368-L374
5. The dispatcher reads newer messages instead of the messages contained in `messagesToRedeliver`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L233-L267
6. A new message (message-3) is delivered to consumer-b.
7. Message-2 contained in messagesToRedeliver is delivered to consumer-b.
8. As a result, the order of message-2 and message-3 is reversed.

### Modifications

When adding a message to be redeliver to `messagesToRedeliver`, save the hash of the key that the message has. If the dispatcher attempts to send newer messages to the consumer that have a key corresponding to any one of the saved hash values, they will be added to `messagesToRedeliver` instead of being sent. This prevents messages with the same key from being out of order.
poorbarcode added a commit that referenced this pull request May 19, 2023
…essage skip to avoid unnecessary consumption stuck (#20335)

### Motivation
- #7105 provide a mechanism to avoid a stuck consumer affecting the consumption of other consumers: 
  - if all consumers can not accept more messages, stop delivering messages to the client.
  - if one consumer can not accept more messages, just read new messages and deliver them to other consumers.
- #7553 provide a mechanism to fix the issue of lost order of consumption: If the consumer cannot accept any more messages, skip the consumer for the next round of message delivery because there may be messages with the same key in the replay queue.
- #10762 provide a mechanism to fix the issue of lost order of consumption: If there have any messages with the same key in the replay queue, do not deliver the new messages to this consumer.

#10762 and #7553 do the same thing and #10762 is better than #7553 , so #7553 is unnecessary. 

### Modifications
remove the mechanism provided by #7553 to avoid unnecessary consumption stuck.
poorbarcode added a commit that referenced this pull request May 19, 2023
…essage skip to avoid unnecessary consumption stuck (#20335)

- #7105 provide a mechanism to avoid a stuck consumer affecting the consumption of other consumers:
  - if all consumers can not accept more messages, stop delivering messages to the client.
  - if one consumer can not accept more messages, just read new messages and deliver them to other consumers.
- #7553 provide a mechanism to fix the issue of lost order of consumption: If the consumer cannot accept any more messages, skip the consumer for the next round of message delivery because there may be messages with the same key in the replay queue.
- #10762 provide a mechanism to fix the issue of lost order of consumption: If there have any messages with the same key in the replay queue, do not deliver the new messages to this consumer.

#10762 and #7553 do the same thing and #10762 is better than #7553 , so #7553 is unnecessary.

remove the mechanism provided by #7553 to avoid unnecessary consumption stuck.

(cherry picked from commit 1e664b7)
Technoboy- pushed a commit that referenced this pull request May 24, 2023
…essage skip to avoid unnecessary consumption stuck (#20335)

### Motivation
- #7105 provide a mechanism to avoid a stuck consumer affecting the consumption of other consumers: 
  - if all consumers can not accept more messages, stop delivering messages to the client.
  - if one consumer can not accept more messages, just read new messages and deliver them to other consumers.
- #7553 provide a mechanism to fix the issue of lost order of consumption: If the consumer cannot accept any more messages, skip the consumer for the next round of message delivery because there may be messages with the same key in the replay queue.
- #10762 provide a mechanism to fix the issue of lost order of consumption: If there have any messages with the same key in the replay queue, do not deliver the new messages to this consumer.

#10762 and #7553 do the same thing and #10762 is better than #7553 , so #7553 is unnecessary. 

### Modifications
remove the mechanism provided by #7553 to avoid unnecessary consumption stuck.
lhotari pushed a commit to datastax/pulsar that referenced this pull request May 29, 2023
…essage skip to avoid unnecessary consumption stuck (apache#20335)

- apache#7105 provide a mechanism to avoid a stuck consumer affecting the consumption of other consumers:
  - if all consumers can not accept more messages, stop delivering messages to the client.
  - if one consumer can not accept more messages, just read new messages and deliver them to other consumers.
- apache#7553 provide a mechanism to fix the issue of lost order of consumption: If the consumer cannot accept any more messages, skip the consumer for the next round of message delivery because there may be messages with the same key in the replay queue.
- apache#10762 provide a mechanism to fix the issue of lost order of consumption: If there have any messages with the same key in the replay queue, do not deliver the new messages to this consumer.

apache#10762 and apache#7553 do the same thing and apache#10762 is better than apache#7553 , so apache#7553 is unnecessary.

remove the mechanism provided by apache#7553 to avoid unnecessary consumption stuck.

(cherry picked from commit 1e664b7)
(cherry picked from commit c973603)
poorbarcode added a commit that referenced this pull request May 30, 2023
…essage skip to avoid unnecessary consumption stuck (#20335)

### Motivation
- #7105 provide a mechanism to avoid a stuck consumer affecting the consumption of other consumers:
  - if all consumers can not accept more messages, stop delivering messages to the client.
  - if one consumer can not accept more messages, just read new messages and deliver them to other consumers.
- #7553 provide a mechanism to fix the issue of lost order of consumption: If the consumer cannot accept any more messages, skip the consumer for the next round of message delivery because there may be messages with the same key in the replay queue.
- #10762 provide a mechanism to fix the issue of lost order of consumption: If there have any messages with the same key in the replay queue, do not deliver the new messages to this consumer.

#10762 and #7553 do the same thing and #10762 is better than #7553 , so #7553 is unnecessary.

### Modifications
remove the mechanism provided by #7553 to avoid unnecessary consumption stuck.

(cherry picked from commit 1e664b7)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-2.7 Archived: 2.7 is end of life cherry-picked/branch-2.8 Archived: 2.8 is end of life release/2.7.3 release/2.8.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants