From f72fa14a54021197f1b0b69437e1b2e54eea8cb0 Mon Sep 17 00:00:00 2001 From: Yuri Mizushima Date: Thu, 20 Jan 2022 15:42:29 +0900 Subject: [PATCH 1/3] Rename test file name from `*Test2.java` to `*Test.java` to run all tests correctly (#13644) * test: rename test class * fix: fix to follow existing test cases * test: fix to follow existing production codes --- pom.xml | 2 +- .../broker/admin/impl/NamespacesBase.java | 34 +++++-- ...{AdminApiTest2.java => AdminApi2Test.java} | 90 ++++--------------- ...minApiTest2.java => V1_AdminApi2Test.java} | 2 +- 4 files changed, 45 insertions(+), 83 deletions(-) rename pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/{AdminApiTest2.java => AdminApi2Test.java} (96%) rename pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/{V1_AdminApiTest2.java => V1_AdminApi2Test.java} (99%) diff --git a/pom.xml b/pom.xml index 64be13ca11d7e..7b3c42d2c64e1 100644 --- a/pom.xml +++ b/pom.xml @@ -80,7 +80,7 @@ flexible messaging model and an intuitive client API. 8 8 - + **/Test*.java,**/*Test.java,**/*Tests.java,**/*TestCase.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 56544a56106f2..a91499ed81b7e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -465,7 +465,8 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo } // remove from owned namespace map and ephemeral node from ZK - final List> futures = Lists.newArrayList(); + final List> topicFutures = Lists.newArrayList(); + final List> bundleFutures = Lists.newArrayList(); try { // firstly remove all topics including system topics if (!topics.isEmpty()) { @@ -479,12 +480,12 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo String partitionedTopic = topicName.getPartitionedTopicName(); if (!partitionedTopics.contains(partitionedTopic)) { // Distinguish partitioned topic to avoid duplicate deletion of the same schema - futures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync( + topicFutures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync( partitionedTopic, true, true)); partitionedTopics.add(partitionedTopic); } } else { - futures.add(pulsar().getAdminClient().topics().deleteAsync( + topicFutures.add(pulsar().getAdminClient().topics().deleteAsync( topic, true, true)); nonPartitionedTopics.add(topic); } @@ -505,14 +506,35 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo + "and non-partitioned-topics:{} in namespace:{}.", partitionedTopics, nonPartitionedTopics, namespaceName); } + + final CompletableFuture topicFutureEx = + FutureUtil.waitForAll(topicFutures).handle((result, exception) -> { + if (exception != null) { + if (exception.getCause() instanceof PulsarAdminException) { + asyncResponse + .resume(new RestException((PulsarAdminException) exception.getCause())); + } else { + log.error("[{}] Failed to remove forcefully owned namespace {}", + clientAppId(), namespaceName, exception); + asyncResponse.resume(new RestException(exception.getCause())); + } + return exception; + } + + return null; + }); + if (topicFutureEx.join() != null) { + return; + } } + // forcefully delete namespace bundles NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() .getBundles(namespaceName); for (NamespaceBundle bundle : bundles.getBundles()) { // check if the bundle is owned by any broker, if not then we do not need to delete the bundle if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) { - futures.add(pulsar().getAdminClient().namespaces() + bundleFutures.add(pulsar().getAdminClient().namespaces() .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange(), true)); } } @@ -522,7 +544,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo return; } - FutureUtil.waitForAll(futures).thenCompose(__ -> internalClearZkSources()).handle((result, exception) -> { + FutureUtil.waitForAll(bundleFutures).thenCompose(__ -> internalClearZkSources()).handle((result, exception) -> { if (exception != null) { Throwable cause = FutureUtil.unwrapCompletionException(exception); if (cause instanceof PulsarAdminException.ConflictException) { @@ -1910,7 +1932,7 @@ protected List internalGetAntiAffinityNamespaces(String cluster, String return namespaces.stream().filter(ns -> { Optional policies; try { - policies = getLocalPolicies().getLocalPolicies(namespaceName); + policies = getLocalPolicies().getLocalPolicies(NamespaceName.get(ns)); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java similarity index 96% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index baafbc1ba4253..202cbe87e1ff2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -36,7 +36,6 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -107,7 +106,7 @@ @Slf4j @Test(groups = "broker") -public class AdminApiTest2 extends MockedPulsarServiceBaseTest { +public class AdminApi2Test extends MockedPulsarServiceBaseTest { private MockedPulsarService mockPulsarSetup; @@ -1027,9 +1026,11 @@ public void brokerNamespaceIsolationPoliciesUpdateOnTime() throws Exception { parameters1.put("min_limit", "1"); parameters1.put("usage_threshold", "100"); + final List primaryList = new ArrayList<>(); + primaryList.add(brokerName + ".*"); NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder() .namespaces(Collections.singletonList(ns1Name)) - .primary(Collections.singletonList(brokerName + ".*")) + .primary(primaryList) .autoFailoverPolicy(AutoFailoverPolicyData.builder() .policyType(AutoFailoverPolicyType.min_available) .parameters(parameters1) @@ -1615,60 +1616,6 @@ public void testForceDeleteNamespace() throws Exception { } } - @Test - public void testDistinguishTopicTypeWhenForceDeleteNamespace() throws Exception { - conf.setForceDeleteNamespaceAllowed(true); - final String ns = "prop-xyz/distinguish-topic-type-ns"; - final String exNs = "prop-xyz/ex-distinguish-topic-type-ns"; - admin.namespaces().createNamespace(ns, 2); - admin.namespaces().createNamespace(exNs, 2); - - final String p1 = "persistent://" + ns + "/p1"; - final String p5 = "persistent://" + ns + "/p5"; - final String np = "persistent://" + ns + "/np"; - - admin.topics().createPartitionedTopic(p1, 1); - admin.topics().createPartitionedTopic(p5, 5); - admin.topics().createNonPartitionedTopic(np); - - final String exNp = "persistent://" + exNs + "/np"; - admin.topics().createNonPartitionedTopic(exNp); - // insert an invalid topic name - pulsar.getLocalMetadataStore().put( - "/managed-ledgers/" + exNs + "/persistent/", "".getBytes(), Optional.empty()).join(); - - List topics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(ns)).get(); - List exTopics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(exNs)).get(); - - // ensure that the topic list contains all the topics - List allTopics = new ArrayList<>(Arrays.asList(np, TopicName.get(p1).getPartition(0).toString())); - for (int i = 0; i < 5; i++) { - allTopics.add(TopicName.get(p5).getPartition(i).toString()); - } - Assert.assertEquals(allTopics.stream().filter(t -> !topics.contains(t)).count(), 0); - Assert.assertTrue(exTopics.contains("persistent://" + exNs + "/")); - // partition num = p1 + p5 + np - Assert.assertEquals(topics.size(), 1 + 5 + 1); - Assert.assertEquals(exTopics.size(), 1 + 1); - - admin.namespaces().deleteNamespace(ns, true); - Arrays.asList(p1, p5, np).forEach(t -> { - try { - admin.schemas().getSchemaInfo(t); - } catch (PulsarAdminException e) { - // all the normal topics' schemas have been deleted - Assert.assertEquals(e.getStatusCode(), 404); - } - }); - - try { - admin.namespaces().deleteNamespace(exNs, true); - fail("Should fail due to invalid topic"); - } catch (Exception e) { - //ok - } - } - @Test public void testUpdateClusterWithProxyUrl() throws Exception { ClusterData cluster = ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build(); @@ -1760,11 +1707,7 @@ public void testMaxTopicsPerNamespace() throws Exception { for (int i = 0; i < 5; ++i) { admin.topics().createPartitionedTopic(topic + i, 1); } - admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 2); - admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__transaction_buffer_snapshot", 2); - admin.topics().createPartitionedTopic( - "persistent://testTenant/ns1/__transaction_buffer_snapshot-multiTopicsReader" - + "-05c0ded5e9__transaction_pending_ack", 2); + admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6); // check first create system topics, then normal topic, unlimited even setMaxTopicsPerNamespace @@ -1774,11 +1717,7 @@ public void testMaxTopicsPerNamespace() throws Exception { admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); admin.tenants().createTenant("testTenant", tenantInfo); admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test")); - admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 2); - admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__transaction_buffer_snapshot", 2); - admin.topics().createPartitionedTopic( - "persistent://testTenant/ns1/__transaction_buffer_snapshot-multiTopicsReader" - + "-05c0ded5e9__transaction_pending_ack", 2); + admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6); for (int i = 0; i < 5; ++i) { admin.topics().createPartitionedTopic(topic + i, 1); } @@ -1949,6 +1888,7 @@ public void testMaxSubPerTopicApi() throws Exception { @Test(timeOut = 30000) public void testMaxSubPerTopic() throws Exception { + pulsar.getConfiguration().setMaxSubscriptionsPerTopic(0); final String myNamespace = "prop-xyz/ns" + UUID.randomUUID(); admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic"; @@ -1956,9 +1896,8 @@ public void testMaxSubPerTopic() throws Exception { final int maxSub = 2; admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSub); PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); - Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic"); - field.setAccessible(true); - Awaitility.await().until(() -> (int) field.get(persistentTopic) == maxSub); + Awaitility.await().until(() -> + persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == maxSub); List> consumerList = new ArrayList<>(maxSub); for (int i = 0; i < maxSub; i++) { @@ -1975,7 +1914,8 @@ public void testMaxSubPerTopic() throws Exception { } //After removing the restriction, it should be able to create normally admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace); - Awaitility.await().until(() -> field.get(persistentTopic) == null); + Awaitility.await().until(() -> + persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == 0); Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()) .subscribe(); consumerList.add(consumer); @@ -2020,16 +1960,16 @@ public void testMaxSubPerTopicPriority() throws Exception { final int nsLevelMaxSub = 4; admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, nsLevelMaxSub); PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); - Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic"); - field.setAccessible(true); - Awaitility.await().until(() -> (int) field.get(persistentTopic) == nsLevelMaxSub); + Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies() + .getMaxSubscriptionsPerTopic().get() == nsLevelMaxSub); Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()) .subscribe(); consumerList.add(consumer); assertEquals(consumerList.size(), 3); //After removing the restriction, it should fail again admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace); - Awaitility.await().until(() -> field.get(persistentTopic) == null); + Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies() + .getMaxSubscriptionsPerTopic().get() == brokerLevelMaxSub); try { client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe(); fail("should fail"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java similarity index 99% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java index 4f9c8603a9e9a..2eb37c222d614 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java @@ -73,7 +73,7 @@ import org.testng.annotations.Test; @Test(groups = "broker") -public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest { +public class V1_AdminApi2Test extends MockedPulsarServiceBaseTest { private MockedPulsarService mockPulsarSetup; From 1a62d462cb46542eef2defcfd0cfa7e147e92f12 Mon Sep 17 00:00:00 2001 From: equanz Date: Wed, 10 Aug 2022 23:54:13 +0900 Subject: [PATCH 2/3] test: revert some fixes about #12687 --- .../pulsar/broker/admin/AdminApi2Test.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 202cbe87e1ff2..89752ccbaf3a3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1896,8 +1896,9 @@ public void testMaxSubPerTopic() throws Exception { final int maxSub = 2; admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSub); PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); - Awaitility.await().until(() -> - persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == maxSub); + Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic"); + field.setAccessible(true); + Awaitility.await().until(() -> (int) field.get(persistentTopic) == maxSub); List> consumerList = new ArrayList<>(maxSub); for (int i = 0; i < maxSub; i++) { @@ -1914,8 +1915,7 @@ public void testMaxSubPerTopic() throws Exception { } //After removing the restriction, it should be able to create normally admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace); - Awaitility.await().until(() -> - persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == 0); + Awaitility.await().until(() -> field.get(persistentTopic) == null); Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()) .subscribe(); consumerList.add(consumer); @@ -1960,16 +1960,16 @@ public void testMaxSubPerTopicPriority() throws Exception { final int nsLevelMaxSub = 4; admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, nsLevelMaxSub); PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get(); - Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies() - .getMaxSubscriptionsPerTopic().get() == nsLevelMaxSub); + Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic"); + field.setAccessible(true); + Awaitility.await().until(() -> (int) field.get(persistentTopic) == nsLevelMaxSub); Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()) .subscribe(); consumerList.add(consumer); assertEquals(consumerList.size(), 3); //After removing the restriction, it should fail again admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace); - Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies() - .getMaxSubscriptionsPerTopic().get() == brokerLevelMaxSub); + Awaitility.await().until(() -> field.get(persistentTopic) == null); try { client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe(); fail("should fail"); From aa449cef73a81866c73961da5bd9118626e10073 Mon Sep 17 00:00:00 2001 From: equanz Date: Thu, 11 Aug 2022 00:29:34 +0900 Subject: [PATCH 3/3] test: cherry picked from commit ecd275d --- .../org/apache/pulsar/broker/admin/AdminApi2Test.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 89752ccbaf3a3..e0fac522f27db 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -1253,7 +1253,7 @@ public void testPreciseBacklog() throws PulsarClientException, PulsarAdminExcept assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10); topicStats = admin.topics().getStats(topic, true, true); - assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 43); + assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40); assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1); consumer.acknowledge(message); @@ -1501,7 +1501,7 @@ public void testPreciseBacklogForPartitionedTopic() throws PulsarClientException topicStats = admin.topics().getPartitionedStats(topic, false, true, true); assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 1); - assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 43); + assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 40); } @Test(timeOut = 30000) @@ -1540,7 +1540,7 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true, true); assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10); - assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 470); + assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 440); assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 5); for (int i = 0; i < 5; i++) { @@ -1550,7 +1550,7 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti Awaitility.await().untilAsserted(() -> { TopicStats topicStats2 = admin.topics().getPartitionedStats(topic, false, true, true); assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5); - assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 238); + assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 223); assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0); });