Skip to content

Commit

Permalink
[Broker] Handle NPE when full key range isn't covered with active con…
Browse files Browse the repository at this point in the history
…sumers (apache#11749)

(cherry picked from commit 8027ab4)
  • Loading branch information
lhotari committed Aug 24, 2021
1 parent 79db72e commit 968432c
Showing 1 changed file with 6 additions and 2 deletions.
Expand Up @@ -175,8 +175,12 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
for (Entry entry : entries) {
int stickyKeyHash = getStickyKeyHash(entry);
Consumer c = selector.select(stickyKeyHash);
groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
if (c != null) {
groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
} else {
entry.release();
}
}

AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
Expand Down

0 comments on commit 968432c

Please sign in to comment.