Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transaction] Fix transaction system topic create in loop #12889

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -22,7 +22,7 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.apache.pulsar.common.naming.TopicName.TRANSACTION_COORDINATOR_LOG;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -135,7 +135,6 @@
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
Expand Down Expand Up @@ -1673,11 +1672,16 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu
}


private static boolean isTransactionSystemTopic(TopicName topicName) {
public static boolean isTransactionSystemTopic(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
|| topic.startsWith(TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString())
|| topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}

public static boolean isTransactionInternalName(TopicName topicName) {
String topic = topicName.toString();
return topic.startsWith(TRANSACTION_COORDINATOR_LOG.toString())
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}

Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static org.apache.pulsar.broker.PulsarService.isTransactionInternalName;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.resources.PulsarResources.DEFAULT_OPERATION_TIMEOUT_SEC;
import static org.apache.pulsar.common.util.Codec.decode;
Expand Down Expand Up @@ -164,7 +165,8 @@ protected List<String> internalGetList() {
try {
String path = String.format("/managed-ledgers/%s/%s", namespaceName.toString(), domain());
for (String topic : getLocalPolicies().getChildren(path)) {
if (domain().equals(TopicDomain.persistent.toString())) {
if (domain().equals(TopicDomain.persistent.toString())
&& !isTransactionInternalName(TopicName.get(topic))) {
topics.add(TopicName.get(domain(), namespaceName, decode(topic)).toString());
}
}
Expand Down Expand Up @@ -198,6 +200,13 @@ protected List<String> internalGetPartitionedTopicList() {
return getPartitionedTopicList(TopicDomain.getEnum(domain()));
}

protected void validateCreateTopic(TopicName topicName) {
if (isTransactionInternalName(topicName)) {
log.warn("Try to create a topic in the system topic format! {}", topicName);
throw new RestException(Status.BAD_REQUEST, "Cannot create topic in system topic format!");
}
}

protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenant(namespaceName.getTenant());
Expand Down Expand Up @@ -3517,7 +3526,10 @@ private Topic getTopicReference(TopicName topicName) {
} catch (RestException e) {
throw e;
} catch (Exception e) {
throw new RestException(e);
if (e.getCause() instanceof NotAllowedException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NotAllowedException is related to security problems (authorisation).

why are we rewriting this to "CONFLICT" ? shouldn't it be "FORBIDDEN" ?

throw new RestException(Status.BAD_REQUEST, e.getCause());
}
throw new RestException(e.getCause());
}
}

Expand Down
Expand Up @@ -233,6 +233,7 @@ public void createPartitionedTopic(
validateGlobalNamespaceOwnership(tenant, namespace);
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
validateCreateTopic(topicName);
internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
Expand Down Expand Up @@ -265,6 +266,7 @@ public void createNonPartitionedTopic(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateGlobalNamespaceOwnership(tenant, namespace);
validateTopicName(tenant, namespace, encodedTopic);
validateCreateTopic(topicName);
internalCreateNonPartitionedTopic(authoritative);
}

Expand Down
Expand Up @@ -23,6 +23,7 @@
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.commons.collections.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
Expand Down Expand Up @@ -1282,6 +1283,14 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
return;
}

if (isTransactionSystemTopic(topicName)) {
String msg = String.format("Can not create transaction system topic %s", topic);
log.warn(msg);
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
topicFuture.completeExceptionally(new NotAllowedException(msg));
return;
}

if (createIfMissing && !checkMaxTopicsPerNamespace(topicName, 1, topicFuture)) {
return;
}
Expand Down
Expand Up @@ -18,12 +18,20 @@
*/
package org.apache.pulsar.broker.transaction;

import static org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore.PENDING_ACK_STORE_SUFFIX;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.FileAssert.fail;

import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -44,6 +52,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -164,14 +173,14 @@ public void testGetTxnID() throws Exception {
Transaction transaction = pulsarClient.newTransaction()
.build().get();
TxnID txnID = transaction.getTxnID();
Assert.assertEquals(txnID.getLeastSigBits(), 0);
Assert.assertEquals(txnID.getMostSigBits(), 0);
assertEquals(txnID.getLeastSigBits(), 0);
assertEquals(txnID.getMostSigBits(), 0);
transaction.abort();
transaction = pulsarClient.newTransaction()
.build().get();
txnID = transaction.getTxnID();
Assert.assertEquals(txnID.getLeastSigBits(), 1);
Assert.assertEquals(txnID.getMostSigBits(), 0);
assertEquals(txnID.getLeastSigBits(), 1);
assertEquals(txnID.getMostSigBits(), 0);
}

@Test
Expand Down Expand Up @@ -228,12 +237,12 @@ public void testSubscriptionRecreateTopic()
Assert.fail();
}
TopicPolicies topicPolicies = originPersistentTopic.getTopicPolicies().get();
Assert.assertEquals(retentionSizeInMbSetTopic, retentionSize);
assertEquals(retentionSizeInMbSetTopic, retentionSize);
MLPendingAckStoreProvider mlPendingAckStoreProvider = new MLPendingAckStoreProvider();
CompletableFuture<PendingAckStore> future = mlPendingAckStoreProvider.newPendingAckStore(subscription);
future.thenAccept(pendingAckStore -> {
((MLPendingAckStore) pendingAckStore).getManagedLedger().thenAccept(managedLedger1 -> {
Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(),
assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(),
retentionSizeInMbSetTo);
});
}
Expand All @@ -246,4 +255,61 @@ public void testSubscriptionRecreateTopic()

}

@Test
public void testCreateTransactionSystemTopic() throws Exception {
String subName = "test";
String topicName = TopicName.get(NAMESPACE1 + "/" + "testCreateTransactionSystemTopic").toString();

try {
// init pending ack
@Cleanup
Consumer<byte[]> consumer = getConsumer(topicName, subName);
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS).build().get();

consumer.acknowledgeAsync(new MessageIdImpl(10, 10, 10), transaction).get();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
}
topicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName);

// getList does not include transaction system topic
List<String> list = admin.topics().getList(NAMESPACE1);
assertEquals(list.size(), 3);
list.forEach(topic -> assertFalse(topic.contains(PENDING_ACK_STORE_SUFFIX)));

try {
// can't create transaction system topic
@Cleanup
Consumer<byte[]> consumer = getConsumer(topicName, subName);
fail();
} catch (PulsarClientException.NotAllowedException e) {
assertTrue(e.getMessage().contains("Can not create transaction system topic"));
}

// can't create transaction system topic
try {
admin.topics().getSubscriptions(topicName);
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getMessage(), "Can not create transaction system topic " + topicName);
}

// can't create transaction system topic
try {
admin.topics().createPartitionedTopic(topicName, 3);
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
}

// can't create transaction system topic
try {
admin.topics().createNonPartitionedTopic(topicName);
fail();
} catch (PulsarAdminException e) {
assertEquals(e.getMessage(), "Cannot create topic in system topic format!");
}
}

}
Expand Up @@ -67,6 +67,9 @@ public TopicName load(String name) throws Exception {
public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, "transaction_coordinator_assign");

public static final TopicName TRANSACTION_COORDINATOR_LOG = TopicName.get(TopicDomain.persistent.value(),
NamespaceName.SYSTEM_NAMESPACE, "__transaction_log_");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you sure about this '_transaction_log' ? it is strange that it ends with an underscore character

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it ends with an underscore character, because there is a coordinator id behind the __transaction_log_.


public static TopicName get(String domain, NamespaceName namespaceName, String topic) {
String name = domain + "://" + namespaceName.toString() + '/' + topic;
return TopicName.get(name);
Expand Down