From 2c4d913c4b3fb1c6d924efaa0a24c93a2d2de7d0 Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Sat, 13 Nov 2021 09:15:30 +0800 Subject: [PATCH] [Transaction] Fix transaction system topic create in loop. (#12749) fix https://github.com/apache/pulsar/issues/12727 ### Motivation Now transaction system topic can be created. ### Modifications we should not allow broker or user create by transaction system format topic. 1. checkout topic auto create. 2. admin create topic. ### Verifying this change add some test for it --- .../apache/pulsar/broker/PulsarService.java | 14 ++-- .../admin/impl/PersistentTopicsBase.java | 17 ++++- .../broker/admin/v2/PersistentTopics.java | 2 + .../pulsar/broker/service/BrokerService.java | 9 +++ .../broker/transaction/TransactionTest.java | 64 +++++++++++++++++++ .../common/events/EventsTopicNames.java | 3 +- .../pulsar/common/naming/TopicName.java | 3 + 7 files changed, 104 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index bca21ae476206..28ccbb1013426 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -22,7 +22,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER; -import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; +import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -132,7 +132,6 @@ import org.apache.pulsar.common.configuration.VipStatus; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; @@ -1649,11 +1648,16 @@ public void shutdownNow() { } - private static boolean isTransactionSystemTopic(TopicName topicName) { + public static boolean isTransactionSystemTopic(TopicName topicName) { String topic = topicName.toString(); return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()) - || topic.startsWith(TopicName.get(TopicDomain.persistent.value(), - NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString()) + || topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString()) + || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); + } + + public static boolean isTransactionInternalName(TopicName topicName) { + String topic = topicName.toString(); + return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString()) || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ef4510cd6ef51..fc33ad4ea5673 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin.impl; +import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName; import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC; import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign; import com.fasterxml.jackson.core.JsonProcessingException; @@ -161,7 +162,9 @@ protected List internalGetList() { } try { - return topicResources().listPersistentTopicsAsync(namespaceName).join(); + return topicResources().listPersistentTopicsAsync(namespaceName).thenApply(topics -> + topics.stream().filter(topic -> + !isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList())).join(); } catch (Exception e) { log.error("[{}] Failed to get topics list for namespace {}", clientAppId(), namespaceName, e); throw new RestException(e); @@ -244,6 +247,13 @@ protected void validateAdminAndClientPermission() { } } + protected void validateCreateTopic(TopicName topicName) { + if (isTransactionInternalName(topicName)) { + log.warn("Try to create a topic in the system topic format! {}", topicName); + throw new RestException(Status.CONFLICT, "Cannot create topic in system topic format!"); + } + } + public void validateAdminOperationOnTopic(boolean authoritative) { validateAdminAccessForTenant(topicName.getTenant()); validateTopicOwnership(topicName, authoritative); @@ -3683,7 +3693,10 @@ private Topic getTopicReference(TopicName topicName) { } catch (RestException e) { throw e; } catch (Exception e) { - throw new RestException(e); + if (e.getCause() instanceof NotAllowedException) { + throw new RestException(Status.CONFLICT, e.getCause()); + } + throw new RestException(e.getCause()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 1318057839955..bf6c64712c851 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -234,6 +234,7 @@ public void createPartitionedTopic( validateGlobalNamespaceOwnership(); validatePartitionedTopicName(tenant, namespace, encodedTopic); validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE); + validateCreateTopic(topicName); internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly); } catch (Exception e) { log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); @@ -267,6 +268,7 @@ public void createNonPartitionedTopic( validateNamespaceName(tenant, namespace); validateGlobalNamespaceOwnership(); validateTopicName(tenant, namespace, encodedTopic); + validateCreateTopic(topicName); internalCreateNonPartitionedTopic(authoritative); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 9194820d0283d..ba3a74d4e6cb0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -23,6 +23,7 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.apache.commons.collections.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic; import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -1284,6 +1285,14 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, return; } + if (isTransactionSystemTopic(topicName)) { + String msg = String.format("Can not create transaction system topic %s", topic); + log.warn(msg); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(new NotAllowedException(msg)); + return; + } + CompletableFuture maxTopicsCheck = createIfMissing ? checkMaxTopicsPerNamespace(topicName, 1) : CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 930ab92e661d7..df527dae37ac2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.transaction; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX; import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -26,13 +27,19 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + import com.google.common.collect.Sets; import io.netty.buffer.Unpooled; import java.lang.reflect.Field; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; @@ -101,6 +108,63 @@ protected void setup() throws Exception { setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0); } + @Test + public void testCreateTransactionSystemTopic() throws Exception { + String subName = "test"; + String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString(); + + try { + // init pending ack + @Cleanup + Consumer consumer = getConsumer(topicName, subName); + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); + + consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get(); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException); + } + topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName); + + // getList does not include transaction system topic + List list = admin.topics().getList(NAMESPACE1); + assertEquals(list.size(), 4); + list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX))); + + try { + // can't create transaction system topic + @Cleanup + Consumer consumer = getConsumer(topicName, subName); + fail(); + } catch (PulsarClientException.NotAllowedException e) { + assertTrue(e.getMessage().contains("Can not create transaction system topic")); + } + + // can't create transaction system topic + try { + admin.topics().getSubscriptions(topicName); + fail(); + } catch (PulsarAdminException.ConflictException e) { + assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName); + } + + // can't create transaction system topic + try { + admin.topics().createPartitionedTopic(topicName, 3); + fail(); + } catch (PulsarAdminException.ConflictException e) { + assertEquals(e.getMessage(), "Cannot create topic in system topic format!"); + } + + // can't create transaction system topic + try { + admin.topics().createNonPartitionedTopic(topicName); + fail(); + } catch (PulsarAdminException.ConflictException e) { + assertEquals(e.getMessage(), "Cannot create topic in system topic format!"); + } + } + @Test public void brokerNotInitTxnManagedLedgerTopic() throws Exception { String subName = "test"; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java index 2aa9e122d63c5..f82c9ae8519a4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java @@ -49,6 +49,7 @@ public static boolean checkTopicIsEventsNames(TopicName topicName) { } public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicName) { - return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString().equals(topicName.toString()); + return topicName != null && topicName.toString() + .startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 257d27c206943..efbd8c0f87359 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -63,6 +63,9 @@ public TopicName load(String name) throws Exception { public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign"); + public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(), + NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_"); + public static TopicName get(String domain, NamespaceName namespaceName, String topic) { String name = domain + "://" + namespaceName.toString() + '/' + topic; return TopicName.get(name);