Skip to content

Commit

Permalink
Fix NPE of cumulative ack mode and incorrect unack message count (#14021
Browse files Browse the repository at this point in the history
)

link #13383
## Motivation
#13383 has fixed  the batch message ack does not decrease the unacked-msg count, but in cumulative ack mode also decrease, it will use pendingAcks, but in cumulative ack, this will not init.

![image](https://user-images.githubusercontent.com/39078850/151622041-7fb0acc5-32fd-4140-82d7-8c75d2a6aef5.png)
![image](https://user-images.githubusercontent.com/39078850/151622106-bf75f3fa-84d5-4099-99f4-50f4dddd43a2.png)

If ack the batch index one by one, the last ack of a batch will decrease unack message with `batchSize`
```
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 1
================ batch size -> 10
================ message id -> 3:1
================ acked count -> 9
================ batch size -> 10
```

### Modifications
add judge `Subscription.isIndividualAckMode(subType)` when get ackCount.
If the ack from consumer don't have ackset, we should treat it as empty ackset to calculate the ack count with the currently ackset.
  • Loading branch information
congbobo184 committed Jan 29, 2022
1 parent 823bcd6 commit 618f17c
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 3 deletions.
Expand Up @@ -122,6 +122,7 @@ public class Consumer {
private static final AtomicIntegerFieldUpdater<Consumer> AVG_MESSAGES_PER_ENTRY =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "avgMessagesPerEntry");
private volatile int avgMessagesPerEntry = 1000;
private static final long [] EMPTY_ACK_SET = new long[0];

private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;
Expand Down Expand Up @@ -413,10 +414,10 @@ private CompletableFuture<Void> individualAckNormal(CommandAck ack, Map<String,
}
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
if (isAcknowledgmentAtBatchIndexLevelEnabled) {
if (Subscription.isIndividualAckMode(subType) && isAcknowledgmentAtBatchIndexLevelEnabled) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
ackedCount = batchSize - BitSet.valueOf(cursorAckSet).cardinality();
ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET);
} else {
ackedCount = batchSize;
}
Expand Down Expand Up @@ -521,7 +522,7 @@ private long getBatchSize(MessageIdData msgId) {

private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets) {
long ackedCount = 0;
if (isAcknowledgmentAtBatchIndexLevelEnabled) {
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet);
Expand Down
Expand Up @@ -19,21 +19,27 @@
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.assertEquals;

Expand Down Expand Up @@ -105,4 +111,66 @@ public void testBatchMessageAck() {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
});
}

@DataProvider(name = "testSubTypeAndEnableBatch")
public Object[][] testSubTypeAndEnableBatch() {
return new Object[][] { { SubscriptionType.Shared, Boolean.TRUE },
{ SubscriptionType.Failover, Boolean.TRUE },
{ SubscriptionType.Shared, Boolean.FALSE },
{ SubscriptionType.Failover, Boolean.FALSE }};
}


@Test(dataProvider="testSubTypeAndEnableBatch")
private void testDecreaseUnAckMessageCountWithAckReceipt(SubscriptionType subType,
boolean enableBatch) throws Exception {

final int messageCount = 50;
final String topicName = "persistent://prop/ns-abc/testDecreaseWithAckReceipt" + UUID.randomUUID();
final String subscriptionName = "sub-batch-1";
@Cleanup
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
.newConsumer(Schema.BYTES)
.topic(topicName)
.isAckReceiptEnabled(true)
.subscriptionName(subscriptionName)
.subscriptionType(subType)
.enableBatchIndexAcknowledgment(true)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.enableBatching(enableBatch)
.topic(topicName)
.batchingMaxMessages(10)
.create();

CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
producer.sendAsync((i + "").getBytes()).thenRun(countDownLatch::countDown);
}

countDownLatch.await();

for (int i = 0; i < messageCount; i++) {
Message<byte[]> message = consumer.receive();
// wait for receipt
if (i < messageCount / 2) {
consumer.acknowledgeAsync(message.getMessageId()).get();
}
}

String topic = TopicName.get(topicName).toString();
PersistentSubscription persistentSubscription = (PersistentSubscription) pulsar.getBrokerService()
.getTopic(topic, false).get().get().getSubscription(subscriptionName);

Awaitility.await().untilAsserted(() -> {
if (subType == SubscriptionType.Shared) {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), messageCount / 2);
} else {
assertEquals(persistentSubscription.getConsumers().get(0).getUnackedMessages(), 0);
}
});
}
}

0 comments on commit 618f17c

Please sign in to comment.