From ab040db8b714c36e2d2f24cdcd2f372ba0c2fd59 Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Sun, 11 Apr 2021 16:14:53 +0300 Subject: [PATCH 1/8] fix timeout --- .../eviction/ClearExpiredRecordsTask.java | 4 +- .../internal/eviction/ToBackupSender.java | 8 +- .../map/impl/recordstore/StorageImpl.java | 39 ++--- .../impl/recordstore/expiry/ExpirySystem.java | 152 ++++++++++-------- 4 files changed, 115 insertions(+), 88 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/internal/eviction/ClearExpiredRecordsTask.java b/hazelcast/src/main/java/com/hazelcast/internal/eviction/ClearExpiredRecordsTask.java index 3ec745a932bb..1c3bcc0cc8a3 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/eviction/ClearExpiredRecordsTask.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/eviction/ClearExpiredRecordsTask.java @@ -28,9 +28,9 @@ import com.hazelcast.spi.properties.HazelcastProperty; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -252,7 +252,7 @@ private boolean notInProcessableTimeWindow(T container, long now) { private List addContainerTo(List containersToProcess, T container) { if (containersToProcess == null) { - containersToProcess = new ArrayList(); + containersToProcess = new LinkedList<>(); } containersToProcess.add(container); diff --git a/hazelcast/src/main/java/com/hazelcast/internal/eviction/ToBackupSender.java b/hazelcast/src/main/java/com/hazelcast/internal/eviction/ToBackupSender.java index 80e86fdaadeb..cf62f8a3b77b 100644 --- a/hazelcast/src/main/java/com/hazelcast/internal/eviction/ToBackupSender.java +++ b/hazelcast/src/main/java/com/hazelcast/internal/eviction/ToBackupSender.java @@ -16,16 +16,16 @@ package com.hazelcast.internal.eviction; -import java.util.function.BiFunction; import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue; +import com.hazelcast.internal.util.CollectionUtil; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.impl.operationservice.Operation; import com.hazelcast.spi.impl.operationservice.OperationService; -import com.hazelcast.internal.util.CollectionUtil; -import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedList; import java.util.Queue; +import java.util.function.BiFunction; /** * Helper class to create and send backup expiration operations. @@ -59,7 +59,7 @@ static ToBackupSender newToBackupSender(String serviceName, } private static Collection pollExpiredKeys(Queue expiredKeys) { - Collection polledKeys = new ArrayList(expiredKeys.size()); + Collection polledKeys = new LinkedList<>(); do { ExpiredKey expiredKey = expiredKeys.poll(); diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/StorageImpl.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/StorageImpl.java index 99a3cd475c12..9a9d955428bf 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/StorageImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/StorageImpl.java @@ -27,11 +27,9 @@ import com.hazelcast.map.impl.record.Record; import com.hazelcast.map.impl.recordstore.expiry.ExpirySystem; -import java.util.AbstractMap; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import static com.hazelcast.config.InMemoryFormat.BINARY; import static com.hazelcast.map.impl.OwnedEntryCostEstimatorFactory.createMapSizeEstimator; @@ -43,7 +41,7 @@ */ public class StorageImpl implements Storage { - private final StorageSCHM records; + private final ConcurrentHashMap records; private final SerializationService serializationService; private final InMemoryFormat inMemoryFormat; @@ -54,7 +52,8 @@ public class StorageImpl implements Storage { SerializationService serializationService) { this.entryCostEstimator = createMapSizeEstimator(inMemoryFormat); this.inMemoryFormat = inMemoryFormat; - this.records = new StorageSCHM<>(serializationService, expirySystem); +// this.records = new StorageSCHM<>(serializationService, expirySystem); + this.records = new ConcurrentHashMap<>(); this.serializationService = serializationService; } @@ -67,7 +66,7 @@ public void clear(boolean isDuringShutdown) { @Override public Iterator> mutationTolerantIterator() { - return records.cachedEntrySet().iterator(); + return records.entrySet().iterator(); } @Override @@ -143,27 +142,29 @@ public void setEntryCostEstimator(EntryCostEstimator entryCostEstimator) { @Override public Iterable getRandomSamples(int sampleCount) { - return records.getRandomSamples(sampleCount); + return null;//records.getRandomSamples(sampleCount); } @Override public MapKeysWithCursor fetchKeys(IterationPointer[] pointers, int size) { - List keys = new ArrayList<>(size); - IterationPointer[] newPointers = records.fetchKeys(pointers, size, keys); - return new MapKeysWithCursor(keys, newPointers); +// List keys = new ArrayList<>(size); +// IterationPointer[] newPointers = records.fetchKeys(pointers, size, keys); +// return new MapKeysWithCursor(keys, newPointers); + return null; } @Override public MapEntriesWithCursor fetchEntries(IterationPointer[] pointers, int size) { - List> entries = new ArrayList<>(size); - IterationPointer[] newPointers = records.fetchEntries(pointers, size, entries); - List> entriesData = new ArrayList<>(entries.size()); - for (Map.Entry entry : entries) { - R record = entry.getValue(); - Data dataValue = serializationService.toData(record.getValue()); - entriesData.add(new AbstractMap.SimpleEntry<>(entry.getKey(), dataValue)); - } - return new MapEntriesWithCursor(entriesData, newPointers); +// List> entries = new ArrayList<>(size); +// IterationPointer[] newPointers = records.fetchEntries(pointers, size, entries); +// List> entriesData = new ArrayList<>(entries.size()); +// for (Map.Entry entry : entries) { +// R record = entry.getValue(); +// Data dataValue = serializationService.toData(record.getValue()); +// entriesData.add(new AbstractMap.SimpleEntry<>(entry.getKey(), dataValue)); +// } +// return new MapEntriesWithCursor(entriesData, newPointers); + return null; } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java index 38a89b977b5c..a3797e8d383f 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java @@ -21,12 +21,12 @@ import com.hazelcast.internal.eviction.ExpiredKey; import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue; import com.hazelcast.internal.serialization.Data; +import com.hazelcast.internal.util.ConcurrentReferenceHashMap; import com.hazelcast.internal.util.MapUtil; import com.hazelcast.logging.ILogger; import com.hazelcast.map.impl.ExpirationTimeSetter; import com.hazelcast.map.impl.MapContainer; import com.hazelcast.map.impl.MapServiceContext; -import com.hazelcast.map.impl.eviction.Evictor; import com.hazelcast.map.impl.recordstore.RecordStore; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.properties.ClusterProperty; @@ -34,11 +34,11 @@ import com.hazelcast.spi.properties.HazelcastProperty; import javax.annotation.Nonnull; -import java.util.ArrayList; +import java.util.ArrayDeque; import java.util.Collections; import java.util.Iterator; -import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -61,7 +61,10 @@ public class ExpirySystem { = new HazelcastProperty(PROP_EXPIRED_KEY_SCAN_TIMEOUT_NANOS, DEFAULT_EXPIRED_KEY_SCAN_TIMEOUT_NANOS, NANOSECONDS); private static final int ONE_HUNDRED_PERCENT = 100; - private static final int MIN_SCANNABLE_ENTRY_COUNT = 100; + private static final int MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN = 100; + private static final int MAX_SAMPLE_AT_A_TIME = 16; + private static final ThreadLocal BATCH_OF_EXPIRED + = ThreadLocal.withInitial(() -> new ArrayDeque(MAX_SAMPLE_AT_A_TIME)); private final long expiryDelayMillis; private final long expiredKeyScanTimeoutNanos; @@ -74,7 +77,9 @@ public class ExpirySystem { private final InvalidationQueue expiredKeys = new InvalidationQueue<>(); private Iterator> cachedExpirationIterator; - private Map expireTimeByKey; + // This is volatile since it can be initialized at runtime lazily and + // can be accessed by query threads besides partition ones. + private volatile Map expireTimeByKey; public ExpirySystem(RecordStore recordStore, MapContainer mapContainer, @@ -91,7 +96,7 @@ public ExpirySystem(RecordStore recordStore, this.expiredKeyScanTimeoutNanos = nodeEngine.getProperties().getNanos(EXPIRED_KEY_SCAN_TIMEOUT_NANOS); } - public boolean isEmpty() { + public final boolean isEmpty() { return MapUtil.isNullOrEmpty(expireTimeByKey); } @@ -129,7 +134,7 @@ protected ExpiryMetadata createExpiryMetadata(long ttlMillis, long maxIdleMillis return new ExpiryMetadataImpl(ttlMillis, maxIdleMillis, expirationTime); } - public void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expiryTime, long now) { + public final void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expiryTime, long now) { if (expiryTime <= 0) { MapConfig mapConfig = mapContainer.getMapConfig(); long ttlMillis = pickTTLMillis(ttl, mapConfig); @@ -143,8 +148,7 @@ public void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expiryTime, private void addExpirableKey(Data key, long ttlMillis, long maxIdleMillis, long expirationTime) { if (expirationTime == Long.MAX_VALUE) { - Map map = getOrCreateExpireTimeByKeyMap(false); - if (!map.isEmpty()) { + if (!isEmpty()) { Data nativeKey = recordStore.getStorage().toBackingDataKeyFormat(key); callRemove(nativeKey, expireTimeByKey); } @@ -166,28 +170,28 @@ private void addExpirableKey(Data key, long ttlMillis, long maxIdleMillis, long mapServiceContext.getExpirationManager().scheduleExpirationTask(); } - public long calculateExpirationTime(long ttl, long maxIdle, long now) { + public final long calculateExpirationTime(long ttl, long maxIdle, long now) { MapConfig mapConfig = mapContainer.getMapConfig(); long ttlMillis = pickTTLMillis(ttl, mapConfig); long maxIdleMillis = pickMaxIdleMillis(maxIdle, mapConfig); return ExpirationTimeSetter.calculateExpirationTime(ttlMillis, maxIdleMillis, now); } - public void removeKeyFromExpirySystem(Data key) { + public final void removeKeyFromExpirySystem(Data key) { Map expireTimeByKey = getOrCreateExpireTimeByKeyMap(false); - if (expireTimeByKey.isEmpty()) { + if (isEmpty()) { return; } callRemove(key, expireTimeByKey); } - public void extendExpiryTime(Data dataKey, long now) { + public final void extendExpiryTime(Data dataKey, long now) { if (isEmpty()) { return; } Map expireTimeByKey = getOrCreateExpireTimeByKeyMap(false); - if (expireTimeByKey.isEmpty()) { + if (isEmpty()) { return; } @@ -202,9 +206,9 @@ public void extendExpiryTime(Data dataKey, long now) { expiryMetadata.setExpirationTime(expirationTime); } - public ExpiryReason hasExpired(Data key, long now, boolean backup) { + public final ExpiryReason hasExpired(Data key, long now, boolean backup) { Map expireTimeByKey = getOrCreateExpireTimeByKeyMap(false); - if (expireTimeByKey.isEmpty()) { + if (isEmpty()) { return ExpiryReason.NOT_EXPIRED; } ExpiryMetadata expiryMetadata = getExpiryMetadataForExpiryCheck(key, expireTimeByKey); @@ -234,31 +238,64 @@ private ExpiryReason hasExpired(ExpiryMetadata expiryMetadata, long now, boolean return expiryReason; } - public InvalidationQueue getExpiredKeys() { + public final InvalidationQueue getExpiredKeys() { return expiredKeys; } @Nonnull - public ExpiryMetadata getExpiredMetadata(Data key) { + public final ExpiryMetadata getExpiredMetadata(Data key) { ExpiryMetadata expiryMetadata = getOrCreateExpireTimeByKeyMap(false).get(key); return expiryMetadata != null ? expiryMetadata : ExpiryMetadata.NULL; } @SuppressWarnings("checkstyle:magicnumber") - public void evictExpiredEntries(int percentage, long now, boolean backup) { - Map expireTimeByKey = getOrCreateExpireTimeByKeyMap(false); - if (expireTimeByKey.isEmpty()) { + public final void evictExpiredEntries(final int percentage, final long now, final boolean backup) { + // 1. Find how many keys we can scan at max. + final int maxScannableCount = findMaxScannableCount(percentage); + if (maxScannableCount == 0) { + // no expirable entry exists. return; } - // Find max scannable key count - int expirableKeysMapSize = expireTimeByKey.size(); - int keyCountInPercentage = (int) (1D * expirableKeysMapSize * percentage / ONE_HUNDRED_PERCENT); - int maxScannableKeyCount = Math.max(MIN_SCANNABLE_ENTRY_COUNT, keyCountInPercentage); - - scanAndEvictExpiredKeys(maxScannableKeyCount, now, backup); + // 2. Do scanning and evict expired keys. + int scannedCount = 0; + int expiredCount = 0; + long scanLoopStartNanos = System.nanoTime(); + do { + scannedCount += findExpiredKeys(now, backup); + expiredCount += evictExpiredKeys(backup); + } while ((scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext()) + && (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos); + // 3. Send expired keys to backups(only valid for max-idle-expiry) tryToSendBackupExpiryOp(); + + if (logger.isFinestEnabled()) { + logProgress(maxScannableCount, scannedCount, expiredCount); + } + } + + private void logProgress(int maxScannableCount, int scannedCount, int expiredCount) { + logger.finest(String.format("mapName: %s, partitionId: %d, partitionSize: %d, " + + "remainingKeyCountToExpire: %d, maxScannableKeyCount: %d, " + + "scannedKeyCount: %d, expiredKeyCount: %d" + , recordStore.getName(), recordStore.getPartitionId(), recordStore.size() + , expireTimeByKey.size(), maxScannableCount, scannedCount, expiredCount)); + } + + private int findMaxScannableCount(int percentage) { + Map expireTimeByKey = getOrCreateExpireTimeByKeyMap(false); + if (isEmpty()) { + return 0; + } + + int numberOfExpirableKeys = expireTimeByKey.size(); + if (numberOfExpirableKeys <= MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN) { + return numberOfExpirableKeys; + } + + int percentageOfExpirableKeys = (int) (1D * numberOfExpirableKeys * percentage / ONE_HUNDRED_PERCENT); + return Math.max(MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN, percentageOfExpirableKeys); } /** @@ -268,57 +305,46 @@ private Iterator> getOrInitCachedIterator() { if (cachedExpirationIterator == null || !cachedExpirationIterator.hasNext()) { cachedExpirationIterator = initIteratorOf(expireTimeByKey); } - return cachedExpirationIterator; } - private void scanAndEvictExpiredKeys(int maxScannableKeyCount, long now, boolean backup) { - // Scan to find expired keys. - long scanLoopStartNanos = System.nanoTime(); - List expiredKeyExpiryReasonList = new ArrayList<>(); - int scannedKeyCount = 0; - int expiredKeyCount = 0; - do { - Map.Entry entry = getOrInitCachedIterator().next(); - scannedKeyCount++; + private int findExpiredKeys(long now, boolean backup) { + Queue batchOfExpired = BATCH_OF_EXPIRED.get(); + + int scannedCount = 0; + Iterator> cachedIterator = getOrInitCachedIterator(); + while (cachedIterator.hasNext()) { + Map.Entry entry = cachedIterator.next(); Data key = entry.getKey(); ExpiryMetadata expiryMetadata = entry.getValue(); ExpiryReason expiryReason = hasExpired(expiryMetadata, now, backup); if (expiryReason != ExpiryReason.NOT_EXPIRED && !recordStore.isLocked(key)) { // add key and expiryReason to list to evict them later - expiredKeyExpiryReasonList.add(key); - expiredKeyExpiryReasonList.add(expiryReason); + batchOfExpired.add(key); + batchOfExpired.add(expiryReason); // remove expired key from expirySystem - callIterRemove(cachedExpirationIterator); - expiredKeyCount++; + callIterRemove(cachedIterator); } - // - If timed out while looping, break this loop to free - // partition thread. - // - Scan at least Evictor.SAMPLE_COUNT keys. During - // eviction we also check this number of keys. - if (scannedKeyCount % Evictor.SAMPLE_COUNT == 0 - && (System.nanoTime() - scanLoopStartNanos) >= expiredKeyScanTimeoutNanos) { + if (++scannedCount == MAX_SAMPLE_AT_A_TIME) { break; } + } + return scannedCount; + } - } while (scannedKeyCount < maxScannableKeyCount && getOrInitCachedIterator().hasNext()); + private int evictExpiredKeys(boolean backup) { + int evictedCount = 0; - // Evict expired keys - for (int i = 0; i < expiredKeyExpiryReasonList.size(); i += 2) { - Data key = (Data) expiredKeyExpiryReasonList.get(i); - ExpiryReason reason = (ExpiryReason) expiredKeyExpiryReasonList.get(i + 1); + Queue batchOfExpired = BATCH_OF_EXPIRED.get(); + Data key; + while ((key = (Data) batchOfExpired.poll()) != null) { + ExpiryReason reason = (ExpiryReason) batchOfExpired.poll(); recordStore.evictExpiredEntryAndPublishExpiryEvent(key, reason, backup); + evictedCount++; } - - if (logger.isFinestEnabled()) { - logger.finest(String.format("mapName: %s, partitionId: %d, partitionSize: %d, " - + "remainingKeyCountToExpire: %d, maxScannableKeyCount: %d, " - + "scannedKeyCount: %d, expiredKeyCount: %d" - , recordStore.getName(), recordStore.getPartitionId(), recordStore.size() - , expireTimeByKey.size(), maxScannableKeyCount, scannedKeyCount, expiredKeyCount)); - } + return evictedCount; } // this method is overridden @@ -334,7 +360,7 @@ protected void callIterRemove(Iterator> expirati // this method is overridden protected Iterator> initIteratorOf(Map expireTimeByKey) { - return expireTimeByKey.entrySet().iterator(); + return ((ConcurrentReferenceHashMap) expireTimeByKey).cachedEntrySet().iterator(); } // this method is overridden @@ -347,7 +373,7 @@ public void destroy() { getOrCreateExpireTimeByKeyMap(false).clear(); } - public void accumulateOrSendExpiredKey(Data dataKey, long valueHashCode) { + public final void accumulateOrSendExpiredKey(Data dataKey, long valueHashCode) { if (mapContainer.getTotalBackupCount() == 0) { return; } @@ -359,7 +385,7 @@ public void accumulateOrSendExpiredKey(Data dataKey, long valueHashCode) { clearExpiredRecordsTask.tryToSendBackupExpiryOp(recordStore, true); } - public void tryToSendBackupExpiryOp() { + public final void tryToSendBackupExpiryOp() { if (mapContainer.getTotalBackupCount() == 0) { return; } From b2d1845a74e9b1ab941c5cb156ebf51303c9c831 Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Sun, 11 Apr 2021 17:15:31 +0300 Subject: [PATCH 2/8] fix timeout 2 --- .../map/impl/recordstore/StorageImpl.java | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/StorageImpl.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/StorageImpl.java index 9a9d955428bf..99a3cd475c12 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/StorageImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/StorageImpl.java @@ -27,9 +27,11 @@ import com.hazelcast.map.impl.record.Record; import com.hazelcast.map.impl.recordstore.expiry.ExpirySystem; +import java.util.AbstractMap; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import static com.hazelcast.config.InMemoryFormat.BINARY; import static com.hazelcast.map.impl.OwnedEntryCostEstimatorFactory.createMapSizeEstimator; @@ -41,7 +43,7 @@ */ public class StorageImpl implements Storage { - private final ConcurrentHashMap records; + private final StorageSCHM records; private final SerializationService serializationService; private final InMemoryFormat inMemoryFormat; @@ -52,8 +54,7 @@ public class StorageImpl implements Storage { SerializationService serializationService) { this.entryCostEstimator = createMapSizeEstimator(inMemoryFormat); this.inMemoryFormat = inMemoryFormat; -// this.records = new StorageSCHM<>(serializationService, expirySystem); - this.records = new ConcurrentHashMap<>(); + this.records = new StorageSCHM<>(serializationService, expirySystem); this.serializationService = serializationService; } @@ -66,7 +67,7 @@ public void clear(boolean isDuringShutdown) { @Override public Iterator> mutationTolerantIterator() { - return records.entrySet().iterator(); + return records.cachedEntrySet().iterator(); } @Override @@ -142,29 +143,27 @@ public void setEntryCostEstimator(EntryCostEstimator entryCostEstimator) { @Override public Iterable getRandomSamples(int sampleCount) { - return null;//records.getRandomSamples(sampleCount); + return records.getRandomSamples(sampleCount); } @Override public MapKeysWithCursor fetchKeys(IterationPointer[] pointers, int size) { -// List keys = new ArrayList<>(size); -// IterationPointer[] newPointers = records.fetchKeys(pointers, size, keys); -// return new MapKeysWithCursor(keys, newPointers); - return null; + List keys = new ArrayList<>(size); + IterationPointer[] newPointers = records.fetchKeys(pointers, size, keys); + return new MapKeysWithCursor(keys, newPointers); } @Override public MapEntriesWithCursor fetchEntries(IterationPointer[] pointers, int size) { -// List> entries = new ArrayList<>(size); -// IterationPointer[] newPointers = records.fetchEntries(pointers, size, entries); -// List> entriesData = new ArrayList<>(entries.size()); -// for (Map.Entry entry : entries) { -// R record = entry.getValue(); -// Data dataValue = serializationService.toData(record.getValue()); -// entriesData.add(new AbstractMap.SimpleEntry<>(entry.getKey(), dataValue)); -// } -// return new MapEntriesWithCursor(entriesData, newPointers); - return null; + List> entries = new ArrayList<>(size); + IterationPointer[] newPointers = records.fetchEntries(pointers, size, entries); + List> entriesData = new ArrayList<>(entries.size()); + for (Map.Entry entry : entries) { + R record = entry.getValue(); + Data dataValue = serializationService.toData(record.getValue()); + entriesData.add(new AbstractMap.SimpleEntry<>(entry.getKey(), dataValue)); + } + return new MapEntriesWithCursor(entriesData, newPointers); } @Override From 5e387641b95dd93f4181fa1079a4751608aa7ff6 Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Sun, 11 Apr 2021 17:37:42 +0300 Subject: [PATCH 3/8] fix timeout 3 --- .../com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java index a3797e8d383f..a365bd7437b5 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java @@ -264,7 +264,7 @@ public final void evictExpiredEntries(final int percentage, final long now, fina do { scannedCount += findExpiredKeys(now, backup); expiredCount += evictExpiredKeys(backup); - } while ((scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext()) + } while (scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext() && (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos); // 3. Send expired keys to backups(only valid for max-idle-expiry) From a628f3b8d0a62b6c97213c7fa993d17c981f99ab Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Sun, 11 Apr 2021 17:42:49 +0300 Subject: [PATCH 4/8] fix timeout 4 --- .../hazelcast/map/impl/recordstore/expiry/ExpirySystem.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java index a365bd7437b5..44b31a18f20e 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java @@ -21,7 +21,6 @@ import com.hazelcast.internal.eviction.ExpiredKey; import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue; import com.hazelcast.internal.serialization.Data; -import com.hazelcast.internal.util.ConcurrentReferenceHashMap; import com.hazelcast.internal.util.MapUtil; import com.hazelcast.logging.ILogger; import com.hazelcast.map.impl.ExpirationTimeSetter; @@ -360,7 +359,7 @@ protected void callIterRemove(Iterator> expirati // this method is overridden protected Iterator> initIteratorOf(Map expireTimeByKey) { - return ((ConcurrentReferenceHashMap) expireTimeByKey).cachedEntrySet().iterator(); + return expireTimeByKey.entrySet().iterator(); } // this method is overridden From e009999db5a57d315db5a968a0fadc7fcc8f31d6 Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Tue, 13 Apr 2021 13:08:30 +0300 Subject: [PATCH 5/8] empty queue --- .../impl/recordstore/DefaultRecordStore.java | 8 +++---- .../impl/recordstore/expiry/ExpirySystem.java | 23 ++++++++----------- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java index 07bfb0432a63..c4d576f87e46 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java @@ -511,7 +511,7 @@ public Object evict(Data key, boolean backup) { value = record.getValue(); mapDataStore.flush(key, value, backup); mutationObserver.onEvictRecord(key, record); - key = removeKeyFromExpirySystem(key); + removeKeyFromExpirySystem(key); storage.removeRecord(key, record); if (!backup) { mapServiceContext.interceptRemove(interceptorRegistry, value); @@ -520,10 +520,8 @@ public Object evict(Data key, boolean backup) { return value; } - private Data removeKeyFromExpirySystem(Data key) { - Data backingDataKeyFormat = storage.toBackingDataKeyFormat(key); - expirySystem.removeKeyFromExpirySystem(backingDataKeyFormat); - return backingDataKeyFormat; + private void removeKeyFromExpirySystem(Data key) { + expirySystem.removeKeyFromExpirySystem(key); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java index 44b31a18f20e..e662eadfa1a4 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java @@ -148,8 +148,7 @@ public final void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expir private void addExpirableKey(Data key, long ttlMillis, long maxIdleMillis, long expirationTime) { if (expirationTime == Long.MAX_VALUE) { if (!isEmpty()) { - Data nativeKey = recordStore.getStorage().toBackingDataKeyFormat(key); - callRemove(nativeKey, expireTimeByKey); + callRemove(key, expireTimeByKey); } return; } @@ -249,6 +248,11 @@ public final ExpiryMetadata getExpiredMetadata(Data key) { @SuppressWarnings("checkstyle:magicnumber") public final void evictExpiredEntries(final int percentage, final long now, final boolean backup) { + // 0. Prepare queue for the first use. This + // is to remove leftovers which can remain + // in the queue after possible exceptions. + BATCH_OF_EXPIRED.get().clear(); + // 1. Find how many keys we can scan at max. final int maxScannableCount = findMaxScannableCount(percentage); if (maxScannableCount == 0) { @@ -312,7 +316,7 @@ private int findExpiredKeys(long now, boolean backup) { int scannedCount = 0; Iterator> cachedIterator = getOrInitCachedIterator(); - while (cachedIterator.hasNext()) { + while (scannedCount++ < MAX_SAMPLE_AT_A_TIME && cachedIterator.hasNext()) { Map.Entry entry = cachedIterator.next(); Data key = entry.getKey(); ExpiryMetadata expiryMetadata = entry.getValue(); @@ -322,12 +326,6 @@ private int findExpiredKeys(long now, boolean backup) { // add key and expiryReason to list to evict them later batchOfExpired.add(key); batchOfExpired.add(expiryReason); - // remove expired key from expirySystem - callIterRemove(cachedIterator); - } - - if (++scannedCount == MAX_SAMPLE_AT_A_TIME) { - break; } } return scannedCount; @@ -341,6 +339,8 @@ private int evictExpiredKeys(boolean backup) { while ((key = (Data) batchOfExpired.poll()) != null) { ExpiryReason reason = (ExpiryReason) batchOfExpired.poll(); recordStore.evictExpiredEntryAndPublishExpiryEvent(key, reason, backup); + // remove expired key from expirySystem + callRemove(key, expireTimeByKey); evictedCount++; } return evictedCount; @@ -352,11 +352,6 @@ protected ExpiryMetadata getExpiryMetadataForExpiryCheck(Data key, return expireTimeByKey.get(key); } - // this method is overridden - protected void callIterRemove(Iterator> expirationIterator) { - expirationIterator.remove(); - } - // this method is overridden protected Iterator> initIteratorOf(Map expireTimeByKey) { return expireTimeByKey.entrySet().iterator(); From 6a3e604e7b4f4a0993b52408ae32f0605f7d5205 Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Tue, 13 Apr 2021 15:51:23 +0300 Subject: [PATCH 6/8] empty list --- .../impl/recordstore/expiry/ExpirySystem.java | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java index e662eadfa1a4..ed618ecb3e81 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java @@ -33,11 +33,11 @@ import com.hazelcast.spi.properties.HazelcastProperty; import javax.annotation.Nonnull; -import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -62,8 +62,8 @@ public class ExpirySystem { private static final int ONE_HUNDRED_PERCENT = 100; private static final int MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN = 100; private static final int MAX_SAMPLE_AT_A_TIME = 16; - private static final ThreadLocal BATCH_OF_EXPIRED - = ThreadLocal.withInitial(() -> new ArrayDeque(MAX_SAMPLE_AT_A_TIME)); + private static final ThreadLocal BATCH_OF_EXPIRED + = ThreadLocal.withInitial(() -> new ArrayList<>(MAX_SAMPLE_AT_A_TIME)); private final long expiryDelayMillis; private final long expiredKeyScanTimeoutNanos; @@ -248,11 +248,6 @@ public final ExpiryMetadata getExpiredMetadata(Data key) { @SuppressWarnings("checkstyle:magicnumber") public final void evictExpiredEntries(final int percentage, final long now, final boolean backup) { - // 0. Prepare queue for the first use. This - // is to remove leftovers which can remain - // in the queue after possible exceptions. - BATCH_OF_EXPIRED.get().clear(); - // 1. Find how many keys we can scan at max. final int maxScannableCount = findMaxScannableCount(percentage); if (maxScannableCount == 0) { @@ -312,7 +307,7 @@ private Iterator> getOrInitCachedIterator() { } private int findExpiredKeys(long now, boolean backup) { - Queue batchOfExpired = BATCH_OF_EXPIRED.get(); + List batchOfExpired = BATCH_OF_EXPIRED.get(); int scannedCount = 0; Iterator> cachedIterator = getOrInitCachedIterator(); @@ -334,14 +329,17 @@ private int findExpiredKeys(long now, boolean backup) { private int evictExpiredKeys(boolean backup) { int evictedCount = 0; - Queue batchOfExpired = BATCH_OF_EXPIRED.get(); - Data key; - while ((key = (Data) batchOfExpired.poll()) != null) { - ExpiryReason reason = (ExpiryReason) batchOfExpired.poll(); - recordStore.evictExpiredEntryAndPublishExpiryEvent(key, reason, backup); - // remove expired key from expirySystem - callRemove(key, expireTimeByKey); - evictedCount++; + List batchOfExpired = BATCH_OF_EXPIRED.get(); + try { + for (int i = 0; i < batchOfExpired.size(); i += 2) { + Data key = (Data) batchOfExpired.get(i); + ExpiryReason expiryReason = (ExpiryReason) batchOfExpired.get(i + 1); + recordStore.evictExpiredEntryAndPublishExpiryEvent(key, expiryReason, backup); + callRemove(key, expireTimeByKey); + evictedCount++; + } + } finally { + batchOfExpired.clear(); } return evictedCount; } From 60b8f32a961fba8246654805d994ac2adde23075 Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Tue, 13 Apr 2021 19:26:30 +0300 Subject: [PATCH 7/8] fix initial size --- .../com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java index ed618ecb3e81..2df55bd0b3db 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java @@ -63,7 +63,7 @@ public class ExpirySystem { private static final int MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN = 100; private static final int MAX_SAMPLE_AT_A_TIME = 16; private static final ThreadLocal BATCH_OF_EXPIRED - = ThreadLocal.withInitial(() -> new ArrayList<>(MAX_SAMPLE_AT_A_TIME)); + = ThreadLocal.withInitial(() -> new ArrayList<>(MAX_SAMPLE_AT_A_TIME << 1)); private final long expiryDelayMillis; private final long expiredKeyScanTimeoutNanos; From d8adb326c8cc0dd5b704bf4ae3221833aaa8581a Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Wed, 14 Apr 2021 09:18:22 +0300 Subject: [PATCH 8/8] try-catch --- .../impl/recordstore/expiry/ExpirySystem.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java index 2df55bd0b3db..36cc7a260153 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/expiry/ExpirySystem.java @@ -21,6 +21,7 @@ import com.hazelcast.internal.eviction.ExpiredKey; import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue; import com.hazelcast.internal.serialization.Data; +import com.hazelcast.internal.util.ExceptionUtil; import com.hazelcast.internal.util.MapUtil; import com.hazelcast.logging.ILogger; import com.hazelcast.map.impl.ExpirationTimeSetter; @@ -258,12 +259,17 @@ public final void evictExpiredEntries(final int percentage, final long now, fina // 2. Do scanning and evict expired keys. int scannedCount = 0; int expiredCount = 0; - long scanLoopStartNanos = System.nanoTime(); - do { - scannedCount += findExpiredKeys(now, backup); - expiredCount += evictExpiredKeys(backup); - } while (scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext() - && (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos); + try { + long scanLoopStartNanos = System.nanoTime(); + do { + scannedCount += findExpiredKeys(now, backup); + expiredCount += evictExpiredKeys(backup); + } while (scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext() + && (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos); + } catch (Exception e) { + BATCH_OF_EXPIRED.get().clear(); + throw ExceptionUtil.rethrow(e); + } // 3. Send expired keys to backups(only valid for max-idle-expiry) tryToSendBackupExpiryOp();