diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 299f54578e6e5..596c9c7092ed3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -176,8 +176,12 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { for (Entry entry : entries) { int stickyKeyHash = getStickyKeyHash(entry); Consumer c = selector.select(stickyKeyHash); - groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry); - consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash); + if (c != null) { + groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry); + consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash); + } else { + entry.release(); + } } AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());