Skip to content

Commit

Permalink
fix a typo in UnAckedMessageTracker (#12467)
Browse files Browse the repository at this point in the history
  • Loading branch information
shoothzj committed Oct 24, 2021
1 parent ead0911 commit 52336c8
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
Expand Up @@ -28,7 +28,7 @@

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequnceMap;
import static org.apache.pulsar.client.impl.UnAckedMessageTracker.addChunkedMessageIdsAndRemoveFromSequenceMap;

class NegativeAcksTracker {

Expand Down Expand Up @@ -63,7 +63,7 @@ private synchronized void triggerRedelivery(Timeout t) {
long now = System.nanoTime();
nackedMessages.forEach((msgId, timestamp) -> {
if (timestamp < now) {
addChunkedMessageIdsAndRemoveFromSequnceMap(msgId, messagesToRedeliver, this.consumer);
addChunkedMessageIdsAndRemoveFromSequenceMap(msgId, messagesToRedeliver, this.consumer);
messagesToRedeliver.add(msgId);
}
});
Expand Down
Expand Up @@ -30,6 +30,7 @@

import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
Expand Down Expand Up @@ -130,7 +131,7 @@ public void run(Timeout t) throws Exception {
if (!headPartition.isEmpty()) {
log.warn("[{}] {} messages have timed-out", consumerBase, headPartition.size());
headPartition.forEach(messageId -> {
addChunkedMessageIdsAndRemoveFromSequnceMap(messageId, messageIds, consumerBase);
addChunkedMessageIdsAndRemoveFromSequenceMap(messageId, messageIds, consumerBase);
messageIds.add(messageId);
messageIdPartitionMap.remove(messageId);
});
Expand All @@ -150,14 +151,12 @@ public void run(Timeout t) throws Exception {
}, this.tickDurationInMs, TimeUnit.MILLISECONDS);
}

public static void addChunkedMessageIdsAndRemoveFromSequnceMap(MessageId messageId, Set<MessageId> messageIds,
ConsumerBase<?> consumerBase) {
public static void addChunkedMessageIdsAndRemoveFromSequenceMap(MessageId messageId, Set<MessageId> messageIds,
ConsumerBase<?> consumerBase) {
if (messageId instanceof MessageIdImpl) {
MessageIdImpl[] chunkedMsgIds = consumerBase.unAckedChunkedMessageIdSequenceMap.get((MessageIdImpl) messageId);
if (chunkedMsgIds != null && chunkedMsgIds.length > 0) {
for (MessageIdImpl msgId : chunkedMsgIds) {
messageIds.add(msgId);
}
Collections.addAll(messageIds, chunkedMsgIds);
}
consumerBase.unAckedChunkedMessageIdSequenceMap.remove((MessageIdImpl) messageId);
}
Expand Down

0 comments on commit 52336c8

Please sign in to comment.