From 52336c8d0253ca16d51105abc180dcc81e65a5e3 Mon Sep 17 00:00:00 2001 From: ZhangJian He Date: Sun, 24 Oct 2021 12:32:29 +0800 Subject: [PATCH] fix a typo in UnAckedMessageTracker (#12467) --- .../pulsar/client/impl/NegativeAcksTracker.java | 4 ++-- .../pulsar/client/impl/UnAckedMessageTracker.java | 11 +++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java index bbdd7864987bf..16cfa0c11ec19 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java @@ -28,7 +28,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; -import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequnceMap; +import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap; class NegativeAcksTracker { @@ -63,7 +63,7 @@ private synchronized void triggerRedelivery(Timeout t) { long now = System.nanoTime(); nackedMessages.forEach((msgId, timestamp) -> { if (timestamp < now) { - addChunkedMessageIdsAndRemoveFromSequnceMap(msgId, messagesToRedeliver, this.consumer); + addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer); messagesToRedeliver.add(msgId); } }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java index db616f20450a4..ff07156972f62 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java @@ -30,6 +30,7 @@ import java.io.Closeable; import java.util.ArrayDeque; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.Set; @@ -130,7 +131,7 @@ public void run(Timeout t) throws Exception { if (!headPartition.isEmpty()) { log.warn("[{}] {} messages have timed-out", consumerBase, headPartition.size()); headPartition.forEach(messageId -> { - addChunkedMessageIdsAndRemoveFromSequnceMap(messageId, messageIds, consumerBase); + addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase); messageIds.add(messageId); messageIdPartitionMap.remove(messageId); }); @@ -150,14 +151,12 @@ public void run(Timeout t) throws Exception { }, this.tickDurationInMs, TimeUnit.MILLISECONDS); } - public static void addChunkedMessageIdsAndRemoveFromSequnceMap(MessageId messageId, Set messageIds, - ConsumerBase consumerBase) { + public static void addChunkedMessageIdsAndRemoveFromSequenceMap(MessageId messageId, Set messageIds, + ConsumerBase consumerBase) { if (messageId instanceof MessageIdImpl) { MessageIdImpl[] chunkedMsgIds = consumerBase.unAckedChunkedMessageIdSequenceMap.get((MessageIdImpl) messageId); if (chunkedMsgIds != null && chunkedMsgIds.length > 0) { - for (MessageIdImpl msgId : chunkedMsgIds) { - messageIds.add(msgId); - } + Collections.addAll(messageIds, chunkedMsgIds); } consumerBase.unAckedChunkedMessageIdSequenceMap.remove((MessageIdImpl) messageId); }