From f70fa31ceac9d50b769b309eda14035b5dfa0607 Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Wed, 16 Jun 2021 16:43:02 +0300 Subject: [PATCH] add spaced merge policy check --- .../com/hazelcast/map/impl/MapContainer.java | 6 ++++ .../map/impl/operation/MergeOperation.java | 28 +++++++++++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java b/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java index e393f81151f1..e8450cfb62db 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/MapContainer.java @@ -64,6 +64,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -106,6 +107,7 @@ public class MapContainer { * Holds number of registered {@link InvalidationListener} from clients. */ protected final AtomicInteger invalidationListenerCount = new AtomicInteger(); + protected final AtomicLong lastInvalidMergePolicyCheckTime = new AtomicLong(); protected SplitBrainMergePolicy wanMergePolicy; protected DelegatingWanScheme wanReplicationDelegate; @@ -169,6 +171,10 @@ public Indexes createIndexes(boolean global) { .build(); } + public AtomicLong getLastInvalidMergePolicyCheckTime() { + return lastInvalidMergePolicyCheckTime; + } + private class IndexResultFilterFactory implements Supplier> { @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java index 55b95f39bfca..cc613c9a050f 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MergeOperation.java @@ -16,7 +16,9 @@ package com.hazelcast.map.impl.operation; +import com.hazelcast.config.InvalidConfigurationException; import com.hazelcast.internal.serialization.Data; +import com.hazelcast.internal.util.Clock; import com.hazelcast.map.impl.MapDataSerializerHook; import com.hazelcast.map.impl.record.Record; import com.hazelcast.nio.ObjectDataInput; @@ -31,6 +33,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; import static com.hazelcast.core.EntryEventType.MERGED; import static com.hazelcast.internal.config.MergePolicyValidator.checkMapMergePolicy; @@ -44,6 +49,8 @@ public class MergeOperation extends MapOperation implements PartitionAwareOperation, BackupAwareOperation { + private static final long MERGE_POLICY_CHECK_PERIOD = TimeUnit.MINUTES.toMillis(1); + private boolean disableWanReplicationEvent; private List> mergingEntries; private SplitBrainMergePolicy, Object> mergePolicy; @@ -78,8 +85,15 @@ protected boolean disableWanReplicationEvent() { @Override protected void runInternal() { - checkMapMergePolicy(mapContainer.getMapConfig(), mergePolicy.getClass().getName(), - getNodeEngine().getSplitBrainMergePolicyProvider()); + // Check once in a minute as earliest to avoid log bursts. + if (shouldCheckNow(mapContainer.getLastInvalidMergePolicyCheckTime())) { + try { + checkMapMergePolicy(mapContainer.getMapConfig(), mergePolicy.getClass().getName(), + getNodeEngine().getSplitBrainMergePolicyProvider()); + } catch (InvalidConfigurationException e) { + logger().log(Level.WARNING, e.getMessage(), e); + } + } hasMapListener = mapEventPublisher.hasEventListener(name); hasWanReplication = mapContainer.isWanReplicationEnabled() @@ -104,6 +118,16 @@ protected void runInternal() { } } + private static boolean shouldCheckNow(AtomicLong lastLogTime) { + long now = Clock.currentTimeMillis(); + long lastLogged = lastLogTime.get(); + if (now - lastLogged >= MERGE_POLICY_CHECK_PERIOD) { + return lastLogTime.compareAndSet(lastLogged, now); + } + + return false; + } + private void merge(MapMergeTypes mergingEntry) { Data dataKey = getNodeEngine().toData(mergingEntry.getRawKey()); Data oldValue = hasMapListener ? getValue(dataKey) : null;