Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transaction] Fix topicTransactionBuffer handle null snapshot #12758

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -537,14 +537,16 @@ public void run() {
try {
boolean hasSnapshot = false;
while (reader.hasMoreEvents()) {
hasSnapshot = true;
Message<TransactionBufferSnapshot> message = reader.readNext();
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
callBack.handleSnapshot(transactionBufferSnapshot);
this.startReadCursorPosition = PositionImpl.get(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null) {
hasSnapshot = true;
callBack.handleSnapshot(transactionBufferSnapshot);
this.startReadCursorPosition = PositionImpl.get(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
}
}
}
if (!hasSnapshot) {
Expand Down
Expand Up @@ -362,11 +362,11 @@ public void completed(Exception e, long ledgerId, long entryId) {
}

@Test
public void testMaxReadPositionForNormalPublish() throws Exception{
public void testMaxReadPositionForNormalPublish() throws Exception {
String topic = "persistent://" + NAMESPACE1 + "/NormalPublish";
admin.topics().createNonPartitionedTopic(topic);
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(topic, false).get().get();
.getTopic(topic, false).get().get();

TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
PulsarClient noTxnClient = PulsarClient.builder().enableTransaction(false)
Expand Down Expand Up @@ -394,7 +394,7 @@ public void testMaxReadPositionForNormalPublish() throws Exception{
.sendTimeout(0, TimeUnit.SECONDS)
.create();

Awaitility.await().untilAsserted(() ->Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
//test publishing txn messages will not change maxReadPosition if don`t commit or abort.
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
Expand Down Expand Up @@ -433,8 +433,7 @@ public void testMaxReadPositionForNormalPublish() throws Exception{
PositionImpl position5 = (PositionImpl) maxReadPositionField.get(topicTransactionBuffer);
Assert.assertEquals(position5.getLedgerId(), messageId4.getLedgerId());
Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId());

}
}

@Test
public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
Expand All @@ -447,9 +446,13 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topic)
.create();
producer.newMessage().send();
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

producer.newMessage(txn).value("test").send();

PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(topic, false).get().get();
.getTopic("persistent://" + topic, false).get().get();
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);

ManagedCursor managedCursor = mock(ManagedCursor.class);
Expand All @@ -468,10 +471,6 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
managedCursors.removeCursor("transaction-buffer-sub");
managedCursors.add(managedCursor);

TransactionBuffer buffer1 = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer1.getStats().state, "Ready"));

doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
Expand Down
Expand Up @@ -342,6 +342,23 @@ private void txnAckTest(boolean batchEnable, int maxBatchSize,
}
}

@Test
public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception {
String topicOne = "persistent://" + NAMESPACE1 + "/topic-one";
String topicTwo = "persistent://" + NAMESPACE1 + "/topic-two";
String sub = "test";
admin.topics().createNonPartitionedTopic(topicOne);
admin.topics().createSubscription(topicOne, "test", MessageId.earliest);
admin.topics().delete(topicOne);

Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicTwo).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicTwo).subscriptionName(sub).subscribe();
String content = "test";
producer.send(content);
assertEquals(consumer.receive().getValue(), content);
}

@Test
public void txnMessageAckTest() throws Exception {
String topic = TOPIC_MESSAGE_ACK_TEST;
Expand Down