diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index fdf8a95f9f95d..dcddc43237532 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -903,7 +903,8 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, config, this, cursorName); CompletableFuture cursorFuture = new CompletableFuture<>(); uninitializedCursors.put(cursorName, cursorFuture); - cursor.initialize(getLastPosition(), properties, new VoidCallback() { + PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition(); + cursor.initialize(position, properties, new VoidCallback() { @Override public void operationComplete() { log.info("[{}] Opened new cursor: {}", name, cursor); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java index ed9f5482d1866..9b154690d7e0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerRedeliveryTest.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; @@ -40,6 +41,7 @@ import com.google.common.collect.Sets; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertEquals; @@ -247,4 +249,56 @@ public void testConsumerWithPermitReceiveBatchMessages() throws Exception { assertEquals(consumer2.getTotalIncomingMessages(), queueSize); log.info("-- Exiting {} test --", methodName); } + + @Test(timeOut = 30000) + public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() throws Exception { + + final String subName = "my-subscriber-name"; + final String topicName = "testMessageRedeliveryAfterUnloadedWithEarliestPosition" + UUID.randomUUID(); + final int messages = 100; + + Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(false) + .create(); + + List> sendResults = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + sendResults.add(producer.sendAsync("Hello - " + i)); + } + producer.flush(); + + FutureUtil.waitForAll(sendResults).get(); + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .subscriptionName(subName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + List> received = new ArrayList<>(messages); + for (int i = 0; i < messages; i++) { + received.add(consumer.receive()); + } + + assertEquals(received.size(), messages); + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + + admin.topics().unload(topicName); + + // The consumer does not ack any messages, so after unloading the topic, + // the consumer should get the unacked messages again + + received.clear(); + for (int i = 0; i < messages; i++) { + received.add(consumer.receive()); + } + + assertEquals(received.size(), messages); + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + + consumer.close(); + producer.close(); + } }