Skip to content

Commit

Permalink
[Transaction] Fix topicTransactionBuffer handle null snapshot (apache…
Browse files Browse the repository at this point in the history
…#12758)

fix apache#12754
Now when delete topic, we will write a null value to Transaction buffer snapshot topic, other topic recover by this transaction buffer snapshot system topic, will produce NPE

judge NPE logic

(cherry picked from commit c90c89b)
(cherry picked from commit 75cec6d)
  • Loading branch information
congbobo184 authored and eolivelli committed Mar 1, 2022
1 parent 991a6d8 commit 190e650
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
Expand Up @@ -451,12 +451,14 @@ public void run() {
try {
while (reader.hasMoreEvents()) {
Message<TransactionBufferSnapshot> message = reader.readNext();
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
callBack.handleSnapshot(transactionBufferSnapshot);
this.startReadCursorPosition = PositionImpl.get(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null) {
callBack.handleSnapshot(transactionBufferSnapshot);
this.startReadCursorPosition = PositionImpl.get(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
}
}
}
} catch (PulsarClientException pulsarClientException) {
Expand Down
Expand Up @@ -376,6 +376,23 @@ private void txnAckTest(boolean batchEnable, int maxBatchSize,
}
}

@Test
public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception {
String topicOne = "persistent://" + NAMESPACE1 + "/topic-one";
String topicTwo = "persistent://" + NAMESPACE1 + "/topic-two";
String sub = "test";
admin.topics().createNonPartitionedTopic(topicOne);
admin.topics().createSubscription(topicOne, "test", MessageId.earliest);
admin.topics().delete(topicOne);

Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicTwo).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicTwo).subscriptionName(sub).subscribe();
String content = "test";
producer.send(content);
assertEquals(consumer.receive().getValue(), content);
}

@Test
public void txnMessageAckTest() throws Exception {
String topic = TOPIC_MESSAGE_ACK_TEST;
Expand Down

0 comments on commit 190e650

Please sign in to comment.