Skip to content

Commit

Permalink
Fix data lost when using earliest position to subscribe to a topic (#…
Browse files Browse the repository at this point in the history
…11547)

When subscribing to a topic with earliest position, the ManagedLedger always using
the last position to init the cursor. If the no cursor update happens and the broker restarts
or topic been unloaded or the topic ownership changed, will lead to the data lost, the unacked messages
will not redeliver to the consumer again.

The root cause is if we are using the last position to init the cursor, the cursor will update the
mark delete position as the last position first to the Zookeeper, if the cursor can't a chance to
update the mark delete position again before been closed, when recoving the cursor again, will using
the mark delete posiion that stored in the Zookeeper, so the issue happens.

The fix is to add check for the initial position of the cursor, if we are using the Earliest as the initial position,
use the first position to init the cursor.

The new added test can cover the changes, and without this change, the test would failed.

(cherry picked from commit 035a6ba)
  • Loading branch information
codelipenghui committed Aug 4, 2021
1 parent 454da89 commit cf526bf
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 1 deletion.
Expand Up @@ -903,7 +903,8 @@ public synchronized void asyncOpenCursor(final String cursorName, final InitialP
final ManagedCursorImpl cursor = new ManagedCursorImpl(bookKeeper, config, this, cursorName);
CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorFuture);
cursor.initialize(getLastPosition(), properties, new VoidCallback() {
PositionImpl position = InitialPosition.Earliest == initialPosition ? getFirstPosition() : getLastPosition();
cursor.initialize(position, properties, new VoidCallback() {
@Override
public void operationComplete() {
log.info("[{}] Opened new cursor: {}", name, cursor);
Expand Down
Expand Up @@ -30,6 +30,7 @@

import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
Expand All @@ -40,6 +41,7 @@
import com.google.common.collect.Sets;

import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertEquals;

Expand Down Expand Up @@ -247,4 +249,56 @@ public void testConsumerWithPermitReceiveBatchMessages() throws Exception {
assertEquals(consumer2.getTotalIncomingMessages(), queueSize);
log.info("-- Exiting {} test --", methodName);
}

@Test(timeOut = 30000)
public void testMessageRedeliveryAfterUnloadedWithEarliestPosition() throws Exception {

final String subName = "my-subscriber-name";
final String topicName = "testMessageRedeliveryAfterUnloadedWithEarliestPosition" + UUID.randomUUID();
final int messages = 100;

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(false)
.create();

List<CompletableFuture<MessageId>> sendResults = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
sendResults.add(producer.sendAsync("Hello - " + i));
}
producer.flush();

FutureUtil.waitForAll(sendResults).get();

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

List<Message<String>> received = new ArrayList<>(messages);
for (int i = 0; i < messages; i++) {
received.add(consumer.receive());
}

assertEquals(received.size(), messages);
assertNull(consumer.receive(1, TimeUnit.SECONDS));

admin.topics().unload(topicName);

// The consumer does not ack any messages, so after unloading the topic,
// the consumer should get the unacked messages again

received.clear();
for (int i = 0; i < messages; i++) {
received.add(consumer.receive());
}

assertEquals(received.size(), messages);
assertNull(consumer.receive(1, TimeUnit.SECONDS));

consumer.close();
producer.close();
}
}

0 comments on commit cf526bf

Please sign in to comment.