Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename test file name from *Test2.java to *Test.java to run all tests correctly #13644

Merged
merged 3 commits into from Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -80,7 +80,7 @@ flexible messaging model and an intuitive client API.</description>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>

<!--config keys to congiure test selection -->
<!--config keys to configure test selection -->
<include>**/Test*.java,**/*Test.java,**/*Tests.java,**/*TestCase.java</include>
<exclude/>
<groups/>
Expand Down
Expand Up @@ -419,7 +419,8 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
}

// remove from owned namespace map and ephemeral node from ZK
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
final List<CompletableFuture<Void>> topicFutures = Lists.newArrayList();
final List<CompletableFuture<Void>> bundleFutures = Lists.newArrayList();
try {
// firstly remove all topics including system topics
if (!topics.isEmpty()) {
Expand All @@ -433,12 +434,12 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
String partitionedTopic = topicName.getPartitionedTopicName();
if (!partitionedTopics.contains(partitionedTopic)) {
// Distinguish partitioned topic to avoid duplicate deletion of the same schema
futures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync(
topicFutures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync(
partitionedTopic, true, true));
partitionedTopics.add(partitionedTopic);
}
} else {
futures.add(pulsar().getAdminClient().topics().deleteAsync(
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
topic, true, true));
nonPartitionedTopics.add(topic);
}
Expand All @@ -459,14 +460,35 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
+ "and non-partitioned-topics:{} in namespace:{}.",
partitionedTopics, nonPartitionedTopics, namespaceName);
}

final CompletableFuture<Throwable> topicFutureEx =
FutureUtil.waitForAll(topicFutures).handle((result, exception) -> {
if (exception != null) {
if (exception.getCause() instanceof PulsarAdminException) {
asyncResponse
.resume(new RestException((PulsarAdminException) exception.getCause()));
} else {
log.error("[{}] Failed to remove forcefully owned namespace {}",
clientAppId(), namespaceName, exception);
asyncResponse.resume(new RestException(exception.getCause()));
}
return exception;
}

return null;
});
if (topicFutureEx.join() != null) {
return;
}
}

// forcefully delete namespace bundles
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
for (NamespaceBundle bundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then we do not need to delete the bundle
if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) {
futures.add(pulsar().getAdminClient().namespaces()
bundleFutures.add(pulsar().getAdminClient().namespaces()
.deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange(), true));
}
}
Expand All @@ -476,7 +498,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
return;
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
FutureUtil.waitForAll(bundleFutures).handle((result, exception) -> {
if (exception != null) {
if (exception.getCause() instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
Expand Down Expand Up @@ -1866,7 +1888,7 @@ protected List<String> internalGetAntiAffinityNamespaces(String cluster, String
return namespaces.stream().filter(ns -> {
Optional<LocalPolicies> policies;
try {
policies = getLocalPolicies().getLocalPolicies(namespaceName);
policies = getLocalPolicies().getLocalPolicies(NamespaceName.get(ns));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Expand Up @@ -36,7 +36,6 @@
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -105,7 +104,7 @@

@Slf4j
@Test(groups = "broker-admin")
public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
public class AdminApi2Test extends MockedPulsarServiceBaseTest {

private MockedPulsarService mockPulsarSetup;

Expand Down Expand Up @@ -1025,9 +1024,11 @@ public void brokerNamespaceIsolationPoliciesUpdateOnTime() throws Exception {
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");

final List<String> primaryList = new ArrayList<>();
primaryList.add(brokerName + ".*");
NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList(ns1Name))
.primary(Collections.singletonList(brokerName + ".*"))
.primary(primaryList)
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
Expand Down Expand Up @@ -1576,60 +1577,6 @@ public void testForceDeleteNamespace() throws Exception {
}
}

@Test
public void testDistinguishTopicTypeWhenForceDeleteNamespace() throws Exception {
conf.setForceDeleteNamespaceAllowed(true);
final String ns = "prop-xyz/distinguish-topic-type-ns";
final String exNs = "prop-xyz/ex-distinguish-topic-type-ns";
admin.namespaces().createNamespace(ns, 2);
admin.namespaces().createNamespace(exNs, 2);

final String p1 = "persistent://" + ns + "/p1";
final String p5 = "persistent://" + ns + "/p5";
final String np = "persistent://" + ns + "/np";

admin.topics().createPartitionedTopic(p1, 1);
admin.topics().createPartitionedTopic(p5, 5);
admin.topics().createNonPartitionedTopic(np);

final String exNp = "persistent://" + exNs + "/np";
admin.topics().createNonPartitionedTopic(exNp);
// insert an invalid topic name
pulsar.getLocalMetadataStore().put(
"/managed-ledgers/" + exNs + "/persistent/", "".getBytes(), Optional.empty()).join();

List<String> topics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(ns)).get();
List<String> exTopics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(exNs)).get();

// ensure that the topic list contains all the topics
List<String> allTopics = new ArrayList<>(Arrays.asList(np, TopicName.get(p1).getPartition(0).toString()));
for (int i = 0; i < 5; i++) {
allTopics.add(TopicName.get(p5).getPartition(i).toString());
}
Assert.assertEquals(allTopics.stream().filter(t -> !topics.contains(t)).count(), 0);
Assert.assertTrue(exTopics.contains("persistent://" + exNs + "/"));
// partition num = p1 + p5 + np
Assert.assertEquals(topics.size(), 1 + 5 + 1);
Assert.assertEquals(exTopics.size(), 1 + 1);

admin.namespaces().deleteNamespace(ns, true);
Arrays.asList(p1, p5, np).forEach(t -> {
try {
admin.schemas().getSchemaInfo(t);
} catch (PulsarAdminException e) {
// all the normal topics' schemas have been deleted
Assert.assertEquals(e.getStatusCode(), 404);
}
});

try {
admin.namespaces().deleteNamespace(exNs, true);
fail("Should fail due to invalid topic");
} catch (Exception e) {
//ok
}
}

@Test
public void testUpdateClusterWithProxyUrl() throws Exception {
ClusterData cluster = ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
Expand Down Expand Up @@ -1721,11 +1668,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
for (int i = 0; i < 5; ++i) {
admin.topics().createPartitionedTopic(topic + i, 1);
}
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__transaction_buffer_snapshot", 2);
admin.topics().createPartitionedTopic(
"persistent://testTenant/ns1/__transaction_buffer_snapshot-multiTopicsReader"
+ "-05c0ded5e9__transaction_pending_ack", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6);


// check first create system topics, then normal topic, unlimited even setMaxTopicsPerNamespace
Expand All @@ -1735,11 +1678,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__transaction_buffer_snapshot", 2);
admin.topics().createPartitionedTopic(
"persistent://testTenant/ns1/__transaction_buffer_snapshot-multiTopicsReader"
+ "-05c0ded5e9__transaction_pending_ack", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6);
for (int i = 0; i < 5; ++i) {
admin.topics().createPartitionedTopic(topic + i, 1);
}
Expand Down Expand Up @@ -1910,16 +1849,16 @@ public void testMaxSubPerTopicApi() throws Exception {

@Test(timeOut = 30000)
public void testMaxSubPerTopic() throws Exception {
pulsar.getConfiguration().setMaxSubscriptionsPerTopic(0);
final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic";
pulsarClient.newProducer().topic(topic).create().close();
final int maxSub = 2;
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSub);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
field.setAccessible(true);
Awaitility.await().until(() -> (int) field.get(persistentTopic) == maxSub);
Awaitility.await().until(() ->
persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == maxSub);

List<Consumer<?>> consumerList = new ArrayList<>(maxSub);
for (int i = 0; i < maxSub; i++) {
Expand All @@ -1936,7 +1875,8 @@ public void testMaxSubPerTopic() throws Exception {
}
//After removing the restriction, it should be able to create normally
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
Awaitility.await().until(() -> field.get(persistentTopic) == null);
Awaitility.await().until(() ->
persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == 0);
Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
.subscribe();
consumerList.add(consumer);
Expand Down Expand Up @@ -1981,16 +1921,16 @@ public void testMaxSubPerTopicPriority() throws Exception {
final int nsLevelMaxSub = 4;
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, nsLevelMaxSub);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
field.setAccessible(true);
Awaitility.await().until(() -> (int) field.get(persistentTopic) == nsLevelMaxSub);
Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies()
.getMaxSubscriptionsPerTopic().get() == nsLevelMaxSub);
Consumer<?> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
.subscribe();
consumerList.add(consumer);
assertEquals(consumerList.size(), 3);
//After removing the restriction, it should fail again
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
Awaitility.await().until(() -> field.get(persistentTopic) == null);
Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies()
.getMaxSubscriptionsPerTopic().get() == brokerLevelMaxSub);
try {
client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
fail("should fail");
Expand Down
Expand Up @@ -73,7 +73,7 @@
import org.testng.annotations.Test;

@Test(groups = "broker-admin")
public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest {
public class V1_AdminApi2Test extends MockedPulsarServiceBaseTest {

private MockedPulsarService mockPulsarSetup;

Expand Down