Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transaction] Fix transaction system topic create in loop. #12749

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1649,14 +1649,21 @@ public void shutdownNow() {
}


private static boolean isTransactionSystemTopic(TopicName topicName) {
public static boolean isTransactionSystemTopic(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
|| topic.startsWith(TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString())
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}

public static boolean isNotAllowedToCreateTopic(TopicName topicName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about naming this method isTransactionInternalName ?
the meaning is that this is a reserved name

I see that you are using this method while listing the topic names, so "Create" is not good in that place

String topic = topicName.toString();
return topic.startsWith(TopicName.get(TopicDomain.persistent.value(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about creating a constant for this ? "TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString()"

NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString())
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}

@VisibleForTesting
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
return new BrokerService(pulsar, ioEventLoopGroup);
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static org.apache.pulsar.broker.PulsarService.isNotAllowedToCreateTopic;
import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -244,6 +245,13 @@ protected void validateAdminAndClientPermission() {
}
}

protected void validateTopicAllowdToCreate(TopicName topicName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: validateCreateTopic

if (isNotAllowedToCreateTopic(topicName)) {
log.warn("Try to create a topic in the system topic format! {}", topicName);
throw new RestException(Status.CONFLICT, "Cannot create topic in system topic format!");
}
}

public void validateAdminOperationOnTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
Expand Down
Expand Up @@ -234,6 +234,7 @@ public void createPartitionedTopic(
validateGlobalNamespaceOwnership();
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
validateTopicAllowdToCreate(topicName);
internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
Expand Down Expand Up @@ -267,6 +268,7 @@ public void createNonPartitionedTopic(
validateNamespaceName(tenant, namespace);
validateGlobalNamespaceOwnership();
validateTopicName(tenant, namespace, encodedTopic);
validateTopicAllowdToCreate(topicName);
internalCreateNonPartitionedTopic(authoritative);
}

Expand Down
Expand Up @@ -23,6 +23,7 @@
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.commons.collections.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -1284,6 +1285,14 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
return;
}

if (isTransactionSystemTopic(topicName)) {
String msg = String.format("Can not create transaction system topic %s", topic);
log.warn(msg);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(new NotAllowedException(msg));
return;
}

CompletableFuture<Void> maxTopicsCheck = createIfMissing
? checkMaxTopicsPerNamespace(topicName, 1)
: CompletableFuture.completedFuture(null);
Expand Down
Expand Up @@ -26,6 +26,9 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Sets;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -101,6 +104,35 @@ protected void setup() throws Exception {
setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
}

@Test
public void testCreateTransactionSystemTopic() throws Exception {
String subName = "test";
String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();
topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName);

try {
@Cleanup
Consumer<byte[]> consumer = getConsumer(topicName, subName);
fail();
} catch (PulsarClientException.NotAllowedException e) {
assertTrue(e.getMessage().contains("Can not create transaction system topic"));
}

try {
admin.topics().createPartitionedTopic(topicName, 3);
fail();
} catch (PulsarAdminException.ConflictException e) {
assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
}

try {
admin.topics().createNonPartitionedTopic(topicName);
fail();
} catch (PulsarAdminException.ConflictException e) {
assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
}
}

@Test
public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
String subName = "test";
Expand Down