Skip to content

Commit

Permalink
[Broker] Handle NPE when full key range isn't covered with active con…
Browse files Browse the repository at this point in the history
…sumers (#11749)

(cherry picked from commit 8027ab4)
  • Loading branch information
lhotari authored and codelipenghui committed Dec 11, 2021
1 parent b9d4383 commit 6502ca0
Showing 1 changed file with 6 additions and 2 deletions.
Expand Up @@ -176,8 +176,12 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
for (Entry entry : entries) {
int stickyKeyHash = getStickyKeyHash(entry);
Consumer c = selector.select(stickyKeyHash);
groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
if (c != null) {
groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
} else {
entry.release();
}
}

AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
Expand Down

0 comments on commit 6502ca0

Please sign in to comment.