Skip to content

Commit

Permalink
Control temporary allocations for background expiry task (#18507)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmetmircik committed May 4, 2021
1 parent de203ad commit 6c305d7
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 83 deletions.
Expand Up @@ -28,9 +28,9 @@
import com.hazelcast.spi.properties.HazelcastProperty;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -252,7 +252,7 @@ private boolean notInProcessableTimeWindow(T container, long now) {

private List<T> addContainerTo(List<T> containersToProcess, T container) {
if (containersToProcess == null) {
containersToProcess = new ArrayList<T>();
containersToProcess = new LinkedList<>();
}

containersToProcess.add(container);
Expand Down
Expand Up @@ -16,16 +16,16 @@

package com.hazelcast.internal.eviction;

import java.util.function.BiFunction;
import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue;
import com.hazelcast.internal.util.CollectionUtil;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.OperationService;
import com.hazelcast.internal.util.CollectionUtil;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.function.BiFunction;

/**
* Helper class to create and send backup expiration operations.
Expand Down Expand Up @@ -59,7 +59,7 @@ static <S> ToBackupSender<S> newToBackupSender(String serviceName,
}

private static Collection<ExpiredKey> pollExpiredKeys(Queue<ExpiredKey> expiredKeys) {
Collection<ExpiredKey> polledKeys = new ArrayList<ExpiredKey>(expiredKeys.size());
Collection<ExpiredKey> polledKeys = new LinkedList<>();

do {
ExpiredKey expiredKey = expiredKeys.poll();
Expand Down
Expand Up @@ -511,7 +511,7 @@ public Object evict(Data key, boolean backup) {
value = record.getValue();
mapDataStore.flush(key, value, backup);
mutationObserver.onEvictRecord(key, record);
key = removeKeyFromExpirySystem(key);
removeKeyFromExpirySystem(key);
storage.removeRecord(key, record);
if (!backup) {
mapServiceContext.interceptRemove(interceptorRegistry, value);
Expand All @@ -520,10 +520,8 @@ public Object evict(Data key, boolean backup) {
return value;
}

private Data removeKeyFromExpirySystem(Data key) {
Data backingDataKeyFormat = storage.toBackingDataKeyFormat(key);
expirySystem.removeKeyFromExpirySystem(backingDataKeyFormat);
return backingDataKeyFormat;
private void removeKeyFromExpirySystem(Data key) {
expirySystem.removeKeyFromExpirySystem(key);
}

@Override
Expand Down
Expand Up @@ -21,12 +21,12 @@
import com.hazelcast.internal.eviction.ExpiredKey;
import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.MapUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.impl.ExpirationTimeSetter;
import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.eviction.Evictor;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.properties.ClusterProperty;
Expand Down Expand Up @@ -61,7 +61,10 @@ public class ExpirySystem {
= new HazelcastProperty(PROP_EXPIRED_KEY_SCAN_TIMEOUT_NANOS,
DEFAULT_EXPIRED_KEY_SCAN_TIMEOUT_NANOS, NANOSECONDS);
private static final int ONE_HUNDRED_PERCENT = 100;
private static final int MIN_SCANNABLE_ENTRY_COUNT = 100;
private static final int MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN = 100;
private static final int MAX_SAMPLE_AT_A_TIME = 16;
private static final ThreadLocal<List> BATCH_OF_EXPIRED
= ThreadLocal.withInitial(() -> new ArrayList<>(MAX_SAMPLE_AT_A_TIME << 1));

private final long expiryDelayMillis;
private final long expiredKeyScanTimeoutNanos;
Expand All @@ -74,7 +77,9 @@ public class ExpirySystem {
private final InvalidationQueue<ExpiredKey> expiredKeys = new InvalidationQueue<>();

private Iterator<Map.Entry<Data, ExpiryMetadata>> cachedExpirationIterator;
private Map<Data, ExpiryMetadata> expireTimeByKey;
// This is volatile since it can be initialized at runtime lazily and
// can be accessed by query threads besides partition ones.
private volatile Map<Data, ExpiryMetadata> expireTimeByKey;

public ExpirySystem(RecordStore recordStore,
MapContainer mapContainer,
Expand All @@ -91,7 +96,7 @@ public ExpirySystem(RecordStore recordStore,
this.expiredKeyScanTimeoutNanos = nodeEngine.getProperties().getNanos(EXPIRED_KEY_SCAN_TIMEOUT_NANOS);
}

public boolean isEmpty() {
public final boolean isEmpty() {
return MapUtil.isNullOrEmpty(expireTimeByKey);
}

Expand Down Expand Up @@ -129,7 +134,7 @@ protected ExpiryMetadata createExpiryMetadata(long ttlMillis, long maxIdleMillis
return new ExpiryMetadataImpl(ttlMillis, maxIdleMillis, expirationTime);
}

public void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expiryTime, long now) {
public final void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expiryTime, long now) {
if (expiryTime <= 0) {
MapConfig mapConfig = mapContainer.getMapConfig();
long ttlMillis = pickTTLMillis(ttl, mapConfig);
Expand All @@ -143,10 +148,8 @@ public void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expiryTime,

private void addExpirableKey(Data key, long ttlMillis, long maxIdleMillis, long expirationTime) {
if (expirationTime == Long.MAX_VALUE) {
Map<Data, ExpiryMetadata> map = getOrCreateExpireTimeByKeyMap(false);
if (!map.isEmpty()) {
Data nativeKey = recordStore.getStorage().toBackingDataKeyFormat(key);
callRemove(nativeKey, expireTimeByKey);
if (!isEmpty()) {
callRemove(key, expireTimeByKey);
}
return;
}
Expand All @@ -166,28 +169,28 @@ private void addExpirableKey(Data key, long ttlMillis, long maxIdleMillis, long
mapServiceContext.getExpirationManager().scheduleExpirationTask();
}

public long calculateExpirationTime(long ttl, long maxIdle, long now) {
public final long calculateExpirationTime(long ttl, long maxIdle, long now) {
MapConfig mapConfig = mapContainer.getMapConfig();
long ttlMillis = pickTTLMillis(ttl, mapConfig);
long maxIdleMillis = pickMaxIdleMillis(maxIdle, mapConfig);
return ExpirationTimeSetter.calculateExpirationTime(ttlMillis, maxIdleMillis, now);
}

public void removeKeyFromExpirySystem(Data key) {
public final void removeKeyFromExpirySystem(Data key) {
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (expireTimeByKey.isEmpty()) {
if (isEmpty()) {
return;
}
callRemove(key, expireTimeByKey);
}

public void extendExpiryTime(Data dataKey, long now) {
public final void extendExpiryTime(Data dataKey, long now) {
if (isEmpty()) {
return;
}

Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (expireTimeByKey.isEmpty()) {
if (isEmpty()) {
return;
}

Expand All @@ -202,9 +205,9 @@ public void extendExpiryTime(Data dataKey, long now) {
expiryMetadata.setExpirationTime(expirationTime);
}

public ExpiryReason hasExpired(Data key, long now, boolean backup) {
public final ExpiryReason hasExpired(Data key, long now, boolean backup) {
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (expireTimeByKey.isEmpty()) {
if (isEmpty()) {
return ExpiryReason.NOT_EXPIRED;
}
ExpiryMetadata expiryMetadata = getExpiryMetadataForExpiryCheck(key, expireTimeByKey);
Expand Down Expand Up @@ -234,31 +237,69 @@ private ExpiryReason hasExpired(ExpiryMetadata expiryMetadata, long now, boolean
return expiryReason;
}

public InvalidationQueue<ExpiredKey> getExpiredKeys() {
public final InvalidationQueue<ExpiredKey> getExpiredKeys() {
return expiredKeys;
}

@Nonnull
public ExpiryMetadata getExpiredMetadata(Data key) {
public final ExpiryMetadata getExpiredMetadata(Data key) {
ExpiryMetadata expiryMetadata = getOrCreateExpireTimeByKeyMap(false).get(key);
return expiryMetadata != null ? expiryMetadata : ExpiryMetadata.NULL;
}

@SuppressWarnings("checkstyle:magicnumber")
public void evictExpiredEntries(int percentage, long now, boolean backup) {
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (expireTimeByKey.isEmpty()) {
public final void evictExpiredEntries(final int percentage, final long now, final boolean backup) {
// 1. Find how many keys we can scan at max.
final int maxScannableCount = findMaxScannableCount(percentage);
if (maxScannableCount == 0) {
// no expirable entry exists.
return;
}

// Find max scannable key count
int expirableKeysMapSize = expireTimeByKey.size();
int keyCountInPercentage = (int) (1D * expirableKeysMapSize * percentage / ONE_HUNDRED_PERCENT);
int maxScannableKeyCount = Math.max(MIN_SCANNABLE_ENTRY_COUNT, keyCountInPercentage);

scanAndEvictExpiredKeys(maxScannableKeyCount, now, backup);
// 2. Do scanning and evict expired keys.
int scannedCount = 0;
int expiredCount = 0;
try {
long scanLoopStartNanos = System.nanoTime();
do {
scannedCount += findExpiredKeys(now, backup);
expiredCount += evictExpiredKeys(backup);
} while (scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext()
&& (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos);
} catch (Exception e) {
BATCH_OF_EXPIRED.get().clear();
throw ExceptionUtil.rethrow(e);
}

// 3. Send expired keys to backups(only valid for max-idle-expiry)
tryToSendBackupExpiryOp();

if (logger.isFinestEnabled()) {
logProgress(maxScannableCount, scannedCount, expiredCount);
}
}

private void logProgress(int maxScannableCount, int scannedCount, int expiredCount) {
logger.finest(String.format("mapName: %s, partitionId: %d, partitionSize: %d, "
+ "remainingKeyCountToExpire: %d, maxScannableKeyCount: %d, "
+ "scannedKeyCount: %d, expiredKeyCount: %d"
, recordStore.getName(), recordStore.getPartitionId(), recordStore.size()
, expireTimeByKey.size(), maxScannableCount, scannedCount, expiredCount));
}

private int findMaxScannableCount(int percentage) {
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (isEmpty()) {
return 0;
}

int numberOfExpirableKeys = expireTimeByKey.size();
if (numberOfExpirableKeys <= MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN) {
return numberOfExpirableKeys;
}

int percentageOfExpirableKeys = (int) (1D * numberOfExpirableKeys * percentage / ONE_HUNDRED_PERCENT);
return Math.max(MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN, percentageOfExpirableKeys);
}

/**
Expand All @@ -268,57 +309,45 @@ private Iterator<Map.Entry<Data, ExpiryMetadata>> getOrInitCachedIterator() {
if (cachedExpirationIterator == null || !cachedExpirationIterator.hasNext()) {
cachedExpirationIterator = initIteratorOf(expireTimeByKey);
}

return cachedExpirationIterator;
}

private void scanAndEvictExpiredKeys(int maxScannableKeyCount, long now, boolean backup) {
// Scan to find expired keys.
long scanLoopStartNanos = System.nanoTime();
List expiredKeyExpiryReasonList = new ArrayList<>();
int scannedKeyCount = 0;
int expiredKeyCount = 0;
do {
Map.Entry<Data, ExpiryMetadata> entry = getOrInitCachedIterator().next();
scannedKeyCount++;
private int findExpiredKeys(long now, boolean backup) {
List batchOfExpired = BATCH_OF_EXPIRED.get();

int scannedCount = 0;
Iterator<Map.Entry<Data, ExpiryMetadata>> cachedIterator = getOrInitCachedIterator();
while (scannedCount++ < MAX_SAMPLE_AT_A_TIME && cachedIterator.hasNext()) {
Map.Entry<Data, ExpiryMetadata> entry = cachedIterator.next();
Data key = entry.getKey();
ExpiryMetadata expiryMetadata = entry.getValue();

ExpiryReason expiryReason = hasExpired(expiryMetadata, now, backup);
if (expiryReason != ExpiryReason.NOT_EXPIRED && !recordStore.isLocked(key)) {
// add key and expiryReason to list to evict them later
expiredKeyExpiryReasonList.add(key);
expiredKeyExpiryReasonList.add(expiryReason);
// remove expired key from expirySystem
callIterRemove(cachedExpirationIterator);
expiredKeyCount++;
}

// - If timed out while looping, break this loop to free
// partition thread.
// - Scan at least Evictor.SAMPLE_COUNT keys. During
// eviction we also check this number of keys.
if (scannedKeyCount % Evictor.SAMPLE_COUNT == 0
&& (System.nanoTime() - scanLoopStartNanos) >= expiredKeyScanTimeoutNanos) {
break;
batchOfExpired.add(key);
batchOfExpired.add(expiryReason);
}

} while (scannedKeyCount < maxScannableKeyCount && getOrInitCachedIterator().hasNext());

// Evict expired keys
for (int i = 0; i < expiredKeyExpiryReasonList.size(); i += 2) {
Data key = (Data) expiredKeyExpiryReasonList.get(i);
ExpiryReason reason = (ExpiryReason) expiredKeyExpiryReasonList.get(i + 1);
recordStore.evictExpiredEntryAndPublishExpiryEvent(key, reason, backup);
}
return scannedCount;
}

if (logger.isFinestEnabled()) {
logger.finest(String.format("mapName: %s, partitionId: %d, partitionSize: %d, "
+ "remainingKeyCountToExpire: %d, maxScannableKeyCount: %d, "
+ "scannedKeyCount: %d, expiredKeyCount: %d"
, recordStore.getName(), recordStore.getPartitionId(), recordStore.size()
, expireTimeByKey.size(), maxScannableKeyCount, scannedKeyCount, expiredKeyCount));
private int evictExpiredKeys(boolean backup) {
int evictedCount = 0;

List batchOfExpired = BATCH_OF_EXPIRED.get();
try {
for (int i = 0; i < batchOfExpired.size(); i += 2) {
Data key = (Data) batchOfExpired.get(i);
ExpiryReason expiryReason = (ExpiryReason) batchOfExpired.get(i + 1);
recordStore.evictExpiredEntryAndPublishExpiryEvent(key, expiryReason, backup);
callRemove(key, expireTimeByKey);
evictedCount++;
}
} finally {
batchOfExpired.clear();
}
return evictedCount;
}

// this method is overridden
Expand All @@ -327,11 +356,6 @@ protected ExpiryMetadata getExpiryMetadataForExpiryCheck(Data key,
return expireTimeByKey.get(key);
}

// this method is overridden
protected void callIterRemove(Iterator<Map.Entry<Data, ExpiryMetadata>> expirationIterator) {
expirationIterator.remove();
}

// this method is overridden
protected Iterator<Map.Entry<Data, ExpiryMetadata>> initIteratorOf(Map<Data, ExpiryMetadata> expireTimeByKey) {
return expireTimeByKey.entrySet().iterator();
Expand All @@ -347,7 +371,7 @@ public void destroy() {
getOrCreateExpireTimeByKeyMap(false).clear();
}

public void accumulateOrSendExpiredKey(Data dataKey, long valueHashCode) {
public final void accumulateOrSendExpiredKey(Data dataKey, long valueHashCode) {
if (mapContainer.getTotalBackupCount() == 0) {
return;
}
Expand All @@ -359,7 +383,7 @@ public void accumulateOrSendExpiredKey(Data dataKey, long valueHashCode) {
clearExpiredRecordsTask.tryToSendBackupExpiryOp(recordStore, true);
}

public void tryToSendBackupExpiryOp() {
public final void tryToSendBackupExpiryOp() {
if (mapContainer.getTotalBackupCount() == 0) {
return;
}
Expand Down

0 comments on commit 6c305d7

Please sign in to comment.