Skip to content

Commit

Permalink
Warn if writes are stalled due to blocked eviction (fixes #672)
Browse files Browse the repository at this point in the history
Eviction occcurs under an exclusive lock which is typically held for
very short periods to update the policy and possibly remove a victim
entry. Writes are applied to the hash table and the policy updated are
buffered, allowing the writer threads to schedule their work, tryLock
to update the policy, and move on if busy. If the writer buffer then
becomes full then to avoid a memory leak the writers must block to
assist, creating back pressure if the write rate exceeds the eviction
rate.

The eviction's removal of the victim from the hash table may cause
unexpected blocking. A map computation performs its work under the
same lock guarding the entry, which allows for atomicity of that
operation and requiring that other writes to the entry wait until
it completes. Typically this is quick, as caches are ready-heavy and
the victim entry is unlikely to be computed on. However, since the
locking in ConcurrentHashMap is based on the hash bin, not the entry,
a hash collision can cause writes to different keys to delay each
other. A slow, long-running computation then blocks eviction even
though the entries differ. When this happens then the writer buffer
fills up and other writes are blocked, causing no more write activity
until the eviction is allowed to proceed again.

That scenario goes against the advice of this library and the JavaDoc in
ConcurrentHashMap, which encourages short and fast computations. That is
milliseconds to seconds in practice, not minutes or hours. Instead we
offer AsyncCache to decouple the computation from the map, which
sacrafices linearizability for more efficient processing.

Of course few users will be aware of these implementation details to
make that decision early enough. Sadly some will find out only when they
observe production problems. To assist we now include a log warning in
hopes that it helps highlight it earlier, helps debugging, and hints
towards the required fix.
  • Loading branch information
ben-manes committed Feb 28, 2022
1 parent 2b96b7e commit 0620efd
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 44 deletions.
Expand Up @@ -218,6 +218,8 @@ abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef<K, V>
static final long EXPIRE_WRITE_TOLERANCE = TimeUnit.SECONDS.toNanos(1);
/** The maximum duration before an entry expires. */
static final long MAXIMUM_EXPIRY = (Long.MAX_VALUE >> 1); // 150 years
/** The duration to wait on the eviction lock before warning that of a possible misuse. */
static final long WARN_AFTER_LOCK_WAIT_NANOS = TimeUnit.SECONDS.toNanos(30);
/** The handle for the in-flight refresh operations. */
static final VarHandle REFRESHES;

Expand Down Expand Up @@ -1469,15 +1471,43 @@ void afterWrite(Runnable task) {
scheduleDrainBuffers();
}

// The maintenance task may be scheduled but not running. This might occur due to all of the
// executor's threads being busy (perhaps writing into this cache), the write rate greatly
// exceeds the consuming rate, priority inversion, or if the executor silently discarded the
// maintenance task. In these scenarios then the writing threads cannot make progress and
// instead writers provide assistance by performing this work directly.
// In scenarios where the writing threads cannot make progress then they attempt to provide
// assistance by performing the eviction work directly. This can resolve cases where the
// maintenance task is scheduled but not running. That might occur due to all of the executor's
// threads being busy (perhaps writing into this cache), the write rate greatly exceeds the
// consuming rate, priority inversion, or if the executor silently discarded the maintenance
// task. Unfortunately this cannot resolve when the eviction is blocked waiting on a long
// running computation due to an eviction listener, the victim being computed on by other write,
// or the victim residing in the same hash bin as a computing entry. In those cases a warning is
// logged to encourage the application to decouple these computations from the map operations.
lock();
try {
performCleanUp(task);
maintenance(task);
} catch (RuntimeException e) {
logger.log(Level.ERROR, "Exception thrown when performing the maintenance task", e);
} finally {
evictionLock.unlock();
}
}

/** Acquires the eviction lock. */
void lock() {
long remainingNanos = WARN_AFTER_LOCK_WAIT_NANOS;
long end = System.nanoTime() + remainingNanos;
for (;;) {
try {
if (evictionLock.tryLock(remainingNanos, TimeUnit.NANOSECONDS)) {
return;
}
logger.log(Level.WARNING, "The cache is experiencing excessive wait times for acquiring "
+ "the eviction lock. This may indicate that a long-running computation has halted "
+ "eviction when trying to remove the victim entry. Consider using AsyncCache to "
+ "decouple the computation from the map operation.", new TimeoutException());
evictionLock.lock();
return;
} catch (InterruptedException e) {
remainingNanos = end - System.nanoTime();
}
}
}

Expand Down
Expand Up @@ -21,6 +21,7 @@
import static com.github.benmanes.caffeine.cache.BLCHeader.DrainStatusRef.REQUIRED;
import static com.github.benmanes.caffeine.cache.BoundedLocalCache.EXPIRE_WRITE_TOLERANCE;
import static com.github.benmanes.caffeine.cache.BoundedLocalCache.PERCENT_MAIN_PROTECTED;
import static com.github.benmanes.caffeine.cache.BoundedLocalCache.WARN_AFTER_LOCK_WAIT_NANOS;
import static com.github.benmanes.caffeine.cache.BoundedLocalCache.WRITE_BUFFER_MAX;
import static com.github.benmanes.caffeine.cache.RemovalCause.COLLECTED;
import static com.github.benmanes.caffeine.cache.RemovalCause.EXPIRED;
Expand All @@ -41,6 +42,7 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static uk.org.lidalia.slf4jext.ConventionalLevelHierarchy.WARN_LEVELS;

import java.lang.Thread.State;
import java.lang.ref.Reference;
Expand Down Expand Up @@ -74,15 +76,19 @@
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Implementation;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.InitialCapacity;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Maximum;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Population;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.ReferenceType;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Stats;
import com.github.benmanes.caffeine.cache.testing.CacheValidationListener;
import com.github.benmanes.caffeine.testing.ConcurrentTestHarness;
import com.github.benmanes.caffeine.testing.Int;
import com.github.valfirst.slf4jtest.TestLogger;
import com.github.valfirst.slf4jtest.TestLoggerFactory;
import com.google.common.collect.Iterables;
import com.google.common.testing.GcFinalization;
import com.google.common.util.concurrent.Uninterruptibles;

/**
* The test cases for the implementation details of {@link BoundedLocalCache}.
Expand Down Expand Up @@ -1056,6 +1062,52 @@ public void put_expireTolerance_expiry(BoundedLocalCache<Int, Int> cache, CacheC
assertThat(cache.writeBuffer.producerIndex).isEqualTo(8);
}

@Test(dataProvider = "caches", groups = "slow")
@CacheSpec(implementation = Implementation.Caffeine, population = Population.EMPTY,
refreshAfterWrite = Expire.DISABLED, expireAfterAccess = Expire.DISABLED,
expireAfterWrite = Expire.DISABLED, expiry = CacheExpiry.DISABLED,
maximumSize = Maximum.UNREACHABLE, weigher = CacheWeigher.DEFAULT,
compute = Compute.SYNC, loader = Loader.DISABLED, stats = Stats.DISABLED,
removalListener = Listener.DEFAULT, evictionListener = Listener.DEFAULT,
keys = ReferenceType.STRONG, values = ReferenceType.STRONG)
public void put_warnIfEvictionBlocked(BoundedLocalCache<Int, Int> cache, CacheContext context) {
var testLogger = new AtomicReference<TestLogger>();
var thread = new AtomicReference<Thread>();
var done = new AtomicBoolean();
cache.evictionLock.lock();
try {
ConcurrentTestHarness.execute(() -> {
var logger = TestLoggerFactory.getTestLogger(BoundedLocalCache.class);
logger.setEnabledLevels(WARN_LEVELS);
thread.set(Thread.currentThread());
testLogger.set(logger);

for (int i = 0; true; i++) {
if (done.get()) {
return;
}
cache.put(Int.valueOf(i), Int.valueOf(i));
}
});

var halfWaitTime = Duration.ofNanos(WARN_AFTER_LOCK_WAIT_NANOS / 2);
await().until(cache.evictionLock::hasQueuedThreads);
thread.get().interrupt();

Uninterruptibles.sleepUninterruptibly(halfWaitTime);
assertThat(cache.evictionLock.hasQueuedThreads()).isTrue();
assertThat(testLogger.get().getAllLoggingEvents()).isEmpty();

Uninterruptibles.sleepUninterruptibly(halfWaitTime);
await().until(() -> !testLogger.get().getAllLoggingEvents().isEmpty());

assertThat(cache.evictionLock.hasQueuedThreads()).isTrue();
} finally {
done.set(true);
cache.evictionLock.unlock();
}
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, population = Population.EMPTY,
scheduler = CacheScheduler.MOCKITO, expiryTime = Expire.ONE_MINUTE,
Expand Down
Expand Up @@ -710,7 +710,7 @@ public void refresh_cancel_noLog(CacheContext context) {
LoadingCache<Int, Int> cache = context.isAsync()
? context.buildAsync(cacheLoader).synchronous()
: context.build(cacheLoader);
TestLoggerFactory.getAllTestLoggers().values().stream()
TestLoggerFactory.getAllTestLoggers().values()
.forEach(logger -> logger.setEnabledLevels(INFO_LEVELS));

cache.refresh(context.absentKey());
Expand All @@ -734,7 +734,7 @@ public void refresh_timeout_noLog(CacheContext context) {
LoadingCache<Int, Int> cache = context.isAsync()
? context.buildAsync(cacheLoader).synchronous()
: context.build(cacheLoader);
TestLoggerFactory.getAllTestLoggers().values().stream()
TestLoggerFactory.getAllTestLoggers().values()
.forEach(logger -> logger.setEnabledLevels(INFO_LEVELS));

cache.refresh(context.absentKey());
Expand All @@ -749,7 +749,7 @@ public void refresh_error_log(CacheContext context) throws Exception {
LoadingCache<Int, Int> cache = context.isAsync()
? context.buildAsync(cacheLoader).synchronous()
: context.build(cacheLoader);
TestLoggerFactory.getAllTestLoggers().values().stream()
TestLoggerFactory.getAllTestLoggers().values()
.forEach(logger -> logger.setEnabledLevels(INFO_LEVELS));

cache.refresh(context.absentKey());
Expand Down
Expand Up @@ -271,7 +271,7 @@ public void refreshIfNeeded_cancel_noLog(CacheContext context) {
? context.buildAsync(cacheLoader).synchronous()
: context.build(cacheLoader);
cache.put(context.absentKey(), context.absentValue());
TestLoggerFactory.getAllTestLoggers().values().stream()
TestLoggerFactory.getAllTestLoggers().values()
.forEach(logger -> logger.setEnabledLevels(INFO_LEVELS));
context.ticker().advance(2, TimeUnit.MINUTES);

Expand Down Expand Up @@ -299,7 +299,7 @@ public void refreshIfNeeded_timeout_noLog(CacheContext context) {
? context.buildAsync(cacheLoader).synchronous()
: context.build(cacheLoader);
cache.put(context.absentKey(), context.absentValue());
TestLoggerFactory.getAllTestLoggers().values().stream()
TestLoggerFactory.getAllTestLoggers().values()
.forEach(logger -> logger.setEnabledLevels(INFO_LEVELS));
context.ticker().advance(2, TimeUnit.MINUTES);

Expand All @@ -317,7 +317,7 @@ public void refreshIfNeeded_error_log(CacheContext context) {
? context.buildAsync(cacheLoader).synchronous()
: context.build(cacheLoader);
cache.put(context.absentKey(), context.absentValue());
TestLoggerFactory.getAllTestLoggers().values().stream()
TestLoggerFactory.getAllTestLoggers().values()
.forEach(logger -> logger.setEnabledLevels(INFO_LEVELS));
context.ticker().advance(2, TimeUnit.MINUTES);

Expand Down
Expand Up @@ -103,7 +103,7 @@ public final class CacheContext {
final Loader loader;
final Stats stats;

final boolean isAsyncLoading;
final boolean isAsyncLoader;

CacheBuilder<Object, Object> guava;
Caffeine<Object, Object> caffeine;
Expand All @@ -125,9 +125,8 @@ public CacheContext(InitialCapacity initialCapacity, Stats stats, CacheWeigher w
Maximum maximumSize, CacheExpiry expiryType, Expire afterAccess, Expire afterWrite,
Expire refresh, ReferenceType keyStrength, ReferenceType valueStrength,
CacheExecutor cacheExecutor, CacheScheduler cacheScheduler, Listener removalListenerType,
Listener evictionListenerType, Population population, boolean isLoading,
boolean isAsyncLoading, Compute compute, Loader loader, Implementation implementation,
CacheSpec cacheSpec) {
Listener evictionListenerType, Population population, boolean isAsyncLoader, Compute compute,
Loader loader, Implementation implementation, CacheSpec cacheSpec) {
this.initialCapacity = requireNonNull(initialCapacity);
this.stats = requireNonNull(stats);
this.weigher = requireNonNull(weigher);
Expand All @@ -146,8 +145,8 @@ public CacheContext(InitialCapacity initialCapacity, Stats stats, CacheWeigher w
this.evictionListenerType = evictionListenerType;
this.evictionListener = evictionListenerType.create();
this.population = requireNonNull(population);
this.loader = isLoading ? requireNonNull(loader) : null;
this.isAsyncLoading = isAsyncLoading;
this.loader = requireNonNull(loader);
this.isAsyncLoader = isAsyncLoader;
this.ticker = new SerializableFakeTicker();
this.implementation = requireNonNull(implementation);
this.original = new LinkedHashMap<>();
Expand Down Expand Up @@ -345,11 +344,11 @@ public boolean isSoftValues() {
}

public boolean isLoading() {
return (loader != null);
return (loader != Loader.DISABLED);
}

public boolean isAsyncLoading() {
return isAsyncLoading;
public boolean isAsyncLoader() {
return isAsyncLoader;
}

public Loader loader() {
Expand Down Expand Up @@ -494,7 +493,7 @@ public String toString() {
.add("valueStrength", valueStrength)
.add("compute", compute)
.add("loader", loader)
.add("isAsyncLoading", isAsyncLoading)
.add("isAsyncLoader", isAsyncLoader)
.add("cacheExecutor", cacheExecutor)
.add("cacheScheduler", cacheScheduler)
.add("removalListener", removalListenerType)
Expand Down
Expand Up @@ -15,8 +15,6 @@
*/
package com.github.benmanes.caffeine.cache.testing;

import static com.google.common.collect.ImmutableSet.toImmutableSet;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -90,7 +88,8 @@ public static void initialize(CacheContext context) {

/** Returns the Cartesian set of the possible cache configurations. */
private Set<List<Object>> combinations() {
var asyncLoading = ImmutableSet.of(true, false);
var asyncLoader = ImmutableSet.of(true, false);
var loaders = ImmutableSet.copyOf(cacheSpec.loader());
var keys = filterTypes(options.keys(), cacheSpec.keys());
var values = filterTypes(options.values(), cacheSpec.values());
var statistics = filterTypes(options.stats(), cacheSpec.stats());
Expand All @@ -101,15 +100,17 @@ private Set<List<Object>> combinations() {
values = values.contains(ReferenceType.STRONG)
? ImmutableSet.of(ReferenceType.STRONG)
: ImmutableSet.of();
computations = Sets.filter(computations, Compute.ASYNC::equals);
computations = Sets.intersection(computations, Set.of(Compute.ASYNC));
}
if (!isGuavaCompatible || isAsyncOnly || computations.equals(ImmutableSet.of(Compute.ASYNC))) {
implementations = implementations.stream()
.filter(implementation -> implementation != Implementation.Guava)
.collect(toImmutableSet());
implementations = Sets.difference(implementations, Set.of(Implementation.Guava));
}
if (computations.equals(ImmutableSet.of(Compute.SYNC))) {
asyncLoading = ImmutableSet.of(false);
asyncLoader = ImmutableSet.of(false);
}

if (isLoadingOnly) {
loaders = Sets.difference(loaders, Set.of(Loader.DISABLED)).immutableCopy();
}

if (computations.isEmpty() || implementations.isEmpty()
Expand All @@ -132,17 +133,16 @@ private Set<List<Object>> combinations() {
ImmutableSet.copyOf(cacheSpec.removalListener()),
ImmutableSet.copyOf(cacheSpec.evictionListener()),
ImmutableSet.copyOf(cacheSpec.population()),
ImmutableSet.of(true, isLoadingOnly),
ImmutableSet.copyOf(asyncLoading),
ImmutableSet.copyOf(asyncLoader),
ImmutableSet.copyOf(computations),
ImmutableSet.copyOf(cacheSpec.loader()),
ImmutableSet.copyOf(loaders),
ImmutableSet.copyOf(implementations));
}

/** Returns the set of options filtered if a specific type is specified. */
private static <T> Set<T> filterTypes(Optional<T> type, T[] options) {
return type.isPresent()
? type.filter(List.of(options)::contains).stream().collect(toImmutableSet())
? Sets.intersection(Set.of(options), Set.of(type.orElseThrow()))
: ImmutableSet.copyOf(options);
}

Expand All @@ -166,7 +166,6 @@ private CacheContext newCacheContext(List<Object> combination) {
(Listener) combination.get(index++),
(Population) combination.get(index++),
(Boolean) combination.get(index++),
(Boolean) combination.get(index++),
(Compute) combination.get(index++),
(Loader) combination.get(index++),
(Implementation) combination.get(index++),
Expand All @@ -177,7 +176,7 @@ private CacheContext newCacheContext(List<Object> combination) {
private boolean isCompatible(CacheContext context) {
boolean asyncIncompatible = context.isAsync()
&& (!context.isCaffeine() || !context.isStrongValues());
boolean asyncLoaderIncompatible = context.isAsyncLoading()
boolean asyncLoaderIncompatible = context.isAsyncLoader()
&& (!context.isAsync() || !context.isLoading());
boolean refreshIncompatible = context.refreshes() && !context.isLoading();
boolean weigherIncompatible = context.isUnbounded() && context.isWeighted();
Expand Down
Expand Up @@ -411,11 +411,18 @@ public <K, V> RemovalListener<K, V> create() {
/* --------------- CacheLoader --------------- */

Loader[] loader() default {
Loader.DISABLED,
Loader.NEGATIVE,
};

/** The {@link CacheLoader} for constructing the {@link LoadingCache}. */
enum Loader implements CacheLoader<Int, Int> {
/** A flag indicating that a loader should not be configured. */
DISABLED {
@Override public Int load(Int key) {
throw new AssertionError();
}
},
/** A loader that always returns null (no mapping). */
NULL {
@Override public Int load(Int key) {
Expand Down
Expand Up @@ -28,6 +28,7 @@
import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheWeigher;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.InitialCapacity;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Maximum;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.ReferenceType;

Expand Down Expand Up @@ -99,14 +100,14 @@ public static <K, V> Cache<K, V> newCaffeineCache(CacheContext context) {
builder.evictionListener(context.evictionListener());
}
if (context.isAsync()) {
if (context.loader() == null) {
if (context.loader() == Loader.DISABLED) {
context.asyncCache = builder.buildAsync();
} else {
context.asyncCache = builder.buildAsync(
context.isAsyncLoading() ? context.loader().async() : context.loader());
context.isAsyncLoader() ? context.loader().async() : context.loader());
}
context.cache = context.asyncCache.synchronous();
} else if (context.loader() == null) {
} else if (context.loader() == Loader.DISABLED) {
context.cache = builder.build();
} else {
context.cache = builder.build(context.loader());
Expand Down
Expand Up @@ -47,6 +47,7 @@
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Expire;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.InitialCapacity;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.Maximum;
import com.github.benmanes.caffeine.cache.testing.CacheSpec.ReferenceType;
import com.github.benmanes.caffeine.testing.Int;
Expand Down Expand Up @@ -123,7 +124,7 @@ public static <K, V> Cache<K, V> newGuavaCache(CacheContext context) {
builder.removalListener(new GuavaRemovalListener<>(
translateZeroExpire, context.removalListener()));
}
if (context.loader() == null) {
if (context.loader() == Loader.DISABLED) {
context.cache = new GuavaCache<>(builder.<Int, Int>build(), context);
} else if (context.loader().isBulk()) {
var loader = new BulkLoader<Int, Int>(context.loader());
Expand Down

0 comments on commit 0620efd

Please sign in to comment.