Skip to content

Commit

Permalink
add spaced merge policy check
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmetmircik committed Jun 16, 2021
1 parent ddd1f93 commit f70fa31
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
Expand Up @@ -64,6 +64,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -106,6 +107,7 @@ public class MapContainer {
* Holds number of registered {@link InvalidationListener} from clients.
*/
protected final AtomicInteger invalidationListenerCount = new AtomicInteger();
protected final AtomicLong lastInvalidMergePolicyCheckTime = new AtomicLong();

protected SplitBrainMergePolicy wanMergePolicy;
protected DelegatingWanScheme wanReplicationDelegate;
Expand Down Expand Up @@ -169,6 +171,10 @@ public Indexes createIndexes(boolean global) {
.build();
}

public AtomicLong getLastInvalidMergePolicyCheckTime() {
return lastInvalidMergePolicyCheckTime;
}

private class IndexResultFilterFactory implements Supplier<Predicate<QueryableEntry>> {

@Override
Expand Down
Expand Up @@ -16,7 +16,9 @@

package com.hazelcast.map.impl.operation;

import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.map.impl.MapDataSerializerHook;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.nio.ObjectDataInput;
Expand All @@ -31,6 +33,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;

import static com.hazelcast.core.EntryEventType.MERGED;
import static com.hazelcast.internal.config.MergePolicyValidator.checkMapMergePolicy;
Expand All @@ -44,6 +49,8 @@
public class MergeOperation extends MapOperation
implements PartitionAwareOperation, BackupAwareOperation {

private static final long MERGE_POLICY_CHECK_PERIOD = TimeUnit.MINUTES.toMillis(1);

private boolean disableWanReplicationEvent;
private List<MapMergeTypes<Object, Object>> mergingEntries;
private SplitBrainMergePolicy<Object, MapMergeTypes<Object, Object>, Object> mergePolicy;
Expand Down Expand Up @@ -78,8 +85,15 @@ protected boolean disableWanReplicationEvent() {

@Override
protected void runInternal() {
checkMapMergePolicy(mapContainer.getMapConfig(), mergePolicy.getClass().getName(),
getNodeEngine().getSplitBrainMergePolicyProvider());
// Check once in a minute as earliest to avoid log bursts.
if (shouldCheckNow(mapContainer.getLastInvalidMergePolicyCheckTime())) {
try {
checkMapMergePolicy(mapContainer.getMapConfig(), mergePolicy.getClass().getName(),
getNodeEngine().getSplitBrainMergePolicyProvider());
} catch (InvalidConfigurationException e) {
logger().log(Level.WARNING, e.getMessage(), e);
}
}

hasMapListener = mapEventPublisher.hasEventListener(name);
hasWanReplication = mapContainer.isWanReplicationEnabled()
Expand All @@ -104,6 +118,16 @@ protected void runInternal() {
}
}

private static boolean shouldCheckNow(AtomicLong lastLogTime) {
long now = Clock.currentTimeMillis();
long lastLogged = lastLogTime.get();
if (now - lastLogged >= MERGE_POLICY_CHECK_PERIOD) {
return lastLogTime.compareAndSet(lastLogged, now);
}

return false;
}

private void merge(MapMergeTypes<Object, Object> mergingEntry) {
Data dataKey = getNodeEngine().toData(mergingEntry.getRawKey());
Data oldValue = hasMapListener ? getValue(dataKey) : null;
Expand Down

0 comments on commit f70fa31

Please sign in to comment.