From 202ad3d7b1b00ee7032c209a96d22574c0fa1fb2 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 28 Jan 2022 15:43:56 +0100 Subject: [PATCH] KeyShared stickyHashRange subscription: prevent message loss in case of consumer restart --- ...tStickyKeyDispatcherMultipleConsumers.java | 1 + .../client/api/KeySharedSubscriptionTest.java | 128 ++++++++++++++++++ 2 files changed, 129 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 1a9ed787ef068..94850a6f173ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -214,6 +214,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry); consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash); } else { + addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 1b7e2bc4f2216..4b899f2ee0cf1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -39,6 +39,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -1410,4 +1411,131 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMe return null; } } + + @Test + public void testStickyKeyRangesRestartConsumers() throws PulsarClientException, InterruptedException { + final String topic = TopicName.get("persistent", "public", "default", + "testStickyKeyRangesRestartConsumers" + UUID.randomUUID()).toString(); + + final String subscriptionName = "my-sub"; + + final int numMessages = 100; + // start 2 consumers + Set sentMessages = new ConcurrentSkipListSet<>(); + + CountDownLatch count1 = new CountDownLatch(2); + CountDownLatch count2 = new CountDownLatch(13); // consumer 2 usually receive the fix messages + CountDownLatch count3 = new CountDownLatch(numMessages); + Consumer consumer1 = pulsarClient.newConsumer( + Schema.STRING) + .topic(topic) + .subscriptionName(subscriptionName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65536 / 2))) + .messageListener((consumer, msg) -> { + consumer.acknowledgeAsync(msg).whenComplete((m, e) -> { + if (e != null) { + log.error("error", e); + } else { + sentMessages.remove(msg.getKey()); + count1.countDown(); + count3.countDown(); + } + }); + }) + .subscribe(); + + Consumer consumer2 = pulsarClient.newConsumer( + Schema.STRING) + .topic(topic) + .subscriptionName(subscriptionName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(65536 / 2 + 1, 65535))) + .messageListener((consumer, msg) -> { + consumer.acknowledgeAsync(msg).whenComplete((m, e) -> { + if (e != null) { + log.error("error", e); + } else { + sentMessages.remove(msg.getKey()); + count2.countDown(); + count3.countDown(); + } + }); + }) + .subscribe(); + + pulsar.getExecutor().submit(() -> { + try + { + try (Producer producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .enableBatching(false) + .create();) { + for (int i = 0; i < numMessages; i++) + { + String key = "test" + i; + sentMessages.add(key); + producer.newMessage() + .key(key) + .value("test" + i). + send(); + Thread.sleep(100); + } + } + } catch (Throwable t) { + log.error("error", t); + }}); + + // wait for some messages to be received by both of the consumers + count1.await(); + count2.await(); + consumer1.close(); + consumer2.close(); + + // this sleep is to trigger a race condition that happens + // when there are some messages that cannot be dispatched while consuming + Thread.sleep(3000); + + // start consuming again... + + pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subscriptionName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65536 / 2))) + .messageListener((consumer, msg) -> { + consumer.acknowledgeAsync(msg).whenComplete((m, e) -> { + if (e != null) { + log.error("error", e); + } else { + sentMessages.remove(msg.getKey()); + count3.countDown(); + } + }); + }) + .subscribe(); + pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subscriptionName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionType(SubscriptionType.Key_Shared) + .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(65536 / 2 + 1, 65535))) + .messageListener((consumer, msg) -> { + consumer.acknowledgeAsync(msg).whenComplete((m, e) -> { + if (e != null) { + log.error("error", e); + } else { + sentMessages.remove(msg.getKey()); + count3.countDown(); + } + }); + }) + .subscribe(); + // wait for all the messages to be delivered + count3.await(); + assertTrue(sentMessages.isEmpty(), "didn't receive " + sentMessages); + } }