Skip to content

Commit

Permalink
Fix size-based eviction when cluster scaled down
Browse files Browse the repository at this point in the history
Modification:
Main changes are in replication operation. In this operation, before putting entries into record-
store, we first calculate ownedEntryCountOnThisNode. If this value is below the per-node max-size,
we put entries till we reach the max-size. After reaching it, we quit putting entries.

This is an approximation approach. Based on my tests it is working and keeps per-node map-size
around expected value.
  • Loading branch information
ahmetmircik committed Jun 23, 2021
1 parent 65cd1ef commit 2c5c8db
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 28 deletions.
Expand Up @@ -16,12 +16,14 @@

package com.hazelcast.map.impl.operation;

import com.hazelcast.config.EvictionConfig;
import com.hazelcast.config.IndexConfig;
import com.hazelcast.config.MapConfig;
import com.hazelcast.internal.cluster.Versions;
import com.hazelcast.internal.monitor.LocalRecordStoreStats;
import com.hazelcast.internal.monitor.impl.LocalRecordStoreStatsImpl;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.partition.IPartitionService;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.internal.services.ObjectNamespace;
Expand All @@ -31,12 +33,16 @@
import com.hazelcast.internal.util.ThreadUtil;
import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.MapDataSerializerHook;
import com.hazelcast.map.impl.MapService;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.PartitionContainer;
import com.hazelcast.map.impl.eviction.Evictor;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.map.impl.record.Records;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.map.impl.recordstore.expiry.ExpiryMetadata;
import com.hazelcast.map.impl.recordstore.expiry.ExpiryMetadataImpl;
import com.hazelcast.map.impl.recordstore.expiry.ExpiryReason;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
Expand All @@ -54,6 +60,7 @@
import java.util.Map;
import java.util.Set;

import static com.hazelcast.config.MaxSizePolicy.PER_NODE;
import static com.hazelcast.internal.util.MapUtil.createHashMap;
import static com.hazelcast.internal.util.MapUtil.isNullOrEmpty;

Expand Down Expand Up @@ -139,7 +146,8 @@ void prepare(PartitionContainer container, Collection<ServiceNamespace> namespac
}
}

@SuppressWarnings("checkstyle:npathcomplexity")
@SuppressWarnings({"checkstyle:npathcomplexity",
"checkstyle:cyclomaticcomplexity", "checkstyle:nestedifdepth"})
void applyState() {
ThreadUtil.assertRunningOnPartitionThread();

Expand Down Expand Up @@ -175,20 +183,37 @@ void applyState() {
indexes.clearAll();
}


long ownedEntryCountOnThisNode = entryCountOnThisNode(mapContainer);
EvictionConfig evictionConfig = mapContainer.getMapConfig().getEvictionConfig();
boolean perNodeEvictionConfigured = mapContainer.getEvictor() != Evictor.NULL_EVICTOR
&& evictionConfig.getMaxSizePolicy() == PER_NODE;

long nowInMillis = Clock.currentTimeMillis();
for (int i = 0; i < keyRecordExpiry.size(); i += 3) {
Data dataKey = (Data) keyRecordExpiry.get(i);
Record record = (Record) keyRecordExpiry.get(i + 1);
ExpiryMetadata expiryMetadata = (ExpiryMetadata) keyRecordExpiry.get(i + 2);

recordStore.putReplicatedRecord(dataKey, record, expiryMetadata, populateIndexes, nowInMillis);

if (recordStore.shouldEvict()) {
// No need to continue replicating records anymore.
// We are already over eviction threshold, each put record will cause another eviction.
recordStore.evictEntries(dataKey);
break;
if (perNodeEvictionConfigured) {
if (ownedEntryCountOnThisNode >= evictionConfig.getSize()) {
if (operation.getReplicaIndex() == 0) {
recordStore.doPostEvictionOperations(dataKey, record.getValue(), ExpiryReason.NOT_EXPIRED);
}
} else {
recordStore.putReplicatedRecord(dataKey, record, expiryMetadata, populateIndexes, nowInMillis);
ownedEntryCountOnThisNode++;
}
} else {
recordStore.putReplicatedRecord(dataKey, record, expiryMetadata, populateIndexes, nowInMillis);
if (recordStore.shouldEvict()) {
// No need to continue replicating records anymore.
// We are already over eviction threshold, each put record will cause another eviction.
recordStore.evictEntries(dataKey);
break;
}
}

recordStore.disposeDeferredBlocks();
}

Expand All @@ -208,6 +233,32 @@ void applyState() {
}
}

// owned or backup
private long entryCountOnThisNode(MapContainer mapContainer) {
int replicaIndex = operation.getReplicaIndex();
long owned = 0;
if (mapContainer.getEvictor() != Evictor.NULL_EVICTOR
&& PER_NODE == mapContainer.getMapConfig().getEvictionConfig().getMaxSizePolicy()) {

MapService mapService = operation.getService();
MapServiceContext mapServiceContext = mapService.getMapServiceContext();
IPartitionService partitionService = mapServiceContext.getNodeEngine().getPartitionService();
int partitionCount = partitionService.getPartitionCount();

for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
if (replicaIndex == 0 ? partitionService.isPartitionOwner(partitionId)
: !partitionService.isPartitionOwner(partitionId)) {
RecordStore store = mapServiceContext.getExistingRecordStore(partitionId, mapContainer.getName());
if (store != null) {
owned += store.size();
}
}
}
}

return owned;
}

private void applyIndexesState() {
if (mapIndexInfos != null) {
for (MapIndexInfo mapIndexInfo : mapIndexInfos) {
Expand Down
Expand Up @@ -23,7 +23,6 @@
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MaxSizePolicy;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.spi.eviction.EvictionPolicyComparator;
import com.hazelcast.internal.eviction.impl.comparator.LRUEvictionPolicyComparator;
import com.hazelcast.internal.partition.IPartitionService;
import com.hazelcast.internal.util.MemoryInfoAccessor;
Expand All @@ -38,6 +37,8 @@
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.map.impl.recordstore.DefaultRecordStore;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.map.listener.EntryEvictedListener;
import com.hazelcast.spi.eviction.EvictionPolicyComparator;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.test.AssertTask;
Expand All @@ -52,7 +53,10 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static com.hazelcast.config.MaxSizePolicy.FREE_HEAP_PERCENTAGE;
import static com.hazelcast.config.MaxSizePolicy.FREE_HEAP_SIZE;
Expand Down Expand Up @@ -85,40 +89,68 @@ public void testPerNodePolicy_withManyNodes() {

@Test
public void testPerNodePolicy_afterGracefulShutdown() {
int nodeCount = 2;
int nodeCount = 3;
int perNodeMaxSize = 1000;
int numberOfPuts = 5000;

// eviction takes place if a partitions size exceeds this number
// see EvictionChecker#toPerPartitionMaxSize
double maxPartitionSize = 1D * nodeCount * perNodeMaxSize / PARTITION_COUNT;

String mapName = "testPerNodePolicy_afterGracefulShutdown";
Config config = createConfig(PER_NODE, perNodeMaxSize, mapName);

Set<Object> evictedKeySet = Collections.newSetFromMap(new ConcurrentHashMap<>());
// populate map from one of the nodes
Collection<HazelcastInstance> nodes = createNodes(nodeCount, config);
for (HazelcastInstance node : nodes) {
IMap map = node.getMap(mapName);
for (int i = 0; i < 5000; i++) {
map.put(i, i);
}
List<HazelcastInstance> nodes = createNodes(nodeCount, config);
assertClusterSize(3, nodes.toArray(new HazelcastInstance[0]));

node.shutdown();
break;
IMap map = nodes.get(0).getMap(mapName);
for (int i = 0; i < numberOfPuts; i++) {
map.put(i, i);
}

for (HazelcastInstance node : nodes) {
if (node.getLifecycleService().isRunning()) {
int mapSize = node.getMap(mapName).size();
String message = format("map size is %d and it should be smaller "
+ "than maxPartitionSize * PARTITION_COUNT which is %.0f",
mapSize, maxPartitionSize * PARTITION_COUNT);
int initialMapSize = map.size();

nodes.get(1).getMap(mapName).addEntryListener((EntryEvictedListener) event -> {
evictedKeySet.add(event.getKey());
}, true);

assertTrue(message, mapSize <= maxPartitionSize * PARTITION_COUNT);
nodes.get(0).shutdown();

assertTrueEventually(() -> {
for (HazelcastInstance node : nodes) {
if (node.getLifecycleService().isRunning()) {
int currentMapSize = node.getMap(mapName).size();
int evictedKeyCount = evictedKeySet.size();
String message = format("initialMapSize=%d, evictedKeyCount=%d, map size is %d and it should be smaller "
+ "than maxPartitionSize * PARTITION_COUNT which is %.0f",
initialMapSize, evictedKeyCount, currentMapSize, maxPartitionSize * PARTITION_COUNT);

assertEquals(message, initialMapSize, evictedKeyCount + currentMapSize);
// current map size should approximately be around (nodeCount - 1) * perNodeMaxSize.
assertTrue(message, ((nodeCount - 1) * perNodeMaxSize)
+ (PARTITION_COUNT / nodeCount) >= currentMapSize);
}
}

// check also backup entry count is around perNodeMaxSize.
IMap<Object, Object> map1 = nodes.get(1).getMap(mapName);
IMap<Object, Object> map2 = nodes.get(2).getMap(mapName);
long totalBackupEntryCount = getTotalBackupEntryCount(map1, map2);
assertTrue("totalBackupEntryCount=" + totalBackupEntryCount, ((nodeCount - 1) * perNodeMaxSize)
+ (PARTITION_COUNT / nodeCount) >= totalBackupEntryCount);
});
}

private static long getTotalBackupEntryCount(IMap... maps) {
long total = 0;
for (IMap map : maps) {
total += map.getLocalMapStats().getBackupEntryCount();
}
return total;
}


/**
* Eviction starts if a partitions' size exceeds this number:
*
Expand Down Expand Up @@ -403,9 +435,11 @@ Collection<IMap> createMaps(String mapName, Config config, int nodeCount) {

Config createConfig(MaxSizePolicy maxSizePolicy, int maxSize, String mapName) {
Config config = getConfig();
config.getMetricsConfig().setEnabled(false);
config.setProperty(ClusterProperty.PARTITION_COUNT.getName(), String.valueOf(PARTITION_COUNT));

MapConfig mapConfig = config.getMapConfig(mapName);
mapConfig.setBackupCount(1);
EvictionConfig evictionConfig = mapConfig.getEvictionConfig();
evictionConfig.setEvictionPolicy(EvictionPolicy.LRU);
evictionConfig.setMaxSizePolicy(maxSizePolicy);
Expand All @@ -414,10 +448,10 @@ Config createConfig(MaxSizePolicy maxSizePolicy, int maxSize, String mapName) {
return config;
}

Collection<HazelcastInstance> createNodes(int nodeCount, Config config) {
List<HazelcastInstance> createNodes(int nodeCount, Config config) {
TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(nodeCount);
factory.newInstances(config);
return factory.getAllHazelcastInstances();
return new ArrayList<>(factory.getAllHazelcastInstances());
}

int getSize(Collection<IMap> maps) {
Expand Down

0 comments on commit 2c5c8db

Please sign in to comment.