diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MapReplicationStateHolder.java b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MapReplicationStateHolder.java index 2d183487f42d..97c0f46cdda3 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MapReplicationStateHolder.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/operation/MapReplicationStateHolder.java @@ -16,12 +16,14 @@ package com.hazelcast.map.impl.operation; +import com.hazelcast.config.EvictionConfig; import com.hazelcast.config.IndexConfig; import com.hazelcast.config.MapConfig; import com.hazelcast.internal.cluster.Versions; import com.hazelcast.internal.monitor.LocalRecordStoreStats; import com.hazelcast.internal.monitor.impl.LocalRecordStoreStatsImpl; import com.hazelcast.internal.nio.IOUtil; +import com.hazelcast.internal.partition.IPartitionService; import com.hazelcast.internal.serialization.Data; import com.hazelcast.internal.serialization.SerializationService; import com.hazelcast.internal.services.ObjectNamespace; @@ -31,12 +33,16 @@ import com.hazelcast.internal.util.ThreadUtil; import com.hazelcast.map.impl.MapContainer; import com.hazelcast.map.impl.MapDataSerializerHook; +import com.hazelcast.map.impl.MapService; +import com.hazelcast.map.impl.MapServiceContext; import com.hazelcast.map.impl.PartitionContainer; +import com.hazelcast.map.impl.eviction.Evictor; import com.hazelcast.map.impl.record.Record; import com.hazelcast.map.impl.record.Records; import com.hazelcast.map.impl.recordstore.RecordStore; import com.hazelcast.map.impl.recordstore.expiry.ExpiryMetadata; import com.hazelcast.map.impl.recordstore.expiry.ExpiryMetadataImpl; +import com.hazelcast.map.impl.recordstore.expiry.ExpiryReason; import com.hazelcast.nio.ObjectDataInput; import com.hazelcast.nio.ObjectDataOutput; import com.hazelcast.nio.serialization.IdentifiedDataSerializable; @@ -54,6 +60,7 @@ import java.util.Map; import java.util.Set; +import static com.hazelcast.config.MaxSizePolicy.PER_NODE; import static com.hazelcast.internal.util.MapUtil.createHashMap; import static com.hazelcast.internal.util.MapUtil.isNullOrEmpty; @@ -139,7 +146,8 @@ void prepare(PartitionContainer container, Collection namespac } } - @SuppressWarnings("checkstyle:npathcomplexity") + @SuppressWarnings({"checkstyle:npathcomplexity", + "checkstyle:cyclomaticcomplexity", "checkstyle:nestedifdepth"}) void applyState() { ThreadUtil.assertRunningOnPartitionThread(); @@ -175,20 +183,37 @@ void applyState() { indexes.clearAll(); } + + long ownedEntryCountOnThisNode = entryCountOnThisNode(mapContainer); + EvictionConfig evictionConfig = mapContainer.getMapConfig().getEvictionConfig(); + boolean perNodeEvictionConfigured = mapContainer.getEvictor() != Evictor.NULL_EVICTOR + && evictionConfig.getMaxSizePolicy() == PER_NODE; + long nowInMillis = Clock.currentTimeMillis(); for (int i = 0; i < keyRecordExpiry.size(); i += 3) { Data dataKey = (Data) keyRecordExpiry.get(i); Record record = (Record) keyRecordExpiry.get(i + 1); ExpiryMetadata expiryMetadata = (ExpiryMetadata) keyRecordExpiry.get(i + 2); - recordStore.putReplicatedRecord(dataKey, record, expiryMetadata, populateIndexes, nowInMillis); - - if (recordStore.shouldEvict()) { - // No need to continue replicating records anymore. - // We are already over eviction threshold, each put record will cause another eviction. - recordStore.evictEntries(dataKey); - break; + if (perNodeEvictionConfigured) { + if (ownedEntryCountOnThisNode >= evictionConfig.getSize()) { + if (operation.getReplicaIndex() == 0) { + recordStore.doPostEvictionOperations(dataKey, record.getValue(), ExpiryReason.NOT_EXPIRED); + } + } else { + recordStore.putReplicatedRecord(dataKey, record, expiryMetadata, populateIndexes, nowInMillis); + ownedEntryCountOnThisNode++; + } + } else { + recordStore.putReplicatedRecord(dataKey, record, expiryMetadata, populateIndexes, nowInMillis); + if (recordStore.shouldEvict()) { + // No need to continue replicating records anymore. + // We are already over eviction threshold, each put record will cause another eviction. + recordStore.evictEntries(dataKey); + break; + } } + recordStore.disposeDeferredBlocks(); } @@ -208,6 +233,32 @@ void applyState() { } } + // owned or backup + private long entryCountOnThisNode(MapContainer mapContainer) { + int replicaIndex = operation.getReplicaIndex(); + long owned = 0; + if (mapContainer.getEvictor() != Evictor.NULL_EVICTOR + && PER_NODE == mapContainer.getMapConfig().getEvictionConfig().getMaxSizePolicy()) { + + MapService mapService = operation.getService(); + MapServiceContext mapServiceContext = mapService.getMapServiceContext(); + IPartitionService partitionService = mapServiceContext.getNodeEngine().getPartitionService(); + int partitionCount = partitionService.getPartitionCount(); + + for (int partitionId = 0; partitionId < partitionCount; partitionId++) { + if (replicaIndex == 0 ? partitionService.isPartitionOwner(partitionId) + : !partitionService.isPartitionOwner(partitionId)) { + RecordStore store = mapServiceContext.getExistingRecordStore(partitionId, mapContainer.getName()); + if (store != null) { + owned += store.size(); + } + } + } + } + + return owned; + } + private void applyIndexesState() { if (mapIndexInfos != null) { for (MapIndexInfo mapIndexInfo : mapIndexInfos) { diff --git a/hazelcast/src/test/java/com/hazelcast/map/EvictionMaxSizePolicyTest.java b/hazelcast/src/test/java/com/hazelcast/map/EvictionMaxSizePolicyTest.java index aefd96ce748f..f5a6bb84cfc0 100644 --- a/hazelcast/src/test/java/com/hazelcast/map/EvictionMaxSizePolicyTest.java +++ b/hazelcast/src/test/java/com/hazelcast/map/EvictionMaxSizePolicyTest.java @@ -23,7 +23,6 @@ import com.hazelcast.config.MapConfig; import com.hazelcast.config.MaxSizePolicy; import com.hazelcast.core.HazelcastInstance; -import com.hazelcast.spi.eviction.EvictionPolicyComparator; import com.hazelcast.internal.eviction.impl.comparator.LRUEvictionPolicyComparator; import com.hazelcast.internal.partition.IPartitionService; import com.hazelcast.internal.util.MemoryInfoAccessor; @@ -38,6 +37,8 @@ import com.hazelcast.map.impl.proxy.MapProxyImpl; import com.hazelcast.map.impl.recordstore.DefaultRecordStore; import com.hazelcast.map.impl.recordstore.RecordStore; +import com.hazelcast.map.listener.EntryEvictedListener; +import com.hazelcast.spi.eviction.EvictionPolicyComparator; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.properties.ClusterProperty; import com.hazelcast.test.AssertTask; @@ -52,7 +53,10 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import static com.hazelcast.config.MaxSizePolicy.FREE_HEAP_PERCENTAGE; import static com.hazelcast.config.MaxSizePolicy.FREE_HEAP_SIZE; @@ -85,8 +89,9 @@ public void testPerNodePolicy_withManyNodes() { @Test public void testPerNodePolicy_afterGracefulShutdown() { - int nodeCount = 2; + int nodeCount = 3; int perNodeMaxSize = 1000; + int numberOfPuts = 5000; // eviction takes place if a partitions size exceeds this number // see EvictionChecker#toPerPartitionMaxSize @@ -94,31 +99,58 @@ public void testPerNodePolicy_afterGracefulShutdown() { String mapName = "testPerNodePolicy_afterGracefulShutdown"; Config config = createConfig(PER_NODE, perNodeMaxSize, mapName); - + Set evictedKeySet = Collections.newSetFromMap(new ConcurrentHashMap<>()); // populate map from one of the nodes - Collection nodes = createNodes(nodeCount, config); - for (HazelcastInstance node : nodes) { - IMap map = node.getMap(mapName); - for (int i = 0; i < 5000; i++) { - map.put(i, i); - } + List nodes = createNodes(nodeCount, config); + assertClusterSize(3, nodes.toArray(new HazelcastInstance[0])); - node.shutdown(); - break; + IMap map = nodes.get(0).getMap(mapName); + for (int i = 0; i < numberOfPuts; i++) { + map.put(i, i); } - for (HazelcastInstance node : nodes) { - if (node.getLifecycleService().isRunning()) { - int mapSize = node.getMap(mapName).size(); - String message = format("map size is %d and it should be smaller " - + "than maxPartitionSize * PARTITION_COUNT which is %.0f", - mapSize, maxPartitionSize * PARTITION_COUNT); + int initialMapSize = map.size(); + + nodes.get(1).getMap(mapName).addEntryListener((EntryEvictedListener) event -> { + evictedKeySet.add(event.getKey()); + }, true); - assertTrue(message, mapSize <= maxPartitionSize * PARTITION_COUNT); + nodes.get(0).shutdown(); + + assertTrueEventually(() -> { + for (HazelcastInstance node : nodes) { + if (node.getLifecycleService().isRunning()) { + int currentMapSize = node.getMap(mapName).size(); + int evictedKeyCount = evictedKeySet.size(); + String message = format("initialMapSize=%d, evictedKeyCount=%d, map size is %d and it should be smaller " + + "than maxPartitionSize * PARTITION_COUNT which is %.0f", + initialMapSize, evictedKeyCount, currentMapSize, maxPartitionSize * PARTITION_COUNT); + + assertEquals(message, initialMapSize, evictedKeyCount + currentMapSize); + // current map size should approximately be around (nodeCount - 1) * perNodeMaxSize. + assertTrue(message, ((nodeCount - 1) * perNodeMaxSize) + + (PARTITION_COUNT / nodeCount) >= currentMapSize); + } } + + // check also backup entry count is around perNodeMaxSize. + IMap map1 = nodes.get(1).getMap(mapName); + IMap map2 = nodes.get(2).getMap(mapName); + long totalBackupEntryCount = getTotalBackupEntryCount(map1, map2); + assertTrue("totalBackupEntryCount=" + totalBackupEntryCount, ((nodeCount - 1) * perNodeMaxSize) + + (PARTITION_COUNT / nodeCount) >= totalBackupEntryCount); + }); + } + + private static long getTotalBackupEntryCount(IMap... maps) { + long total = 0; + for (IMap map : maps) { + total += map.getLocalMapStats().getBackupEntryCount(); } + return total; } + /** * Eviction starts if a partitions' size exceeds this number: * @@ -403,9 +435,11 @@ Collection createMaps(String mapName, Config config, int nodeCount) { Config createConfig(MaxSizePolicy maxSizePolicy, int maxSize, String mapName) { Config config = getConfig(); + config.getMetricsConfig().setEnabled(false); config.setProperty(ClusterProperty.PARTITION_COUNT.getName(), String.valueOf(PARTITION_COUNT)); MapConfig mapConfig = config.getMapConfig(mapName); + mapConfig.setBackupCount(1); EvictionConfig evictionConfig = mapConfig.getEvictionConfig(); evictionConfig.setEvictionPolicy(EvictionPolicy.LRU); evictionConfig.setMaxSizePolicy(maxSizePolicy); @@ -414,10 +448,10 @@ Config createConfig(MaxSizePolicy maxSizePolicy, int maxSize, String mapName) { return config; } - Collection createNodes(int nodeCount, Config config) { + List createNodes(int nodeCount, Config config) { TestHazelcastInstanceFactory factory = createHazelcastInstanceFactory(nodeCount); factory.newInstances(config); - return factory.getAllHazelcastInstances(); + return new ArrayList<>(factory.getAllHazelcastInstances()); } int getSize(Collection maps) {