diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index ca078ac652235..27bcf9722c3dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -123,8 +123,14 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce @Override public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { - super.removeConsumer(consumer); + // The consumer must be removed from the selector before calling the superclass removeConsumer method. + // In the superclass removeConsumer method, the pending acks that the consumer has are added to + // messagesToRedeliver. If the consumer has not been removed from the selector at this point, + // the broker will try to redeliver the messages to the consumer that has already been closed. + // As a result, the messages are not redelivered to any consumer, and the mark-delete position does not move, + // eventually causing all consumers to get stuck. selector.removeConsumer(consumer); + super.removeConsumer(consumer); if (recentlyJoinedConsumers != null) { recentlyJoinedConsumers.remove(consumer); if (consumerList.size() == 1) {