Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmetmircik committed Apr 9, 2021
1 parent 438b979 commit 2a666b4
Showing 1 changed file with 44 additions and 34 deletions.
Expand Up @@ -60,11 +60,10 @@ public class ExpirySystem {
= new HazelcastProperty(PROP_EXPIRED_KEY_SCAN_TIMEOUT_NANOS,
DEFAULT_EXPIRED_KEY_SCAN_TIMEOUT_NANOS, NANOSECONDS);
private static final int ONE_HUNDRED_PERCENT = 100;
private static final int MIN_SCANNABLE_ENTRY_COUNT = 100;
// MAX_SAMPLE_AT_A_TIME and SAMPLING_QUEUE is used for background expiry task.
private static final int MAX_SCAN_AT_A_TIME = 16;
private static final ThreadLocal<Queue> EXPIRY_QUEUE
= ThreadLocal.withInitial(() -> new ArrayDeque(MAX_SCAN_AT_A_TIME));
private static final int MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN = 100;
private static final int MAX_NUMBER_OF_KEYS_TO_SCAN_AT_A_TIME = 16;
private static final ThreadLocal<Queue> BATCH_OF_EXPIRED
= ThreadLocal.withInitial(() -> new ArrayDeque(MAX_NUMBER_OF_KEYS_TO_SCAN_AT_A_TIME));

private final long expiryDelayMillis;
private final long expiredKeyScanTimeoutNanos;
Expand Down Expand Up @@ -250,44 +249,55 @@ public ExpiryMetadata getExpiredMetadata(Data key) {
}

@SuppressWarnings("checkstyle:magicnumber")
public void evictExpiredEntries(int percentage, long now, boolean backup) {
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (expireTimeByKey.isEmpty()) {
public void evictExpiredEntries(final int percentage, final long now, final boolean backup) {
// 1. Find how many keys we can scan at max.
final int maxScannableCount = findMaxScannableCount(percentage);
if (maxScannableCount == 0) {
// no expirable entry exists.
return;
}

// Find max scannable key count
int maxScannableCount;
int expirableKeysMapSize = expireTimeByKey.size();
if (expirableKeysMapSize <= MIN_SCANNABLE_ENTRY_COUNT) {
maxScannableCount = expirableKeysMapSize;
} else {
int keyCountInPercentage = (int) (1D * expirableKeysMapSize * percentage / ONE_HUNDRED_PERCENT);
maxScannableCount = Math.max(MIN_SCANNABLE_ENTRY_COUNT, keyCountInPercentage);
}

// Do scanning and evict expired keys.
// 2. Do scanning and evict expired keys.
int scannedCount = 0;
int expiredCount = 0;
long scanLoopStartNanos = System.nanoTime();
do {
scannedCount += findExpiredKeys(now, backup);
expiredCount += evictExpiredSamples(backup);
expiredCount += evictExpiredKeys(backup);
} while ((scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext())
|| (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos);

// Send expired keys to backups(only valid for max-idle-expiry)
// 3. Send expired keys to backups(only valid for max-idle-expiry)
tryToSendBackupExpiryOp();

if (logger.isFinestEnabled()) {
logger.finest(String.format("mapName: %s, partitionId: %d, partitionSize: %d, "
+ "remainingKeyCountToExpire: %d, maxScannableKeyCount: %d, "
+ "scannedKeyCount: %d, expiredKeyCount: %d"
, recordStore.getName(), recordStore.getPartitionId(), recordStore.size()
, expireTimeByKey.size(), maxScannableCount, scannedCount, expiredCount));
logProgress(maxScannableCount, scannedCount, expiredCount);
}
}

private void logProgress(int maxScannableCount, int scannedCount, int expiredCount) {
logger.finest(String.format("mapName: %s, partitionId: %d, partitionSize: %d, "
+ "remainingKeyCountToExpire: %d, maxScannableKeyCount: %d, "
+ "scannedKeyCount: %d, expiredKeyCount: %d"
, recordStore.getName(), recordStore.getPartitionId(), recordStore.size()
, expireTimeByKey.size(), maxScannableCount, scannedCount, expiredCount));
}

private int findMaxScannableCount(int percentage) {
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (expireTimeByKey.isEmpty()) {
return 0;
}

int numberOfExpirableKeys = expireTimeByKey.size();
if (numberOfExpirableKeys <= MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN) {
return numberOfExpirableKeys;
}

int percentageOfExpirableKeys = (int) (1D * numberOfExpirableKeys * percentage / ONE_HUNDRED_PERCENT);
return Math.max(MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN, percentageOfExpirableKeys);
}

/**
* Get cachedExpirationIterator or init it if it has no next entry.
*/
Expand All @@ -299,7 +309,7 @@ private Iterator<Map.Entry<Data, ExpiryMetadata>> getOrInitCachedIterator() {
}

private int findExpiredKeys(long now, boolean backup) {
Queue expiryQueue = EXPIRY_QUEUE.get();
Queue batchOfExpired = BATCH_OF_EXPIRED.get();

int scannedCount = 0;
Iterator<Map.Entry<Data, ExpiryMetadata>> cachedIterator = getOrInitCachedIterator();
Expand All @@ -311,26 +321,26 @@ private int findExpiredKeys(long now, boolean backup) {
ExpiryReason expiryReason = hasExpired(expiryMetadata, now, backup);
if (expiryReason != ExpiryReason.NOT_EXPIRED && !recordStore.isLocked(key)) {
// add key and expiryReason to list to evict them later
expiryQueue.add(key);
expiryQueue.add(expiryReason);
batchOfExpired.add(key);
batchOfExpired.add(expiryReason);
// remove expired key from expirySystem
callIterRemove(cachedIterator);
}

if (++scannedCount == MAX_SCAN_AT_A_TIME) {
if (++scannedCount == MAX_NUMBER_OF_KEYS_TO_SCAN_AT_A_TIME) {
break;
}
}
return scannedCount;
}

private int evictExpiredSamples(boolean backup) {
private int evictExpiredKeys(boolean backup) {
int evictedCount = 0;

Queue sampledPairs = EXPIRY_QUEUE.get();
Queue batchOfExpired = BATCH_OF_EXPIRED.get();
Data key;
while ((key = (Data) sampledPairs.poll()) != null) {
ExpiryReason reason = (ExpiryReason) sampledPairs.poll();
while ((key = (Data) batchOfExpired.poll()) != null) {
ExpiryReason reason = (ExpiryReason) batchOfExpired.poll();
recordStore.evictExpiredEntryAndPublishExpiryEvent(key, reason, backup);
}
return evictedCount;
Expand Down

0 comments on commit 2a666b4

Please sign in to comment.