Skip to content

Commit

Permalink
Test that replicated subscription is actually working
Browse files Browse the repository at this point in the history
  • Loading branch information
Masahiro Sakamoto committed Jun 9, 2021
1 parent b878caf commit b5ee9db
Showing 1 changed file with 149 additions and 2 deletions.
Expand Up @@ -238,6 +238,9 @@ public void testReplicatedSubscriptionRestApi1() throws Exception {
final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
final String topicName = "persistent://" + namespace + "/topic-rest-api1";
final String subName = "sub";
// Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054
// TODO: duplications shouldn't be allowed, change to "false" when fixing the issue
final boolean allowDuplicates = true;

admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
Expand Down Expand Up @@ -273,25 +276,84 @@ public void testReplicatedSubscriptionRestApi1() throws Exception {

// Unload topic in r1
admin1.topics().unload(topicName);
Thread.sleep(1000);
stats = admin1.topics().getStats(topicName);
assertFalse(stats.subscriptions.get(subName).isReplicated);

// Make sure the replicated subscription is actually disabled
final int numMessages = 20;
final Set<String> sentMessages = new LinkedHashSet<>();
final Set<String> receivedMessages = new LinkedHashSet<>();

Producer<byte[]> producer = client1.newProducer().topic(topicName).enableBatching(false).create();
sentMessages.clear();
publishMessages(producer, 0, numMessages, sentMessages);
producer.close();

Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
receivedMessages.clear();
readMessages(consumer1, receivedMessages, numMessages, false);
assertEquals(receivedMessages, sentMessages);
consumer1.close();

Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
receivedMessages.clear();
readMessages(consumer2, receivedMessages, numMessages, false);
assertEquals(receivedMessages, sentMessages);
consumer2.close();

// Enable replicated subscription in r1
admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
stats = admin1.topics().getStats(topicName);
assertTrue(stats.subscriptions.get(subName).isReplicated);
stats = admin2.topics().getStats(topicName);
assertFalse(stats.subscriptions.get(subName).isReplicated);

// Enable replicated subscription in r2
admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
stats = admin2.topics().getStats(topicName);
assertTrue(stats.subscriptions.get(subName).isReplicated);

// Make sure the replicated subscription is actually enabled
sentMessages.clear();
receivedMessages.clear();

producer = client1.newProducer().topic(topicName).enableBatching(false).create();
publishMessages(producer, 0, numMessages / 2, sentMessages);
producer.close();
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());

consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
final int numReceivedMessages1 = readMessages(consumer1, receivedMessages, numMessages / 2, allowDuplicates);
consumer1.close();

producer = client1.newProducer().topic(topicName).enableBatching(false).create();
publishMessages(producer, numMessages / 2, numMessages / 2, sentMessages);
producer.close();
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());

consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
final int numReceivedMessages2 = readMessages(consumer2, receivedMessages, -1, allowDuplicates);
consumer2.close();

assertEquals(receivedMessages, sentMessages);
assertTrue(numReceivedMessages1 < numMessages,
String.format("numReceivedMessages1 (%d) should be less than %d", numReceivedMessages1, numMessages));
assertTrue(numReceivedMessages2 < numMessages,
String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, numMessages));
}

@Test(timeOut = 30000)
public void testReplicatedSubscriptionRestApi2() throws Exception {
final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
final String topicName = "persistent://" + namespace + "/topic-rest-api2";
final String subName = "sub";
// Subscription replication produces duplicates, https://github.com/apache/pulsar/issues/10054
// TODO: duplications shouldn't be allowed, change to "false" when fixing the issue
final boolean allowDuplicates = true;

admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1"));
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
admin1.topics().createPartitionedTopic(topicName, 2);

@Cleanup
Expand All @@ -301,6 +363,13 @@ public void testReplicatedSubscriptionRestApi2() throws Exception {
// Create subscription in r1
createReplicatedSubscription(client1, topicName, subName, true);

@Cleanup
final PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
.statsInterval(0, TimeUnit.SECONDS).build();

// Create subscription in r2
createReplicatedSubscription(client2, topicName, subName, true);

PartitionedTopicStats partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
for (TopicStats stats : partitionedStats.partitions.values()) {
assertTrue(stats.subscriptions.get(subName).isReplicated);
Expand All @@ -313,15 +382,91 @@ public void testReplicatedSubscriptionRestApi2() throws Exception {
assertFalse(stats.subscriptions.get(subName).isReplicated);
}

// Disable replicated subscription in r2
admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, false);
partitionedStats = admin2.topics().getPartitionedStats(topicName, true);
for (TopicStats stats : partitionedStats.partitions.values()) {
assertFalse(stats.subscriptions.get(subName).isReplicated);
}

// Make sure the replicated subscription is actually disabled
final int numMessages = 20;
final Set<String> sentMessages = new LinkedHashSet<>();
final Set<String> receivedMessages = new LinkedHashSet<>();

Producer<byte[]> producer = client1.newProducer().topic(topicName).enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
sentMessages.clear();
publishMessages(producer, 0, numMessages, sentMessages);
producer.close();

Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
receivedMessages.clear();
readMessages(consumer1, receivedMessages, numMessages, false);
assertEquals(receivedMessages, sentMessages);
consumer1.close();

Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
receivedMessages.clear();
readMessages(consumer2, receivedMessages, numMessages, false);
assertEquals(receivedMessages, sentMessages);
consumer2.close();

// Enable replicated subscription in r1
admin1.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
partitionedStats = admin1.topics().getPartitionedStats(topicName, true);
for (TopicStats stats : partitionedStats.partitions.values()) {
assertTrue(stats.subscriptions.get(subName).isReplicated);
}

// Enable replicated subscription in r2
admin2.topics().setReplicatedSubscriptionStatus(topicName, subName, true);
partitionedStats = admin2.topics().getPartitionedStats(topicName, true);
for (TopicStats stats : partitionedStats.partitions.values()) {
assertTrue(stats.subscriptions.get(subName).isReplicated);
}

// Make sure the replicated subscription is actually enabled
sentMessages.clear();
receivedMessages.clear();

producer = client1.newProducer().topic(topicName).enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
publishMessages(producer, 0, numMessages / 2, sentMessages);
producer.close();
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());

consumer1 = client1.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
final int numReceivedMessages1 = readMessages(consumer1, receivedMessages, numMessages / 2, allowDuplicates);
consumer1.close();

producer = client1.newProducer().topic(topicName).enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
publishMessages(producer, numMessages / 2, numMessages / 2, sentMessages);
producer.close();
Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());

consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
final int numReceivedMessages2 = readMessages(consumer2, receivedMessages, -1, allowDuplicates);
consumer2.close();

assertEquals(receivedMessages, sentMessages);
assertTrue(numReceivedMessages1 < numMessages,
String.format("numReceivedMessages1 (%d) should be less than %d", numReceivedMessages1, numMessages));
assertTrue(numReceivedMessages2 < numMessages,
String.format("numReceivedMessages2 (%d) should be less than %d", numReceivedMessages2, numMessages));
}

void publishMessages(Producer<byte[]> producer, int startIndex, int numMessages, Set<String> sentMessages)
throws PulsarClientException {
for (int i = startIndex; i < startIndex + numMessages; i++) {
final String msg = "msg" + i;
producer.send(msg.getBytes(StandardCharsets.UTF_8));
sentMessages.add(msg);
}
}

void readMessages(Consumer<byte[]> consumer, Set<String> messages, int maxMessages, boolean allowDuplicates)
int readMessages(Consumer<byte[]> consumer, Set<String> messages, int maxMessages, boolean allowDuplicates)
throws PulsarClientException {
int count = 0;
while (count < maxMessages || maxMessages == -1) {
Expand All @@ -333,10 +478,12 @@ void readMessages(Consumer<byte[]> consumer, Set<String> messages, int maxMessag
assertFalse(messages.contains(body), "Duplicate message '" + body + "' detected.");
}
messages.add(body);
consumer.acknowledge(message);
} else {
break;
}
}
return count;
}

void createReplicatedSubscription(PulsarClient pulsarClient, String topicName, String subscriptionName,
Expand Down

0 comments on commit b5ee9db

Please sign in to comment.