Skip to content

Commit

Permalink
[Transaction] Fix transaction system topic create in loop. (#12749)
Browse files Browse the repository at this point in the history
fix #12727
### Motivation
Now transaction system topic can be created.

### Modifications
we should not allow broker or user create by transaction system format topic.

1. checkout topic auto create.
2. admin create topic.

### Verifying this change

add some test for it

(cherry picked from commit 2c4d913)
  • Loading branch information
congbobo184 authored and eolivelli committed Dec 15, 2021
1 parent 4e4c787 commit 393f95c
Showing 1 changed file with 57 additions and 0 deletions.
Expand Up @@ -179,6 +179,63 @@ public void testCreateTransactionSystemTopic() throws Exception {
}
}

@Test
public void testCreateTransactionSystemTopic() throws Exception {
String subName = "test";
String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();

try {
// init pending ack
@Cleanup
Consumer<byte[]> consumer = getConsumer(topicName, subName);
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
}
topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName);

// getList does not include transaction system topic
List<String> list = admin.topics().getList(NAMESPACE1);
assertEquals(list.size(), 4);
list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));

try {
// can't create transaction system topic
@Cleanup
Consumer<byte[]> consumer = getConsumer(topicName, subName);
fail();
} catch (PulsarClientException.NotAllowedException e) {
assertTrue(e.getMessage().contains("Can not create transaction system topic"));
}

// can't create transaction system topic
try {
admin.topics().getSubscriptions(topicName);
fail();
} catch (PulsarAdminException.ConflictException e) {
assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName);
}

// can't create transaction system topic
try {
admin.topics().createPartitionedTopic(topicName, 3);
fail();
} catch (PulsarAdminException.ConflictException e) {
assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
}

// can't create transaction system topic
try {
admin.topics().createNonPartitionedTopic(topicName);
fail();
} catch (PulsarAdminException.ConflictException e) {
assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
}
}

@Test
public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
String subName = "test";
Expand Down

0 comments on commit 393f95c

Please sign in to comment.