From 82762e4f4d327687d52bda5d39633e933cec8798 Mon Sep 17 00:00:00 2001 From: Vassilis Bekiaris Date: Fri, 8 Apr 2022 10:39:28 +0300 Subject: [PATCH] Fix hanging cluster safe query from common pool When all FJP#commonPool threads are busy querying isClusterSafe and partition assignments are not in sync (eg during initial partition arrangement), then there is no chance for an important callback to be executed after PartitionBackupReplicaAntiEntropyOperation is done, resulting in neither partition replica sync nor cluster-safe query being able to make any progress. The fix is to use the Hazelcast internal async executor (instead of the common pool) for the callback that processes replica antientropy operation result. (cherry picked from commit 434d731a4bd6aa2349160b2ab40e7d1a09808fec) --- ...artitionPrimaryReplicaAntiEntropyTask.java | 6 ++- .../PartitionServiceSafetyCheckTest.java | 50 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/hazelcast/src/main/java/com/hazelcast/internal/partition/impl/AbstractPartitionPrimaryReplicaAntiEntropyTask.java b/hazelcast/src/main/java/com/hazelcast/internal/partition/impl/AbstractPartitionPrimaryReplicaAntiEntropyTask.java index ccffd9952b72..fedbfae5c1fa 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/partition/impl/AbstractPartitionPrimaryReplicaAntiEntropyTask.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/partition/impl/AbstractPartitionPrimaryReplicaAntiEntropyTask.java @@ -26,6 +26,7 @@ import com.hazelcast.logging.ILogger; import com.hazelcast.spi.impl.NodeEngineImpl; import com.hazelcast.spi.impl.PartitionSpecificRunnable; +import com.hazelcast.spi.impl.executionservice.ExecutionService; import com.hazelcast.spi.impl.operationservice.Operation; import com.hazelcast.spi.impl.operationservice.OperationService; import com.hazelcast.spi.impl.operationservice.UrgentSystemOperation; @@ -34,6 +35,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import static com.hazelcast.internal.partition.IPartitionService.SERVICE_NAME; @@ -119,11 +121,13 @@ final void invokePartitionBackupReplicaAntiEntropyOp(int replicaIndex, Partition OperationService operationService = nodeEngine.getOperationService(); if (hasCallback) { + ExecutorService asyncExecutor = + nodeEngine.getExecutionService().getExecutor(ExecutionService.ASYNC_EXECUTOR); operationService.createInvocationBuilder(SERVICE_NAME, op, target.address()) .setTryCount(OPERATION_TRY_COUNT) .setTryPauseMillis(OPERATION_TRY_PAUSE_MILLIS) .invoke() - .whenCompleteAsync(callback); + .whenCompleteAsync(callback, asyncExecutor); } else { operationService.send(op, target.address()); } diff --git a/hazelcast/src/test/java/com/hazelcast/internal/partition/PartitionServiceSafetyCheckTest.java b/hazelcast/src/test/java/com/hazelcast/internal/partition/PartitionServiceSafetyCheckTest.java index a77a32962867..5aa2bb3b3bed 100644 --- a/hazelcast/src/test/java/com/hazelcast/internal/partition/PartitionServiceSafetyCheckTest.java +++ b/hazelcast/src/test/java/com/hazelcast/internal/partition/PartitionServiceSafetyCheckTest.java @@ -23,12 +23,16 @@ import com.hazelcast.test.HazelcastParallelClassRunner; import com.hazelcast.test.annotation.ParallelJVMTest; import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.test.annotation.SlowTest; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import static com.hazelcast.internal.partition.AntiEntropyCorrectnessTest.setBackupPacketDropFilter; @@ -162,6 +166,52 @@ public void run() throws Exception { }); } + @Category(SlowTest.class) + @Test(timeout = 60000) + public void clusterSafe_completes_whenInvokedFromCommonPool() throws InterruptedException { + Config config = getConfig(true, true); + + HazelcastInstance hz = factory.newHazelcastInstance(config); + startNodes(config, nodeCount); + + final Collection instances = factory.getAllHazelcastInstances(); + + // ensure we start enough threads to block all threads of FJP#commonPool + int threadCount = ForkJoinPool.commonPool().getParallelism() + 2; + spawn(() -> { + // trigger partition assignment while many threads are querying cluster safety + sleepSeconds(5); + warmUpPartitions(instances); + fillData(hz); + }); + ExecutorService executorService = ForkJoinPool.commonPool(); + // no assertions are actually executed. We only care that isClusterSafe + // does not hang indefinitely + assertTrueAllTheTime(() -> { + CountDownLatch startQueryClusterSafe = new CountDownLatch(1); + CountDownLatch completed = new CountDownLatch(threadCount); + for (int i = 0; i < threadCount; i++) { + executorService.submit(() -> { + try { + startQueryClusterSafe.await(); + for (HazelcastInstance instance : instances) { + PartitionService ps = instance.getPartitionService(); + // just invoke isClusterSafe, no need to assert result + // we only care about the call eventually completing + ps.isClusterSafe(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + completed.countDown(); + } + }); + } + startQueryClusterSafe.countDown(); + completed.await(); + }, 30); + } + @Test public void clusterShouldBeSafe_whenBackupsBlocked_withForceToBeSafe() throws InterruptedException { Config config = getConfig(true, true);