Skip to content

Commit

Permalink
[Issue 12723] Fix race condition in PersistentTopic#addReplicationClu…
Browse files Browse the repository at this point in the history
…ster (#12729)

See #12723

Add a method org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap#removeNullValue to remove null value   in a thread safe way.

(cherry picked from commit a3fe00e)
  • Loading branch information
Jason918 authored and codelipenghui committed Nov 18, 2021
1 parent 2ab4dec commit 230e1ac
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
Expand Up @@ -1544,7 +1544,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {

protected boolean addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
brokerService);
Expand All @@ -1555,8 +1555,8 @@ protected boolean addReplicationCluster(String remoteCluster, ManagedCursor curs
return null;
});
// clean up replicator if startup is failed
if (!isReplicatorStarted.get()) {
replicators.remove(remoteCluster);
if (replicator == null) {
replicators.removeNullValue(remoteCluster);
}
return isReplicatorStarted.get();
}
Expand Down
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -42,6 +43,27 @@ public class ConcurrentOpenHashMap<K, V> {
private static final Object EmptyKey = null;
private static final Object DeletedKey = new Object();

/**
* This object is used to delete empty value in this map.
* EmptyValue.equals(null) = true.
*/
private static final Object EmptyValue = new Object() {

@SuppressFBWarnings
@Override
public boolean equals(Object obj) {
return obj == null;
}

/**
* This is just for avoiding spotbugs errors
*/
@Override
public int hashCode() {
return super.hashCode();
}
};

private static final float MapFillFactor = 0.66f;

private static final int DefaultExpectedItems = 256;
Expand Down Expand Up @@ -142,6 +164,10 @@ public boolean remove(K key, Object value) {
return getSection(h).remove(key, value, (int) h) != null;
}

public void removeNullValue(K key) {
remove(key, EmptyValue);
}

private Section<K, V> getSection(long hash) {
// Use 32 msb out of long to get the section
final int sectionIdx = (int) (hash >>> 32) & (sections.length - 1);
Expand Down
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand Down Expand Up @@ -369,6 +370,37 @@ public boolean equals(Object obj) {
assertNull(map.get(t1_b));
}

@Test
public void testNullValue() {
ConcurrentOpenHashMap<String, String> map = new ConcurrentOpenHashMap<>(16, 1);
String key = "a";
assertThrows(NullPointerException.class, () -> map.put(key, null));

//put a null value.
assertNull(map.computeIfAbsent(key, k -> null));
assertEquals(1, map.size());
assertEquals(1, map.keys().size());
assertEquals(1, map.values().size());
assertNull(map.get(key));
assertFalse(map.containsKey(key));

//test remove null value
map.removeNullValue(key);
assertTrue(map.isEmpty());
assertEquals(0, map.keys().size());
assertEquals(0, map.values().size());
assertNull(map.get(key));
assertFalse(map.containsKey(key));


//test not remove non-null value
map.put(key, "V");
assertEquals(1, map.size());
map.removeNullValue(key);
assertEquals(1, map.size());

}

static final int Iterations = 1;
static final int ReadIterations = 1000;
static final int N = 1_000_000;
Expand Down

0 comments on commit 230e1ac

Please sign in to comment.