From b5ee9dba9a22a84756d401d52d723b18fcf42cd4 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Sat, 5 Jun 2021 00:22:46 +0900 Subject: [PATCH] Test that replicated subscription is actually working --- .../service/ReplicatorSubscriptionTest.java | 151 +++++++++++++++++- 1 file changed, 149 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 7ea251a71096d..68428991b495b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -238,6 +238,9 @@ public void testReplicatedSubscriptionRestApi1() throws Exception { final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription"); final String topicName = "persistent://" + namespace + "/topic-rest-api1"; final String subName = "sub"; + // Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054 + // TODO: duplications shouldn't be allowed, change to "false" when fixing the issue + final boolean allowDuplicates = true; admin1.namespaces().createNamespace(namespace); admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); @@ -273,15 +276,71 @@ public void testReplicatedSubscriptionRestApi1() throws Exception { // Unload topic in r1 admin1.topics().unload(topicName); + Thread.sleep(1000); stats = admin1.topics().getStats(topicName); assertFalse(stats.subscriptions.get(subName).isReplicated); + // Make sure the replicated subscription is actually disabled + final int numMessages = 20; + final Set sentMessages = new LinkedHashSet<>(); + final Set receivedMessages = new LinkedHashSet<>(); + + Producer producer = client1.newProducer().topic(topicName).enableBatching(false).create(); + sentMessages.clear(); + publishMessages(producer, 0, numMessages, sentMessages); + producer.close(); + + Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + receivedMessages.clear(); + readMessages(consumer1, receivedMessages, numMessages, false); + assertEquals(receivedMessages, sentMessages); + consumer1.close(); + + Consumer consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + receivedMessages.clear(); + readMessages(consumer2, receivedMessages, numMessages, false); + assertEquals(receivedMessages, sentMessages); + consumer2.close(); + // Enable replicated subscription in r1 admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); stats = admin1.topics().getStats(topicName); assertTrue(stats.subscriptions.get(subName).isReplicated); stats = admin2.topics().getStats(topicName); assertFalse(stats.subscriptions.get(subName).isReplicated); + + // Enable replicated subscription in r2 + admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + stats = admin2.topics().getStats(topicName); + assertTrue(stats.subscriptions.get(subName).isReplicated); + + // Make sure the replicated subscription is actually enabled + sentMessages.clear(); + receivedMessages.clear(); + + producer = client1.newProducer().topic(topicName).enableBatching(false).create(); + publishMessages(producer, 0, numMessages / 2, sentMessages); + producer.close(); + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + final int numReceivedMessages1 = readMessages(consumer1, receivedMessages, numMessages / 2, allowDuplicates); + consumer1.close(); + + producer = client1.newProducer().topic(topicName).enableBatching(false).create(); + publishMessages(producer, numMessages / 2, numMessages / 2, sentMessages); + producer.close(); + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + final int numReceivedMessages2 = readMessages(consumer2, receivedMessages, -1, allowDuplicates); + consumer2.close(); + + assertEquals(receivedMessages, sentMessages); + assertTrue(numReceivedMessages1 < numMessages, + String.format("numReceivedMessages1 (%d) should be less than %d", numReceivedMessages1, numMessages)); + assertTrue(numReceivedMessages2 < numMessages, + String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, numMessages)); } @Test(timeOut = 30000) @@ -289,9 +348,12 @@ public void testReplicatedSubscriptionRestApi2() throws Exception { final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription"); final String topicName = "persistent://" + namespace + "/topic-rest-api2"; final String subName = "sub"; + // Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054 + // TODO: duplications shouldn't be allowed, change to "false" when fixing the issue + final boolean allowDuplicates = true; admin1.namespaces().createNamespace(namespace); - admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1")); + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); admin1.topics().createPartitionedTopic(topicName, 2); @Cleanup @@ -301,6 +363,13 @@ public void testReplicatedSubscriptionRestApi2() throws Exception { // Create subscription in r1 createReplicatedSubscription(client1, topicName, subName, true); + @Cleanup + final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString()) + .statsInterval(0, TimeUnit.SECONDS).build(); + + // Create subscription in r2 + createReplicatedSubscription(client2, topicName, subName, true); + PartitionedTopicStats partitionedStats = admin1.topics().getPartitionedStats(topicName, true); for (TopicStats stats : partitionedStats.partitions.values()) { assertTrue(stats.subscriptions.get(subName).isReplicated); @@ -313,15 +382,91 @@ public void testReplicatedSubscriptionRestApi2() throws Exception { assertFalse(stats.subscriptions.get(subName).isReplicated); } + // Disable replicated subscription in r2 + admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false); + partitionedStats = admin2.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.partitions.values()) { + assertFalse(stats.subscriptions.get(subName).isReplicated); + } + + // Make sure the replicated subscription is actually disabled + final int numMessages = 20; + final Set sentMessages = new LinkedHashSet<>(); + final Set receivedMessages = new LinkedHashSet<>(); + + Producer producer = client1.newProducer().topic(topicName).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + sentMessages.clear(); + publishMessages(producer, 0, numMessages, sentMessages); + producer.close(); + + Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + receivedMessages.clear(); + readMessages(consumer1, receivedMessages, numMessages, false); + assertEquals(receivedMessages, sentMessages); + consumer1.close(); + + Consumer consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + receivedMessages.clear(); + readMessages(consumer2, receivedMessages, numMessages, false); + assertEquals(receivedMessages, sentMessages); + consumer2.close(); + // Enable replicated subscription in r1 admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true); partitionedStats = admin1.topics().getPartitionedStats(topicName, true); for (TopicStats stats : partitionedStats.partitions.values()) { assertTrue(stats.subscriptions.get(subName).isReplicated); } + + // Enable replicated subscription in r2 + admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, true); + partitionedStats = admin2.topics().getPartitionedStats(topicName, true); + for (TopicStats stats : partitionedStats.partitions.values()) { + assertTrue(stats.subscriptions.get(subName).isReplicated); + } + + // Make sure the replicated subscription is actually enabled + sentMessages.clear(); + receivedMessages.clear(); + + producer = client1.newProducer().topic(topicName).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + publishMessages(producer, 0, numMessages / 2, sentMessages); + producer.close(); + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + final int numReceivedMessages1 = readMessages(consumer1, receivedMessages, numMessages / 2, allowDuplicates); + consumer1.close(); + + producer = client1.newProducer().topic(topicName).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); + publishMessages(producer, numMessages / 2, numMessages / 2, sentMessages); + producer.close(); + Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis()); + + consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); + final int numReceivedMessages2 = readMessages(consumer2, receivedMessages, -1, allowDuplicates); + consumer2.close(); + + assertEquals(receivedMessages, sentMessages); + assertTrue(numReceivedMessages1 < numMessages, + String.format("numReceivedMessages1 (%d) should be less than %d", numReceivedMessages1, numMessages)); + assertTrue(numReceivedMessages2 < numMessages, + String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, numMessages)); + } + + void publishMessages(Producer producer, int startIndex, int numMessages, Set sentMessages) + throws PulsarClientException { + for (int i = startIndex; i < startIndex + numMessages; i++) { + final String msg = "msg" + i; + producer.send(msg.getBytes(StandardCharsets.UTF_8)); + sentMessages.add(msg); + } } - void readMessages(Consumer consumer, Set messages, int maxMessages, boolean allowDuplicates) + int readMessages(Consumer consumer, Set messages, int maxMessages, boolean allowDuplicates) throws PulsarClientException { int count = 0; while (count < maxMessages || maxMessages == -1) { @@ -333,10 +478,12 @@ void readMessages(Consumer consumer, Set messages, int maxMessag assertFalse(messages.contains(body), "Duplicate message '" + body + "' detected."); } messages.add(body); + consumer.acknowledge(message); } else { break; } } + return count; } void createReplicatedSubscription(PulsarClient pulsarClient, String topicName, String subscriptionName,