Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transaction] Fix topicTransactionBuffer handle null snapshot #12758

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -537,10 +537,11 @@ public void run() {
try {
boolean hasSnapshot = false;
while (reader.hasMoreEvents()) {
hasSnapshot = true;
Message<TransactionBufferSnapshot> message = reader.readNext();
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
if (transactionBufferSnapshot != null
Copy link
Contributor

@gaoran10 gaoran10 Nov 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use the key of the message to verify the message belong to this topic? This may reduce some deserialization works.

&& topic.getName().equals(transactionBufferSnapshot.getTopicName())) {
hasSnapshot = true;
callBack.handleSnapshot(transactionBufferSnapshot);
this.startReadCursorPosition = PositionImpl.get(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
Expand Down
Expand Up @@ -342,6 +342,23 @@ private void txnAckTest(boolean batchEnable, int maxBatchSize,
}
}

@Test
public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception {
String topicOne = "persistent://" + NAMESPACE1 + "/topic-one";
String topicTwo = "persistent://" + NAMESPACE1 + "/topic-two";
String sub = "test";
admin.topics().createNonPartitionedTopic(topicOne);
admin.topics().createSubscription(topicOne, "test", MessageId.earliest);
admin.topics().delete(topicOne);

Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicTwo).create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicTwo).subscriptionName(sub).subscribe();
String content = "test";
producer.send(content);
assertEquals(consumer.receive().getValue(), content);
}

@Test
public void txnMessageAckTest() throws Exception {
String topic = TOPIC_MESSAGE_ACK_TEST;
Expand Down