diff --git a/hazelcast/src/main/java/com/hazelcast/internal/partition/impl/AbstractPartitionPrimaryReplicaAntiEntropyTask.java b/hazelcast/src/main/java/com/hazelcast/internal/partition/impl/AbstractPartitionPrimaryReplicaAntiEntropyTask.java index ccffd9952b72..fedbfae5c1fa 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/partition/impl/AbstractPartitionPrimaryReplicaAntiEntropyTask.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/partition/impl/AbstractPartitionPrimaryReplicaAntiEntropyTask.java @@ -26,6 +26,7 @@ import com.hazelcast.logging.ILogger; import com.hazelcast.spi.impl.NodeEngineImpl; import com.hazelcast.spi.impl.PartitionSpecificRunnable; +import com.hazelcast.spi.impl.executionservice.ExecutionService; import com.hazelcast.spi.impl.operationservice.Operation; import com.hazelcast.spi.impl.operationservice.OperationService; import com.hazelcast.spi.impl.operationservice.UrgentSystemOperation; @@ -34,6 +35,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import static com.hazelcast.internal.partition.IPartitionService.SERVICE_NAME; @@ -119,11 +121,13 @@ final void invokePartitionBackupReplicaAntiEntropyOp(int replicaIndex, Partition OperationService operationService = nodeEngine.getOperationService(); if (hasCallback) { + ExecutorService asyncExecutor = + nodeEngine.getExecutionService().getExecutor(ExecutionService.ASYNC_EXECUTOR); operationService.createInvocationBuilder(SERVICE_NAME, op, target.address()) .setTryCount(OPERATION_TRY_COUNT) .setTryPauseMillis(OPERATION_TRY_PAUSE_MILLIS) .invoke() - .whenCompleteAsync(callback); + .whenCompleteAsync(callback, asyncExecutor); } else { operationService.send(op, target.address()); } diff --git a/hazelcast/src/test/java/com/hazelcast/internal/partition/PartitionServiceSafetyCheckTest.java b/hazelcast/src/test/java/com/hazelcast/internal/partition/PartitionServiceSafetyCheckTest.java index a77a32962867..5aa2bb3b3bed 100644 --- a/hazelcast/src/test/java/com/hazelcast/internal/partition/PartitionServiceSafetyCheckTest.java +++ b/hazelcast/src/test/java/com/hazelcast/internal/partition/PartitionServiceSafetyCheckTest.java @@ -23,12 +23,16 @@ import com.hazelcast.test.HazelcastParallelClassRunner; import com.hazelcast.test.annotation.ParallelJVMTest; import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.test.annotation.SlowTest; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import static com.hazelcast.internal.partition.AntiEntropyCorrectnessTest.setBackupPacketDropFilter; @@ -162,6 +166,52 @@ public void run() throws Exception { }); } + @Category(SlowTest.class) + @Test(timeout = 60000) + public void clusterSafe_completes_whenInvokedFromCommonPool() throws InterruptedException { + Config config = getConfig(true, true); + + HazelcastInstance hz = factory.newHazelcastInstance(config); + startNodes(config, nodeCount); + + final Collection instances = factory.getAllHazelcastInstances(); + + // ensure we start enough threads to block all threads of FJP#commonPool + int threadCount = ForkJoinPool.commonPool().getParallelism() + 2; + spawn(() -> { + // trigger partition assignment while many threads are querying cluster safety + sleepSeconds(5); + warmUpPartitions(instances); + fillData(hz); + }); + ExecutorService executorService = ForkJoinPool.commonPool(); + // no assertions are actually executed. We only care that isClusterSafe + // does not hang indefinitely + assertTrueAllTheTime(() -> { + CountDownLatch startQueryClusterSafe = new CountDownLatch(1); + CountDownLatch completed = new CountDownLatch(threadCount); + for (int i = 0; i < threadCount; i++) { + executorService.submit(() -> { + try { + startQueryClusterSafe.await(); + for (HazelcastInstance instance : instances) { + PartitionService ps = instance.getPartitionService(); + // just invoke isClusterSafe, no need to assert result + // we only care about the call eventually completing + ps.isClusterSafe(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + completed.countDown(); + } + }); + } + startQueryClusterSafe.countDown(); + completed.await(); + }, 30); + } + @Test public void clusterShouldBeSafe_whenBackupsBlocked_withForceToBeSafe() throws InterruptedException { Config config = getConfig(true, true);