diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index c1802ac9aa5af..62bc786787e5f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1424,7 +1424,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { protected boolean addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) { AtomicBoolean isReplicatorStarted = new AtomicBoolean(true); - replicators.computeIfAbsent(remoteCluster, r -> { + Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { try { return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster, brokerService); @@ -1435,8 +1435,8 @@ protected boolean addReplicationCluster(String remoteCluster, ManagedCursor curs return null; }); // clean up replicator if startup is failed - if (!isReplicatorStarted.get()) { - replicators.remove(remoteCluster); + if (replicator == null) { + replicators.removeNullValue(remoteCluster); } return isReplicatorStarted.get(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java index 0388cd33aa7ca..45beee617e8bc 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -42,6 +43,27 @@ public class ConcurrentOpenHashMap { private static final Object EmptyKey = null; private static final Object DeletedKey = new Object(); + /** + * This object is used to delete empty value in this map. + * EmptyValue.equals(null) = true. + */ + private static final Object EmptyValue = new Object() { + + @SuppressFBWarnings + @Override + public boolean equals(Object obj) { + return obj == null; + } + + /** + * This is just for avoiding spotbugs errors + */ + @Override + public int hashCode() { + return super.hashCode(); + } + }; + private static final float MapFillFactor = 0.66f; private static final int DefaultExpectedItems = 256; @@ -142,6 +164,10 @@ public boolean remove(K key, Object value) { return getSection(h).remove(key, value, (int) h) != null; } + public void removeNullValue(K key) { + remove(key, EmptyValue); + } + private Section getSection(long hash) { // Use 32 msb out of long to get the section final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java index 58f6ee5e85c6b..6953c9307338c 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -369,6 +370,37 @@ public boolean equals(Object obj) { assertNull(map.get(t1_b)); } + @Test + public void testNullValue() { + ConcurrentOpenHashMap map = new ConcurrentOpenHashMap<>(16, 1); + String key = "a"; + assertThrows(NullPointerException.class, () -> map.put(key, null)); + + //put a null value. + assertNull(map.computeIfAbsent(key, k -> null)); + assertEquals(1, map.size()); + assertEquals(1, map.keys().size()); + assertEquals(1, map.values().size()); + assertNull(map.get(key)); + assertFalse(map.containsKey(key)); + + //test remove null value + map.removeNullValue(key); + assertTrue(map.isEmpty()); + assertEquals(0, map.keys().size()); + assertEquals(0, map.values().size()); + assertNull(map.get(key)); + assertFalse(map.containsKey(key)); + + + //test not remove non-null value + map.put(key, "V"); + assertEquals(1, map.size()); + map.removeNullValue(key); + assertEquals(1, map.size()); + + } + final static int Iterations = 1; final static int ReadIterations = 1000; final static int N = 1_000_000;