From 5decc85a2091c587361ef265815d36d669447868 Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Thu, 8 Jul 2021 12:07:27 +0900 Subject: [PATCH] Use forEach() instead of keys() --- .../persistent/MessageRedeliveryController.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java index 8f4751c48cc9f9..a6cd362372dc24 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -21,6 +21,7 @@ import com.google.common.collect.ComparisonChain; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; @@ -88,25 +89,27 @@ public String toString() { } public boolean containsStickyKeyHashes(Set stickyKeyHashes) { + final AtomicBoolean isContained = new AtomicBoolean(false); if (hashesToBeBlocked != null) { - for (LongPair longPair : hashesToBeBlocked.values()) { - int stickyKeyHash = (int) longPair.first; - if (stickyKeyHashes.contains(stickyKeyHash)) { - return true; + hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none) -> { + if (!isContained.get() && stickyKeyHashes.contains((int) stickyKeyHash)) { + isContained.set(true); } - } + }); } - return false; + return isContained.get(); } public Set getMessagesToReplayNow(int maxMessagesToRead) { if (hashesToBeBlocked != null) { + // allowOutOfOrderDelivery is false return messagesToRedeliver.items().stream() .sorted((l1, l2) -> ComparisonChain.start().compare(l1.first, l2.first) .compare(l1.second, l2.second).result()) .limit(maxMessagesToRead).map(longPair -> new PositionImpl(longPair.first, longPair.second)) .collect(Collectors.toCollection(TreeSet::new)); } else { + // allowOutOfOrderDelivery is true return messagesToRedeliver.items(maxMessagesToRead, (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); }