diff --git a/hazelcast-spring-tests/src/test/java/com/hazelcast/spring/TestFullApplicationContext.java b/hazelcast-spring-tests/src/test/java/com/hazelcast/spring/TestFullApplicationContext.java index 359872ed532b..137a8caf9993 100644 --- a/hazelcast-spring-tests/src/test/java/com/hazelcast/spring/TestFullApplicationContext.java +++ b/hazelcast-spring-tests/src/test/java/com/hazelcast/spring/TestFullApplicationContext.java @@ -328,7 +328,7 @@ public void testCacheConfig() { WanReplicationRef wanRef = cacheConfig.getWanReplicationRef(); assertEquals("testWan", wanRef.getName()); - assertEquals("PUT_IF_ABSENT", wanRef.getMergePolicyClassName()); + assertEquals("PutIfAbsentMergePolicy", wanRef.getMergePolicyClassName()); assertEquals(1, wanRef.getFilters().size()); assertEquals("com.example.SampleFilter", wanRef.getFilters().get(0)); } @@ -415,7 +415,7 @@ public void testMapConfig() { // test testMapConfig2's WanReplicationConfig WanReplicationRef wanReplicationRef = testMapConfig2.getWanReplicationRef(); assertEquals("testWan", wanReplicationRef.getName()); - assertEquals("PUT_IF_ABSENT", wanReplicationRef.getMergePolicyClassName()); + assertEquals("PutIfAbsentMergePolicy", wanReplicationRef.getMergePolicyClassName()); assertTrue(wanReplicationRef.isRepublishingEnabled()); assertEquals(1000, testMapConfig2.getEvictionConfig().getSize()); @@ -482,7 +482,7 @@ public void testMapNoWanMergePolicy() { // test testMapConfig2's WanReplicationConfig WanReplicationRef wanReplicationRef = testMapConfig2.getWanReplicationRef(); assertEquals("testWan", wanReplicationRef.getName()); - assertEquals("PUT_IF_ABSENT", wanReplicationRef.getMergePolicyClassName()); + assertEquals("PutIfAbsentMergePolicy", wanReplicationRef.getMergePolicyClassName()); } @Test diff --git a/hazelcast-spring-tests/src/test/resources/com/hazelcast/spring/fullConfig-applicationContext-hazelcast.xml b/hazelcast-spring-tests/src/test/resources/com/hazelcast/spring/fullConfig-applicationContext-hazelcast.xml index cd49df9ad643..bf777c4d7aa4 100644 --- a/hazelcast-spring-tests/src/test/resources/com/hazelcast/spring/fullConfig-applicationContext-hazelcast.xml +++ b/hazelcast-spring-tests/src/test/resources/com/hazelcast/spring/fullConfig-applicationContext-hazelcast.xml @@ -374,7 +374,7 @@ implementation="dummyMapStore" write-delay-seconds="0" initial-mode="LAZY"/> - + @@ -508,7 +508,7 @@ - com.example.SampleFilter diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java b/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java index e393f81151f1..e8450cfb62db 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java @@ -64,6 +64,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -106,6 +107,7 @@ public class MapContainer { * Holds number of registered {@link InvalidationListener} from clients. */ protected final AtomicInteger invalidationListenerCount = new AtomicInteger(); + protected final AtomicLong lastInvalidMergePolicyCheckTime = new AtomicLong(); protected SplitBrainMergePolicy wanMergePolicy; protected DelegatingWanScheme wanReplicationDelegate; @@ -169,6 +171,10 @@ public Indexes createIndexes(boolean global) { .build(); } + public AtomicLong getLastInvalidMergePolicyCheckTime() { + return lastInvalidMergePolicyCheckTime; + } + private class IndexResultFilterFactory implements Supplier> { @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java index 2df90e4d2459..cc613c9a050f 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java @@ -16,7 +16,9 @@ package com.hazelcast.map.impl.operation; +import com.hazelcast.config.InvalidConfigurationException; import com.hazelcast.internal.serialization.Data; +import com.hazelcast.internal.util.Clock; import com.hazelcast.map.impl.MapDataSerializerHook; import com.hazelcast.map.impl.record.Record; import com.hazelcast.nio.ObjectDataInput; @@ -31,8 +33,12 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; import static com.hazelcast.core.EntryEventType.MERGED; +import static com.hazelcast.internal.config.MergePolicyValidator.checkMapMergePolicy; /** * Contains multiple merge entries for split-brain @@ -43,6 +49,8 @@ public class MergeOperation extends MapOperation implements PartitionAwareOperation, BackupAwareOperation { + private static final long MERGE_POLICY_CHECK_PERIOD = TimeUnit.MINUTES.toMillis(1); + private boolean disableWanReplicationEvent; private List> mergingEntries; private SplitBrainMergePolicy, Object> mergePolicy; @@ -77,6 +85,16 @@ protected boolean disableWanReplicationEvent() { @Override protected void runInternal() { + // Check once in a minute as earliest to avoid log bursts. + if (shouldCheckNow(mapContainer.getLastInvalidMergePolicyCheckTime())) { + try { + checkMapMergePolicy(mapContainer.getMapConfig(), mergePolicy.getClass().getName(), + getNodeEngine().getSplitBrainMergePolicyProvider()); + } catch (InvalidConfigurationException e) { + logger().log(Level.WARNING, e.getMessage(), e); + } + } + hasMapListener = mapEventPublisher.hasEventListener(name); hasWanReplication = mapContainer.isWanReplicationEnabled() && !disableWanReplicationEvent; @@ -100,6 +118,16 @@ protected void runInternal() { } } + private static boolean shouldCheckNow(AtomicLong lastLogTime) { + long now = Clock.currentTimeMillis(); + long lastLogged = lastLogTime.get(); + if (now - lastLogged >= MERGE_POLICY_CHECK_PERIOD) { + return lastLogTime.compareAndSet(lastLogged, now); + } + + return false; + } + private void merge(MapMergeTypes mergingEntry) { Data dataKey = getNodeEngine().toData(mergingEntry.getRawKey()); Data oldValue = hasMapListener ? getValue(dataKey) : null; diff --git a/hazelcast/src/main/java/com/hazelcast/spi/impl/NodeEngineImpl.java b/hazelcast/src/main/java/com/hazelcast/spi/impl/NodeEngineImpl.java index 6791af8fdb9b..6d6fa48bcb64 100644 --- a/hazelcast/src/main/java/com/hazelcast/spi/impl/NodeEngineImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/spi/impl/NodeEngineImpl.java @@ -19,6 +19,8 @@ import com.hazelcast.cluster.Address; import com.hazelcast.cluster.impl.MemberImpl; import com.hazelcast.config.Config; +import com.hazelcast.config.MapConfig; +import com.hazelcast.config.WanReplicationRef; import com.hazelcast.core.HazelcastException; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.instance.impl.Node; @@ -80,9 +82,11 @@ import javax.annotation.Nonnull; import java.util.Collection; import java.util.LinkedList; +import java.util.Map; import java.util.UUID; import java.util.function.Consumer; +import static com.hazelcast.internal.config.MergePolicyValidator.checkMapMergePolicy; import static com.hazelcast.internal.metrics.MetricDescriptorConstants.MEMORY_PREFIX; import static com.hazelcast.internal.metrics.impl.MetricsConfigHelper.memberMetricsLevel; import static com.hazelcast.internal.util.EmptyStatement.ignore; @@ -151,17 +155,20 @@ public NodeEngineImpl(Node node) { this.wanReplicationService = node.getNodeExtension().createService(WanReplicationService.class); this.sqlService = new SqlServiceImpl(this); this.packetDispatcher = new PacketDispatcher( - logger, - operationService.getOperationExecutor(), - operationService.getInboundResponseHandlerSupplier().get(), - operationService.getInvocationMonitor(), - eventService, - getJetPacketConsumer(node.getNodeExtension()), - sqlService + logger, + operationService.getOperationExecutor(), + operationService.getInboundResponseHandlerSupplier().get(), + operationService.getInvocationMonitor(), + eventService, + getJetPacketConsumer(node.getNodeExtension()), + sqlService ); this.splitBrainProtectionService = new SplitBrainProtectionServiceImpl(this); this.diagnostics = newDiagnostics(); - this.splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(this); + this.splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(configClassLoader); + + checkMapMergePolicies(node); + this.tenantControlService = new TenantControlServiceImpl(this); serviceManager.registerService(OperationServiceImpl.SERVICE_NAME, operationService); serviceManager.registerService(OperationParker.SERVICE_NAME, operationParker); @@ -178,6 +185,22 @@ public NodeEngineImpl(Node node) { } } + private void checkMapMergePolicies(Node node) { + Map mapConfigs = node.config.getMapConfigs(); + for (MapConfig mapConfig : mapConfigs.values()) { + WanReplicationRef wanReplicationRef = mapConfig.getWanReplicationRef(); + if (wanReplicationRef != null) { + String wanMergePolicyClassName = mapConfig.getWanReplicationRef().getMergePolicyClassName(); + checkMapMergePolicy(mapConfig, + wanMergePolicyClassName, splitBrainMergePolicyProvider); + } + + String splitBrainMergePolicyClassName = mapConfig.getMergePolicyConfig().getPolicy(); + checkMapMergePolicy(mapConfig, + splitBrainMergePolicyClassName, splitBrainMergePolicyProvider); + } + } + private ConcurrencyDetection newConcurrencyDetection() { HazelcastProperties properties = node.getProperties(); boolean writeThrough = properties.getBoolean(ClusterProperty.IO_WRITE_THROUGH_ENABLED); diff --git a/hazelcast/src/main/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProvider.java b/hazelcast/src/main/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProvider.java index bec78d47b386..8fda4ba4657a 100644 --- a/hazelcast/src/main/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProvider.java +++ b/hazelcast/src/main/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProvider.java @@ -17,8 +17,8 @@ package com.hazelcast.spi.merge; import com.hazelcast.config.InvalidConfigurationException; -import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.internal.util.ConstructorFunction; +import com.hazelcast.spi.impl.NodeEngine; import java.util.HashMap; import java.util.Map; @@ -51,7 +51,7 @@ public final class SplitBrainMergePolicyProvider { addPolicy(PutIfAbsentMergePolicy.class, new PutIfAbsentMergePolicy()); } - private final NodeEngine nodeEngine; + private final ClassLoader configClassLoader; private final ConcurrentMap mergePolicyMap = new ConcurrentHashMap(); @@ -61,7 +61,7 @@ public final class SplitBrainMergePolicyProvider { @Override public SplitBrainMergePolicy createNew(String className) { try { - return newInstance(nodeEngine.getConfigClassLoader(), className); + return newInstance(configClassLoader, className); } catch (Exception e) { throw new InvalidConfigurationException("Invalid SplitBrainMergePolicy: " + className, e); } @@ -73,8 +73,8 @@ public SplitBrainMergePolicy createNew(String className) { * * @param nodeEngine the {@link NodeEngine} to retrieve the classloader from */ - public SplitBrainMergePolicyProvider(NodeEngine nodeEngine) { - this.nodeEngine = nodeEngine; + public SplitBrainMergePolicyProvider(ClassLoader configClassLoader) { + this.configClassLoader = configClassLoader; this.mergePolicyMap.putAll(OUT_OF_THE_BOX_MERGE_POLICIES); } diff --git a/hazelcast/src/test/java/com/hazelcast/internal/config/ConfigValidatorTest.java b/hazelcast/src/test/java/com/hazelcast/internal/config/ConfigValidatorTest.java index 96b6a338ea89..819666b31cc4 100644 --- a/hazelcast/src/test/java/com/hazelcast/internal/config/ConfigValidatorTest.java +++ b/hazelcast/src/test/java/com/hazelcast/internal/config/ConfigValidatorTest.java @@ -65,7 +65,7 @@ public void setUp() { NodeEngine nodeEngine = Mockito.mock(NodeEngine.class); when(nodeEngine.getConfigClassLoader()).thenReturn(config.getClassLoader()); - splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(nodeEngine); + splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(config.getClassLoader()); when(nodeEngine.getSplitBrainMergePolicyProvider()).thenReturn(splitBrainMergePolicyProvider); properties = nodeEngine.getProperties(); diff --git a/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorMapIntegrationTest.java b/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorMapIntegrationTest.java index 315ea00cfcd6..3f524ef9217f 100644 --- a/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorMapIntegrationTest.java +++ b/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorMapIntegrationTest.java @@ -40,7 +40,7 @@ @Category({QuickTest.class, ParallelJVMTest.class}) public class MergePolicyValidatorMapIntegrationTest extends AbstractMergePolicyValidatorIntegrationTest { - private boolean perEntryStatsEnabled = false; + private boolean perEntryStatsEnabled = true; @Override void addConfig(Config config, String name, MergePolicyConfig mergePolicyConfig) { @@ -60,9 +60,9 @@ public void testMap_withPutIfAbsentMergePolicy() { @Test public void testMap_withHyperLogLogMergePolicy() { + expectCardinalityEstimatorException(); HazelcastInstance hz = getHazelcastInstance("cardinalityEstimator", hyperLogLogMergePolicy); - expectCardinalityEstimatorException(); hz.getMap("cardinalityEstimator"); } @@ -76,9 +76,9 @@ public void testMap_withHigherHitsMergePolicy() { @Test public void testMap_withInvalidMergePolicy() { + expectedInvalidMergePolicyException(); HazelcastInstance hz = getHazelcastInstance("invalid", invalidMergePolicyConfig); - expectedInvalidMergePolicyException(); hz.getMap("invalid"); } @@ -105,9 +105,11 @@ public void testMap_withExpirationTimeMergePolicy_withStatsEnabled() { */ @Test public void testMap_withLastStoredTimeMergePolicy() { + perEntryStatsEnabled = false; + expectedMapStatisticsDisabledException(lastStoredTimeMergePolicy); + HazelcastInstance hz = getHazelcastInstance("lastStoredTime", lastStoredTimeMergePolicy); - expectedMapStatisticsDisabledException(lastStoredTimeMergePolicy); hz.getMap("lastStoredTime"); } @@ -124,9 +126,11 @@ public void testMap_withLastStoredMergePolicy_withStatsEnabled() { */ @Test public void testMap_withLastStoredTimeMergePolicyNoTypeVariable() { + perEntryStatsEnabled = false; + expectedMapStatisticsDisabledException(lastStoredTimeMergePolicyNoTypeVariable); + HazelcastInstance hz = getHazelcastInstance("lastStoredTimeNoTypeVariable", lastStoredTimeMergePolicyNoTypeVariable); - expectedMapStatisticsDisabledException(lastStoredTimeMergePolicyNoTypeVariable); hz.getMap("lastStoredTimeNoTypeVariable"); } @@ -163,9 +167,10 @@ public void testMap_withComplexCustomMergePolicy_withStatsEnabled() { */ @Test public void testMap_withCustomMapMergePolicy() { + perEntryStatsEnabled = false; + expectedMapStatisticsDisabledException(customMapMergePolicy); HazelcastInstance hz = getHazelcastInstance("customMap", customMapMergePolicy); - expectedMapStatisticsDisabledException(customMapMergePolicy); hz.getMap("customMap"); } @@ -183,9 +188,11 @@ public void testMap_withCustomMapMergePolicy_withStatsEnabled() { */ @Test public void testMap_withCustomMapMergePolicyNoTypeVariable() { + perEntryStatsEnabled = false; + expectedMapStatisticsDisabledException(customMapMergePolicyNoTypeVariable); + HazelcastInstance hz = getHazelcastInstance("customMapNoTypeVariable", customMapMergePolicyNoTypeVariable); - expectedMapStatisticsDisabledException(customMapMergePolicyNoTypeVariable); hz.getMap("customMapNoTypeVariable"); } diff --git a/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorTest.java b/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorTest.java index ae0a5474fd06..21e24423733f 100644 --- a/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorTest.java +++ b/hazelcast/src/test/java/com/hazelcast/internal/config/MergePolicyValidatorTest.java @@ -43,7 +43,7 @@ public void setUp() { NodeEngine nodeEngine = Mockito.mock(NodeEngine.class); when(nodeEngine.getConfigClassLoader()).thenReturn(config.getClassLoader()); - mapMergePolicyProvider = new SplitBrainMergePolicyProvider(nodeEngine); + mapMergePolicyProvider = new SplitBrainMergePolicyProvider(config.getClassLoader()); when(nodeEngine.getSplitBrainMergePolicyProvider()).thenReturn(mapMergePolicyProvider); } diff --git a/hazelcast/src/test/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProviderTest.java b/hazelcast/src/test/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProviderTest.java index 02e13d6a58c4..bfc342408a1a 100644 --- a/hazelcast/src/test/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProviderTest.java +++ b/hazelcast/src/test/java/com/hazelcast/spi/merge/SplitBrainMergePolicyProviderTest.java @@ -48,7 +48,7 @@ public class SplitBrainMergePolicyProviderTest extends HazelcastTestSupport { @Before public void setup() { - mergePolicyProvider = new SplitBrainMergePolicyProvider(getNode(createHazelcastInstance()).getNodeEngine()); + mergePolicyProvider = new SplitBrainMergePolicyProvider(getNode(createHazelcastInstance()).getConfigClassLoader()); } @Test