Skip to content

Commit

Permalink
KeyShared stickyHashRange subscription: prevent stuck subscription in…
Browse files Browse the repository at this point in the history
… case of consumer restart (#14014)

### Motivation
When using KeyShared subscription with `stickyHashRange` it is possible to a stuck subscription while restarting the consumers.

This bug is not a regression in 2.10, the problem is present also in Pulsar 2.8 (and probably older versions)

### Modifications

add the entry to the list of messaged to be redelivered

(cherry picked from commit da9e806)
  • Loading branch information
eolivelli authored and codelipenghui committed Jan 30, 2022
1 parent 16cf802 commit 9c24f79
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 0 deletions.
Expand Up @@ -213,6 +213,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
} else {
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
}
}
Expand Down
Expand Up @@ -40,6 +40,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -1372,4 +1373,131 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMe
return null;
}
}

@Test
public void testStickyKeyRangesRestartConsumers() throws PulsarClientException, InterruptedException {
final String topic = TopicName.get("persistent", "public", "default",
"testStickyKeyRangesRestartConsumers" + UUID.randomUUID()).toString();

final String subscriptionName = "my-sub";

final int numMessages = 100;
// start 2 consumers
Set<String> sentMessages = new ConcurrentSkipListSet<>();

CountDownLatch count1 = new CountDownLatch(2);
CountDownLatch count2 = new CountDownLatch(13); // consumer 2 usually receive the fix messages
CountDownLatch count3 = new CountDownLatch(numMessages);
Consumer<String> consumer1 = pulsarClient.newConsumer(
Schema.STRING)
.topic(topic)
.subscriptionName(subscriptionName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65536 / 2)))
.messageListener((consumer, msg) -> {
consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
if (e != null) {
log.error("error", e);
} else {
sentMessages.remove(msg.getKey());
count1.countDown();
count3.countDown();
}
});
})
.subscribe();

Consumer<String> consumer2 = pulsarClient.newConsumer(
Schema.STRING)
.topic(topic)
.subscriptionName(subscriptionName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(65536 / 2 + 1, 65535)))
.messageListener((consumer, msg) -> {
consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
if (e != null) {
log.error("error", e);
} else {
sentMessages.remove(msg.getKey());
count2.countDown();
count3.countDown();
}
});
})
.subscribe();

pulsar.getExecutor().submit(() -> {
try
{
try (Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create();) {
for (int i = 0; i < numMessages; i++)
{
String key = "test" + i;
sentMessages.add(key);
producer.newMessage()
.key(key)
.value("test" + i).
send();
Thread.sleep(100);
}
}
} catch (Throwable t) {
log.error("error", t);
}});

// wait for some messages to be received by both of the consumers
count1.await();
count2.await();
consumer1.close();
consumer2.close();

// this sleep is to trigger a race condition that happens
// when there are some messages that cannot be dispatched while consuming
Thread.sleep(3000);

// start consuming again...

pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subscriptionName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65536 / 2)))
.messageListener((consumer, msg) -> {
consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
if (e != null) {
log.error("error", e);
} else {
sentMessages.remove(msg.getKey());
count3.countDown();
}
});
})
.subscribe();
pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subscriptionName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(65536 / 2 + 1, 65535)))
.messageListener((consumer, msg) -> {
consumer.acknowledgeAsync(msg).whenComplete((m, e) -> {
if (e != null) {
log.error("error", e);
} else {
sentMessages.remove(msg.getKey());
count3.countDown();
}
});
})
.subscribe();
// wait for all the messages to be delivered
count3.await();
assertTrue(sentMessages.isEmpty(), "didn't receive " + sentMessages);
}
}

0 comments on commit 9c24f79

Please sign in to comment.