From 190e6504ad6460c46cec36eeb80ed94e3dde409a Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Fri, 12 Nov 2021 13:11:42 +0800 Subject: [PATCH] [Transaction] Fix topicTransactionBuffer handle null snapshot (#12758) fix https://github.com/apache/pulsar/issues/12754 Now when delete topic, we will write a null value to Transaction buffer snapshot topic, other topic recover by this transaction buffer snapshot system topic, will produce NPE judge NPE logic (cherry picked from commit c90c89b07ce544b92becedfaa7a0090b4b73edd2) (cherry picked from commit 75cec6d9816c123bd644d9b85d86a9a35dca0ba4) --- .../buffer/impl/TopicTransactionBuffer.java | 14 ++++++++------ .../client/impl/TransactionEndToEndTest.java | 17 +++++++++++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index 4db7a3e9c5ab1..39ef7366e6aa9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -451,12 +451,14 @@ public void run() { try { while (reader.hasMoreEvents()) { Message message = reader.readNext(); - TransactionBufferSnapshot transactionBufferSnapshot = message.getValue(); - if (topic.getName().equals(transactionBufferSnapshot.getTopicName())) { - callBack.handleSnapshot(transactionBufferSnapshot); - this.startReadCursorPosition = PositionImpl.get( - transactionBufferSnapshot.getMaxReadPositionLedgerId(), - transactionBufferSnapshot.getMaxReadPositionEntryId()); + if (topic.getName().equals(message.getKey())) { + TransactionBufferSnapshot transactionBufferSnapshot = message.getValue(); + if (transactionBufferSnapshot != null) { + callBack.handleSnapshot(transactionBufferSnapshot); + this.startReadCursorPosition = PositionImpl.get( + transactionBufferSnapshot.getMaxReadPositionLedgerId(), + transactionBufferSnapshot.getMaxReadPositionEntryId()); + } } } } catch (PulsarClientException pulsarClientException) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 960beaac12d22..f36dcc0470717 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -376,6 +376,23 @@ private void txnAckTest(boolean batchEnable, int maxBatchSize, } } + @Test + public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception { + String topicOne = "persistent://" + NAMESPACE1 + "/topic-one"; + String topicTwo = "persistent://" + NAMESPACE1 + "/topic-two"; + String sub = "test"; + admin.topics().createNonPartitionedTopic(topicOne); + admin.topics().createSubscription(topicOne, "test", MessageId.earliest); + admin.topics().delete(topicOne); + + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicTwo).create(); + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicTwo).subscriptionName(sub).subscribe(); + String content = "test"; + producer.send(content); + assertEquals(consumer.receive().getValue(), content); + } + @Test public void txnMessageAckTest() throws Exception { String topic = TOPIC_MESSAGE_ACK_TEST;