Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control temporary allocations for background expiry task #18507

Merged
merged 8 commits into from May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,9 +28,9 @@
import com.hazelcast.spi.properties.HazelcastProperty;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -252,7 +252,7 @@ private boolean notInProcessableTimeWindow(T container, long now) {

private List<T> addContainerTo(List<T> containersToProcess, T container) {
if (containersToProcess == null) {
containersToProcess = new ArrayList<T>();
containersToProcess = new LinkedList<>();
}

containersToProcess.add(container);
Expand Down
Expand Up @@ -16,16 +16,16 @@

package com.hazelcast.internal.eviction;

import java.util.function.BiFunction;
import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue;
import com.hazelcast.internal.util.CollectionUtil;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.OperationService;
import com.hazelcast.internal.util.CollectionUtil;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.function.BiFunction;

/**
* Helper class to create and send backup expiration operations.
Expand Down Expand Up @@ -59,7 +59,7 @@ static <S> ToBackupSender<S> newToBackupSender(String serviceName,
}

private static Collection<ExpiredKey> pollExpiredKeys(Queue<ExpiredKey> expiredKeys) {
Collection<ExpiredKey> polledKeys = new ArrayList<ExpiredKey>(expiredKeys.size());
Collection<ExpiredKey> polledKeys = new LinkedList<>();

do {
ExpiredKey expiredKey = expiredKeys.poll();
Expand Down
Expand Up @@ -511,7 +511,7 @@ public Object evict(Data key, boolean backup) {
value = record.getValue();
mapDataStore.flush(key, value, backup);
mutationObserver.onEvictRecord(key, record);
key = removeKeyFromExpirySystem(key);
removeKeyFromExpirySystem(key);
storage.removeRecord(key, record);
if (!backup) {
mapServiceContext.interceptRemove(interceptorRegistry, value);
Expand All @@ -520,10 +520,8 @@ public Object evict(Data key, boolean backup) {
return value;
}

private Data removeKeyFromExpirySystem(Data key) {
Data backingDataKeyFormat = storage.toBackingDataKeyFormat(key);
expirySystem.removeKeyFromExpirySystem(backingDataKeyFormat);
return backingDataKeyFormat;
private void removeKeyFromExpirySystem(Data key) {
expirySystem.removeKeyFromExpirySystem(key);
}

@Override
Expand Down
Expand Up @@ -21,12 +21,12 @@
import com.hazelcast.internal.eviction.ExpiredKey;
import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.util.ExceptionUtil;
import com.hazelcast.internal.util.MapUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.impl.ExpirationTimeSetter;
import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.MapServiceContext;
import com.hazelcast.map.impl.eviction.Evictor;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.properties.ClusterProperty;
Expand Down Expand Up @@ -61,7 +61,10 @@ public class ExpirySystem {
= new HazelcastProperty(PROP_EXPIRED_KEY_SCAN_TIMEOUT_NANOS,
DEFAULT_EXPIRED_KEY_SCAN_TIMEOUT_NANOS, NANOSECONDS);
private static final int ONE_HUNDRED_PERCENT = 100;
private static final int MIN_SCANNABLE_ENTRY_COUNT = 100;
private static final int MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN = 100;
private static final int MAX_SAMPLE_AT_A_TIME = 16;
private static final ThreadLocal<List> BATCH_OF_EXPIRED
= ThreadLocal.withInitial(() -> new ArrayList<>(MAX_SAMPLE_AT_A_TIME << 1));

private final long expiryDelayMillis;
private final long expiredKeyScanTimeoutNanos;
Expand All @@ -74,7 +77,9 @@ public class ExpirySystem {
private final InvalidationQueue<ExpiredKey> expiredKeys = new InvalidationQueue<>();

private Iterator<Map.Entry<Data, ExpiryMetadata>> cachedExpirationIterator;
private Map<Data, ExpiryMetadata> expireTimeByKey;
// This is volatile since it can be initialized at runtime lazily and
// can be accessed by query threads besides partition ones.
private volatile Map<Data, ExpiryMetadata> expireTimeByKey;

public ExpirySystem(RecordStore recordStore,
MapContainer mapContainer,
Expand All @@ -91,7 +96,7 @@ public ExpirySystem(RecordStore recordStore,
this.expiredKeyScanTimeoutNanos = nodeEngine.getProperties().getNanos(EXPIRED_KEY_SCAN_TIMEOUT_NANOS);
}

public boolean isEmpty() {
public final boolean isEmpty() {
return MapUtil.isNullOrEmpty(expireTimeByKey);
}

Expand Down Expand Up @@ -129,7 +134,7 @@ protected ExpiryMetadata createExpiryMetadata(long ttlMillis, long maxIdleMillis
return new ExpiryMetadataImpl(ttlMillis, maxIdleMillis, expirationTime);
}

public void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expiryTime, long now) {
public final void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expiryTime, long now) {
if (expiryTime <= 0) {
MapConfig mapConfig = mapContainer.getMapConfig();
long ttlMillis = pickTTLMillis(ttl, mapConfig);
Expand All @@ -143,10 +148,8 @@ public void addKeyIfExpirable(Data key, long ttl, long maxIdle, long expiryTime,

private void addExpirableKey(Data key, long ttlMillis, long maxIdleMillis, long expirationTime) {
if (expirationTime == Long.MAX_VALUE) {
Map<Data, ExpiryMetadata> map = getOrCreateExpireTimeByKeyMap(false);
if (!map.isEmpty()) {
Data nativeKey = recordStore.getStorage().toBackingDataKeyFormat(key);
callRemove(nativeKey, expireTimeByKey);
if (!isEmpty()) {
callRemove(key, expireTimeByKey);
}
return;
}
Expand All @@ -166,28 +169,28 @@ private void addExpirableKey(Data key, long ttlMillis, long maxIdleMillis, long
mapServiceContext.getExpirationManager().scheduleExpirationTask();
}

public long calculateExpirationTime(long ttl, long maxIdle, long now) {
public final long calculateExpirationTime(long ttl, long maxIdle, long now) {
MapConfig mapConfig = mapContainer.getMapConfig();
long ttlMillis = pickTTLMillis(ttl, mapConfig);
long maxIdleMillis = pickMaxIdleMillis(maxIdle, mapConfig);
return ExpirationTimeSetter.calculateExpirationTime(ttlMillis, maxIdleMillis, now);
}

public void removeKeyFromExpirySystem(Data key) {
public final void removeKeyFromExpirySystem(Data key) {
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (expireTimeByKey.isEmpty()) {
if (isEmpty()) {
return;
}
callRemove(key, expireTimeByKey);
}

public void extendExpiryTime(Data dataKey, long now) {
public final void extendExpiryTime(Data dataKey, long now) {
if (isEmpty()) {
return;
}

Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (expireTimeByKey.isEmpty()) {
if (isEmpty()) {
return;
}

Expand All @@ -202,9 +205,9 @@ public void extendExpiryTime(Data dataKey, long now) {
expiryMetadata.setExpirationTime(expirationTime);
}

public ExpiryReason hasExpired(Data key, long now, boolean backup) {
public final ExpiryReason hasExpired(Data key, long now, boolean backup) {
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (expireTimeByKey.isEmpty()) {
if (isEmpty()) {
return ExpiryReason.NOT_EXPIRED;
}
ExpiryMetadata expiryMetadata = getExpiryMetadataForExpiryCheck(key, expireTimeByKey);
Expand Down Expand Up @@ -234,31 +237,69 @@ private ExpiryReason hasExpired(ExpiryMetadata expiryMetadata, long now, boolean
return expiryReason;
}

public InvalidationQueue<ExpiredKey> getExpiredKeys() {
public final InvalidationQueue<ExpiredKey> getExpiredKeys() {
return expiredKeys;
}

@Nonnull
public ExpiryMetadata getExpiredMetadata(Data key) {
public final ExpiryMetadata getExpiredMetadata(Data key) {
ExpiryMetadata expiryMetadata = getOrCreateExpireTimeByKeyMap(false).get(key);
return expiryMetadata != null ? expiryMetadata : ExpiryMetadata.NULL;
}

@SuppressWarnings("checkstyle:magicnumber")
public void evictExpiredEntries(int percentage, long now, boolean backup) {
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (expireTimeByKey.isEmpty()) {
public final void evictExpiredEntries(final int percentage, final long now, final boolean backup) {
// 1. Find how many keys we can scan at max.
final int maxScannableCount = findMaxScannableCount(percentage);
if (maxScannableCount == 0) {
// no expirable entry exists.
return;
}

// Find max scannable key count
int expirableKeysMapSize = expireTimeByKey.size();
int keyCountInPercentage = (int) (1D * expirableKeysMapSize * percentage / ONE_HUNDRED_PERCENT);
int maxScannableKeyCount = Math.max(MIN_SCANNABLE_ENTRY_COUNT, keyCountInPercentage);

scanAndEvictExpiredKeys(maxScannableKeyCount, now, backup);
// 2. Do scanning and evict expired keys.
int scannedCount = 0;
int expiredCount = 0;
try {
long scanLoopStartNanos = System.nanoTime();
do {
scannedCount += findExpiredKeys(now, backup);
expiredCount += evictExpiredKeys(backup);
} while (scannedCount < maxScannableCount && getOrInitCachedIterator().hasNext()
&& (System.nanoTime() - scanLoopStartNanos) < expiredKeyScanTimeoutNanos);
} catch (Exception e) {
BATCH_OF_EXPIRED.get().clear();
throw ExceptionUtil.rethrow(e);
}

// 3. Send expired keys to backups(only valid for max-idle-expiry)
tryToSendBackupExpiryOp();

if (logger.isFinestEnabled()) {
logProgress(maxScannableCount, scannedCount, expiredCount);
}
}

private void logProgress(int maxScannableCount, int scannedCount, int expiredCount) {
logger.finest(String.format("mapName: %s, partitionId: %d, partitionSize: %d, "
+ "remainingKeyCountToExpire: %d, maxScannableKeyCount: %d, "
+ "scannedKeyCount: %d, expiredKeyCount: %d"
, recordStore.getName(), recordStore.getPartitionId(), recordStore.size()
, expireTimeByKey.size(), maxScannableCount, scannedCount, expiredCount));
}

private int findMaxScannableCount(int percentage) {
Map<Data, ExpiryMetadata> expireTimeByKey = getOrCreateExpireTimeByKeyMap(false);
if (isEmpty()) {
return 0;
}

int numberOfExpirableKeys = expireTimeByKey.size();
if (numberOfExpirableKeys <= MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN) {
return numberOfExpirableKeys;
}

int percentageOfExpirableKeys = (int) (1D * numberOfExpirableKeys * percentage / ONE_HUNDRED_PERCENT);
return Math.max(MIN_TOTAL_NUMBER_OF_KEYS_TO_SCAN, percentageOfExpirableKeys);
}

/**
Expand All @@ -268,57 +309,45 @@ private Iterator<Map.Entry<Data, ExpiryMetadata>> getOrInitCachedIterator() {
if (cachedExpirationIterator == null || !cachedExpirationIterator.hasNext()) {
cachedExpirationIterator = initIteratorOf(expireTimeByKey);
}

return cachedExpirationIterator;
}

private void scanAndEvictExpiredKeys(int maxScannableKeyCount, long now, boolean backup) {
// Scan to find expired keys.
long scanLoopStartNanos = System.nanoTime();
List expiredKeyExpiryReasonList = new ArrayList<>();
int scannedKeyCount = 0;
int expiredKeyCount = 0;
do {
Map.Entry<Data, ExpiryMetadata> entry = getOrInitCachedIterator().next();
scannedKeyCount++;
private int findExpiredKeys(long now, boolean backup) {
List batchOfExpired = BATCH_OF_EXPIRED.get();

int scannedCount = 0;
Iterator<Map.Entry<Data, ExpiryMetadata>> cachedIterator = getOrInitCachedIterator();
while (scannedCount++ < MAX_SAMPLE_AT_A_TIME && cachedIterator.hasNext()) {
Map.Entry<Data, ExpiryMetadata> entry = cachedIterator.next();
Data key = entry.getKey();
ExpiryMetadata expiryMetadata = entry.getValue();

ExpiryReason expiryReason = hasExpired(expiryMetadata, now, backup);
if (expiryReason != ExpiryReason.NOT_EXPIRED && !recordStore.isLocked(key)) {
// add key and expiryReason to list to evict them later
expiredKeyExpiryReasonList.add(key);
expiredKeyExpiryReasonList.add(expiryReason);
// remove expired key from expirySystem
callIterRemove(cachedExpirationIterator);
expiredKeyCount++;
}

// - If timed out while looping, break this loop to free
// partition thread.
// - Scan at least Evictor.SAMPLE_COUNT keys. During
// eviction we also check this number of keys.
if (scannedKeyCount % Evictor.SAMPLE_COUNT == 0
&& (System.nanoTime() - scanLoopStartNanos) >= expiredKeyScanTimeoutNanos) {
break;
batchOfExpired.add(key);
batchOfExpired.add(expiryReason);
}

} while (scannedKeyCount < maxScannableKeyCount && getOrInitCachedIterator().hasNext());

// Evict expired keys
for (int i = 0; i < expiredKeyExpiryReasonList.size(); i += 2) {
Data key = (Data) expiredKeyExpiryReasonList.get(i);
ExpiryReason reason = (ExpiryReason) expiredKeyExpiryReasonList.get(i + 1);
recordStore.evictExpiredEntryAndPublishExpiryEvent(key, reason, backup);
}
return scannedCount;
}

if (logger.isFinestEnabled()) {
logger.finest(String.format("mapName: %s, partitionId: %d, partitionSize: %d, "
+ "remainingKeyCountToExpire: %d, maxScannableKeyCount: %d, "
+ "scannedKeyCount: %d, expiredKeyCount: %d"
, recordStore.getName(), recordStore.getPartitionId(), recordStore.size()
, expireTimeByKey.size(), maxScannableKeyCount, scannedKeyCount, expiredKeyCount));
private int evictExpiredKeys(boolean backup) {
int evictedCount = 0;

List batchOfExpired = BATCH_OF_EXPIRED.get();
try {
for (int i = 0; i < batchOfExpired.size(); i += 2) {
Data key = (Data) batchOfExpired.get(i);
ExpiryReason expiryReason = (ExpiryReason) batchOfExpired.get(i + 1);
recordStore.evictExpiredEntryAndPublishExpiryEvent(key, expiryReason, backup);
callRemove(key, expireTimeByKey);
evictedCount++;
}
} finally {
batchOfExpired.clear();
}
return evictedCount;
}

// this method is overridden
Expand All @@ -327,11 +356,6 @@ protected ExpiryMetadata getExpiryMetadataForExpiryCheck(Data key,
return expireTimeByKey.get(key);
}

// this method is overridden
protected void callIterRemove(Iterator<Map.Entry<Data, ExpiryMetadata>> expirationIterator) {
expirationIterator.remove();
}

// this method is overridden
protected Iterator<Map.Entry<Data, ExpiryMetadata>> initIteratorOf(Map<Data, ExpiryMetadata> expireTimeByKey) {
return expireTimeByKey.entrySet().iterator();
Expand All @@ -347,7 +371,7 @@ public void destroy() {
getOrCreateExpireTimeByKeyMap(false).clear();
}

public void accumulateOrSendExpiredKey(Data dataKey, long valueHashCode) {
public final void accumulateOrSendExpiredKey(Data dataKey, long valueHashCode) {
if (mapContainer.getTotalBackupCount() == 0) {
return;
}
Expand All @@ -359,7 +383,7 @@ public void accumulateOrSendExpiredKey(Data dataKey, long valueHashCode) {
clearExpiredRecordsTask.tryToSendBackupExpiryOp(recordStore, true);
}

public void tryToSendBackupExpiryOp() {
public final void tryToSendBackupExpiryOp() {
if (mapContainer.getTotalBackupCount() == 0) {
return;
}
Expand Down