diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index e33108203cc814..b02b8727ac860f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -3286,14 +3286,21 @@ public void testGetTtlDurationDefaultInSeconds() throws Exception { } @Test - public void testGetReadPositionWhenJoining() throws Exception { - final String topic = "persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" + UUID.randomUUID().toString(); + public void testGetLastSentPositionsWhenJoining() throws Exception { + final String topic = "persistent://prop-xyz/ns1/testGetLastSentPositionsWhenJoining-" + UUID.randomUUID().toString(); final String subName = "my-sub"; @Cleanup Producer producer = pulsarClient.newProducer() .topic(topic) .enableBatching(false) .create(); + @Cleanup + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic) + .consumerName("c1") + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionName(subName) + .subscribe(); final int messages = 10; MessageIdImpl messageId = null; @@ -3301,28 +3308,26 @@ public void testGetReadPositionWhenJoining() throws Exception { messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes()); } - List> consumers = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - Consumer consumer = pulsarClient.newConsumer() - .topic(topic) - .subscriptionType(SubscriptionType.Key_Shared) - .subscriptionName(subName) - .subscribe(); - consumers.add(consumer); + for (int i = 0; i < messages; i++) { + assertNotNull(consumer1.receive(100, TimeUnit.MILLISECONDS)); } + @Cleanup + Consumer consumer2 = pulsarClient.newConsumer() + .topic(topic) + .consumerName("c2") + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionName(subName) + .subscribe(); + TopicStats stats = admin.topics().getStats(topic); Assert.assertEquals(stats.getSubscriptions().size(), 1); SubscriptionStats subStats = stats.getSubscriptions().get(subName); Assert.assertNotNull(subStats); Assert.assertEquals(subStats.getConsumers().size(), 2); - ConsumerStats consumerStats = subStats.getConsumers().get(0); - Assert.assertEquals(consumerStats.getReadPositionWhenJoining(), - PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId() + 1).toString()); - - for (Consumer consumer : consumers) { - consumer.close(); - } + ConsumerStats consumer2Stats = subStats.getConsumers().stream().filter(s -> s.getConsumerName().equals(consumer2.getConsumerName())).findFirst().get(); + Assert.assertEquals(consumer2Stats.getLastSentPositionsWhenJoining(), + Set.of(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).toString()); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index 68902c73e57174..17ec58eb21e88d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -154,7 +154,9 @@ void shutdown() throws Exception { executor.shutdownNow(); for (int i = 0; i < BROKER_COUNT; i++) { - pulsarAdmins[i].close(); + if (pulsarAdmins[i] != null) { + pulsarAdmins[i].close(); + } if (pulsarServices[i] != null) { pulsarServices[i].close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java index bf9c1d540bf875..6571bc170d53bf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/SubscriptionStatsTest.java @@ -116,7 +116,7 @@ public void testConsumersAfterMarkDelete() throws PulsarClientException, PulsarA TopicStats stats = admin.topics().getStats(topicName); Assert.assertEquals(stats.getSubscriptions().size(), 1); Assert.assertEquals(stats.getSubscriptions().entrySet().iterator().next().getValue() - .getConsumersAfterMarkDeletePosition().size(), 1); + .getRecentlyJoinedConsumers().size(), 1); consumer1.close(); consumer2.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 7b617a6a192d82..dc16356d023ff0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -20,11 +20,13 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -455,7 +457,7 @@ public void testDisableKeySharedSubscription() throws PulsarClientException { String topic = "persistent://public/default/key_shared_disabled"; try { @Cleanup - Consumer c = pulsarClient.newConsumer() + Consumer c = pulsarClient.newConsumer() .topic(topic) .subscriptionName("key_shared") .subscriptionType(SubscriptionType.Key_Shared) @@ -499,7 +501,7 @@ public void testMakingProgressWithSlowerConsumer(boolean enableBatch) throws Exc String slowKey = "slowKey"; List clients = new ArrayList<>(); - List consumers = new ArrayList<>(); + List> consumers = new ArrayList<>(); try { AtomicInteger receivedMessages = new AtomicInteger(); @@ -509,7 +511,7 @@ public void testMakingProgressWithSlowerConsumer(boolean enableBatch) throws Exc .build(); clients.add(client); - Consumer c = client.newConsumer(Schema.INT32) + Consumer c = client.newConsumer(Schema.INT32) .topic(topic) .subscriptionName("key_shared") .subscriptionType(SubscriptionType.Key_Shared) @@ -556,7 +558,7 @@ public void testMakingProgressWithSlowerConsumer(boolean enableBatch) throws Exc assertEquals((double) receivedMessages.get(), N * 0.9, N * 0.3); }); - for (Consumer c : consumers) { + for (Consumer c : consumers) { c.close(); } } finally { @@ -576,14 +578,19 @@ public void testOrderingWhenAddingConsumers() throws Exception { @Cleanup Consumer c1 = createConsumer(topic); + final List keys = new ArrayList<>(); for (int i = 0; i < 10; i++) { + final String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS)); + keys.add(key); producer.newMessage() - .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .key(key) .value(i) .send(); } // All the already published messages will be pre-fetched by C1. + Awaitility.await().untilAsserted(() -> + assertEquals(((ConsumerImpl) c1).getTotalIncomingMessages(), 10)); // Adding a new consumer. @Cleanup @@ -591,7 +598,7 @@ public void testOrderingWhenAddingConsumers() throws Exception { for (int i = 10; i < 20; i++) { producer.newMessage() - .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .key(keys.get(i % 10)) .value(i) .send(); } @@ -630,6 +637,8 @@ public void testReadAheadWhenAddingConsumers() throws Exception { } // All the already published messages will be pre-fetched by C1. + Awaitility.await().untilAsserted(() -> + assertEquals(((ConsumerImpl) c1).getTotalIncomingMessages(), 10)); // Adding a new consumer. @Cleanup @@ -676,9 +685,12 @@ public void testRemoveFirstConsumer() throws Exception { .consumerName("c1") .subscribe(); + final List keys = new ArrayList<>(); for (int i = 0; i < 10; i++) { + final String key = String.valueOf(random.nextInt(NUMBER_OF_KEYS)); + keys.add(key); producer.newMessage() - .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .key(key) .value(i) .send(); } @@ -699,7 +711,7 @@ public void testRemoveFirstConsumer() throws Exception { for (int i = 10; i < 20; i++) { producer.newMessage() - .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .key(keys.get(i % 10)) .value(i) .send(); } @@ -838,6 +850,7 @@ public void testAttachKeyToMessageMetadata() throws PulsarClientException { public void testContinueDispatchMessagesWhenMessageTTL() throws Exception { int defaultTTLSec = 3; int totalMessages = 1000; + int numOfKeys = totalMessages / 10; this.conf.setTtlDurationDefaultInSeconds(defaultTTLSec); final String topic = "persistent://public/default/key_shared-" + UUID.randomUUID(); final String subName = "my-sub"; @@ -846,7 +859,7 @@ public void testContinueDispatchMessagesWhenMessageTTL() throws Exception { Consumer consumer1 = pulsarClient.newConsumer(Schema.INT32) .topic(topic) .subscriptionName(subName) - .receiverQueueSize(10) + .receiverQueueSize(numOfKeys) .subscriptionType(SubscriptionType.Key_Shared) .subscribe(); @@ -855,19 +868,24 @@ public void testContinueDispatchMessagesWhenMessageTTL() throws Exception { .topic(topic) .create(); + final Map keys = new HashMap<>(); for (int i = 0; i < totalMessages; i++) { + // all type of keys are sent to consumer1 + final int keyIndex = i % numOfKeys; + keys.computeIfAbsent(keyIndex, k -> String.valueOf(random.nextInt(NUMBER_OF_KEYS))); producer.newMessage() - .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS))) + .key(keys.get(keyIndex)) .value(i) .send(); } // don't ack the first message + Awaitility.await().untilAsserted(() -> + assertEquals(((ConsumerImpl) consumer1).getTotalIncomingMessages(), numOfKeys)); consumer1.receive(); consumer1.acknowledge(consumer1.receive()); - // The consumer1 and consumer2 should be stuck because of the mark delete position did not move forward. - + // The consumer2 and consumer3 should be stuck because of the mark delete position did not move forward. @Cleanup Consumer consumer2 = pulsarClient.newConsumer(Schema.INT32) .topic(topic) @@ -1190,6 +1208,167 @@ public void testCheckConsumersWithSameName() throws Exception { l.await(); } + @Test(timeOut = 30_000) + public void testCheckRecentlyJoinedConsumers() throws Exception { + conf.setSubscriptionKeySharedUseConsistentHashing(true); + conf.setSubscriptionKeySharedConsistentHashingReplicaPoints(100); + + final String topicName = "persistent://public/default/recently-joined-consumers-" + UUID.randomUUID(); + final String subName = "my-sub"; + final String consumerName = "name"; + + final ConsumerBuilder cb = pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Key_Shared); + + // Create 2 consumers + @Cleanup + final Consumer c1 = cb.consumerName("c1").subscribe(); + final Map>> c1Msgs = new HashMap<>(); + @Cleanup + final Consumer c2 = cb.consumerName("c2").subscribe(); + final Map>> c2Msgs = new HashMap<>(); + + @Cleanup + final Producer p = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .create(); + for (int i = 0; i < 100; i++) { + p.newMessage() + .key(Integer.toString(i % 10)) + .value("msg-" + i) + .send(); + } + + final Set c1Keys1 = Set.of("1", "3", "4", "5", "9"); + final Set c2Keys1 = Set.of("0", "2", "6", "7", "8"); + for (int i = 0; i < 100; i++) { + final Message msg1 = c1.receive(10, TimeUnit.MILLISECONDS); + if (msg1 != null) { + c1Msgs.computeIfAbsent(msg1.getKey(), k -> new ArrayList<>()); + c1Msgs.get(msg1.getKey()).add(msg1); + c1.acknowledge(msg1); + } + } + assertEquals(c1Msgs.values().stream().mapToInt(List::size).sum(), c1Keys1.size() * 10); + assertEquals(c1Msgs.keySet(), c1Keys1); + assertNotEquals(c1Msgs.keySet(), c2Keys1); + c1Msgs.clear(); + + @Cleanup + final Consumer c3 = cb.consumerName("c3").subscribe(); + final Map>> c3Msgs = new HashMap<>(); + + for (int i = 100; i < 200; i++) { + p.newMessage() + .key(Integer.toString(i % 10)) + .value("msg-" + i) + .send(); + } + + final Set c1Keys2 = Set.of("3", "4", "5", "9"); + final Set c2Keys2 = Set.of("0", "8"); + final Set c2RemovedKeys = Sets.difference(c2Keys1, c2Keys2); + final Set c3Keys2 = Set.of("1", "2", "6", "7"); + for (int i = 0; i < 100; i++) { + final Message msg1 = c1.receive(10, TimeUnit.MILLISECONDS); + if (msg1 != null) { + c1Msgs.computeIfAbsent(msg1.getKey(), k -> new ArrayList<>()); + c1Msgs.get(msg1.getKey()).add(msg1); + c1.acknowledge(msg1); + } + + final Message msg3 = c3.receive(10, TimeUnit.MILLISECONDS); + if (msg3 != null) { + fail(); + } + } + assertEquals(c1Msgs.values().stream().mapToInt(List::size).sum(), c1Keys2.size() * 10); + assertEquals(c1Msgs.keySet(), c1Keys2); + assertNotEquals(c1Msgs.keySet(), c2Keys2); + assertNotEquals(c1Msgs.keySet(), c3Keys2); + c1Msgs.clear(); + + for (int i = 200; i < 300; i++) { + p.newMessage() + .key(Integer.toString((i % 10) + 10)) + .value("msg-" + i) + .send(); + } + + final Set c1Keys3 = Set.of("11", "16"); + final Set c2Keys3 = Set.of("13", "19"); + final Set c3Keys3 = Set.of("10", "12", "14", "15", "17", "18"); + for (int i = 0; i < 100; i++) { + final Message msg1 = c1.receive(10, TimeUnit.MILLISECONDS); + if (msg1 != null) { + c1Msgs.computeIfAbsent(msg1.getKey(), k -> new ArrayList<>()); + c1Msgs.get(msg1.getKey()).add(msg1); + c1.acknowledge(msg1); + } + + final Message msg3 = c3.receive(10, TimeUnit.MILLISECONDS); + if (msg3 != null) { + c3Msgs.computeIfAbsent(msg3.getKey(), k -> new ArrayList<>()); + c3Msgs.get(msg3.getKey()).add(msg3); + c3.acknowledge(msg3); + } + } + assertEquals(c1Msgs.values().stream().mapToInt(List::size).sum(), c1Keys3.size() * 10); + assertEquals(c1Msgs.keySet(), c1Keys3); + c1Msgs.clear(); + + assertEquals(c3Msgs.values().stream().mapToInt(List::size).sum(), c3Keys3.size() * 10); + assertEquals(c3Msgs.keySet(), c3Keys3); + c3Msgs.clear(); + + for (int i = 0; i < 300; i++) { + final Message msg2 = c2.receive(10, TimeUnit.MILLISECONDS); + if (msg2 != null) { + final String key2 = msg2.getKey(); + c2Msgs.computeIfAbsent(key2, k -> new ArrayList<>()); + c2Msgs.get(key2).add(msg2); + if (c2Keys2.contains(key2) || c2Keys3.contains(key2)) { + c2.acknowledge(msg2); + } + } + + final Message msg3 = c3.receive(10, TimeUnit.MILLISECONDS); + if (msg3 != null) { + fail(); + } + } + assertEquals(c2Msgs.values().stream().mapToInt(List::size).sum(), + (c2Keys1.size() + c2Keys2.size() + c2Keys3.size()) * 10); + assertEquals(c2Msgs.keySet(), Sets.union(c2Keys1, c2Keys3)); + + for (final String key : c2RemovedKeys) { + final List> msgs = c2Msgs.get(key); + final int numOfMsgs = msgs.size(); + for (int i = 0; i < numOfMsgs; i++) { + c2.acknowledge(msgs.get(i)); + if (i < numOfMsgs - 1) { + assertNull(c3.receive(10, TimeUnit.MILLISECONDS)); + } + } + } + c2Msgs.clear(); + + for (int i = 0; i < 300; i++) { + final Message msg3 = c3.receive(10, TimeUnit.MILLISECONDS); + if (msg3 != null) { + c3Msgs.computeIfAbsent(msg3.getKey(), k -> new ArrayList<>()); + c3Msgs.get(msg3.getKey()).add(msg3); + c3.acknowledge(msg3); + } + } + assertEquals(c3Msgs.values().stream().mapToInt(List::size).sum(), c3Keys2.size() * 10); + assertEquals(c3Msgs.keySet(), c3Keys2); + c3Msgs.clear(); + + assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgBacklog(), 0); + } private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) { if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) { @@ -1323,7 +1502,7 @@ private void receiveAndCheckDistribution(List> consumers, int expect } private void receiveAndCheck(List, Integer>> checkList) throws PulsarClientException { - Map> consumerKeys = new HashMap<>(); + Map, Set> consumerKeys = new HashMap<>(); for (KeyValue, Integer> check : checkList) { if (check.getValue() % 2 != 0) { throw new IllegalArgumentException(); @@ -1371,7 +1550,7 @@ private void receiveAndCheck(List, Integer>> checkLis } lastMessageForKey.put(key, message); } - Message noMessages = null; + Message noMessages = null; try { noMessages = check.getKey().receive(100, TimeUnit.MILLISECONDS); } catch (PulsarClientException ignore) { @@ -1479,7 +1658,7 @@ public void testStickyKeyRangesRestartConsumers() throws Exception { }) .subscribe(); - Future producerFuture = pulsar.getExecutor().submit(() -> { + Future producerFuture = pulsar.getExecutor().submit(() -> { try { try (Producer producer = pulsarClient.newProducer(Schema.STRING) @@ -1513,7 +1692,7 @@ public void testStickyKeyRangesRestartConsumers() throws Exception { // start consuming again... @Cleanup - Consumer consumer3 = pulsarClient.newConsumer(Schema.STRING) + Consumer consumer3 = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName(subscriptionName) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) @@ -1531,7 +1710,7 @@ public void testStickyKeyRangesRestartConsumers() throws Exception { }) .subscribe(); @Cleanup - Consumer consumer4 = pulsarClient.newConsumer(Schema.STRING) + Consumer consumer4 = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName(subscriptionName) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)