From cf526bf88a49c015f0656b3bc59d4e7ca5440c41 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Wed, 4 Aug 2021 08:19:20 +0800 Subject: [PATCH] Fix data lost when using earliest position to subscribe to a topic (#11547) When subscribing to a topic with earliest position, the ManagedLedger always using the last position to init the cursor. If the no cursor update happens and the broker restarts or topic been unloaded or the topic ownership changed, will lead to the data lost, the unacked messages will not redeliver to the consumer again. The root cause is if we are using the last position to init the cursor, the cursor will update the mark delete position as the last position first to the Zookeeper, if the cursor can't a chance to update the mark delete position again before been closed, when recoving the cursor again, will using the mark delete posiion that stored in the Zookeeper, so the issue happens. The fix is to add check for the initial position of the cursor, if we are using the Earliest as the initial position, use the first position to init the cursor. The new added test can cover the changes, and without this change, the test would failed. (cherry picked from commit 035a6bab7af8ed17f811c16b518dc02eea2435a1) --- .../mledger/impl/ManagedLedgerImpl.java | 3 +- .../client/api/ConsumerRedeliveryTest.java | 54 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index fdf8a95f9f95d..dcddc43237532 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -903,7 +903,8 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, config, this, cursorName); CompletableFuture cursorFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorFuture); - cursor.initialize(getLastPosition(), properties, new VoidCallback() { + PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition(); + cursor.initialize(position, properties, new VoidCallback() { @Override public void operationComplete() { log.info("[{}] Opened new cursor: {}", name, cursor); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index ed9f5482d1866..9b154690d7e0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; @@ -40,6 +41,7 @@ import com.google.common.collect.Sets; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertEquals; @@ -247,4 +249,56 @@ public void testConsumerWithPermitReceiveBatchMessages() throws Exception { assertEquals(consumer2.getTotalIncomingMessages(), queueSize); log.info("-- Exiting {} test --", methodName); } + + @Test(timeOut = 30000) + public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() throws Exception { + + final String subName = "my-subscriber-name"; + final String topicName = "testMessageRedeliveryAfterUnloadedWithEarliestPosition" + UUID.randomUUID(); + final int messages = 100; + + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(false) + .create(); + + List> sendResults = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + sendResults.add(producer.sendAsync("Hello - " + i)); + } + producer.flush(); + + FutureUtil.waitForAll(sendResults).get(); + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName(subName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + List> received = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + received.add(consumer.receive()); + } + + assertEquals(received.size(), messages); + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + + admin.topics().unload(topicName); + + // The consumer does not ack any messages, so after unloading the topic, + // the consumer should get the unacked messages again + + received.clear(); + for (int i = 0; i < messages; i++) { + received.add(consumer.receive()); + } + + assertEquals(received.size(), messages); + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + + consumer.close(); + producer.close(); + } }