From 1a2aad69574e83e082069b8abae8499a3f3ce471 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Mon, 14 Jun 2021 19:24:40 +0900 Subject: [PATCH] Fix issue where Key_Shared consumers could get stuck --- .../PersistentStickyKeyDispatcherMultipleConsumers.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 5145c3b51b7ab..d9a56a095e347 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -121,8 +121,14 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce @Override public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { - super.removeConsumer(consumer); + // The consumer must be removed from the selector before calling the superclass removeConsumer method. + // In the superclass removeConsumer method, the pending acks that the consumer has are added to + // messagesToRedeliver. If the consumer has not been removed from the selector at this point, + // the broker will try to redeliver the messages to the consumer that has already been closed. + // As a result, the messages are not redelivered to any consumer, and the mark-delete position does not move, + // eventually causing all consumers to get stuck. selector.removeConsumer(consumer); + super.removeConsumer(consumer); if (recentlyJoinedConsumers != null) { recentlyJoinedConsumers.remove(consumer); if (consumerList.size() == 1) {