Skip to content

Commit

Permalink
Improve robustness in racy scenarios (fixes #568)
Browse files Browse the repository at this point in the history
1. When an entry is updated then a concurrent reader should either
observe the old or new value. This operation replaces the j.l.Reference
instance stored on the entry and the old referent becomes eligible for
garbage collection. A reader holding the stale Reference may therefore
return a null value, which is more likely due to the cache proactively
clearing the referent to assist the garbage collector.

When a null value is read then the an extra volatile read is used to
validate that the Reference instance is still held by the entry. This
retry loop has negligible cost.

2. When an entry is eligible for removal due to its value being garbage
collected, during the eviction's atomic map operation this eligibility
must be verified. If concurrently the entry was resurrected and a new
value set, then the cache writer has already dispatched the removal
notification and established a live mapping. If the evictor does not
detect that the cause is no longer valid, then it would incorrectly
discard the mapping with a removal notification containing a non-null
key, non-null value, and collected removal cause.

Like expiration and size policies, the reference eviction policy will
now validate and no-op if the entry is no longer eligible.

3. When the fixed expiration setting is dynamically adjusted, an
expired entry may be resurrected as no longer eligible for removoal.
While the map operation detected this case, stemming from the entry
itself being updated and its lifetime reset, the outer eviction loop
could retry indefinitely. A stale read of the fixed duration caused
the loop to retry the ineligible entry, but instead it can terminatee
as it scans a queue ordered by the expiration timestamp.

Co-authored-by: jhorvitz@google.com
  • Loading branch information
ben-manes committed Jul 1, 2021
1 parent f279485 commit 542c308
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 12 deletions.
Expand Up @@ -24,6 +24,7 @@
import javax.lang.model.element.Modifier;

import com.squareup.javapoet.ClassName;
import com.squareup.javapoet.CodeBlock;
import com.squareup.javapoet.FieldSpec;
import com.squareup.javapoet.MethodSpec;

Expand All @@ -44,7 +45,7 @@ protected boolean applies() {
protected void execute() {
context.nodeSubtype
.addField(newValueField())
.addMethod(newGetter(valueStrength(), vTypeVar, "value", Visibility.PLAIN))
.addMethod(makeGetValue())
.addMethod(newGetRef("value"))
.addMethod(makeSetValue())
.addMethod(makeContainsValue());
Expand All @@ -60,6 +61,30 @@ private FieldSpec newValueField() {
return fieldSpec.build();
}

/** Creates the getValue method. */
private MethodSpec makeGetValue() {
MethodSpec.Builder getter = MethodSpec.methodBuilder("getValue")
.addModifiers(context.publicFinalModifiers())
.returns(vTypeVar);
if (valueStrength() == Strength.STRONG) {
getter.addStatement("return ($T) $L.get(this)", vTypeVar, varHandleName("value"));
return getter.build();
}

CodeBlock code = CodeBlock.builder()
.beginControlFlow("for (;;)")
.addStatement("$1T<V> ref = ($1T<V>) $2L.get(this)",
Reference.class, varHandleName("value"))
.addStatement("V value = ref.get()")
.beginControlFlow(
"if ((value != null) || (ref == $L.getVolatile(this)))", varHandleName("value"))
.addStatement("return value")
.endControlFlow()
.endControlFlow()
.build();
return getter.addCode(code).build();
}

/** Creates the setValue method. */
private MethodSpec makeSetValue() {
MethodSpec.Builder setter = MethodSpec.methodBuilder("setValue")
Expand All @@ -70,9 +95,10 @@ private MethodSpec makeSetValue() {
if (isStrongValues()) {
setter.addStatement("$L.set(this, $N)", varHandleName("value"), "value");
} else {
setter.addStatement("(($T<V>) getValueReference()).clear()", Reference.class);
setter.addStatement("$1T<V> ref = ($1T<V>) getValueReference()", Reference.class);
setter.addStatement("$L.set(this, new $T($L, $N, referenceQueue))",
varHandleName("value"), valueReferenceType(), "getKeyReference()", "value");
setter.addStatement("ref.clear()");
}

return setter.build();
Expand Down
Expand Up @@ -870,10 +870,10 @@ void expireAfterAccessEntries(AccessOrderDeque<Node<K, V>> accessOrderDeque, lon
long duration = expiresAfterAccessNanos();
for (;;) {
Node<K, V> node = accessOrderDeque.peekFirst();
if ((node == null) || ((now - node.getAccessTime()) < duration)) {
if ((node == null) || ((now - node.getAccessTime()) < duration)
|| !evictEntry(node, RemovalCause.EXPIRED, now)) {
return;
}
evictEntry(node, RemovalCause.EXPIRED, now);
}
}

Expand All @@ -885,11 +885,11 @@ void expireAfterWriteEntries(long now) {
}
long duration = expiresAfterWriteNanos();
for (;;) {
final Node<K, V> node = writeOrderDeque().peekFirst();
if ((node == null) || ((now - node.getWriteTime()) < duration)) {
Node<K, V> node = writeOrderDeque().peekFirst();
if ((node == null) || ((now - node.getWriteTime()) < duration)
|| !evictEntry(node, RemovalCause.EXPIRED, now)) {
break;
}
evictEntry(node, RemovalCause.EXPIRED, now);
}
}

Expand Down Expand Up @@ -942,8 +942,8 @@ boolean hasExpired(Node<K, V> node, long now) {
}

/**
* Attempts to evict the entry based on the given removal cause. A removal due to expiration or
* size may be ignored if the entry was updated and is no longer eligible for eviction.
* Attempts to evict the entry based on the given removal cause. A removal due to may be ignored
* if the entry was updated and is no longer eligible for eviction.
*
* @param node the entry to evict
* @param cause the reason to evict
Expand All @@ -958,8 +958,8 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
V[] value = (V[]) new Object[1];
boolean[] removed = new boolean[1];
boolean[] resurrect = new boolean[1];
RemovalCause[] actualCause = new RemovalCause[1];
Object keyReference = node.getKeyReference();
RemovalCause[] actualCause = new RemovalCause[1];

data.computeIfPresent(keyReference, (k, n) -> {
if (n != node) {
Expand All @@ -968,7 +968,15 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
synchronized (n) {
value[0] = n.getValue();

actualCause[0] = (key == null) || (value[0] == null) ? RemovalCause.COLLECTED : cause;
if ((key == null) || (value[0] == null)) {
actualCause[0] = RemovalCause.COLLECTED;
} else if (cause == RemovalCause.COLLECTED) {
resurrect[0] = true;
return n;
} else {
actualCause[0] = cause;
}

if (actualCause[0] == RemovalCause.EXPIRED) {
boolean expired = false;
if (expiresAfterAccess()) {
Expand Down
Expand Up @@ -44,6 +44,10 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.lang.Thread.State;
import java.lang.ref.Reference;
import java.time.Duration;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -409,6 +413,141 @@ public void evict_update_entryTooBig_protected(Cache<Int, Int> cache, CacheConte
Int.valueOf(1), Int.valueOf(20), RemovalCause.SIZE)));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, values = {ReferenceType.WEAK, ReferenceType.SOFT},
removalListener = Listener.CONSUMING)
public void evict_resurrect_collected(Cache<Int, Int> cache, CacheContext context) {
Int key = Int.valueOf(1);
Int oldValue = Int.valueOf(2);
Int newValue = Int.valueOf(2);
var localCache = asBoundedLocalCache(cache);

cache.put(key, oldValue);
var node = localCache.data.get(localCache.referenceKey(key));
@SuppressWarnings("unchecked")
var ref = (Reference<Int>) node.getValueReference();
ref.enqueue();

var evictor = new Thread(localCache::cleanUp);
evictor.setDaemon(true);

localCache.compute(key, (k, v) -> {
assertThat(v, is(nullValue()));

evictor.start();
var threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.getState()));

return newValue;
});
await().until(() -> evictor.getState() == State.TERMINATED);

assertThat(node.getValue(), is(newValue));
assertThat(context.removalNotifications(), is(equalTo(List.of(
new RemovalNotification<>(key, null, RemovalCause.COLLECTED)))));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, maximumSize = Maximum.UNREACHABLE,
weigher = CacheWeigher.COLLECTION)
public void evict_resurrect_weight(Cache<Int, List<Int>> cache, CacheContext context) {
Int key = Int.valueOf(1);
cache.put(key, List.of(key));

var evictor = new Thread(() -> cache.policy().eviction().get().setMaximum(0));
evictor.setDaemon(true);

cache.asMap().compute(key, (k, v) -> {
evictor.start();
var threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.getState()));

return List.of();
});
await().until(() -> evictor.getState() == State.TERMINATED);

assertThat(cache.getIfPresent(key), is(List.of()));
verifyRemovalListener(context, verifier -> verifier.hasCount(0, RemovalCause.SIZE));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, expireAfterAccess = Expire.FOREVER)
public void evict_resurrect_expireAfterAccess(Cache<Int, Int> cache, CacheContext context) {
Int key = Int.valueOf(1);
cache.put(key, key);

var evictor = new Thread(() ->
cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ZERO));
evictor.setDaemon(true);

context.ticker().advance(Duration.ofMinutes(1));
cache.asMap().compute(key, (k, v) -> {
evictor.start();
var threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.getState()));
cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ofHours(1));
return v;
});
await().until(() -> evictor.getState() == State.TERMINATED);

assertThat(cache.getIfPresent(key), is(key));
verifyRemovalListener(context, verifier -> verifier.noInteractions());
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, expireAfterWrite = Expire.FOREVER)
public void evict_resurrect_expireAfterWrite(Cache<Int, Int> cache, CacheContext context) {
Int key = Int.valueOf(1);
cache.put(key, key);

var evictor = new Thread(() ->
cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ZERO));
evictor.setDaemon(true);

context.ticker().advance(Duration.ofMinutes(1));
cache.asMap().compute(key, (k, v) -> {
evictor.start();
var threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.getState()));
cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ofHours(1));
return v;
});
await().until(() -> evictor.getState() == State.TERMINATED);

assertThat(cache.getIfPresent(key), is(key));
verifyRemovalListener(context, verifier -> verifier.noInteractions());
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, expiry = CacheExpiry.CREATE, expiryTime = Expire.ONE_MINUTE)
public void evict_resurrect_expireAfterVar(Cache<Int, Int> cache, CacheContext context) {
var localCache = asBoundedLocalCache(cache);
Int key = Int.valueOf(1);
cache.put(key, key);
var node = localCache.data.get(localCache.referenceKey(key));

var evictor = new Thread(cache::cleanUp);
evictor.setDaemon(true);

synchronized (node) {
context.ticker().advance(Duration.ofHours(1));

evictor.start();
var threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.getState()));
cache.policy().expireVariably().get().setExpiresAfter(key, Duration.ofDays(1));
}
await().until(() -> evictor.getState() == State.TERMINATED);

assertThat(cache.getIfPresent(key), is(key));
verifyRemovalListener(context, verifier -> verifier.noInteractions());
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.FULL, maximumSize = Maximum.FULL)
Expand Down

0 comments on commit 542c308

Please sign in to comment.