From 98617023b9f493ee2d220b1e700d2a64d34b78c1 Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Wed, 30 Jun 2021 21:57:44 -0700 Subject: [PATCH] Improve robustness in racy scenarios (fixes #568) 1. When an entry is updated then a concurrent reader should observe either the old or new value. This operation replaces the j.l.Reference instance stored on the entry and the old referent becomes eligible for garbage collection. A reader holding the stale Reference may therefore return a null value, which is more likely due to the cache proactively clearing the referent to assist the garbage collector. When a null value is read then an extra volatile read is used to validate that the Reference instance is still held by the entry. This retry loop has negligible cost. 2. When an entry is eligible for removal due to its value being garbage collected, then during the eviction's atomic map operation this eligibility must be verified. If concurrently the entry was resurrected and a new value set, then the cache writer has already dispatched the removal notification and established a live mapping. If the evictor does not detect that the cause is no longer valid, then it would incorrectly discard the mapping with a removal notification containing a non-null key, non-null value, and collected removal cause. Like expiration and size policies, the reference eviction policy will now validate and no-op if the entry is no longer eligible. 3. When the fixed expiration setting is dynamically adjusted, an expired entry may be resurrected as no longer eligible for removal. While the map operation detected this case, stemming from the entry itself being updated and its lifetime reset, the outer eviction loop could retry indefinitely due to a stale read of the fixed duration. This caused the loop to retry the ineligible entry, but instead it can terminate when eviction fails because it scans a queue ordered by the expiration timestamp. Co-authored-by: Justin Horvitz --- .../caffeine/cache/node/AddValue.java | 30 ++- .../caffeine/cache/BoundedLocalCache.java | 26 +- .../caffeine/cache/BoundedLocalCacheTest.java | 230 ++++++++++++++++++ .../caffeine/cache/issues/Issue568Test.java | 127 ++++++++++ checksum.xml | 3 + gradle/dependencies.gradle | 2 +- 6 files changed, 406 insertions(+), 12 deletions(-) create mode 100644 caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java diff --git a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java index ab271ed2e0..f7a7df2c73 100644 --- a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java +++ b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java @@ -24,6 +24,7 @@ import javax.lang.model.element.Modifier; import com.squareup.javapoet.ClassName; +import com.squareup.javapoet.CodeBlock; import com.squareup.javapoet.FieldSpec; import com.squareup.javapoet.MethodSpec; @@ -44,7 +45,7 @@ protected boolean applies() { protected void execute() { context.nodeSubtype .addField(newValueField()) - .addMethod(newGetter(valueStrength(), vTypeVar, "value", Visibility.PLAIN)) + .addMethod(makeGetValue()) .addMethod(newGetRef("value")) .addMethod(makeSetValue()) .addMethod(makeContainsValue()); @@ -60,6 +61,30 @@ private FieldSpec newValueField() { return fieldSpec.build(); } + /** Creates the getValue method. */ + private MethodSpec makeGetValue() { + MethodSpec.Builder getter = MethodSpec.methodBuilder("getValue") + .addModifiers(context.publicFinalModifiers()) + .returns(vTypeVar); + if (valueStrength() == Strength.STRONG) { + getter.addStatement("return ($T) $L.get(this)", vTypeVar, varHandleName("value")); + return getter.build(); + } + + CodeBlock code = CodeBlock.builder() + .beginControlFlow("for (;;)") + .addStatement("$1T ref = ($1T) $2L.get(this)", + Reference.class, varHandleName("value")) + .addStatement("V value = ref.get()") + .beginControlFlow( + "if ((value != null) || (ref == $L.getVolatile(this)))", varHandleName("value")) + .addStatement("return value") + .endControlFlow() + .endControlFlow() + .build(); + return getter.addCode(code).build(); + } + /** Creates the setValue method. */ private MethodSpec makeSetValue() { MethodSpec.Builder setter = MethodSpec.methodBuilder("setValue") @@ -70,9 +95,10 @@ private MethodSpec makeSetValue() { if (isStrongValues()) { setter.addStatement("$L.set(this, $N)", varHandleName("value"), "value"); } else { - setter.addStatement("(($T) getValueReference()).clear()", Reference.class); + setter.addStatement("$1T ref = ($1T) getValueReference()", Reference.class); setter.addStatement("$L.set(this, new $T($L, $N, referenceQueue))", varHandleName("value"), valueReferenceType(), "getKeyReference()", "value"); + setter.addStatement("ref.clear()"); } return setter.build(); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index e6a9e2b094..6cfa68d50f 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -870,10 +870,10 @@ void expireAfterAccessEntries(AccessOrderDeque> accessOrderDeque, lon long duration = expiresAfterAccessNanos(); for (;;) { Node node = accessOrderDeque.peekFirst(); - if ((node == null) || ((now - node.getAccessTime()) < duration)) { + if ((node == null) || ((now - node.getAccessTime()) < duration) + || !evictEntry(node, RemovalCause.EXPIRED, now)) { return; } - evictEntry(node, RemovalCause.EXPIRED, now); } } @@ -885,11 +885,11 @@ void expireAfterWriteEntries(long now) { } long duration = expiresAfterWriteNanos(); for (;;) { - final Node node = writeOrderDeque().peekFirst(); - if ((node == null) || ((now - node.getWriteTime()) < duration)) { + Node node = writeOrderDeque().peekFirst(); + if ((node == null) || ((now - node.getWriteTime()) < duration) + || !evictEntry(node, RemovalCause.EXPIRED, now)) { break; } - evictEntry(node, RemovalCause.EXPIRED, now); } } @@ -942,8 +942,8 @@ boolean hasExpired(Node node, long now) { } /** - * Attempts to evict the entry based on the given removal cause. A removal due to expiration or - * size may be ignored if the entry was updated and is no longer eligible for eviction. + * Attempts to evict the entry based on the given removal cause. A removal due to may be ignored + * if the entry was updated and is no longer eligible for eviction. * * @param node the entry to evict * @param cause the reason to evict @@ -958,8 +958,8 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { V[] value = (V[]) new Object[1]; boolean[] removed = new boolean[1]; boolean[] resurrect = new boolean[1]; - RemovalCause[] actualCause = new RemovalCause[1]; Object keyReference = node.getKeyReference(); + RemovalCause[] actualCause = new RemovalCause[1]; data.computeIfPresent(keyReference, (k, n) -> { if (n != node) { @@ -968,7 +968,15 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { synchronized (n) { value[0] = n.getValue(); - actualCause[0] = (key == null) || (value[0] == null) ? RemovalCause.COLLECTED : cause; + if ((key == null) || (value[0] == null)) { + actualCause[0] = RemovalCause.COLLECTED; + } else if (cause == RemovalCause.COLLECTED) { + resurrect[0] = true; + return n; + } else { + actualCause[0] = cause; + } + if (actualCause[0] == RemovalCause.EXPIRED) { boolean expired = false; if (expiresAfterAccess()) { diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java index 70a5e757d4..c26fb44b5e 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java @@ -44,6 +44,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.lang.Thread.State; +import java.lang.ref.Reference; +import java.time.Duration; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -409,6 +413,232 @@ public void evict_update_entryTooBig_protected(Cache cache, CacheConte Int.valueOf(1), Int.valueOf(20), RemovalCause.SIZE))); } + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, values = {ReferenceType.WEAK, ReferenceType.SOFT}, + removalListener = Listener.CONSUMING) + public void evict_resurrect_collected(Cache cache, CacheContext context) { + Int key = Int.valueOf(1); + Int oldValue = Int.valueOf(2); + Int newValue = Int.valueOf(3); + var localCache = asBoundedLocalCache(cache); + + cache.put(key, oldValue); + var node = localCache.data.get(localCache.referenceKey(key)); + @SuppressWarnings("unchecked") + var ref = (Reference) node.getValueReference(); + ref.enqueue(); + + var started = new AtomicBoolean(); + var done = new AtomicBoolean(); + var evictor = new AtomicReference(); + cache.asMap().compute(key, (k, v) -> { + assertThat(v, is(nullValue())); + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.cleanUp(); + done.set(true); + }); + await().untilTrue(started); + var threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + + return newValue; + }); + await().untilTrue(done); + + assertThat(node.getValue(), is(newValue)); + assertThat(context.removalNotifications(), is(equalTo(List.of( + new RemovalNotification<>(key, null, RemovalCause.COLLECTED))))); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, maximumSize = Maximum.UNREACHABLE, + weigher = CacheWeigher.COLLECTION) + public void evict_resurrect_weight(Cache> cache, CacheContext context) { + Int key = Int.valueOf(1); + cache.put(key, List.of(key)); + + var started = new AtomicBoolean(); + var done = new AtomicBoolean(); + var evictor = new AtomicReference(); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.policy().eviction().get().setMaximum(0); + done.set(true); + }); + + await().untilTrue(started); + var threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + + return List.of(); + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(List.of())); + verifyRemovalListener(context, verifier -> verifier.hasCount(0, RemovalCause.SIZE)); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, mustExpireWithAnyOf = {AFTER_ACCESS, AFTER_WRITE}, + expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE}, + expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE}) + public void evict_resurrect_expireAfter(Cache cache, CacheContext context) { + Int key = Int.valueOf(1); + cache.put(key, key); + + var started = new AtomicBoolean(); + var done = new AtomicBoolean(); + var evictor = new AtomicReference(); + context.ticker().advance(Duration.ofHours(1)); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.cleanUp(); + done.set(true); + }); + + await().untilTrue(started); + var threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + return key.negate(); + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(key.negate())); + verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPIRED)); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expireAfterAccess = Expire.FOREVER) + public void evict_resurrect_expireAfterAccess(Cache cache, CacheContext context) { + Int key = Int.valueOf(1); + cache.put(key, key); + + var started = new AtomicBoolean(); + var done = new AtomicBoolean(); + var evictor = new AtomicReference(); + context.ticker().advance(Duration.ofMinutes(1)); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ZERO); + done.set(true); + }); + + await().untilTrue(started); + var threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ofHours(1)); + return v; + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(key)); + verifyRemovalListener(context, verifier -> verifier.noInteractions()); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expireAfterWrite = Expire.FOREVER) + public void evict_resurrect_expireAfterWrite(Cache cache, CacheContext context) { + Int key = Int.valueOf(1); + cache.put(key, key); + + var started = new AtomicBoolean(); + var done = new AtomicBoolean(); + var evictor = new AtomicReference(); + context.ticker().advance(Duration.ofMinutes(1)); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ZERO); + done.set(true); + }); + + await().untilTrue(started); + var threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ofHours(1)); + return v; + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(key)); + verifyRemovalListener(context, verifier -> verifier.noInteractions()); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expireAfterWrite = Expire.ONE_MINUTE) + public void evict_resurrect_expireAfterWrite_entry(Cache cache, CacheContext context) { + Int key = Int.valueOf(1); + cache.put(key, key); + + var started = new AtomicBoolean(); + var done = new AtomicBoolean(); + var evictor = new AtomicReference(); + context.ticker().advance(Duration.ofHours(1)); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.cleanUp(); + done.set(true); + }); + + await().untilTrue(started); + var threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + return key.negate(); + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(key.negate())); + verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPIRED)); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expiry = CacheExpiry.CREATE, expiryTime = Expire.ONE_MINUTE) + public void evict_resurrect_expireAfterVar(Cache cache, CacheContext context) { + var localCache = asBoundedLocalCache(cache); + Int key = Int.valueOf(1); + cache.put(key, key); + var node = localCache.data.get(localCache.referenceKey(key)); + + var started = new AtomicBoolean(); + var done = new AtomicBoolean(); + var evictor = new AtomicReference(); + synchronized (node) { + context.ticker().advance(Duration.ofHours(1)); + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.cleanUp(); + done.set(true); + }); + + await().untilTrue(started); + var threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + cache.policy().expireVariably().get().setExpiresAfter(key, Duration.ofDays(1)); + } + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(key)); + verifyRemovalListener(context, verifier -> verifier.noInteractions()); + } @Test(dataProvider = "caches") @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, population = Population.FULL, maximumSize = Maximum.FULL) diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java new file mode 100644 index 0000000000..a47810d23b --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java @@ -0,0 +1,127 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache.issues; + +import java.lang.ref.Reference; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; + +/** + * Issue #568: Incorrect handling of weak/soft reference caching. + * + * @author jhorvitz@google.com (Justin Horvitz) + */ +@Test(groups = "isolated") +public class Issue568Test { + + /** + * When an entry is updated then a concurrent reader should observe either the old or new value. + * This operation replaces the {@link Reference} instance stored on the entry and the old referent + * becomes eligible for garbage collection. A reader holding the stale Reference may therefore + * return a null value, which is more likely due to the cache proactively clearing the referent to + * assist the garbage collector. + */ + @Test + public void intermittentNull() throws InterruptedException { + Cache cache = Caffeine.newBuilder().weakValues().build(); + + String key = "key"; + String val = "val"; + cache.put("key", "val"); + + var error = new AtomicReference(); + var threads = new ArrayList(); + for (int i = 0; i < 10; i++) { + int name = i; + var thread = new Thread(() -> { + for (int j = 0; j < 1000; j++) { + if (Math.random() < .5) { + cache.put(key, val); + } else if (cache.getIfPresent(key) == null) { + error.compareAndSet(null, new IllegalStateException( + "Thread " + name + " observed null on iteration " + j)); + break; + } + } + }); + threads.add(thread); + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + if (error.get() != null) { + throw error.get(); + } + } + + /** + * When an entry is eligible for removal due to its value being garbage collected, then during the + * eviction's atomic map operation this eligibility must be verified. If concurrently the entry + * was resurrected and a new value set, then the cache writer has already dispatched the removal + * notification and established a live mapping. If the evictor does not detect that the cause is + * no longer valid, then it would incorrectly discard the mapping with a removal notification + * containing a non-null key, non-null value, and collected removal cause. + */ + @Test + public void resurrect() throws InterruptedException { + var error = new AtomicReference(); + Cache cache = Caffeine.newBuilder() + .weakValues() + .removalListener((k, v, cause) -> { + if (cause == RemovalCause.COLLECTED && (v != null)) { + error.compareAndSet(null, new IllegalStateException("Evicted a live value: " + v)); + } + }).build(); + + String key = "key"; + cache.put(key, new Object()); + + var missing = new AtomicBoolean(); + var threads = new ArrayList(); + for (int i = 0; i < 100; i++) { + var thread = new Thread(() -> { + for (int j = 0; j < 1000; j++) { + if (error.get() != null) { + break; + } + if (Math.random() < .01) { + System.gc(); + cache.cleanUp(); + } else if ((cache.getIfPresent(key) == null) && !missing.getAndSet(true)) { + cache.put(key, new Object()); + missing.set(false); + } + } + }); + threads.add(thread); + thread.start(); + } + + for (var thread : threads) { + thread.join(); + } + if (error.get() != null) { + throw error.get(); + } + } +} diff --git a/checksum.xml b/checksum.xml index cbbe8b47a4..949e1e0563 100644 --- a/checksum.xml +++ b/checksum.xml @@ -483,6 +483,9 @@ 7F18348A8EDC88BB4B0D6EB5412BC4F6D79D1E5843F28CA72F717BE50BF0BF6A19AE1C3416E5EAB40CC1FD27C697CB0F9B5E10A51CFA574093EF0A4F57CB93CF + + 5EA9CA94A3682E090E28895ECAAE1E020C48DD249EE5040FB6EBE4B01DA027B86F94450E30692253DFA787371D4B4286FE257CEBF02E184AC149A746952D669C + 779F0D784A11834392C65DA375CF5F9612FD89B0540C665BCE1009EFAA7C35642E38D381AF7362703378306C95D24669B02CA27A2CCAFB574173BF9FA273F625 diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index c194f51d58..f9051f8b8c 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -85,7 +85,7 @@ ext { bnd: '5.3.0', checkstyle: '8.44', coveralls: '2.12.0', - errorprone: '2.0.1', + errorprone: '2.0.2', findsecbugs: '1.11.0', jacoco: '0.8.6', jmh: '0.6.5',