diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 95f661abed84e..937662ec1a227 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -359,14 +358,15 @@ public void enableCnxAutoRead() { } protected boolean hasLocalProducers() { - AtomicBoolean foundLocal = new AtomicBoolean(false); - producers.values().forEach(producer -> { + if (producers.isEmpty()) { + return false; + } + for (Producer producer : producers.values()) { if (!producer.isRemote()) { - foundLocal.set(true); + return true; } - }); - - return foundLocal.get(); + } + return false; } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d7924f6595346..97782deb76c17 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -597,14 +597,15 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) } private boolean hasRemoteProducers() { - AtomicBoolean foundRemote = new AtomicBoolean(false); - producers.values().forEach(producer -> { + if (producers.isEmpty()) { + return false; + } + for (Producer producer : producers.values()) { if (producer.isRemote()) { - foundRemote.set(true); + return true; } - }); - - return foundRemote.get(); + } + return false; } public CompletableFuture startReplProducers() { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java index d3321f9ad35ac..95e2302dcb7b9 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java @@ -166,13 +166,15 @@ public String toString() { @Override public boolean isEmpty() { - AtomicBoolean isEmpty = new AtomicBoolean(true); - longPairSets.forEach((item1, longPairSet) -> { - if (isEmpty.get() && !longPairSet.isEmpty()) { - isEmpty.set(false); + if (longPairSets.isEmpty()) { + return true; + } + for (ConcurrentLongPairSet subSet : longPairSets.values()) { + if (!subSet.isEmpty()) { + return false; } - }); - return isEmpty.get(); + } + return true; } @Override diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java index 821bb8819554b..fcb9884a795ad 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java @@ -22,7 +22,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; - +import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -30,13 +30,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; - import lombok.Cleanup; import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPair; import org.testng.annotations.Test; -import com.google.common.collect.Lists; - public class ConcurrentSortedLongPairSetTest { @Test @@ -241,4 +238,11 @@ public void testToString() { assertEquals(set.toString(), toString); } + @Test + public void testIsEmpty() { + LongPairSet set = new ConcurrentSortedLongPairSet(); + assertTrue(set.isEmpty()); + set.add(1, 1); + assertFalse(set.isEmpty()); + } }