Skip to content

Commit

Permalink
Fix batch ack count is negtive issue. (#14288)
Browse files Browse the repository at this point in the history
### Motivation

As #13383 fixed the batch ack issue. we find that the unack-msg count could be negative(#14246). At first, we think it was the normal case caused by msg redelivery.  But after diving into the logic, we find it's a bug.

The test is copy from #14246 :

```
for (int i = 0; i < 50; i++) {
      Message<String> msg = consumer.receive();
      if (i % 2 == 0) {
           consumer.acknowledgeAsync(msg);
       } else {
            consumer.negativeAcknowledge(msg);
       }
}
```
When msg is `negativeAcknowledge`,  Consumer#redeliverUnacknowledgedMessages will invoke:
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L900-L912

When calculating `totalRedeliveryMessages`, it must check `pendingAcks` contains this message. and remove from `pendingAcks` after that.  (Dispatch messages will add messages to pendingAcks)
So the above test may exist that when `negativeAcknowledge` first and then `acknowledgeAsync`.
`acknowledgeAsync` mapped to `Consumer#individualAckNormal` and decrease unack-msg in :
https://github.com/apache/pulsar/blob/b22f70658927e07e3726d32290065f47313070b9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java#L543-L561
It doesn't check `pendingAcks`. this is the root cause. Should move line 556 to 545.

(cherry picked from commit 6b828b4)
  • Loading branch information
Technoboy- authored and codelipenghui committed Feb 16, 2022
1 parent 59fdd43 commit 48dbf01
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 6 deletions.
Expand Up @@ -526,7 +526,8 @@ private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl positio

private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets) {
long ackedCount = 0;
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) {
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)
&& pendingAcks.get(position.getLedgerId(), position.getEntryId()) != null) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet);
Expand All @@ -537,7 +538,7 @@ private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long
int currentCardinality = cursorBitSet.cardinality();
ackedCount = lastCardinality - currentCardinality;
cursorBitSet.recycle();
} else if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) != null) {
} else {
ackedCount = batchSize - BitSet.valueOf(ackSets).cardinality();
}
}
Expand All @@ -558,6 +559,7 @@ private long getUnAckedCountForBatchIndexLevelEnabled(PositionImpl position, lon
if (cursorAckSet != null) {
BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet);
unAckedCount = cursorBitSet.cardinality();
cursorBitSet.recycle();
}
}
return unAckedCount;
Expand Down
Expand Up @@ -19,24 +19,24 @@
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;

@Test(groups = "broker")
Expand All @@ -56,7 +56,8 @@ public void testBatchMessageAck() {
final String topicName = "persistent://prop/ns-abc/batchMessageAck-" + UUID.randomUUID();
final String subscriptionName = "sub-batch-1";

ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient
@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
Expand All @@ -66,6 +67,7 @@ public void testBatchMessageAck() {
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topicName)
Expand Down Expand Up @@ -107,4 +109,90 @@ public void testBatchMessageAck() {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
});
}

@Test
public void testBatchMessageMultiNegtiveAck() throws Exception{
final String topicName = "persistent://prop/ns-abc/batchMessageMultiNegtiveAck-" + UUID.randomUUID();
final String subscriptionName = "sub-negtive-1";

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(10)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient
.newProducer(Schema.STRING)
.topic(topicName)
.batchingMaxMessages(20)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.enableBatching(true)
.create();

final int N = 20;
for (int i = 0; i < N; i++) {
String value = "test-" + i;
producer.sendAsync(value);
}
producer.flush();
for (int i = 0; i < N; i++) {
Message<String> msg = consumer.receive();
if (i % 2 == 0) {
consumer.acknowledgeAsync(msg);
} else {
consumer.negativeAcknowledge(msg);
}
}
Awaitility.await().untilAsserted(() -> {
long unackedMessages = admin.topics().getStats(topicName).getSubscriptions().get(subscriptionName)
.getUnackedMessages();
assertEquals(unackedMessages, 10);
});

// Test negtive ack with sleep
final String topicName2 = "persistent://prop/ns-abc/batchMessageMultiNegtiveAck2-" + UUID.randomUUID();
final String subscriptionName2 = "sub-negtive-2";
@Cleanup
Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName2)
.subscriptionName(subscriptionName2)
.subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(10)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.subscribe();
@Cleanup
Producer<String> producer2 = pulsarClient
.newProducer(Schema.STRING)
.topic(topicName2)
.batchingMaxMessages(20)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.enableBatching(true)
.create();

for (int i = 0; i < N; i++) {
String value = "test-" + i;
producer2.sendAsync(value);
}
producer2.flush();
for (int i = 0; i < N; i++) {
Message<String> msg = consumer2.receive();
if (i % 2 == 0) {
consumer.acknowledgeAsync(msg);
} else {
consumer.negativeAcknowledge(msg);
Thread.sleep(100);
}
}
Awaitility.await().untilAsserted(() -> {
long unackedMessages = admin.topics().getStats(topicName).getSubscriptions().get(subscriptionName)
.getUnackedMessages();
assertEquals(unackedMessages, 10);
});
}
}

0 comments on commit 48dbf01

Please sign in to comment.