From fca0a8ab3922d78cccdc473c0087d06624cc380b Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Wed, 9 Jun 2021 19:14:03 +0300 Subject: [PATCH 1/4] wip --- .../map/impl/operation/MergeOperation.java | 4 ++ .../hazelcast/spi/impl/NodeEngineImpl.java | 39 +++++++++++++++---- .../merge/SplitBrainMergePolicyProvider.java | 10 ++--- .../config/YamlConfigBuilderTest.java | 7 +++- .../internal/config/ConfigValidatorTest.java | 2 +- .../config/MergePolicyValidatorTest.java | 2 +- .../SplitBrainMergePolicyProviderTest.java | 2 +- 7 files changed, 49 insertions(+), 17 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java index 2df90e4d2459..55b95f39bfca 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java @@ -33,6 +33,7 @@ import java.util.List; import static com.hazelcast.core.EntryEventType.MERGED; +import static com.hazelcast.internal.config.MergePolicyValidator.checkMapMergePolicy; /** * Contains multiple merge entries for split-brain @@ -77,6 +78,9 @@ protected boolean disableWanReplicationEvent() { @Override protected void runInternal() { + checkMapMergePolicy(mapContainer.getMapConfig(), mergePolicy.getClass().getName(), + getNodeEngine().getSplitBrainMergePolicyProvider()); + hasMapListener = mapEventPublisher.hasEventListener(name); hasWanReplication = mapContainer.isWanReplicationEnabled() && !disableWanReplicationEvent; diff --git a/hazelcast/src/main/java/com/hazelcast/spi/impl/NodeEngineImpl.java b/hazelcast/src/main/java/com/hazelcast/spi/impl/NodeEngineImpl.java index 6791af8fdb9b..6d6fa48bcb64 100644 --- a/hazelcast/src/main/java/com/hazelcast/spi/impl/NodeEngineImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/spi/impl/NodeEngineImpl.java @@ -19,6 +19,8 @@ import com.hazelcast.cluster.Address; import com.hazelcast.cluster.impl.MemberImpl; import com.hazelcast.config.Config; +import com.hazelcast.config.MapConfig; +import com.hazelcast.config.WanReplicationRef; import com.hazelcast.core.HazelcastException; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.instance.impl.Node; @@ -80,9 +82,11 @@ import javax.annotation.Nonnull; import java.util.Collection; import java.util.LinkedList; +import java.util.Map; import java.util.UUID; import java.util.function.Consumer; +import static com.hazelcast.internal.config.MergePolicyValidator.checkMapMergePolicy; import static com.hazelcast.internal.metrics.MetricDescriptorConstants.MEMORY_PREFIX; import static com.hazelcast.internal.metrics.impl.MetricsConfigHelper.memberMetricsLevel; import static com.hazelcast.internal.util.EmptyStatement.ignore; @@ -151,17 +155,20 @@ public NodeEngineImpl(Node node) { this.wanReplicationService = node.getNodeExtension().createService(WanReplicationService.class); this.sqlService = new SqlServiceImpl(this); this.packetDispatcher = new PacketDispatcher( - logger, - operationService.getOperationExecutor(), - operationService.getInboundResponseHandlerSupplier().get(), - operationService.getInvocationMonitor(), - eventService, - getJetPacketConsumer(node.getNodeExtension()), - sqlService + logger, + operationService.getOperationExecutor(), + operationService.getInboundResponseHandlerSupplier().get(), + operationService.getInvocationMonitor(), + eventService, + getJetPacketConsumer(node.getNodeExtension()), + sqlService ); this.splitBrainProtectionService = new SplitBrainProtectionServiceImpl(this); this.diagnostics = newDiagnostics(); - this.splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(this); + this.splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(configClassLoader); + + checkMapMergePolicies(node); + this.tenantControlService = new TenantControlServiceImpl(this); serviceManager.registerService(OperationServiceImpl.SERVICE_NAME, operationService); serviceManager.registerService(OperationParker.SERVICE_NAME, operationParker); @@ -178,6 +185,22 @@ public NodeEngineImpl(Node node) { } } + private void checkMapMergePolicies(Node node) { + Map mapConfigs = node.config.getMapConfigs(); + for (MapConfig mapConfig : mapConfigs.values()) { + WanReplicationRef wanReplicationRef = mapConfig.getWanReplicationRef(); + if (wanReplicationRef != null) { + String wanMergePolicyClassName = mapConfig.getWanReplicationRef().getMergePolicyClassName(); + checkMapMergePolicy(mapConfig, + wanMergePolicyClassName, splitBrainMergePolicyProvider); + } + + String splitBrainMergePolicyClassName = mapConfig.getMergePolicyConfig().getPolicy(); + checkMapMergePolicy(mapConfig, + splitBrainMergePolicyClassName, splitBrainMergePolicyProvider); + } + } + private ConcurrencyDetection newConcurrencyDetection() { HazelcastProperties properties = node.getProperties(); boolean writeThrough = properties.getBoolean(ClusterProperty.IO_WRITE_THROUGH_ENABLED); diff --git a/hazelcast/src/main/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProvider.java b/hazelcast/src/main/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProvider.java index bec78d47b386..8fda4ba4657a 100644 --- a/hazelcast/src/main/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProvider.java +++ b/hazelcast/src/main/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProvider.java @@ -17,8 +17,8 @@ package com.hazelcast.spi.merge; import com.hazelcast.config.InvalidConfigurationException; -import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.internal.util.ConstructorFunction; +import com.hazelcast.spi.impl.NodeEngine; import java.util.HashMap; import java.util.Map; @@ -51,7 +51,7 @@ public final class SplitBrainMergePolicyProvider { addPolicy(PutIfAbsentMergePolicy.class, new PutIfAbsentMergePolicy()); } - private final NodeEngine nodeEngine; + private final ClassLoader configClassLoader; private final ConcurrentMap mergePolicyMap = new ConcurrentHashMap(); @@ -61,7 +61,7 @@ public final class SplitBrainMergePolicyProvider { @Override public SplitBrainMergePolicy createNew(String className) { try { - return newInstance(nodeEngine.getConfigClassLoader(), className); + return newInstance(configClassLoader, className); } catch (Exception e) { throw new InvalidConfigurationException("Invalid SplitBrainMergePolicy: " + className, e); } @@ -73,8 +73,8 @@ public SplitBrainMergePolicy createNew(String className) { * * @param nodeEngine the {@link NodeEngine} to retrieve the classloader from */ - public SplitBrainMergePolicyProvider(NodeEngine nodeEngine) { - this.nodeEngine = nodeEngine; + public SplitBrainMergePolicyProvider(ClassLoader configClassLoader) { + this.configClassLoader = configClassLoader; this.mergePolicyMap.putAll(OUT_OF_THE_BOX_MERGE_POLICIES); } diff --git a/hazelcast/src/test/java/com/hazelcast/config/YamlConfigBuilderTest.java b/hazelcast/src/test/java/com/hazelcast/config/YamlConfigBuilderTest.java index a8e521049d35..c8301fb26458 100644 --- a/hazelcast/src/test/java/com/hazelcast/config/YamlConfigBuilderTest.java +++ b/hazelcast/src/test/java/com/hazelcast/config/YamlConfigBuilderTest.java @@ -26,6 +26,7 @@ import com.hazelcast.config.security.KerberosIdentityConfig; import com.hazelcast.config.security.LdapAuthenticationConfig; import com.hazelcast.config.security.RealmConfig; +import com.hazelcast.core.HazelcastInstance; import com.hazelcast.instance.EndpointQualifier; import com.hazelcast.internal.nio.IOUtil; import com.hazelcast.splitbrainprotection.SplitBrainProtectionOn; @@ -1376,13 +1377,17 @@ public void testMapWanReplicationRef() { + " " + mapName + ":\n" + " wan-replication-ref:\n" + " test:\n" - + " merge-policy-class-name: TestMergePolicy\n" + + " merge-policy-class-name: LatestUpdateMergePolicy\n" + " filters:\n" + " - com.example.SampleFilter\n"; Config config = buildConfig(yaml); MapConfig mapConfig = config.getMapConfig(mapName); WanReplicationRef wanRef = mapConfig.getWanReplicationRef(); + mapConfig.setPerEntryStatsEnabled(false); + + + HazelcastInstance hazelcastInstance = createHazelcastInstance(config); assertEquals(refName, wanRef.getName()); assertEquals(mergePolicy, wanRef.getMergePolicyClassName()); diff --git a/hazelcast/src/test/java/com/hazelcast/internal/config/ConfigValidatorTest.java b/hazelcast/src/test/java/com/hazelcast/internal/config/ConfigValidatorTest.java index 96b6a338ea89..819666b31cc4 100644 --- a/hazelcast/src/test/java/com/hazelcast/internal/config/ConfigValidatorTest.java +++ b/hazelcast/src/test/java/com/hazelcast/internal/config/ConfigValidatorTest.java @@ -65,7 +65,7 @@ public void setUp() { NodeEngine nodeEngine = Mockito.mock(NodeEngine.class); when(nodeEngine.getConfigClassLoader()).thenReturn(config.getClassLoader()); - splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(nodeEngine); + splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(config.getClassLoader()); when(nodeEngine.getSplitBrainMergePolicyProvider()).thenReturn(splitBrainMergePolicyProvider); properties = nodeEngine.getProperties(); diff --git a/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorTest.java b/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorTest.java index ae0a5474fd06..21e24423733f 100644 --- a/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorTest.java +++ b/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorTest.java @@ -43,7 +43,7 @@ public void setUp() { NodeEngine nodeEngine = Mockito.mock(NodeEngine.class); when(nodeEngine.getConfigClassLoader()).thenReturn(config.getClassLoader()); - mapMergePolicyProvider = new SplitBrainMergePolicyProvider(nodeEngine); + mapMergePolicyProvider = new SplitBrainMergePolicyProvider(config.getClassLoader()); when(nodeEngine.getSplitBrainMergePolicyProvider()).thenReturn(mapMergePolicyProvider); } diff --git a/hazelcast/src/test/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProviderTest.java b/hazelcast/src/test/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProviderTest.java index 02e13d6a58c4..bfc342408a1a 100644 --- a/hazelcast/src/test/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProviderTest.java +++ b/hazelcast/src/test/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProviderTest.java @@ -48,7 +48,7 @@ public class SplitBrainMergePolicyProviderTest extends HazelcastTestSupport { @Before public void setup() { - mergePolicyProvider = new SplitBrainMergePolicyProvider(getNode(createHazelcastInstance()).getNodeEngine()); + mergePolicyProvider = new SplitBrainMergePolicyProvider(getNode(createHazelcastInstance()).getConfigClassLoader()); } @Test From 9c00223c275014e968c9b79e88c71ffdc1fad6c7 Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Wed, 9 Jun 2021 22:20:30 +0300 Subject: [PATCH 2/4] wip test fixes --- .../config/YamlConfigBuilderTest.java | 7 +------ ...ergePolicyValidatorMapIntegrationTest.java | 21 ++++++++++++------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/hazelcast/src/test/java/com/hazelcast/config/YamlConfigBuilderTest.java b/hazelcast/src/test/java/com/hazelcast/config/YamlConfigBuilderTest.java index c8301fb26458..a8e521049d35 100644 --- a/hazelcast/src/test/java/com/hazelcast/config/YamlConfigBuilderTest.java +++ b/hazelcast/src/test/java/com/hazelcast/config/YamlConfigBuilderTest.java @@ -26,7 +26,6 @@ import com.hazelcast.config.security.KerberosIdentityConfig; import com.hazelcast.config.security.LdapAuthenticationConfig; import com.hazelcast.config.security.RealmConfig; -import com.hazelcast.core.HazelcastInstance; import com.hazelcast.instance.EndpointQualifier; import com.hazelcast.internal.nio.IOUtil; import com.hazelcast.splitbrainprotection.SplitBrainProtectionOn; @@ -1377,17 +1376,13 @@ public void testMapWanReplicationRef() { + " " + mapName + ":\n" + " wan-replication-ref:\n" + " test:\n" - + " merge-policy-class-name: LatestUpdateMergePolicy\n" + + " merge-policy-class-name: TestMergePolicy\n" + " filters:\n" + " - com.example.SampleFilter\n"; Config config = buildConfig(yaml); MapConfig mapConfig = config.getMapConfig(mapName); WanReplicationRef wanRef = mapConfig.getWanReplicationRef(); - mapConfig.setPerEntryStatsEnabled(false); - - - HazelcastInstance hazelcastInstance = createHazelcastInstance(config); assertEquals(refName, wanRef.getName()); assertEquals(mergePolicy, wanRef.getMergePolicyClassName()); diff --git a/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorMapIntegrationTest.java b/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorMapIntegrationTest.java index 315ea00cfcd6..3f524ef9217f 100644 --- a/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorMapIntegrationTest.java +++ b/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorMapIntegrationTest.java @@ -40,7 +40,7 @@ @Category({QuickTest.class, ParallelJVMTest.class}) public class MergePolicyValidatorMapIntegrationTest extends AbstractMergePolicyValidatorIntegrationTest { - private boolean perEntryStatsEnabled = false; + private boolean perEntryStatsEnabled = true; @Override void addConfig(Config config, String name, MergePolicyConfig mergePolicyConfig) { @@ -60,9 +60,9 @@ public void testMap_withPutIfAbsentMergePolicy() { @Test public void testMap_withHyperLogLogMergePolicy() { + expectCardinalityEstimatorException(); HazelcastInstance hz = getHazelcastInstance("cardinalityEstimator", hyperLogLogMergePolicy); - expectCardinalityEstimatorException(); hz.getMap("cardinalityEstimator"); } @@ -76,9 +76,9 @@ public void testMap_withHigherHitsMergePolicy() { @Test public void testMap_withInvalidMergePolicy() { + expectedInvalidMergePolicyException(); HazelcastInstance hz = getHazelcastInstance("invalid", invalidMergePolicyConfig); - expectedInvalidMergePolicyException(); hz.getMap("invalid"); } @@ -105,9 +105,11 @@ public void testMap_withExpirationTimeMergePolicy_withStatsEnabled() { */ @Test public void testMap_withLastStoredTimeMergePolicy() { + perEntryStatsEnabled = false; + expectedMapStatisticsDisabledException(lastStoredTimeMergePolicy); + HazelcastInstance hz = getHazelcastInstance("lastStoredTime", lastStoredTimeMergePolicy); - expectedMapStatisticsDisabledException(lastStoredTimeMergePolicy); hz.getMap("lastStoredTime"); } @@ -124,9 +126,11 @@ public void testMap_withLastStoredMergePolicy_withStatsEnabled() { */ @Test public void testMap_withLastStoredTimeMergePolicyNoTypeVariable() { + perEntryStatsEnabled = false; + expectedMapStatisticsDisabledException(lastStoredTimeMergePolicyNoTypeVariable); + HazelcastInstance hz = getHazelcastInstance("lastStoredTimeNoTypeVariable", lastStoredTimeMergePolicyNoTypeVariable); - expectedMapStatisticsDisabledException(lastStoredTimeMergePolicyNoTypeVariable); hz.getMap("lastStoredTimeNoTypeVariable"); } @@ -163,9 +167,10 @@ public void testMap_withComplexCustomMergePolicy_withStatsEnabled() { */ @Test public void testMap_withCustomMapMergePolicy() { + perEntryStatsEnabled = false; + expectedMapStatisticsDisabledException(customMapMergePolicy); HazelcastInstance hz = getHazelcastInstance("customMap", customMapMergePolicy); - expectedMapStatisticsDisabledException(customMapMergePolicy); hz.getMap("customMap"); } @@ -183,9 +188,11 @@ public void testMap_withCustomMapMergePolicy_withStatsEnabled() { */ @Test public void testMap_withCustomMapMergePolicyNoTypeVariable() { + perEntryStatsEnabled = false; + expectedMapStatisticsDisabledException(customMapMergePolicyNoTypeVariable); + HazelcastInstance hz = getHazelcastInstance("customMapNoTypeVariable", customMapMergePolicyNoTypeVariable); - expectedMapStatisticsDisabledException(customMapMergePolicyNoTypeVariable); hz.getMap("customMapNoTypeVariable"); } From ddd1f93f45a1b96e1d79cd91c69236842f2bae97 Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Thu, 10 Jun 2021 14:17:16 +0300 Subject: [PATCH 3/4] wip test fixes --- .../com/hazelcast/spring/TestFullApplicationContext.java | 6 +++--- .../spring/fullConfig-applicationContext-hazelcast.xml | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hazelcast-spring-tests/src/test/java/com/hazelcast/spring/TestFullApplicationContext.java b/hazelcast-spring-tests/src/test/java/com/hazelcast/spring/TestFullApplicationContext.java index 8c6b379cf269..a38405f273c8 100644 --- a/hazelcast-spring-tests/src/test/java/com/hazelcast/spring/TestFullApplicationContext.java +++ b/hazelcast-spring-tests/src/test/java/com/hazelcast/spring/TestFullApplicationContext.java @@ -319,7 +319,7 @@ public void testCacheConfig() { WanReplicationRef wanRef = cacheConfig.getWanReplicationRef(); assertEquals("testWan", wanRef.getName()); - assertEquals("PUT_IF_ABSENT", wanRef.getMergePolicyClassName()); + assertEquals("PutIfAbsentMergePolicy", wanRef.getMergePolicyClassName()); assertEquals(1, wanRef.getFilters().size()); assertEquals("com.example.SampleFilter", wanRef.getFilters().get(0)); } @@ -404,7 +404,7 @@ public void testMapConfig() { // test testMapConfig2's WanReplicationConfig WanReplicationRef wanReplicationRef = testMapConfig2.getWanReplicationRef(); assertEquals("testWan", wanReplicationRef.getName()); - assertEquals("PUT_IF_ABSENT", wanReplicationRef.getMergePolicyClassName()); + assertEquals("PutIfAbsentMergePolicy", wanReplicationRef.getMergePolicyClassName()); assertTrue(wanReplicationRef.isRepublishingEnabled()); assertEquals(1000, testMapConfig2.getEvictionConfig().getSize()); @@ -471,7 +471,7 @@ public void testMapNoWanMergePolicy() { // test testMapConfig2's WanReplicationConfig WanReplicationRef wanReplicationRef = testMapConfig2.getWanReplicationRef(); assertEquals("testWan", wanReplicationRef.getName()); - assertEquals("PUT_IF_ABSENT", wanReplicationRef.getMergePolicyClassName()); + assertEquals("PutIfAbsentMergePolicy", wanReplicationRef.getMergePolicyClassName()); } @Test diff --git a/hazelcast-spring-tests/src/test/resources/com/hazelcast/spring/fullConfig-applicationContext-hazelcast.xml b/hazelcast-spring-tests/src/test/resources/com/hazelcast/spring/fullConfig-applicationContext-hazelcast.xml index 97ca14fc0200..d83c5c06b2c0 100644 --- a/hazelcast-spring-tests/src/test/resources/com/hazelcast/spring/fullConfig-applicationContext-hazelcast.xml +++ b/hazelcast-spring-tests/src/test/resources/com/hazelcast/spring/fullConfig-applicationContext-hazelcast.xml @@ -374,7 +374,7 @@ implementation="dummyMapStore" write-delay-seconds="0" initial-mode="LAZY"/> - + @@ -508,7 +508,7 @@ - com.example.SampleFilter From f70fa31ceac9d50b769b309eda14035b5dfa0607 Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Wed, 16 Jun 2021 16:43:02 +0300 Subject: [PATCH 4/4] add spaced merge policy check --- .../com/hazelcast/map/impl/MapContainer.java | 6 ++++ .../map/impl/operation/MergeOperation.java | 28 +++++++++++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java b/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java index e393f81151f1..e8450cfb62db 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java @@ -64,6 +64,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -106,6 +107,7 @@ public class MapContainer { * Holds number of registered {@link InvalidationListener} from clients. */ protected final AtomicInteger invalidationListenerCount = new AtomicInteger(); + protected final AtomicLong lastInvalidMergePolicyCheckTime = new AtomicLong(); protected SplitBrainMergePolicy wanMergePolicy; protected DelegatingWanScheme wanReplicationDelegate; @@ -169,6 +171,10 @@ public Indexes createIndexes(boolean global) { .build(); } + public AtomicLong getLastInvalidMergePolicyCheckTime() { + return lastInvalidMergePolicyCheckTime; + } + private class IndexResultFilterFactory implements Supplier> { @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java index 55b95f39bfca..cc613c9a050f 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java @@ -16,7 +16,9 @@ package com.hazelcast.map.impl.operation; +import com.hazelcast.config.InvalidConfigurationException; import com.hazelcast.internal.serialization.Data; +import com.hazelcast.internal.util.Clock; import com.hazelcast.map.impl.MapDataSerializerHook; import com.hazelcast.map.impl.record.Record; import com.hazelcast.nio.ObjectDataInput; @@ -31,6 +33,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; import static com.hazelcast.core.EntryEventType.MERGED; import static com.hazelcast.internal.config.MergePolicyValidator.checkMapMergePolicy; @@ -44,6 +49,8 @@ public class MergeOperation extends MapOperation implements PartitionAwareOperation, BackupAwareOperation { + private static final long MERGE_POLICY_CHECK_PERIOD = TimeUnit.MINUTES.toMillis(1); + private boolean disableWanReplicationEvent; private List> mergingEntries; private SplitBrainMergePolicy, Object> mergePolicy; @@ -78,8 +85,15 @@ protected boolean disableWanReplicationEvent() { @Override protected void runInternal() { - checkMapMergePolicy(mapContainer.getMapConfig(), mergePolicy.getClass().getName(), - getNodeEngine().getSplitBrainMergePolicyProvider()); + // Check once in a minute as earliest to avoid log bursts. + if (shouldCheckNow(mapContainer.getLastInvalidMergePolicyCheckTime())) { + try { + checkMapMergePolicy(mapContainer.getMapConfig(), mergePolicy.getClass().getName(), + getNodeEngine().getSplitBrainMergePolicyProvider()); + } catch (InvalidConfigurationException e) { + logger().log(Level.WARNING, e.getMessage(), e); + } + } hasMapListener = mapEventPublisher.hasEventListener(name); hasWanReplication = mapContainer.isWanReplicationEnabled() @@ -104,6 +118,16 @@ protected void runInternal() { } } + private static boolean shouldCheckNow(AtomicLong lastLogTime) { + long now = Clock.currentTimeMillis(); + long lastLogged = lastLogTime.get(); + if (now - lastLogged >= MERGE_POLICY_CHECK_PERIOD) { + return lastLogTime.compareAndSet(lastLogged, now); + } + + return false; + } + private void merge(MapMergeTypes mergingEntry) { Data dataKey = getNodeEngine().toData(mergingEntry.getRawKey()); Data oldValue = hasMapListener ? getValue(dataKey) : null;