Skip to content

Commit

Permalink
[Broker] Handle NPE when full key range isn't covered with active con…
Browse files Browse the repository at this point in the history
…sumers (apache#11749)
  • Loading branch information
lhotari authored and ciaocloud committed Oct 16, 2021
1 parent 1749886 commit bc13fdd
Showing 1 changed file with 6 additions and 2 deletions.
Expand Up @@ -175,8 +175,12 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
for (Entry entry : entries) {
int stickyKeyHash = getStickyKeyHash(entry);
Consumer c = selector.select(stickyKeyHash);
groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
if (c != null) {
groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
} else {
entry.release();
}
}

AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
Expand Down

0 comments on commit bc13fdd

Please sign in to comment.