Skip to content

Commit

Permalink
Use forEach() instead of keys()
Browse files Browse the repository at this point in the history
  • Loading branch information
Masahiro Sakamoto committed Jul 8, 2021
1 parent 01d12a5 commit 5decc85
Showing 1 changed file with 9 additions and 6 deletions.
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.ComparisonChain;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
Expand Down Expand Up @@ -88,25 +89,27 @@ public String toString() {
}

public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
final AtomicBoolean isContained = new AtomicBoolean(false);
if (hashesToBeBlocked != null) {
for (LongPair longPair : hashesToBeBlocked.values()) {
int stickyKeyHash = (int) longPair.first;
if (stickyKeyHashes.contains(stickyKeyHash)) {
return true;
hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none) -> {
if (!isContained.get() && stickyKeyHashes.contains((int) stickyKeyHash)) {
isContained.set(true);
}
}
});
}
return false;
return isContained.get();
}

public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (hashesToBeBlocked != null) {
// allowOutOfOrderDelivery is false
return messagesToRedeliver.items().stream()
.sorted((l1, l2) -> ComparisonChain.start().compare(l1.first, l2.first)
.compare(l1.second, l2.second).result())
.limit(maxMessagesToRead).map(longPair -> new PositionImpl(longPair.first, longPair.second))
.collect(Collectors.toCollection(TreeSet::new));
} else {
// allowOutOfOrderDelivery is true
return messagesToRedeliver.items(maxMessagesToRead,
(ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
}
Expand Down

0 comments on commit 5decc85

Please sign in to comment.