diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index ac081f1ebe46a3..9978f6f8ec20a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -537,14 +537,16 @@ public void run() { try { boolean hasSnapshot = false; while (reader.hasMoreEvents()) { - hasSnapshot = true; Message message = reader.readNext(); - TransactionBufferSnapshot transactionBufferSnapshot = message.getValue(); - if (topic.getName().equals(transactionBufferSnapshot.getTopicName())) { - callBack.handleSnapshot(transactionBufferSnapshot); - this.startReadCursorPosition = PositionImpl.get( - transactionBufferSnapshot.getMaxReadPositionLedgerId(), - transactionBufferSnapshot.getMaxReadPositionEntryId()); + if (topic.getName().equals(message.getKey())) { + TransactionBufferSnapshot transactionBufferSnapshot = message.getValue(); + if (transactionBufferSnapshot != null) { + hasSnapshot = true; + callBack.handleSnapshot(transactionBufferSnapshot); + this.startReadCursorPosition = PositionImpl.get( + transactionBufferSnapshot.getMaxReadPositionLedgerId(), + transactionBufferSnapshot.getMaxReadPositionEntryId()); + } } } if (!hasSnapshot) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index bc8bef96be1ba3..930ab92e661d74 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -362,11 +362,11 @@ public void completed(Exception e, long ledgerId, long entryId) { } @Test - public void testMaxReadPositionForNormalPublish() throws Exception{ + public void testMaxReadPositionForNormalPublish() throws Exception { String topic = "persistent://" + NAMESPACE1 + "/NormalPublish"; admin.topics().createNonPartitionedTopic(topic); PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() - .getTopic(topic, false).get().get(); + .getTopic(topic, false).get().get(); TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer(); PulsarClient noTxnClient = PulsarClient.builder().enableTransaction(false) @@ -394,7 +394,7 @@ public void testMaxReadPositionForNormalPublish() throws Exception{ .sendTimeout(0, TimeUnit.SECONDS) .create(); - Awaitility.await().untilAsserted(() ->Assert.assertTrue(topicTransactionBuffer.checkIfReady())); + Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfReady())); //test publishing txn messages will not change maxReadPosition if don`t commit or abort. Transaction transaction = pulsarClient.newTransaction() .withTransactionTimeout(5, TimeUnit.SECONDS).build().get(); @@ -433,8 +433,7 @@ public void testMaxReadPositionForNormalPublish() throws Exception{ PositionImpl position5 = (PositionImpl) maxReadPositionField.get(topicTransactionBuffer); Assert.assertEquals(position5.getLedgerId(), messageId4.getLedgerId()); Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId()); - - } + } @Test public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{ @@ -447,9 +446,13 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{ .sendTimeout(0, TimeUnit.SECONDS) .topic(topic) .create(); - producer.newMessage().send(); + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); + + producer.newMessage(txn).value("test").send(); + PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() - .getTopic(topic, false).get().get(); + .getTopic("persistent://" + topic, false).get().get(); persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true); ManagedCursor managedCursor = mock(ManagedCursor.class); @@ -468,10 +471,6 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{ managedCursors.removeCursor("transaction-buffer-sub"); managedCursors.add(managedCursor); - TransactionBuffer buffer1 = new TopicTransactionBuffer(persistentTopic); - Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> - assertEquals(buffer1.getStats().state, "Ready")); - doAnswer(invocation -> { AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1); callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index f3437422081a09..463044969ac105 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -342,6 +342,23 @@ private void txnAckTest(boolean batchEnable, int maxBatchSize, } } + @Test + public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception { + String topicOne = "persistent://" + NAMESPACE1 + "/topic-one"; + String topicTwo = "persistent://" + NAMESPACE1 + "/topic-two"; + String sub = "test"; + admin.topics().createNonPartitionedTopic(topicOne); + admin.topics().createSubscription(topicOne, "test", MessageId.earliest); + admin.topics().delete(topicOne); + + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicTwo).create(); + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topicTwo).subscriptionName(sub).subscribe(); + String content = "test"; + producer.send(content); + assertEquals(consumer.receive().getValue(), content); + } + @Test public void txnMessageAckTest() throws Exception { String topic = TOPIC_MESSAGE_ACK_TEST;