Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] In Key_Shared mode: remove unnecessary mechanisms of message skip to avoid unnecessary consumption stuck #20335

Merged
merged 1 commit into from May 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -71,17 +71,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
*/
private final LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers;

private final Set<Consumer> stuckConsumers;
private final Set<Consumer> nextStuckConsumers;

PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) {
super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());

this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>();
this.stuckConsumers = new HashSet<>();
this.nextStuckConsumers = new HashSet<>();
this.keySharedMode = ksm.getKeySharedMode();
switch (this.keySharedMode) {
case AUTO_SPLIT:
Expand Down Expand Up @@ -226,8 +221,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}
}

nextStuckConsumers.clear();

final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
groupedEntries.clear();
final Map<Consumer, Set<Integer>> consumerStickyKeyHashesMap = new HashMap<>();
Expand Down Expand Up @@ -318,14 +311,11 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

stuckConsumers.clear();

if (totalMessagesSent == 0 && (recentlyJoinedConsumers == null || recentlyJoinedConsumers.isEmpty())) {
// This means, that all the messages we've just read cannot be dispatched right now.
// This condition can only happen when:
// 1. We have consumers ready to accept messages (otherwise the would not haven been triggered)
// 2. All keys in the current set of messages are routing to consumers that are currently busy
// and stuck is not caused by stuckConsumers
//
// The solution here is to move on and read next batch of messages which might hopefully contain
// also keys meant for other consumers.
Expand All @@ -334,10 +324,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
// ahead in the stream while the new consumers are not ready to accept the new messages,
// therefore would be most likely only increase the distance between read-position and mark-delete
// position.
if (!nextStuckConsumers.isEmpty()) {
isDispatcherStuckOnReplays = true;
stuckConsumers.addAll(nextStuckConsumers);
}
isDispatcherStuckOnReplays = true;
return true;
} else if (currentThreadKeyNumber == 0) {
return true;
Expand All @@ -348,8 +335,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages,
ReadType readType, Set<Integer> stickyKeyHashes) {
if (maxMessages == 0) {
// the consumer was stuck
nextStuckConsumers.add(consumer);
return 0;
}
if (readType == ReadType.Normal && stickyKeyHashes != null
Expand All @@ -366,13 +351,6 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> en
// At this point, all the old messages were already consumed and this consumer
// is now ready to receive any message
if (maxReadPosition == null) {
// stop to dispatch by stuckConsumers
if (stuckConsumers.contains(consumer)) {
if (log.isDebugEnabled()) {
log.debug("[{}] stop to dispatch by stuckConsumers, consumer: {}", name, consumer);
}
return 0;
}
// The consumer has not recently joined, so we can send all messages
return maxMessages;
}
Expand Down