Skip to content

Commit

Permalink
use thred local queue for expiry
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmetmircik committed Apr 9, 2021
1 parent 776d62d commit 438b979
Showing 1 changed file with 55 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,18 @@
import com.hazelcast.map.impl.ExpirationTimeSetter;
import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.eviction.Evictor;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.spi.properties.HazelcastProperty;

import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

Expand All @@ -62,6 +61,10 @@ public class ExpirySystem {
DEFAULT_EXPIRED_KEY_SCAN_TIMEOUT_NANOS, NANOSECONDS);
private static final int ONE_HUNDRED_PERCENT = 100;
private static final int MIN_SCANNABLE_ENTRY_COUNT = 100;
// MAX_SAMPLE_AT_A_TIME and SAMPLING_QUEUE is used for background expiry task.
private static final int MAX_SCAN_AT_A_TIME = 16;
private static final ThreadLocal<Queue> EXPIRY_QUEUE
= ThreadLocal.withInitial(() -> new ArrayDeque(MAX_SCAN_AT_A_TIME));

private final long expiryDelayMillis;
private final long expiredKeyScanTimeoutNanos;
Expand All @@ -74,7 +77,9 @@ public class ExpirySystem {
private final InvalidationQueue<ExpiredKey> expiredKeys = new InvalidationQueue<>();

private Iterator<Map.Entry<Data, ExpiryMetadata>> cachedExpirationIterator;
private Map<Data, ExpiryMetadata> expireTimeByKey;
// This is volatile since it can be initialized at runtime lazily and
// can be accessed by query threads besides partition ones.
private volatile Map<Data, ExpiryMetadata> expireTimeByKey;

public ExpirySystem(RecordStore recordStore,
MapContainer mapContainer,
Expand Down Expand Up @@ -252,13 +257,35 @@ public void evictExpiredEntries(int percentage, long now, boolean backup) {
}

// Find max scannable key count
int maxScannableCount;
int expirableKeysMapSize = expireTimeByKey.size();
int keyCountInPercentage = (int) (1D * expirableKeysMapSize * percentage / ONE_HUNDRED_PERCENT);
int maxScannableKeyCount = Math.max(MIN_SCANNABLE_ENTRY_COUNT, keyCountInPercentage);
if (expirableKeysMapSize <= MIN_SCANNABLE_ENTRY_COUNT) {
maxScannableCount = expirableKeysMapSize;
} else {
int keyCountInPercentage = (int) (1D * expirableKeysMapSize * percentage / ONE_HUNDRED_PERCENT);
maxScannableCount = Math.max(MIN_SCANNABLE_ENTRY_COUNT, keyCountInPercentage);
}

scanAndEvictExpiredKeys(maxScannableKeyCount, now, backup);
// Do scanning and evict expired keys.
int scannedCount = 0;
int expiredCount = 0;
long scanLoopStartNanos = System.nanoTime();
do {
scannedCount += findExpiredKeys(now, backup);
expiredCount += evictExpiredSamples(backup);
} while ((scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext())
|| (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos);

// Send expired keys to backups(only valid for max-idle-expiry)
tryToSendBackupExpiryOp();

if (logger.isFinestEnabled()) {
logger.finest(String.format("mapName: %s, partitionId: %d, partitionSize: %d, "
+ "remainingKeyCountToExpire: %d, maxScannableKeyCount: %d, "
+ "scannedKeyCount: %d, expiredKeyCount: %d"
, recordStore.getName(), recordStore.getPartitionId(), recordStore.size()
, expireTimeByKey.size(), maxScannableCount, scannedCount, expiredCount));
}
}

/**
Expand All @@ -268,57 +295,45 @@ private Iterator<Map.Entry<Data, ExpiryMetadata>> getOrInitCachedIterator() {
if (cachedExpirationIterator == null || !cachedExpirationIterator.hasNext()) {
cachedExpirationIterator = initIteratorOf(expireTimeByKey);
}

return cachedExpirationIterator;
}

private void scanAndEvictExpiredKeys(int maxScannableKeyCount, long now, boolean backup) {
// Scan to find expired keys.
long scanLoopStartNanos = System.nanoTime();
List expiredKeyExpiryReasonList = new ArrayList<>();
int scannedKeyCount = 0;
int expiredKeyCount = 0;
do {
Map.Entry<Data, ExpiryMetadata> entry = getOrInitCachedIterator().next();
scannedKeyCount++;
private int findExpiredKeys(long now, boolean backup) {
Queue expiryQueue = EXPIRY_QUEUE.get();

int scannedCount = 0;
Iterator<Map.Entry<Data, ExpiryMetadata>> cachedIterator = getOrInitCachedIterator();
while (cachedIterator.hasNext()) {
Map.Entry<Data, ExpiryMetadata> entry = cachedIterator.next();
Data key = entry.getKey();
ExpiryMetadata expiryMetadata = entry.getValue();

ExpiryReason expiryReason = hasExpired(expiryMetadata, now, backup);
if (expiryReason != ExpiryReason.NOT_EXPIRED && !recordStore.isLocked(key)) {
// add key and expiryReason to list to evict them later
expiredKeyExpiryReasonList.add(key);
expiredKeyExpiryReasonList.add(expiryReason);
expiryQueue.add(key);
expiryQueue.add(expiryReason);
// remove expired key from expirySystem
callIterRemove(cachedExpirationIterator);
expiredKeyCount++;
callIterRemove(cachedIterator);
}

// - If timed out while looping, break this loop to free
// partition thread.
// - Scan at least Evictor.SAMPLE_COUNT keys. During
// eviction we also check this number of keys.
if (scannedKeyCount % Evictor.SAMPLE_COUNT == 0
&& (System.nanoTime() - scanLoopStartNanos) >= expiredKeyScanTimeoutNanos) {
if (++scannedCount == MAX_SCAN_AT_A_TIME) {
break;
}
}
return scannedCount;
}

} while (scannedKeyCount < maxScannableKeyCount && getOrInitCachedIterator().hasNext());
private int evictExpiredSamples(boolean backup) {
int evictedCount = 0;

// Evict expired keys
for (int i = 0; i < expiredKeyExpiryReasonList.size(); i += 2) {
Data key = (Data) expiredKeyExpiryReasonList.get(i);
ExpiryReason reason = (ExpiryReason) expiredKeyExpiryReasonList.get(i + 1);
Queue sampledPairs = EXPIRY_QUEUE.get();
Data key;
while ((key = (Data) sampledPairs.poll()) != null) {
ExpiryReason reason = (ExpiryReason) sampledPairs.poll();
recordStore.evictExpiredEntryAndPublishExpiryEvent(key, reason, backup);
}

if (logger.isFinestEnabled()) {
logger.finest(String.format("mapName: %s, partitionId: %d, partitionSize: %d, "
+ "remainingKeyCountToExpire: %d, maxScannableKeyCount: %d, "
+ "scannedKeyCount: %d, expiredKeyCount: %d"
, recordStore.getName(), recordStore.getPartitionId(), recordStore.size()
, expireTimeByKey.size(), maxScannableKeyCount, scannedKeyCount, expiredKeyCount));
}
return evictedCount;
}

// this method is overridden
Expand Down

0 comments on commit 438b979

Please sign in to comment.