Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-2.9] Rename test file name from *Test2.java to *Test.java to run all tests correctly #17048

Merged
merged 3 commits into from Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -80,7 +80,7 @@ flexible messaging model and an intuitive client API.</description>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>

<!--config keys to congiure test selection -->
<!--config keys to configure test selection -->
<include>**/Test*.java,**/*Test.java,**/*Tests.java,**/*TestCase.java</include>
<exclude/>
<groups/>
Expand Down
Expand Up @@ -465,7 +465,8 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
}

// remove from owned namespace map and ephemeral node from ZK
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
final List<CompletableFuture<Void>> topicFutures = Lists.newArrayList();
final List<CompletableFuture<Void>> bundleFutures = Lists.newArrayList();
try {
// firstly remove all topics including system topics
if (!topics.isEmpty()) {
Expand All @@ -479,12 +480,12 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
String partitionedTopic = topicName.getPartitionedTopicName();
if (!partitionedTopics.contains(partitionedTopic)) {
// Distinguish partitioned topic to avoid duplicate deletion of the same schema
futures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync(
topicFutures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync(
partitionedTopic, true, true));
partitionedTopics.add(partitionedTopic);
}
} else {
futures.add(pulsar().getAdminClient().topics().deleteAsync(
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
topic, true, true));
nonPartitionedTopics.add(topic);
}
Expand All @@ -505,14 +506,35 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
+ "and non-partitioned-topics:{} in namespace:{}.",
partitionedTopics, nonPartitionedTopics, namespaceName);
}

final CompletableFuture<Throwable> topicFutureEx =
FutureUtil.waitForAll(topicFutures).handle((result, exception) -> {
if (exception != null) {
if (exception.getCause() instanceof PulsarAdminException) {
asyncResponse
.resume(new RestException((PulsarAdminException) exception.getCause()));
} else {
log.error("[{}] Failed to remove forcefully owned namespace {}",
clientAppId(), namespaceName, exception);
asyncResponse.resume(new RestException(exception.getCause()));
}
return exception;
}

return null;
});
if (topicFutureEx.join() != null) {
return;
}
}

// forcefully delete namespace bundles
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
for (NamespaceBundle bundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then we do not need to delete the bundle
if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) {
futures.add(pulsar().getAdminClient().namespaces()
bundleFutures.add(pulsar().getAdminClient().namespaces()
.deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange(), true));
}
}
Expand All @@ -522,7 +544,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
return;
}

FutureUtil.waitForAll(futures).thenCompose(__ -> internalClearZkSources()).handle((result, exception) -> {
FutureUtil.waitForAll(bundleFutures).thenCompose(__ -> internalClearZkSources()).handle((result, exception) -> {
if (exception != null) {
Throwable cause = FutureUtil.unwrapCompletionException(exception);
if (cause instanceof PulsarAdminException.ConflictException) {
Expand Down Expand Up @@ -1910,7 +1932,7 @@ protected List<String> internalGetAntiAffinityNamespaces(String cluster, String
return namespaces.stream().filter(ns -> {
Optional<LocalPolicies> policies;
try {
policies = getLocalPolicies().getLocalPolicies(namespaceName);
policies = getLocalPolicies().getLocalPolicies(NamespaceName.get(ns));
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Expand Up @@ -36,7 +36,6 @@
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -107,7 +106,7 @@

@Slf4j
@Test(groups = "broker")
public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
public class AdminApi2Test extends MockedPulsarServiceBaseTest {

private MockedPulsarService mockPulsarSetup;

Expand Down Expand Up @@ -1027,9 +1026,11 @@ public void brokerNamespaceIsolationPoliciesUpdateOnTime() throws Exception {
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");

final List<String> primaryList = new ArrayList<>();
primaryList.add(brokerName + ".*");
NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList(ns1Name))
.primary(Collections.singletonList(brokerName + ".*"))
.primary(primaryList)
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
Expand Down Expand Up @@ -1252,7 +1253,7 @@ public void testPreciseBacklog() throws PulsarClientException, PulsarAdminExcept
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);

topicStats = admin.topics().getStats(topic, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 43);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1);
consumer.acknowledge(message);

Expand Down Expand Up @@ -1500,7 +1501,7 @@ public void testPreciseBacklogForPartitionedTopic() throws PulsarClientException

topicStats = admin.topics().getPartitionedStats(topic, false, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 43);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40);
}

@Test(timeOut = 30000)
Expand Down Expand Up @@ -1539,7 +1540,7 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti

TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 470);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 440);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 5);

for (int i = 0; i < 5; i++) {
Expand All @@ -1549,7 +1550,7 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti
Awaitility.await().untilAsserted(() -> {
TopicStats topicStats2 = admin.topics().getPartitionedStats(topic, false, true, true);
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5);
assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 238);
assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 223);
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0);
});

Expand Down Expand Up @@ -1615,60 +1616,6 @@ public void testForceDeleteNamespace() throws Exception {
}
}

@Test
public void testDistinguishTopicTypeWhenForceDeleteNamespace() throws Exception {
conf.setForceDeleteNamespaceAllowed(true);
final String ns = "prop-xyz/distinguish-topic-type-ns";
final String exNs = "prop-xyz/ex-distinguish-topic-type-ns";
admin.namespaces().createNamespace(ns, 2);
admin.namespaces().createNamespace(exNs, 2);

final String p1 = "persistent://" + ns + "/p1";
final String p5 = "persistent://" + ns + "/p5";
final String np = "persistent://" + ns + "/np";

admin.topics().createPartitionedTopic(p1, 1);
admin.topics().createPartitionedTopic(p5, 5);
admin.topics().createNonPartitionedTopic(np);

final String exNp = "persistent://" + exNs + "/np";
admin.topics().createNonPartitionedTopic(exNp);
// insert an invalid topic name
pulsar.getLocalMetadataStore().put(
"/managed-ledgers/" + exNs + "/persistent/", "".getBytes(), Optional.empty()).join();

List<String> topics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(ns)).get();
List<String> exTopics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(exNs)).get();

// ensure that the topic list contains all the topics
List<String> allTopics = new ArrayList<>(Arrays.asList(np, TopicName.get(p1).getPartition(0).toString()));
for (int i = 0; i < 5; i++) {
allTopics.add(TopicName.get(p5).getPartition(i).toString());
}
Assert.assertEquals(allTopics.stream().filter(t -> !topics.contains(t)).count(), 0);
Assert.assertTrue(exTopics.contains("persistent://" + exNs + "/"));
// partition num = p1 + p5 + np
Assert.assertEquals(topics.size(), 1 + 5 + 1);
Assert.assertEquals(exTopics.size(), 1 + 1);

admin.namespaces().deleteNamespace(ns, true);
Arrays.asList(p1, p5, np).forEach(t -> {
try {
admin.schemas().getSchemaInfo(t);
} catch (PulsarAdminException e) {
// all the normal topics' schemas have been deleted
Assert.assertEquals(e.getStatusCode(), 404);
}
});

try {
admin.namespaces().deleteNamespace(exNs, true);
fail("Should fail due to invalid topic");
} catch (Exception e) {
//ok
}
}

@Test
public void testUpdateClusterWithProxyUrl() throws Exception {
ClusterData cluster = ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
Expand Down Expand Up @@ -1760,11 +1707,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
for (int i = 0; i < 5; ++i) {
admin.topics().createPartitionedTopic(topic + i, 1);
}
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__transaction_buffer_snapshot", 2);
admin.topics().createPartitionedTopic(
"persistent://testTenant/ns1/__transaction_buffer_snapshot-multiTopicsReader"
+ "-05c0ded5e9__transaction_pending_ack", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6);


// check first create system topics, then normal topic, unlimited even setMaxTopicsPerNamespace
Expand All @@ -1774,11 +1717,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__transaction_buffer_snapshot", 2);
admin.topics().createPartitionedTopic(
"persistent://testTenant/ns1/__transaction_buffer_snapshot-multiTopicsReader"
+ "-05c0ded5e9__transaction_pending_ack", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6);
for (int i = 0; i < 5; ++i) {
admin.topics().createPartitionedTopic(topic + i, 1);
}
Expand Down Expand Up @@ -1949,6 +1888,7 @@ public void testMaxSubPerTopicApi() throws Exception {

@Test(timeOut = 30000)
public void testMaxSubPerTopic() throws Exception {
pulsar.getConfiguration().setMaxSubscriptionsPerTopic(0);
final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic";
Expand Down
Expand Up @@ -73,7 +73,7 @@
import org.testng.annotations.Test;

@Test(groups = "broker")
public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest {
public class V1_AdminApi2Test extends MockedPulsarServiceBaseTest {

private MockedPulsarService mockPulsarSetup;

Expand Down