From 75cec6d9816c123bd644d9b85d86a9a35dca0ba4 Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Fri, 12 Nov 2021 13:11:42 +0800 Subject: [PATCH] [Transaction] Fix topicTransactionBuffer handle null snapshot (#12758) fix https://github.com/apache/pulsar/issues/12754 Now when delete topic, we will write a null value to Transaction buffer snapshot topic, other topic recover by this transaction buffer snapshot system topic, will produce NPE judge NPE logic (cherry picked from commit c90c89b07ce544b92becedfaa7a0090b4b73edd2) --- .../buffer/impl/TopicTransactionBuffer.java | 14 ++++++++------ .../client/impl/TransactionEndToEndTest.java | 17 +++++++++++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 220b4327cceac..12d5ee42cc7c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -472,12 +472,14 @@ public void run() { try { while (reader.hasMoreEvents()) { Message message = reader.readNext(); - TransactionBufferSnapshot transactionBufferSnapshot = message.getValue(); - if (topic.getName().equals(transactionBufferSnapshot.getTopicName())) { - callBack.handleSnapshot(transactionBufferSnapshot); - this.startReadCursorPosition = PositionImpl.get( - transactionBufferSnapshot.getMaxReadPositionLedgerId(), - transactionBufferSnapshot.getMaxReadPositionEntryId()); + if (topic.getName().equals(message.getKey())) { + TransactionBufferSnapshot transactionBufferSnapshot = message.getValue(); + if (transactionBufferSnapshot != null) { + callBack.handleSnapshot(transactionBufferSnapshot); + this.startReadCursorPosition = PositionImpl.get( + transactionBufferSnapshot.getMaxReadPositionLedgerId(), + transactionBufferSnapshot.getMaxReadPositionEntryId()); + } } } } catch (PulsarClientException pulsarClientException) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index f971b5a770cca..d5ae61c012e50 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -377,6 +377,23 @@ private void txnAckTest(boolean batchEnable, int maxBatchSize, } } + @Test + public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception { + String topicOne = "persistent://" + NAMESPACE1 + "/topic-one"; + String topicTwo = "persistent://" + NAMESPACE1 + "/topic-two"; + String sub = "test"; + admin.topics().createNonPartitionedTopic(topicOne); + admin.topics().createSubscription(topicOne, "test", MessageId.earliest); + admin.topics().delete(topicOne); + + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicTwo).create(); + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicTwo).subscriptionName(sub).subscribe(); + String content = "test"; + producer.send(content); + assertEquals(consumer.receive().getValue(), content); + } + @Test public void txnMessageAckTest() throws Exception { String topic = TOPIC_MESSAGE_ACK_TEST;