diff --git a/hazelcast/src/main/java/com/hazelcast/internal/eviction/ClearExpiredRecordsTask.java b/hazelcast/src/main/java/com/hazelcast/internal/eviction/ClearExpiredRecordsTask.java index 3ec745a932bb..1c3bcc0cc8a3 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/eviction/ClearExpiredRecordsTask.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/eviction/ClearExpiredRecordsTask.java @@ -28,9 +28,9 @@ import com.hazelcast.spi.properties.HazelcastProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -252,7 +252,7 @@ private boolean notInProcessableTimeWindow(T container, long now) { private List addContainerTo(List containersToProcess, T container) { if (containersToProcess == null) { - containersToProcess = new ArrayList(); + containersToProcess = new LinkedList<>(); } containersToProcess.add(container); diff --git a/hazelcast/src/main/java/com/hazelcast/internal/eviction/ToBackupSender.java b/hazelcast/src/main/java/com/hazelcast/internal/eviction/ToBackupSender.java index 80e86fdaadeb..cf62f8a3b77b 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/eviction/ToBackupSender.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/eviction/ToBackupSender.java @@ -16,16 +16,16 @@ package com.hazelcast.internal.eviction; -import java.util.function.BiFunction; import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue; +import com.hazelcast.internal.util.CollectionUtil; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.impl.operationservice.Operation; import com.hazelcast.spi.impl.operationservice.OperationService; -import com.hazelcast.internal.util.CollectionUtil; -import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedList; import java.util.Queue; +import java.util.function.BiFunction; /** * Helper class to create and send backup expiration operations. @@ -59,7 +59,7 @@ static ToBackupSender newToBackupSender(String serviceName, } private static Collection pollExpiredKeys(Queue expiredKeys) { - Collection polledKeys = new ArrayList(expiredKeys.size()); + Collection polledKeys = new LinkedList<>(); do { ExpiredKey expiredKey = expiredKeys.poll(); diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java index 07bfb0432a63..c4d576f87e46 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java @@ -511,7 +511,7 @@ public Object evict(Data key, boolean backup) { value = record.getValue(); mapDataStore.flush(key, value, backup); mutationObserver.onEvictRecord(key, record); - key = removeKeyFromExpirySystem(key); + removeKeyFromExpirySystem(key); storage.removeRecord(key, record); if (!backup) { mapServiceContext.interceptRemove(interceptorRegistry, value); @@ -520,10 +520,8 @@ public Object evict(Data key, boolean backup) { return value; } - private Data removeKeyFromExpirySystem(Data key) { - Data backingDataKeyFormat = storage.toBackingDataKeyFormat(key); - expirySystem.removeKeyFromExpirySystem(backingDataKeyFormat); - return backingDataKeyFormat; + private void removeKeyFromExpirySystem(Data key) { + expirySystem.removeKeyFromExpirySystem(key); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java index 38a89b977b5c..36cc7a260153 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java @@ -21,12 +21,12 @@ import com.hazelcast.internal.eviction.ExpiredKey; import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue; import com.hazelcast.internal.serialization.Data; +import com.hazelcast.internal.util.ExceptionUtil; import com.hazelcast.internal.util.MapUtil; import com.hazelcast.logging.ILogger; import com.hazelcast.map.impl.ExpirationTimeSetter; import com.hazelcast.map.impl.MapContainer; import com.hazelcast.map.impl.MapServiceContext; -import com.hazelcast.map.impl.eviction.Evictor; import com.hazelcast.map.impl.recordstore.RecordStore; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.properties.ClusterProperty; @@ -61,7 +61,10 @@ public class ExpirySystem { = new HazelcastProperty(PROP_EXPIRED_KEY_SCAN_TIMEOUT_NANOS, DEFAULT_EXPIRED_KEY_SCAN_TIMEOUT_NANOS, NANOSECONDS); private static final int ONE_HUNDRED_PERCENT = 100; - private static final int MIN_SCANNABLE_ENTRY_COUNT = 100; + private static final int MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN = 100; + private static final int MAX_SAMPLE_AT_A_TIME = 16; + private static final ThreadLocal BATCH_OF_EXPIRED + = ThreadLocal.withInitial(() -> new ArrayList<>(MAX_SAMPLE_AT_A_TIME << 1)); private final long expiryDelayMillis; private final long expiredKeyScanTimeoutNanos; @@ -74,7 +77,9 @@ public class ExpirySystem { private final InvalidationQueue expiredKeys = new InvalidationQueue<>(); private Iterator> cachedExpirationIterator; - private Map expireTimeByKey; + // This is volatile since it can be initialized at runtime lazily and + // can be accessed by query threads besides partition ones. + private volatile Map expireTimeByKey; public ExpirySystem(RecordStore recordStore, MapContainer mapContainer, @@ -91,7 +96,7 @@ public ExpirySystem(RecordStore recordStore, this.expiredKeyScanTimeoutNanos = nodeEngine.getProperties().getNanos(EXPIRED_KEY_SCAN_TIMEOUT_NANOS); } - public boolean isEmpty() { + public final boolean isEmpty() { return MapUtil.isNullOrEmpty(expireTimeByKey); } @@ -129,7 +134,7 @@ protected ExpiryMetadata createExpiryMetadata(long ttlMillis, long maxIdleMillis return new ExpiryMetadataImpl(ttlMillis, maxIdleMillis, expirationTime); } - public void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expiryTime, long now) { + public final void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expiryTime, long now) { if (expiryTime <= 0) { MapConfig mapConfig = mapContainer.getMapConfig(); long ttlMillis = pickTTLMillis(ttl, mapConfig); @@ -143,10 +148,8 @@ public void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expiryTime, private void addExpirableKey(Data key, long ttlMillis, long maxIdleMillis, long expirationTime) { if (expirationTime == Long.MAX_VALUE) { - Map map = getOrCreateExpireTimeByKeyMap(false); - if (!map.isEmpty()) { - Data nativeKey = recordStore.getStorage().toBackingDataKeyFormat(key); - callRemove(nativeKey, expireTimeByKey); + if (!isEmpty()) { + callRemove(key, expireTimeByKey); } return; } @@ -166,28 +169,28 @@ private void addExpirableKey(Data key, long ttlMillis, long maxIdleMillis, long mapServiceContext.getExpirationManager().scheduleExpirationTask(); } - public long calculateExpirationTime(long ttl, long maxIdle, long now) { + public final long calculateExpirationTime(long ttl, long maxIdle, long now) { MapConfig mapConfig = mapContainer.getMapConfig(); long ttlMillis = pickTTLMillis(ttl, mapConfig); long maxIdleMillis = pickMaxIdleMillis(maxIdle, mapConfig); return ExpirationTimeSetter.calculateExpirationTime(ttlMillis, maxIdleMillis, now); } - public void removeKeyFromExpirySystem(Data key) { + public final void removeKeyFromExpirySystem(Data key) { Map expireTimeByKey = getOrCreateExpireTimeByKeyMap(false); - if (expireTimeByKey.isEmpty()) { + if (isEmpty()) { return; } callRemove(key, expireTimeByKey); } - public void extendExpiryTime(Data dataKey, long now) { + public final void extendExpiryTime(Data dataKey, long now) { if (isEmpty()) { return; } Map expireTimeByKey = getOrCreateExpireTimeByKeyMap(false); - if (expireTimeByKey.isEmpty()) { + if (isEmpty()) { return; } @@ -202,9 +205,9 @@ public void extendExpiryTime(Data dataKey, long now) { expiryMetadata.setExpirationTime(expirationTime); } - public ExpiryReason hasExpired(Data key, long now, boolean backup) { + public final ExpiryReason hasExpired(Data key, long now, boolean backup) { Map expireTimeByKey = getOrCreateExpireTimeByKeyMap(false); - if (expireTimeByKey.isEmpty()) { + if (isEmpty()) { return ExpiryReason.NOT_EXPIRED; } ExpiryMetadata expiryMetadata = getExpiryMetadataForExpiryCheck(key, expireTimeByKey); @@ -234,31 +237,69 @@ private ExpiryReason hasExpired(ExpiryMetadata expiryMetadata, long now, boolean return expiryReason; } - public InvalidationQueue getExpiredKeys() { + public final InvalidationQueue getExpiredKeys() { return expiredKeys; } @Nonnull - public ExpiryMetadata getExpiredMetadata(Data key) { + public final ExpiryMetadata getExpiredMetadata(Data key) { ExpiryMetadata expiryMetadata = getOrCreateExpireTimeByKeyMap(false).get(key); return expiryMetadata != null ? expiryMetadata : ExpiryMetadata.NULL; } @SuppressWarnings("checkstyle:magicnumber") - public void evictExpiredEntries(int percentage, long now, boolean backup) { - Map expireTimeByKey = getOrCreateExpireTimeByKeyMap(false); - if (expireTimeByKey.isEmpty()) { + public final void evictExpiredEntries(final int percentage, final long now, final boolean backup) { + // 1. Find how many keys we can scan at max. + final int maxScannableCount = findMaxScannableCount(percentage); + if (maxScannableCount == 0) { + // no expirable entry exists. return; } - // Find max scannable key count - int expirableKeysMapSize = expireTimeByKey.size(); - int keyCountInPercentage = (int) (1D * expirableKeysMapSize * percentage / ONE_HUNDRED_PERCENT); - int maxScannableKeyCount = Math.max(MIN_SCANNABLE_ENTRY_COUNT, keyCountInPercentage); - - scanAndEvictExpiredKeys(maxScannableKeyCount, now, backup); + // 2. Do scanning and evict expired keys. + int scannedCount = 0; + int expiredCount = 0; + try { + long scanLoopStartNanos = System.nanoTime(); + do { + scannedCount += findExpiredKeys(now, backup); + expiredCount += evictExpiredKeys(backup); + } while (scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext() + && (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos); + } catch (Exception e) { + BATCH_OF_EXPIRED.get().clear(); + throw ExceptionUtil.rethrow(e); + } + // 3. Send expired keys to backups(only valid for max-idle-expiry) tryToSendBackupExpiryOp(); + + if (logger.isFinestEnabled()) { + logProgress(maxScannableCount, scannedCount, expiredCount); + } + } + + private void logProgress(int maxScannableCount, int scannedCount, int expiredCount) { + logger.finest(String.format("mapName: %s, partitionId: %d, partitionSize: %d, " + + "remainingKeyCountToExpire: %d, maxScannableKeyCount: %d, " + + "scannedKeyCount: %d, expiredKeyCount: %d" + , recordStore.getName(), recordStore.getPartitionId(), recordStore.size() + , expireTimeByKey.size(), maxScannableCount, scannedCount, expiredCount)); + } + + private int findMaxScannableCount(int percentage) { + Map expireTimeByKey = getOrCreateExpireTimeByKeyMap(false); + if (isEmpty()) { + return 0; + } + + int numberOfExpirableKeys = expireTimeByKey.size(); + if (numberOfExpirableKeys <= MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN) { + return numberOfExpirableKeys; + } + + int percentageOfExpirableKeys = (int) (1D * numberOfExpirableKeys * percentage / ONE_HUNDRED_PERCENT); + return Math.max(MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN, percentageOfExpirableKeys); } /** @@ -268,57 +309,45 @@ private Iterator> getOrInitCachedIterator() { if (cachedExpirationIterator == null || !cachedExpirationIterator.hasNext()) { cachedExpirationIterator = initIteratorOf(expireTimeByKey); } - return cachedExpirationIterator; } - private void scanAndEvictExpiredKeys(int maxScannableKeyCount, long now, boolean backup) { - // Scan to find expired keys. - long scanLoopStartNanos = System.nanoTime(); - List expiredKeyExpiryReasonList = new ArrayList<>(); - int scannedKeyCount = 0; - int expiredKeyCount = 0; - do { - Map.Entry entry = getOrInitCachedIterator().next(); - scannedKeyCount++; + private int findExpiredKeys(long now, boolean backup) { + List batchOfExpired = BATCH_OF_EXPIRED.get(); + + int scannedCount = 0; + Iterator> cachedIterator = getOrInitCachedIterator(); + while (scannedCount++ < MAX_SAMPLE_AT_A_TIME && cachedIterator.hasNext()) { + Map.Entry entry = cachedIterator.next(); Data key = entry.getKey(); ExpiryMetadata expiryMetadata = entry.getValue(); ExpiryReason expiryReason = hasExpired(expiryMetadata, now, backup); if (expiryReason != ExpiryReason.NOT_EXPIRED && !recordStore.isLocked(key)) { // add key and expiryReason to list to evict them later - expiredKeyExpiryReasonList.add(key); - expiredKeyExpiryReasonList.add(expiryReason); - // remove expired key from expirySystem - callIterRemove(cachedExpirationIterator); - expiredKeyCount++; - } - - // - If timed out while looping, break this loop to free - // partition thread. - // - Scan at least Evictor.SAMPLE_COUNT keys. During - // eviction we also check this number of keys. - if (scannedKeyCount % Evictor.SAMPLE_COUNT == 0 - && (System.nanoTime() - scanLoopStartNanos) >= expiredKeyScanTimeoutNanos) { - break; + batchOfExpired.add(key); + batchOfExpired.add(expiryReason); } - - } while (scannedKeyCount < maxScannableKeyCount && getOrInitCachedIterator().hasNext()); - - // Evict expired keys - for (int i = 0; i < expiredKeyExpiryReasonList.size(); i += 2) { - Data key = (Data) expiredKeyExpiryReasonList.get(i); - ExpiryReason reason = (ExpiryReason) expiredKeyExpiryReasonList.get(i + 1); - recordStore.evictExpiredEntryAndPublishExpiryEvent(key, reason, backup); } + return scannedCount; + } - if (logger.isFinestEnabled()) { - logger.finest(String.format("mapName: %s, partitionId: %d, partitionSize: %d, " - + "remainingKeyCountToExpire: %d, maxScannableKeyCount: %d, " - + "scannedKeyCount: %d, expiredKeyCount: %d" - , recordStore.getName(), recordStore.getPartitionId(), recordStore.size() - , expireTimeByKey.size(), maxScannableKeyCount, scannedKeyCount, expiredKeyCount)); + private int evictExpiredKeys(boolean backup) { + int evictedCount = 0; + + List batchOfExpired = BATCH_OF_EXPIRED.get(); + try { + for (int i = 0; i < batchOfExpired.size(); i += 2) { + Data key = (Data) batchOfExpired.get(i); + ExpiryReason expiryReason = (ExpiryReason) batchOfExpired.get(i + 1); + recordStore.evictExpiredEntryAndPublishExpiryEvent(key, expiryReason, backup); + callRemove(key, expireTimeByKey); + evictedCount++; + } + } finally { + batchOfExpired.clear(); } + return evictedCount; } // this method is overridden @@ -327,11 +356,6 @@ protected ExpiryMetadata getExpiryMetadataForExpiryCheck(Data key, return expireTimeByKey.get(key); } - // this method is overridden - protected void callIterRemove(Iterator> expirationIterator) { - expirationIterator.remove(); - } - // this method is overridden protected Iterator> initIteratorOf(Map expireTimeByKey) { return expireTimeByKey.entrySet().iterator(); @@ -347,7 +371,7 @@ public void destroy() { getOrCreateExpireTimeByKeyMap(false).clear(); } - public void accumulateOrSendExpiredKey(Data dataKey, long valueHashCode) { + public final void accumulateOrSendExpiredKey(Data dataKey, long valueHashCode) { if (mapContainer.getTotalBackupCount() == 0) { return; } @@ -359,7 +383,7 @@ public void accumulateOrSendExpiredKey(Data dataKey, long valueHashCode) { clearExpiredRecordsTask.tryToSendBackupExpiryOp(recordStore, true); } - public void tryToSendBackupExpiryOp() { + public final void tryToSendBackupExpiryOp() { if (mapContainer.getTotalBackupCount() == 0) { return; }