diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 11fcec74ed4a3..bc5e9b86842ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -122,6 +122,7 @@ public class Consumer { private static final AtomicIntegerFieldUpdater AVG_MESSAGES_PER_ENTRY = AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "avgMessagesPerEntry"); private volatile int avgMessagesPerEntry = 1000; + private static final long [] EMPTY_ACK_SET = new long[0]; private static final double avgPercent = 0.9; private boolean preciseDispatcherFlowControl; @@ -413,10 +414,10 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map consumer = (ConsumerImpl) pulsarClient + .newConsumer(Schema.BYTES) + .topic(topicName) + .isAckReceiptEnabled(true) + .subscriptionName(subscriptionName) + .subscriptionType(subType) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + @Cleanup + Producer producer = pulsarClient + .newProducer() + .enableBatching(enableBatch) + .topic(topicName) + .batchingMaxMessages(10) + .create(); + + CountDownLatch countDownLatch = new CountDownLatch(messageCount); + for (int i = 0; i < messageCount; i++) { + producer.sendAsync((i + "").getBytes()).thenRun(countDownLatch::countDown); + } + + countDownLatch.await(); + + for (int i = 0; i < messageCount; i++) { + Message message = consumer.receive(); + // wait for receipt + if (i < messageCount / 2) { + consumer.acknowledgeAsync(message.getMessageId()).get(); + } + } + + String topic = TopicName.get(topicName).toString(); + PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService() + .getTopic(topic, false).get().get().getSubscription(subscriptionName); + + Awaitility.await().untilAsserted(() -> { + if (subType == SubscriptionType.Shared) { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2); + } else { + assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0); + } + }); + } }