Skip to content

Commit

Permalink
[Transaction] Fix transaction system topic create in loop. (apache#12749
Browse files Browse the repository at this point in the history
)

fix apache#12727
### Motivation
Now transaction system topic can be created.

### Modifications
we should not allow broker or user create by transaction system format topic.

1. checkout topic auto create.
2. admin create topic.

### Verifying this change

add some test for it
  • Loading branch information
congbobo184 authored and eolivelli committed Nov 29, 2021
1 parent 21617d9 commit 0ded6d6
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 8 deletions.
Expand Up @@ -22,7 +22,7 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -132,7 +132,6 @@
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
Expand Down Expand Up @@ -1649,11 +1648,16 @@ public void shutdownNow() {
}


private static boolean isTransactionSystemTopic(TopicName topicName) {
public static boolean isTransactionSystemTopic(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
|| topic.startsWith(TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString())
|| topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}

public static boolean isTransactionInternalName(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}

Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsTransactionCoordinatorAssign;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -161,7 +162,9 @@ protected List<String> internalGetList() {
}

try {
return topicResources().listPersistentTopicsAsync(namespaceName).join();
return topicResources().listPersistentTopicsAsync(namespaceName).thenApply(topics ->
topics.stream().filter(topic ->
!isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList())).join();
} catch (Exception e) {
log.error("[{}] Failed to get topics list for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
Expand Down Expand Up @@ -244,6 +247,13 @@ protected void validateAdminAndClientPermission() {
}
}

protected void validateCreateTopic(TopicName topicName) {
if (isTransactionInternalName(topicName)) {
log.warn("Try to create a topic in the system topic format! {}", topicName);
throw new RestException(Status.CONFLICT, "Cannot create topic in system topic format!");
}
}

public void validateAdminOperationOnTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());
validateTopicOwnership(topicName, authoritative);
Expand Down Expand Up @@ -3683,7 +3693,10 @@ private Topic getTopicReference(TopicName topicName) {
} catch (RestException e) {
throw e;
} catch (Exception e) {
throw new RestException(e);
if (e.getCause() instanceof NotAllowedException) {
throw new RestException(Status.CONFLICT, e.getCause());
}
throw new RestException(e.getCause());
}
}

Expand Down
Expand Up @@ -234,6 +234,7 @@ public void createPartitionedTopic(
validateGlobalNamespaceOwnership();
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
validateCreateTopic(topicName);
internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
Expand Down Expand Up @@ -267,6 +268,7 @@ public void createNonPartitionedTopic(
validateNamespaceName(tenant, namespace);
validateGlobalNamespaceOwnership();
validateTopicName(tenant, namespace, encodedTopic);
validateCreateTopic(topicName);
internalCreateNonPartitionedTopic(authoritative);
}

Expand Down
Expand Up @@ -23,6 +23,7 @@
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.commons.collections.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -1284,6 +1285,14 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
return;
}

if (isTransactionSystemTopic(topicName)) {
String msg = String.format("Can not create transaction system topic %s", topic);
log.warn(msg);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(new NotAllowedException(msg));
return;
}

CompletableFuture<Void> maxTopicsCheck = createIfMissing
? checkMaxTopicsPerNamespace(topicName, 1)
: CompletableFuture.completedFuture(null);
Expand Down
Expand Up @@ -19,20 +19,27 @@
package org.apache.pulsar.broker.transaction;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Sets;
import io.netty.buffer.Unpooled;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -101,6 +108,63 @@ protected void setup() throws Exception {
setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
}

@Test
public void testCreateTransactionSystemTopic() throws Exception {
String subName = "test";
String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();

try {
// init pending ack
@Cleanup
Consumer<byte[]> consumer = getConsumer(topicName, subName);
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
}
topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName);

// getList does not include transaction system topic
List<String> list = admin.topics().getList(NAMESPACE1);
assertEquals(list.size(), 4);
list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));

try {
// can't create transaction system topic
@Cleanup
Consumer<byte[]> consumer = getConsumer(topicName, subName);
fail();
} catch (PulsarClientException.NotAllowedException e) {
assertTrue(e.getMessage().contains("Can not create transaction system topic"));
}

// can't create transaction system topic
try {
admin.topics().getSubscriptions(topicName);
fail();
} catch (PulsarAdminException.ConflictException e) {
assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName);
}

// can't create transaction system topic
try {
admin.topics().createPartitionedTopic(topicName, 3);
fail();
} catch (PulsarAdminException.ConflictException e) {
assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
}

// can't create transaction system topic
try {
admin.topics().createNonPartitionedTopic(topicName);
fail();
} catch (PulsarAdminException.ConflictException e) {
assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
}
}

@Test
public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
String subName = "test";
Expand Down
Expand Up @@ -49,6 +49,7 @@ public static boolean checkTopicIsEventsNames(TopicName topicName) {
}

public static boolean checkTopicIsTransactionCoordinatorAssign(TopicName topicName) {
return TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString().equals(topicName.toString());
return topicName != null && topicName.toString()
.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
}
}
Expand Up @@ -63,6 +63,9 @@ public TopicName load(String name) throws Exception {
public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");

public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");

public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
String name = domain + "://" + namespaceName.toString() + '/' + topic;
return TopicName.get(name);
Expand Down

0 comments on commit 0ded6d6

Please sign in to comment.