diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java index eed849ef1a01e4..48ec1c02b645e8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorGlobalNSTest.java @@ -18,18 +18,24 @@ */ package org.apache.pulsar.broker.service; +import static org.testng.Assert.fail; import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; import lombok.Cleanup; -import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.ProducerImpl; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -41,6 +47,11 @@ import java.lang.reflect.Method; import java.util.concurrent.TimeUnit; +/** + * The tests in this class should be denied in a production pulsar cluster. they are very dangerous, which leads to + * a lot of topic deletion namespace policies not correct. + */ +@Slf4j @Test(groups = "broker-impl") public class ReplicatorGlobalNSTest extends ReplicatorTestBase { @@ -81,7 +92,7 @@ public void cleanup() throws Exception { * * @throws Exception */ - @Test + @Test(priority = Integer.MAX_VALUE) public void testRemoveLocalClusterOnGlobalNamespace() throws Exception { log.info("--- Starting ReplicatorTest::testRemoveLocalClusterOnGlobalNamespace ---"); @@ -115,32 +126,88 @@ public void testRemoveLocalClusterOnGlobalNamespace() throws Exception { }); } - @Test - public void testForcefullyTopicDeletion() throws Exception { - log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---"); - - final String namespace = "pulsar/removeClusterTest"; - admin1.namespaces().createNamespace(namespace); - admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1")); - - final String topicName = "persistent://" + namespace + "/topic"; - - @Cleanup - PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) - .build(); - - ProducerImpl producer1 = (ProducerImpl) client1.newProducer().topic(topicName) - .enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create(); - producer1.close(); - - admin1.topics().delete(topicName, true); - - MockedPulsarServiceBaseTest - .retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150); - - Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName)); + /** + * This is not a formal operation and can cause serious problems if call it in a production environment. + */ + @Test(priority = Integer.MAX_VALUE - 1) + public void testConfigChange() throws Exception { + log.info("--- Starting ReplicatorTest::testConfigChange ---"); + // This test is to verify that the config change on global namespace is successfully applied in broker during + // runtime. + // Run a set of producer tasks to create the topics + List> results = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i)); + + results.add(executor.submit(new Callable() { + @Override + public Void call() throws Exception { + + @Cleanup + MessageProducer producer = new MessageProducer(url1, dest); + log.info("--- Starting producer --- " + url1); + + @Cleanup + MessageConsumer consumer = new MessageConsumer(url1, dest); + log.info("--- Starting Consumer --- " + url1); + + producer.produce(2); + consumer.receive(2); + return null; + } + })); + } + + for (Future result : results) { + try { + result.get(); + } catch (Exception e) { + log.error("exception in getting future result ", e); + fail(String.format("replication test failed with %s exception", e.getMessage())); + } + } + + Thread.sleep(1000L); + // Make sure that the internal replicators map contains remote cluster info + ConcurrentOpenHashMap replicationClients1 = ns1.getReplicationClients(); + ConcurrentOpenHashMap replicationClients2 = ns2.getReplicationClients(); + ConcurrentOpenHashMap replicationClients3 = ns3.getReplicationClients(); + + Assert.assertNotNull(replicationClients1.get("r2")); + Assert.assertNotNull(replicationClients1.get("r3")); + Assert.assertNotNull(replicationClients2.get("r1")); + Assert.assertNotNull(replicationClients2.get("r3")); + Assert.assertNotNull(replicationClients3.get("r1")); + Assert.assertNotNull(replicationClients3.get("r2")); + + // Case 1: Update the global namespace replication configuration to only contains the local cluster itself + admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1")); + + // Wait for config changes to be updated. + Thread.sleep(1000L); + + // Make sure that the internal replicators map still contains remote cluster info + Assert.assertNotNull(replicationClients1.get("r2")); + Assert.assertNotNull(replicationClients1.get("r3")); + Assert.assertNotNull(replicationClients2.get("r1")); + Assert.assertNotNull(replicationClients2.get("r3")); + Assert.assertNotNull(replicationClients3.get("r1")); + Assert.assertNotNull(replicationClients3.get("r2")); + + // Case 2: Update the configuration back + admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3")); + + // Wait for config changes to be updated. + Thread.sleep(1000L); + + // Make sure that the internal replicators map still contains remote cluster info + Assert.assertNotNull(replicationClients1.get("r2")); + Assert.assertNotNull(replicationClients1.get("r3")); + Assert.assertNotNull(replicationClients2.get("r1")); + Assert.assertNotNull(replicationClients2.get("r3")); + Assert.assertNotNull(replicationClients3.get("r1")); + Assert.assertNotNull(replicationClients3.get("r2")); + + // Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters } - - private static final Logger log = LoggerFactory.getLogger(ReplicatorGlobalNSTest.class); - } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index fa12eba1c66115..765727aeac3190 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -44,13 +44,11 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -68,6 +66,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -154,88 +153,6 @@ public Object[][] partitionedTopicProvider() { return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; } - @Test(priority = Integer.MAX_VALUE) - public void testConfigChange() throws Exception { - log.info("--- Starting ReplicatorTest::testConfigChange ---"); - // This test is to verify that the config change on global namespace is successfully applied in broker during - // runtime. - // Run a set of producer tasks to create the topics - List> results = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - final TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i)); - - results.add(executor.submit(new Callable() { - @Override - public Void call() throws Exception { - - @Cleanup - MessageProducer producer = new MessageProducer(url1, dest); - log.info("--- Starting producer --- " + url1); - - @Cleanup - MessageConsumer consumer = new MessageConsumer(url1, dest); - log.info("--- Starting Consumer --- " + url1); - - producer.produce(2); - consumer.receive(2); - return null; - } - })); - } - - for (Future result : results) { - try { - result.get(); - } catch (Exception e) { - log.error("exception in getting future result ", e); - fail(String.format("replication test failed with %s exception", e.getMessage())); - } - } - - Thread.sleep(1000L); - // Make sure that the internal replicators map contains remote cluster info - ConcurrentOpenHashMap replicationClients1 = ns1.getReplicationClients(); - ConcurrentOpenHashMap replicationClients2 = ns2.getReplicationClients(); - ConcurrentOpenHashMap replicationClients3 = ns3.getReplicationClients(); - - Assert.assertNotNull(replicationClients1.get("r2")); - Assert.assertNotNull(replicationClients1.get("r3")); - Assert.assertNotNull(replicationClients2.get("r1")); - Assert.assertNotNull(replicationClients2.get("r3")); - Assert.assertNotNull(replicationClients3.get("r1")); - Assert.assertNotNull(replicationClients3.get("r2")); - - // Case 1: Update the global namespace replication configuration to only contains the local cluster itself - admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1")); - - // Wait for config changes to be updated. - Thread.sleep(1000L); - - // Make sure that the internal replicators map still contains remote cluster info - Assert.assertNotNull(replicationClients1.get("r2")); - Assert.assertNotNull(replicationClients1.get("r3")); - Assert.assertNotNull(replicationClients2.get("r1")); - Assert.assertNotNull(replicationClients2.get("r3")); - Assert.assertNotNull(replicationClients3.get("r1")); - Assert.assertNotNull(replicationClients3.get("r2")); - - // Case 2: Update the configuration back - admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3")); - - // Wait for config changes to be updated. - Thread.sleep(1000L); - - // Make sure that the internal replicators map still contains remote cluster info - Assert.assertNotNull(replicationClients1.get("r2")); - Assert.assertNotNull(replicationClients1.get("r3")); - Assert.assertNotNull(replicationClients2.get("r1")); - Assert.assertNotNull(replicationClients2.get("r3")); - Assert.assertNotNull(replicationClients3.get("r1")); - Assert.assertNotNull(replicationClients3.get("r2")); - - // Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters - } - @Test(timeOut = 10000) public void activeBrokerParse() throws Exception { pulsar1.getConfiguration().setAuthorizationEnabled(true); @@ -253,6 +170,32 @@ public void activeBrokerParse() throws Exception { pulsar1.getConfiguration().setAuthorizationEnabled(false); } + @Test + public void testForcefullyTopicDeletion() throws Exception { + log.info("--- Starting ReplicatorTest::testForcefullyTopicDeletion ---"); + + final String namespace = BrokerTestUtil.newUniqueName("pulsar/removeClusterTest"); + admin1.namespaces().createNamespace(namespace); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1")); + + final String topicName = "persistent://" + namespace + "/topic"; + + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + ProducerImpl producer1 = (ProducerImpl) client1.newProducer().topic(topicName) + .enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + producer1.close(); + + admin1.topics().delete(topicName, true); + + MockedPulsarServiceBaseTest + .retryStrategically((test) -> !pulsar1.getBrokerService().getTopics().containsKey(topicName), 50, 150); + + Assert.assertFalse(pulsar1.getBrokerService().getTopics().containsKey(topicName)); + } + @SuppressWarnings("unchecked") @Test(timeOut = 30000) public void testConcurrentReplicator() throws Exception { @@ -1270,7 +1213,7 @@ public void testReplicatedCluster() throws Exception { log.info("--- Starting ReplicatorTest::testReplicatedCluster ---"); - final String namespace = "pulsar/global/repl"; + final String namespace = BrokerTestUtil.newUniqueName("pulsar/global/repl"); final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic1"); admin1.namespaces().createNamespace(namespace); admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); @@ -1677,7 +1620,7 @@ public void testReplicatorWithFailedAck() throws Exception { log.info("--- Starting ReplicatorTest::testReplication ---"); - String namespace = "pulsar/global/ns2"; + String namespace = BrokerTestUtil.newUniqueName("pulsar/global/ns"); admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1")); final TopicName dest = TopicName .get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/ackFailedTopic")); @@ -1749,7 +1692,7 @@ public void testReplicatorWithFailedAck() throws Exception { @Test public void testWhenUpdateReplicationCluster() throws Exception { log.info("--- testWhenUpdateReplicationCluster ---"); - String namespace = "pulsar/ns2"; + String namespace = BrokerTestUtil.newUniqueName("pulsar/ns");; admin1.namespaces().createNamespace(namespace); admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); final TopicName dest = TopicName.get( @@ -1778,12 +1721,12 @@ public void testWhenUpdateReplicationCluster() throws Exception { @Test public void testReplicatorProducerNotExceed() throws Exception { log.info("--- testReplicatorProducerNotExceed ---"); - String namespace1 = "pulsar/ns11"; + String namespace1 = BrokerTestUtil.newUniqueName("pulsar/ns1"); admin1.namespaces().createNamespace(namespace1); admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1", "r2")); final TopicName dest1 = TopicName.get( BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/testReplicatorProducerNotExceed1")); - String namespace2 = "pulsar/ns22"; + String namespace2 = BrokerTestUtil.newUniqueName("pulsar/ns2"); admin2.namespaces().createNamespace(namespace2); admin2.namespaces().setNamespaceReplicationClusters(namespace2, Sets.newHashSet("r1", "r2")); final TopicName dest2 = TopicName.get(