diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index ba5a90b210e37..06430aa0ffa10 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -22,7 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; -import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; +import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -135,7 +135,6 @@ import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; @@ -1673,11 +1672,16 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu } - private static boolean isTransactionSystemTopic(TopicName topicName) { + public static boolean isTransactionSystemTopic(TopicName topicName) { String topic = topicName.toString(); return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()) - || topic.startsWith(TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString()) + || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString()) + || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); + } + + public static boolean isTransactionInternalName(TopicName topicName) { + String topic = topicName.toString(); + return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString()) || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 1db53c0111e01..86116d59c8c87 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin.impl; +import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC; import static org.apache.pulsar.common.util.Codec.decode; @@ -164,7 +165,8 @@ protected List internalGetList() { try { String path = String.format("/managed-ledgers/%s/%s", namespaceName.toString(), domain()); for (String topic : getLocalPolicies().getChildren(path)) { - if (domain().equals(TopicDomain.persistent.toString())) { + if (domain().equals(TopicDomain.persistent.toString()) + && !isTransactionInternalName(TopicName.get(topic))) { topics.add(TopicName.get(domain(), namespaceName, decode(topic)).toString()); } } @@ -198,6 +200,13 @@ protected List internalGetPartitionedTopicList() { return getPartitionedTopicList(TopicDomain.getEnum(domain())); } + protected void validateCreateTopic(TopicName topicName) { + if (isTransactionInternalName(topicName)) { + log.warn("Try to create a topic in the system topic format! {}", topicName); + throw new RestException(Status.BAD_REQUEST, "Cannot create topic in system topic format!"); + } + } + protected Map> internalGetPermissionsOnTopic() { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenant(namespaceName.getTenant()); @@ -3517,7 +3526,10 @@ private Topic getTopicReference(TopicName topicName) { } catch (RestException e) { throw e; } catch (Exception e) { - throw new RestException(e); + if (e.getCause() instanceof NotAllowedException) { + throw new RestException(Status.BAD_REQUEST, e.getCause()); + } + throw new RestException(e.getCause()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 2c54908483926..125e3f3ff9b53 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -233,6 +233,7 @@ public void createPartitionedTopic( validateGlobalNamespaceOwnership(tenant, namespace); validatePartitionedTopicName(tenant, namespace, encodedTopic); validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE); + validateCreateTopic(topicName); internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly); } catch (Exception e) { log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); @@ -265,6 +266,7 @@ public void createNonPartitionedTopic( @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateGlobalNamespaceOwnership(tenant, namespace); validateTopicName(tenant, namespace, encodedTopic); + validateCreateTopic(topicName); internalCreateNonPartitionedTopic(authoritative); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 62124fb9de8a6..b49fb59340300 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -23,6 +23,7 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.apache.commons.collections.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; @@ -1282,6 +1283,14 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, return; } + if (isTransactionSystemTopic(topicName)) { + String msg = String.format("Can not create transaction system topic %s", topic); + log.warn(msg); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(new NotAllowedException(msg)); + return; + } + if (createIfMissing && !checkMaxTopicsPerNamespace(topicName, 1, topicFuture)) { return; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index fb94638bf49c1..052700ffc70ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -18,12 +18,20 @@ */ package org.apache.pulsar.broker.transaction; +import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX; import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.FileAssert.fail; + import com.google.common.collect.Sets; import java.lang.reflect.Field; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -44,6 +52,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -164,14 +173,14 @@ public void testGetTxnID() throws Exception { Transaction transaction = pulsarClient.newTransaction() .build().get(); TxnID txnID = transaction.getTxnID(); - Assert.assertEquals(txnID.getLeastSigBits(), 0); - Assert.assertEquals(txnID.getMostSigBits(), 0); + assertEquals(txnID.getLeastSigBits(), 0); + assertEquals(txnID.getMostSigBits(), 0); transaction.abort(); transaction = pulsarClient.newTransaction() .build().get(); txnID = transaction.getTxnID(); - Assert.assertEquals(txnID.getLeastSigBits(), 1); - Assert.assertEquals(txnID.getMostSigBits(), 0); + assertEquals(txnID.getLeastSigBits(), 1); + assertEquals(txnID.getMostSigBits(), 0); } @Test @@ -228,12 +237,12 @@ public void testSubscriptionRecreateTopic() Assert.fail(); } TopicPolicies topicPolicies = originPersistentTopic.getTopicPolicies().get(); - Assert.assertEquals(retentionSizeInMbSetTopic, retentionSize); + assertEquals(retentionSizeInMbSetTopic, retentionSize); MLPendingAckStoreProvider mlPendingAckStoreProvider = new MLPendingAckStoreProvider(); CompletableFuture future = mlPendingAckStoreProvider.newPendingAckStore(subscription); future.thenAccept(pendingAckStore -> { ((MLPendingAckStore) pendingAckStore).getManagedLedger().thenAccept(managedLedger1 -> { - Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(), + assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(), retentionSizeInMbSetTo); }); } @@ -246,4 +255,61 @@ public void testSubscriptionRecreateTopic() } + @Test + public void testCreateTransactionSystemTopic() throws Exception { + String subName = "test"; + String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString(); + + try { + // init pending ack + @Cleanup + Consumer consumer = getConsumer(topicName, subName); + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); + + consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException); + } + topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName); + + // getList does not include transaction system topic + List list = admin.topics().getList(NAMESPACE1); + assertEquals(list.size(), 3); + list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX))); + + try { + // can't create transaction system topic + @Cleanup + Consumer consumer = getConsumer(topicName, subName); + fail(); + } catch (PulsarClientException.NotAllowedException e) { + assertTrue(e.getMessage().contains("Can not create transaction system topic")); + } + + // can't create transaction system topic + try { + admin.topics().getSubscriptions(topicName); + fail(); + } catch (PulsarAdminException e) { + assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName); + } + + // can't create transaction system topic + try { + admin.topics().createPartitionedTopic(topicName, 3); + fail(); + } catch (PulsarAdminException e) { + assertEquals(e.getMessage(), "Cannot create topic in system topic format!"); + } + + // can't create transaction system topic + try { + admin.topics().createNonPartitionedTopic(topicName); + fail(); + } catch (PulsarAdminException e) { + assertEquals(e.getMessage(), "Cannot create topic in system topic format!"); + } + } + } \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index fd183dfcab76d..b8537d28fae75 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -67,6 +67,9 @@ public TopicName load(String name) throws Exception { public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign"); + public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_"); + public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; return TopicName.get(name);