diff --git a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java index 9298667655..54774d7a56 100644 --- a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java +++ b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/node/AddValue.java @@ -26,6 +26,7 @@ import javax.lang.model.element.Modifier; +import com.squareup.javapoet.CodeBlock; import com.squareup.javapoet.FieldSpec; import com.squareup.javapoet.MethodSpec; @@ -47,7 +48,7 @@ protected void execute() { context.nodeSubtype .addField(newFieldOffset(context.className, "value")) .addField(newValueField()) - .addMethod(newGetter(valueStrength(), vTypeVar, "value", Visibility.LAZY)) + .addMethod(makeGetValue()) .addMethod(newGetRef("value")) .addMethod(makeSetValue()) .addMethod(makeContainsValue()); @@ -60,6 +61,29 @@ private FieldSpec newValueField() { return fieldSpec.build(); } + /** Creates the getValue method. */ + private MethodSpec makeGetValue() { + MethodSpec.Builder getter = MethodSpec.methodBuilder("getValue") + .addModifiers(context.publicFinalModifiers()) + .returns(vTypeVar); + if (valueStrength() == Strength.STRONG) { + getter.addStatement("return value"); + return getter.build(); + } + + CodeBlock code = CodeBlock.builder() + .beginControlFlow("for (;;)") + .addStatement("$1T ref = ($1T) $2T.UNSAFE.getObject(this, $3N)", + Reference.class, UNSAFE_ACCESS, offsetName("value")) + .addStatement("V referent = ref.get()") + .beginControlFlow("if ((referent != null) || (ref == value))") + .addStatement("return referent") + .endControlFlow() + .endControlFlow() + .build(); + return getter.addCode(code).build(); + } + /** Creates the setValue method. */ private MethodSpec makeSetValue() { MethodSpec.Builder setter = MethodSpec.methodBuilder("setValue") @@ -71,9 +95,10 @@ private MethodSpec makeSetValue() { setter.addStatement("$T.UNSAFE.putObject(this, $N, $N)", UNSAFE_ACCESS, offsetName("value"), "value"); } else { - setter.addStatement("(($T) getValueReference()).clear()", Reference.class); + setter.addStatement("$1T ref = ($1T) getValueReference()", Reference.class); setter.addStatement("$T.UNSAFE.putObject(this, $N, new $T($L, $N, referenceQueue))", UNSAFE_ACCESS, offsetName("value"), valueReferenceType(), "getKeyReference()", "value"); + setter.addStatement("ref.clear()"); } return setter.build(); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index c0e50da3b4..e5c2d65237 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -822,10 +822,10 @@ void expireAfterAccessEntries(AccessOrderDeque> accessOrderDeque, lon long duration = expiresAfterAccessNanos(); for (;;) { Node node = accessOrderDeque.peekFirst(); - if ((node == null) || ((now - node.getAccessTime()) < duration)) { + if ((node == null) || ((now - node.getAccessTime()) < duration) + || !evictEntry(node, RemovalCause.EXPIRED, now)) { return; } - evictEntry(node, RemovalCause.EXPIRED, now); } } @@ -837,11 +837,11 @@ void expireAfterWriteEntries(long now) { } long duration = expiresAfterWriteNanos(); for (;;) { - final Node node = writeOrderDeque().peekFirst(); - if ((node == null) || ((now - node.getWriteTime()) < duration)) { + Node node = writeOrderDeque().peekFirst(); + if ((node == null) || ((now - node.getWriteTime()) < duration) + || !evictEntry(node, RemovalCause.EXPIRED, now)) { break; } - evictEntry(node, RemovalCause.EXPIRED, now); } } @@ -894,8 +894,8 @@ boolean hasExpired(Node node, long now) { } /** - * Attempts to evict the entry based on the given removal cause. A removal due to expiration or - * size may be ignored if the entry was updated and is no longer eligible for eviction. + * Attempts to evict the entry based on the given removal cause. A removal due to may be ignored + * if the entry was updated and is no longer eligible for eviction. * * @param node the entry to evict * @param cause the reason to evict @@ -919,7 +919,15 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { synchronized (n) { value[0] = n.getValue(); - actualCause[0] = (key == null) || (value[0] == null) ? RemovalCause.COLLECTED : cause; + if ((key == null) || (value[0] == null)) { + actualCause[0] = RemovalCause.COLLECTED; + } else if (cause == RemovalCause.COLLECTED) { + resurrect[0] = true; + return n; + } else { + actualCause[0] = cause; + } + if (actualCause[0] == RemovalCause.EXPIRED) { boolean expired = false; if (expiresAfterAccess()) { diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java index 9d3926ec32..c0b175db0d 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java @@ -45,6 +45,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.lang.Thread.State; +import java.lang.ref.Reference; +import java.time.Duration; +import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -52,6 +56,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import org.mockito.Mockito; @@ -454,6 +459,239 @@ public void clear_update() { await().untilAtomic(removedValues, is(oldValue + newValue)); } + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, values = {ReferenceType.WEAK, ReferenceType.SOFT}, + removalListener = Listener.CONSUMING) + public void evict_resurrect_collected(Cache cache, CacheContext context) { + Integer key = Integer.valueOf(1); + Integer oldValue = Integer.valueOf(2); + Integer newValue = Integer.valueOf(3); + BoundedLocalCache localCache = asBoundedLocalCache(cache); + + cache.put(key, oldValue); + Node node = localCache.data.get( + localCache.nodeFactory.newReferenceKey(key, localCache.keyReferenceQueue())); + @SuppressWarnings("unchecked") + Reference ref = (Reference) node.getValueReference(); + ref.enqueue(); + ref.clear(); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + cache.asMap().compute(key, (k, v) -> { + assertThat(v, is(nullValue())); + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.cleanUp(); + done.set(true); + }); + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + + return newValue; + }); + await().untilTrue(done); + + assertThat(node.getValue(), is(newValue)); + assertThat(context.removalNotifications(), is(equalTo(ImmutableList.of( + new RemovalNotification<>(key, null, RemovalCause.COLLECTED))))); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, maximumSize = Maximum.UNREACHABLE, + weigher = CacheWeigher.COLLECTION) + public void evict_resurrect_weight(Cache> cache, CacheContext context) { + Integer key = Integer.valueOf(1); + cache.put(key, ImmutableList.of(key)); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.policy().eviction().get().setMaximum(0); + done.set(true); + }); + + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + + return ImmutableList.of(); + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(ImmutableList.of())); + verifyRemovalListener(context, verifier -> verifier.hasCount(0, RemovalCause.SIZE)); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, mustExpireWithAnyOf = {AFTER_ACCESS, AFTER_WRITE}, + expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE}, + expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE}) + public void evict_resurrect_expireAfter(Cache cache, CacheContext context) { + Integer key = Integer.valueOf(1); + cache.put(key, key); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + context.ticker().advance(Duration.ofHours(1)); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.cleanUp(); + done.set(true); + }); + + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + return -key; + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(-key)); + verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPIRED)); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expireAfterAccess = Expire.FOREVER) + public void evict_resurrect_expireAfterAccess( + Cache cache, CacheContext context) { + Integer key = Integer.valueOf(1); + cache.put(key, key); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + context.ticker().advance(Duration.ofMinutes(1)); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ZERO); + done.set(true); + }); + + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ofHours(1)); + return v; + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(key)); + verifyRemovalListener(context, verifier -> verifier.noInteractions()); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expireAfterWrite = Expire.FOREVER) + public void evict_resurrect_expireAfterWrite( + Cache cache, CacheContext context) { + Integer key = Integer.valueOf(1); + cache.put(key, key); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + context.ticker().advance(Duration.ofMinutes(1)); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ZERO); + done.set(true); + }); + + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ofHours(1)); + return v; + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(key)); + verifyRemovalListener(context, verifier -> verifier.noInteractions()); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expireAfterWrite = Expire.ONE_MINUTE) + public void evict_resurrect_expireAfterWrite_entry( + Cache cache, CacheContext context) { + Integer key = Integer.valueOf(1); + cache.put(key, key); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + context.ticker().advance(Duration.ofHours(1)); + cache.asMap().compute(key, (k, v) -> { + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.cleanUp(); + done.set(true); + }); + + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + return -key; + }); + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(-key)); + verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPIRED)); + } + + @Test(dataProvider = "caches") + @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, + population = Population.EMPTY, expiry = CacheExpiry.CREATE, expiryTime = Expire.ONE_MINUTE) + public void evict_resurrect_expireAfterVar(Cache cache, CacheContext context) { + BoundedLocalCache localCache = asBoundedLocalCache(cache); + Integer key = Integer.valueOf(1); + cache.put(key, key); + Node node = localCache.data.get( + localCache.nodeFactory.newReferenceKey(key, localCache.keyReferenceQueue())); + + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + AtomicReference evictor = new AtomicReference<>(); + synchronized (node) { + context.ticker().advance(Duration.ofHours(1)); + ConcurrentTestHarness.execute(() -> { + evictor.set(Thread.currentThread()); + started.set(true); + cache.cleanUp(); + done.set(true); + }); + + await().untilTrue(started); + EnumSet threadState = EnumSet.of(State.BLOCKED, State.WAITING); + await().until(() -> threadState.contains(evictor.get().getState())); + cache.policy().expireVariably().get().setExpiresAfter(key, Duration.ofDays(1)); + } + await().untilTrue(done); + + assertThat(cache.getIfPresent(key), is(key)); + verifyRemovalListener(context, verifier -> verifier.noInteractions()); + } + @Test(dataProvider = "caches") @CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine, population = Population.FULL, maximumSize = Maximum.FULL) diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java new file mode 100644 index 0000000000..3175d389be --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue568Test.java @@ -0,0 +1,128 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache.issues; + +import java.lang.ref.Reference; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; + +/** + * Issue #568: Incorrect handling of weak/soft reference caching. + * + * @author jhorvitz@google.com (Justin Horvitz) + */ +@Test(groups = "isolated") +public class Issue568Test { + + /** + * When an entry is updated then a concurrent reader should observe either the old or new value. + * This operation replaces the {@link Reference} instance stored on the entry and the old referent + * becomes eligible for garbage collection. A reader holding the stale Reference may therefore + * return a null value, which is more likely due to the cache proactively clearing the referent to + * assist the garbage collector. + */ + @Test + public void intermittentNull() throws InterruptedException { + Cache cache = Caffeine.newBuilder().weakValues().build(); + + String key = "key"; + String val = "val"; + cache.put("key", "val"); + + AtomicReference error = new AtomicReference<>(); + List threads = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + int name = i; + Thread thread = new Thread(() -> { + for (int j = 0; j < 1000; j++) { + if (Math.random() < .5) { + cache.put(key, val); + } else if (cache.getIfPresent(key) == null) { + error.compareAndSet(null, new IllegalStateException( + "Thread " + name + " observed null on iteration " + j)); + break; + } + } + }); + threads.add(thread); + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + if (error.get() != null) { + throw error.get(); + } + } + + /** + * When an entry is eligible for removal due to its value being garbage collected, then during the + * eviction's atomic map operation this eligibility must be verified. If concurrently the entry + * was resurrected and a new value set, then the cache writer has already dispatched the removal + * notification and established a live mapping. If the evictor does not detect that the cause is + * no longer valid, then it would incorrectly discard the mapping with a removal notification + * containing a non-null key, non-null value, and collected removal cause. + */ + @Test + public void resurrect() throws InterruptedException { + AtomicReference error = new AtomicReference<>(); + Cache cache = Caffeine.newBuilder() + .weakValues() + .removalListener((k, v, cause) -> { + if (cause == RemovalCause.COLLECTED && (v != null)) { + error.compareAndSet(null, new IllegalStateException("Evicted a live value: " + v)); + } + }).build(); + + String key = "key"; + cache.put(key, new Object()); + + AtomicBoolean missing = new AtomicBoolean(); + List threads = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + Thread thread = new Thread(() -> { + for (int j = 0; j < 1000; j++) { + if (error.get() != null) { + break; + } + if (Math.random() < .01) { + System.gc(); + cache.cleanUp(); + } else if ((cache.getIfPresent(key) == null) && !missing.getAndSet(true)) { + cache.put(key, new Object()); + missing.set(false); + } + } + }); + threads.add(thread); + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + if (error.get() != null) { + throw error.get(); + } + } +} diff --git a/checksum.xml b/checksum.xml index ce1b3fa771..fdb1f25ff5 100644 --- a/checksum.xml +++ b/checksum.xml @@ -8,6 +8,7 @@ + @@ -361,6 +362,9 @@ 3C76A1066218F9C54B83D167E34B563C09BA2F72DB330F5843B2324DD2E40EE1F7E7887416B9C7A8DC03C07223E42216B76569F5D3962B911F922A6963C0C917 + + 9AEC8CAD28C018AEA65F3C2C57FD0D4B82C00A8F6C778F6DBAA61C961AF3A7C369655F5A5E93B360FAE297ADFA3205BC40EAF16847B6EDB56F01DF9AD655A18C + 1A47AAF2442159C1CBD22521F31C74B4C71C4168AF5B22D04B4691FDD286E90F02B2792DEDAD3EEEC12B5034ADA1A9EE751C975B5A169AE0B33EE800A8D96E7F @@ -406,6 +410,15 @@ 7245456641F2BF87960CF859D31827EBF937739932C59E135635579199122A5763F437A19DED6B9CCAAF9E84FD698A675F60427138EF35C10BF40BBC3E7B68DA + + A4C943D4945189F4EB883D764E63CF5C5F8F2A17EA1BBFE6C3F05A4B28E1E011D0358246618A39EC705E7CEE60D30ABFDD82A36A621BC7A3F5455E47D0EDAF2B + + + 7F18348A8EDC88BB4B0D6EB5412BC4F6D79D1E5843F28CA72F717BE50BF0BF6A19AE1C3416E5EAB40CC1FD27C697CB0F9B5E10A51CFA574093EF0A4F57CB93CF + + + 5EA9CA94A3682E090E28895ECAAE1E020C48DD249EE5040FB6EBE4B01DA027B86F94450E30692253DFA787371D4B4286FE257CEBF02E184AC149A746952D669C + 779F0D784A11834392C65DA375CF5F9612FD89B0540C665BCE1009EFAA7C35642E38D381AF7362703378306C95D24669B02CA27A2CCAFB574173BF9FA273F625