Skip to content

Commit

Permalink
[Transaction] Fix topicTransactionBuffer handle null snapshot (apache…
Browse files Browse the repository at this point in the history
…#12758)

fix apache#12754
### Motivation
Now when delete topic, we will write a null value to Transaction buffer snapshot topic, other topic recover by this transaction buffer snapshot system topic, will produce NPE

### Modifications
judge NPE logic
  • Loading branch information
congbobo184 authored and eolivelli committed Nov 29, 2021
1 parent eb2a105 commit 2258b95
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 18 deletions.
Expand Up @@ -537,14 +537,16 @@ public void run() {
try {
boolean hasSnapshot = false;
while (reader.hasMoreEvents()) {
hasSnapshot = true;
Message<TransactionBufferSnapshot> message = reader.readNext();
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
callBack.handleSnapshot(transactionBufferSnapshot);
this.startReadCursorPosition = PositionImpl.get(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null) {
hasSnapshot = true;
callBack.handleSnapshot(transactionBufferSnapshot);
this.startReadCursorPosition = PositionImpl.get(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
}
}
}
if (!hasSnapshot) {
Expand Down
Expand Up @@ -362,11 +362,11 @@ public void completed(Exception e, long ledgerId, long entryId) {
}

@Test
public void testMaxReadPositionForNormalPublish() throws Exception{
public void testMaxReadPositionForNormalPublish() throws Exception {
String topic = "persistent://" + NAMESPACE1 + "/NormalPublish";
admin.topics().createNonPartitionedTopic(topic);
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(topic, false).get().get();
.getTopic(topic, false).get().get();

TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
PulsarClient noTxnClient = PulsarClient.builder().enableTransaction(false)
Expand Down Expand Up @@ -394,7 +394,7 @@ public void testMaxReadPositionForNormalPublish() throws Exception{
.sendTimeout(0, TimeUnit.SECONDS)
.create();

Awaitility.await().untilAsserted(() ->Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicTransactionBuffer.checkIfReady()));
//test publishing txn messages will not change maxReadPosition if don`t commit or abort.
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
Expand Down Expand Up @@ -433,8 +433,7 @@ public void testMaxReadPositionForNormalPublish() throws Exception{
PositionImpl position5 = (PositionImpl) maxReadPositionField.get(topicTransactionBuffer);
Assert.assertEquals(position5.getLedgerId(), messageId4.getLedgerId());
Assert.assertEquals(position5.getEntryId(), messageId4.getEntryId());

}
}

@Test
public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
Expand All @@ -447,9 +446,13 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topic)
.create();
producer.newMessage().send();
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

producer.newMessage(txn).value("test").send();

PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(topic, false).get().get();
.getTopic("persistent://" + topic, false).get().get();
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);

ManagedCursor managedCursor = mock(ManagedCursor.class);
Expand All @@ -468,10 +471,6 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
managedCursors.removeCursor("transaction-buffer-sub");
managedCursors.add(managedCursor);

TransactionBuffer buffer1 = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer1.getStats().state, "Ready"));

doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null);
Expand Down
Expand Up @@ -342,6 +342,23 @@ private void txnAckTest(boolean batchEnable, int maxBatchSize,
}
}

@Test
public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception {
String topicOne = "persistent://" + NAMESPACE1 + "/topic-one";
String topicTwo = "persistent://" + NAMESPACE1 + "/topic-two";
String sub = "test";
admin.topics().createNonPartitionedTopic(topicOne);
admin.topics().createSubscription(topicOne, "test", MessageId.earliest);
admin.topics().delete(topicOne);

Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicTwo).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicTwo).subscriptionName(sub).subscribe();
String content = "test";
producer.send(content);
assertEquals(consumer.receive().getValue(), content);
}

@Test
public void txnMessageAckTest() throws Exception {
String topic = TOPIC_MESSAGE_ACK_TEST;
Expand Down

0 comments on commit 2258b95

Please sign in to comment.