Skip to content

Commit

Permalink
[fix] [broker] In Key_Shared mode: remove unnecessary mechanisms of m…
Browse files Browse the repository at this point in the history
…essage skip to avoid unnecessary consumption stuck (#20335)

- #7105 provide a mechanism to avoid a stuck consumer affecting the consumption of other consumers:
  - if all consumers can not accept more messages, stop delivering messages to the client.
  - if one consumer can not accept more messages, just read new messages and deliver them to other consumers.
- #7553 provide a mechanism to fix the issue of lost order of consumption: If the consumer cannot accept any more messages, skip the consumer for the next round of message delivery because there may be messages with the same key in the replay queue.
- #10762 provide a mechanism to fix the issue of lost order of consumption: If there have any messages with the same key in the replay queue, do not deliver the new messages to this consumer.

#10762 and #7553 do the same thing and #10762 is better than #7553 , so #7553 is unnecessary.

remove the mechanism provided by #7553 to avoid unnecessary consumption stuck.

(cherry picked from commit 1e664b7)
  • Loading branch information
poorbarcode committed May 19, 2023
1 parent 8a1a4be commit c973603
Showing 1 changed file with 1 addition and 23 deletions.
Expand Up @@ -68,17 +68,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
*/
private final LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers;

private final Set<Consumer> stuckConsumers;
private final Set<Consumer> nextStuckConsumers;

PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) {
super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());

this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>();
this.stuckConsumers = new HashSet<>();
this.nextStuckConsumers = new HashSet<>();
this.keySharedMode = ksm.getKeySharedMode();
switch (this.keySharedMode) {
case AUTO_SPLIT:
Expand Down Expand Up @@ -208,8 +203,6 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
}
}

nextStuckConsumers.clear();

final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
groupedEntries.clear();
final Map<Consumer, Set<Integer>> consumerStickyKeyHashesMap = new HashMap<>();
Expand Down Expand Up @@ -311,14 +304,11 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
}
}

stuckConsumers.clear();

if (totalMessagesSent == 0 && (recentlyJoinedConsumers == null || recentlyJoinedConsumers.isEmpty())) {
// This means, that all the messages we've just read cannot be dispatched right now.
// This condition can only happen when:
// 1. We have consumers ready to accept messages (otherwise the would not haven been triggered)
// 2. All keys in the current set of messages are routing to consumers that are currently busy
// and stuck is not caused by stuckConsumers
//
// The solution here is to move on and read next batch of messages which might hopefully contain
// also keys meant for other consumers.
Expand All @@ -327,10 +317,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
// ahead in the stream while the new consumers are not ready to accept the new messages,
// therefore would be most likely only increase the distance between read-position and mark-delete
// position.
if (!nextStuckConsumers.isEmpty()) {
isDispatcherStuckOnReplays = true;
stuckConsumers.addAll(nextStuckConsumers);
}
isDispatcherStuckOnReplays = true;
// readMoreEntries should run regardless whether or not stuck is caused by
// stuckConsumers for avoid stopping dispatch.
topic.getBrokerService().executor().execute(() -> readMoreEntries());
Expand All @@ -346,8 +333,6 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages,
ReadType readType, Set<Integer> stickyKeyHashes) {
if (maxMessages == 0) {
// the consumer was stuck
nextStuckConsumers.add(consumer);
return 0;
}
if (readType == ReadType.Normal && stickyKeyHashes != null
Expand All @@ -364,13 +349,6 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> en
// At this point, all the old messages were already consumed and this consumer
// is now ready to receive any message
if (maxReadPosition == null) {
// stop to dispatch by stuckConsumers
if (stuckConsumers.contains(consumer)) {
if (log.isDebugEnabled()) {
log.debug("[{}] stop to dispatch by stuckConsumers, consumer: {}", name, consumer);
}
return 0;
}
// The consumer has not recently joined, so we can send all messages
return maxMessages;
}
Expand Down

0 comments on commit c973603

Please sign in to comment.