From 618f17c155a8e28a99bbcc5e26f9ec3a6c7d9b08 Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Sun, 30 Jan 2022 01:08:11 +0800 Subject: [PATCH] Fix NPE of cumulative ack mode and incorrect unack message count (#14021) link https://github.com/apache/pulsar/pull/13383 ## Motivation #13383 has fixed the batch message ack does not decrease the unacked-msg count, but in cumulative ack mode also decrease, it will use pendingAcks, but in cumulative ack, this will not init. ![image](https://user-images.githubusercontent.com/39078850/151622041-7fb0acc5-32fd-4140-82d7-8c75d2a6aef5.png) ![image](https://user-images.githubusercontent.com/39078850/151622106-bf75f3fa-84d5-4099-99f4-50f4dddd43a2.png) If ack the batch index one by one, the last ack of a batch will decrease unack message with `batchSize` ``` ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 1 ================ batch size -> 10 ================ message id -> 3:1 ================ acked count -> 9 ================ batch size -> 10 ``` ### Modifications add judge `Subscription.isIndividualAckMode(subType)` when get ackCount. If the ack from consumer don't have ackset, we should treat it as empty ackset to calculate the ack count with the currently ackset. --- .../pulsar/broker/service/Consumer.java | 7 +- .../BatchMessageWithBatchIndexLevelTest.java | 68 +++++++++++++++++++ 2 files changed, 72 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 11fcec74ed4a3..bc5e9b86842ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -122,6 +122,7 @@ public class Consumer { private static final AtomicIntegerFieldUpdater AVG_MESSAGES_PER_ENTRY = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "avgMessagesPerEntry"); private volatile int avgMessagesPerEntry = 1000; + private static final long [] EMPTY_ACK_SET = new long[0]; private static final double avgPercent = 0.9; private boolean preciseDispatcherFlowControl; @@ -413,10 +414,10 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map consumer = (ConsumerImpl) pulsarClient + .newConsumer(Schema.BYTES) + .topic(topicName) + .isAckReceiptEnabled(true) + .subscriptionName(subscriptionName) + .subscriptionType(subType) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient + .newProducer() + .enableBatching(enableBatch) + .topic(topicName) + .batchingMaxMessages(10) + .create(); + + CountDownLatch countDownLatch = new CountDownLatch(messageCount); + for (int i = 0; i < messageCount; i++) { + producer.sendAsync((i + "").getBytes()).thenRun(countDownLatch::countDown); + } + + countDownLatch.await(); + + for (int i = 0; i < messageCount; i++) { + Message message = consumer.receive(); + // wait for receipt + if (i < messageCount / 2) { + consumer.acknowledgeAsync(message.getMessageId()).get(); + } + } + + String topic = TopicName.get(topicName).toString(); + PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService() + .getTopic(topic, false).get().get().getSubscription(subscriptionName); + + Awaitility.await().untilAsserted(() -> { + if (subType == SubscriptionType.Shared) { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); + } else { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0); + } + }); + } }