From 76a9660e0eb034d658779ba53163e8d6d604602c Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Sun, 3 Jan 2021 13:40:59 -0800 Subject: [PATCH] Return refresh future and ignore redundant refreshes A mapping of in-flight refreshes is now maintained and lazily initialized if not used. This allows the cache to ignore redundant requests for reloads, like Guava does. It also removes disablement of expiration during refresh and resolves an ABA problem if the entry is modified in a previously undectectable way. The refresh future can now be obtained from LoadingCache to chain operations against. fixes #143 fixes #193 fixes #236 fixes #282 fixes #322 fixed #373 fixes #467 --- .../caffeine/cache/BoundedLocalCache.java | 131 ++++++++++++------ .../benmanes/caffeine/cache/LoadingCache.java | 7 +- .../cache/LocalAsyncLoadingCache.java | 109 ++++++++++++--- .../benmanes/caffeine/cache/LocalCache.java | 7 + .../caffeine/cache/LocalLoadingCache.java | 99 ++++++++----- .../caffeine/cache/UnboundedLocalCache.java | 65 ++++++++- .../caffeine/cache/ExpirationTest.java | 6 +- .../caffeine/cache/LoadingCacheTest.java | 82 ++++++++--- .../caffeine/cache/MultiThreadedTest.java | 4 +- .../caffeine/cache/ReferenceTest.java | 4 +- .../caffeine/cache/RefreshAfterWriteTest.java | 43 +++++- .../benmanes/caffeine/cache/Stresser.java | 1 + .../caffeine/cache/issues/Issue193Test.java | 104 ++++++++++++++ .../caffeine/cache/testing/CacheSpec.java | 14 ++ .../cache/testing/GuavaCacheFromContext.java | 29 +++- .../guava/CaffeinatedGuavaLoadingCache.java | 1 + 16 files changed, 573 insertions(+), 133 deletions(-) create mode 100644 caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index 36050f45ef..0996a11dd4 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -60,6 +60,7 @@ import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -221,10 +222,10 @@ abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef final Executor executor; final boolean isAsync; - // The collection views - @Nullable transient Set keySet; - @Nullable transient Collection values; - @Nullable transient Set> entrySet; + @Nullable Set keySet; + @Nullable Collection values; + @Nullable Set> entrySet; + AtomicReference>> refreshes; /** Creates an instance based on the builder's configuration. */ protected BoundedLocalCache(Caffeine builder, @@ -233,6 +234,7 @@ protected BoundedLocalCache(Caffeine builder, this.cacheLoader = cacheLoader; executor = builder.getExecutor(); evictionLock = new ReentrantLock(); + refreshes = new AtomicReference<>(); weigher = builder.getWeigher(isAsync); writer = builder.getCacheWriter(isAsync); drainBuffersTask = new PerformCleanupTask(this); @@ -289,11 +291,29 @@ public final Executor executor() { return executor; } + @Override + @SuppressWarnings("NullAway") + public ConcurrentMap> refreshes() { + var pending = refreshes.get(); + if (pending == null) { + pending = new ConcurrentHashMap<>(); + if (!refreshes.compareAndSet(null, pending)) { + pending = refreshes.get(); + } + } + return pending; + } + /** Returns whether this cache notifies a writer when an entry is modified. */ protected boolean hasWriter() { return (writer != CacheWriter.disabledWriter()); } + @Override + public Object referenceKey(K key) { + return nodeFactory.newLookupKey(key); + } + /* --------------- Stats Support --------------- */ @Override @@ -900,8 +920,9 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { boolean[] removed = new boolean[1]; boolean[] resurrect = new boolean[1]; RemovalCause[] actualCause = new RemovalCause[1]; + Object keyReference = node.getKeyReference(); - data.computeIfPresent(node.getKeyReference(), (k, n) -> { + data.computeIfPresent(keyReference, (k, n) -> { if (n != node) { return n; } @@ -964,6 +985,12 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { if (removed[0]) { statsCounter().recordEviction(node.getWeight(), actualCause[0]); + + var pending = refreshes.get(); + if (pending != null) { + pending.remove(keyReference); + } + if (hasRemovalListener()) { // Notify the listener only if the entry was evicted. This must be performed as the last // step during eviction to safe guard against the executor rejecting the notification task. @@ -1171,51 +1198,60 @@ void refreshIfNeeded(Node node, long now) { if (!refreshAfterWrite()) { return; } + K key; V oldValue; - long oldWriteTime = node.getWriteTime(); - long refreshWriteTime = (now + ASYNC_EXPIRY); - if (((now - oldWriteTime) > refreshAfterWriteNanos()) + long writeTime = node.getWriteTime(); + Object keyReference = node.getKeyReference(); + if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null) && ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null) - && node.casWriteTime(oldWriteTime, refreshWriteTime)) { - try { - CompletableFuture refreshFuture; - long startTime = statsTicker().read(); + && !refreshes().containsKey(keyReference)) { + long[] startTime = new long[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] refreshFuture = new CompletableFuture[1]; + refreshes().computeIfAbsent(keyReference, k -> { + startTime[0] = statsTicker().read(); if (isAsync) { @SuppressWarnings("unchecked") CompletableFuture future = (CompletableFuture) oldValue; if (Async.isReady(future)) { @SuppressWarnings("NullAway") - CompletableFuture refresh = future.thenCompose(value -> - cacheLoader.asyncReload(key, value, executor)); - refreshFuture = refresh; + var refresh = cacheLoader.asyncReload(key, future.join(), executor); + refreshFuture[0] = refresh; } else { // no-op if load is pending - node.casWriteTime(refreshWriteTime, oldWriteTime); - return; + return future; } } else { @SuppressWarnings("NullAway") - CompletableFuture refresh = cacheLoader.asyncReload(key, oldValue, executor); - refreshFuture = refresh; + var refresh = cacheLoader.asyncReload(key, oldValue, executor); + refreshFuture[0] = refresh; } - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = statsTicker().read() - startTime; + return refreshFuture[0]; + }); + + if (refreshFuture[0] != null) { + refreshFuture[0].whenComplete((newValue, error) -> { + long loadTime = statsTicker().read() - startTime[0]; if (error != null) { logger.log(Level.WARNING, "Exception thrown during refresh", error); - node.casWriteTime(refreshWriteTime, oldWriteTime); + refreshes().remove(keyReference, refreshFuture[0]); statsCounter().recordLoadFailure(loadTime); return; } @SuppressWarnings("unchecked") - V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue; + V value = (isAsync && (newValue != null)) ? (V) refreshFuture[0] : newValue; boolean[] discard = new boolean[1]; compute(key, (k, currentValue) -> { if (currentValue == null) { - return value; - } else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) { + if (value == null) { + return null; + } else if (refreshes().get(key) == refreshFuture[0]) { + return value; + } + } else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) { return value; } discard[0] = true; @@ -1230,10 +1266,9 @@ void refreshIfNeeded(Node node, long now) { } else { statsCounter().recordLoadSuccess(loadTime); } + + refreshes().remove(keyReference, refreshFuture[0]); }); - } catch (Throwable t) { - node.casWriteTime(refreshWriteTime, oldWriteTime); - logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t); } } } @@ -1781,8 +1816,12 @@ public void clear() { } // Discard all entries - for (Node node : data.values()) { - removeNode(node, now); + var pending = refreshes.get(); + for (var entry : data.entrySet()) { + removeNode(entry.getValue(), now); + if (pending != null) { + pending.remove(entry.getKey()); + } } // Discard all pending reads @@ -2098,8 +2137,9 @@ public Map getAllPresent(Iterable keys) { @SuppressWarnings("unchecked") V[] oldValue = (V[]) new Object[1]; RemovalCause[] cause = new RemovalCause[1]; + Object lookupKey = nodeFactory.newLookupKey(key); - data.computeIfPresent(nodeFactory.newLookupKey(key), (k, n) -> { + data.computeIfPresent(lookupKey, (k, n) -> { synchronized (n) { oldValue[0] = n.getValue(); if (oldValue[0] == null) { @@ -2117,6 +2157,11 @@ public Map getAllPresent(Iterable keys) { }); if (cause[0] != null) { + var pending = refreshes.get(); + if (pending != null) { + pending.remove(lookupKey); + } + afterWrite(new RemovalTask(node[0])); if (hasRemovalListener()) { notifyRemoval(castKey, oldValue[0], cause[0]); @@ -2139,8 +2184,9 @@ public boolean remove(Object key, Object value) { @SuppressWarnings("unchecked") V[] oldValue = (V[]) new Object[1]; RemovalCause[] cause = new RemovalCause[1]; + Object lookupKey = nodeFactory.newLookupKey(key); - data.computeIfPresent(nodeFactory.newLookupKey(key), (kR, node) -> { + data.computeIfPresent(lookupKey, (kR, node) -> { synchronized (node) { oldKey[0] = node.getKey(); oldValue[0] = node.getValue(); @@ -2162,7 +2208,13 @@ public boolean remove(Object key, Object value) { if (removed[0] == null) { return false; - } else if (hasRemovalListener()) { + } + + var pending = refreshes.get(); + if (pending != null) { + pending.remove(lookupKey); + } + if (hasRemovalListener()) { notifyRemoval(oldKey[0], oldValue[0], cause[0]); } afterWrite(new RemovalTask(removed[0])); @@ -2581,15 +2633,14 @@ public void replaceAll(BiFunction function) { if (expiresAfterWrite() || (weightedDifference != 0)) { afterWrite(new UpdateTask(node, weightedDifference)); } else { - if (cause[0] == null) { - if (!isComputingAsync(node)) { - tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]); - setAccessTime(node, now[0]); - } - } else if (cause[0] == RemovalCause.COLLECTED) { - scheduleDrainBuffers(); + if ((cause[0] == null) && !isComputingAsync(node)) { + tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]); + setAccessTime(node, now[0]); } afterRead(node, now[0], /* recordHit */ false); + if ((cause[0] != null) && cause[0].wasEvicted()) { + scheduleDrainBuffers(); + } } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java index c0db010547..81c37db5a8 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java @@ -16,6 +16,7 @@ package com.github.benmanes.caffeine.cache; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.checkerframework.checker.nullness.qual.NonNull; @@ -101,9 +102,13 @@ public interface LoadingCache extends Cache { * Caches loaded by a {@link CacheLoader} will call {@link CacheLoader#reload} if the cache * currently contains a value for the {@code key}, and {@link CacheLoader#load} otherwise. Loading * is asynchronous by delegating to the default executor. + *

+ * Returns an existing future without doing anything if another thread is currently loading the + * value for {@code key}. * * @param key key with which a value may be associated + * @return the future that is loading the value * @throws NullPointerException if the specified key is null */ - void refresh(@NonNull K key); + CompletableFuture refresh(@NonNull K key); } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java index b11a7dc500..f9e01979a9 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java @@ -129,45 +129,109 @@ public Map getAll(Iterable keys) { } @Override - @SuppressWarnings("FutureReturnValueIgnored") - public void refresh(K key) { + public CompletableFuture refresh(K key) { requireNonNull(key); - long[] writeTime = new long[1]; - CompletableFuture oldValueFuture = asyncCache.cache().getIfPresentQuietly(key, writeTime); + Object keyReference = asyncCache.cache().referenceKey(key); + for (;;) { + var future = tryOptimisticRefresh(key, keyReference); + if (future == null) { + future = tryComputeRefresh(key, keyReference); + } + if (future != null) { + return future; + } + } + } + + /** Attempts to avoid a reload if the entry is absent, or a load or reload is in-flight. */ + private @Nullable CompletableFuture tryOptimisticRefresh(K key, Object keyReference) { + // If a refresh is in-flight, then return it directly. If completed and not yet removed, then + // remove to trigger a new reload. + @SuppressWarnings("unchecked") + var lastRefresh = (CompletableFuture) asyncCache.cache().refreshes().get(keyReference); + if (lastRefresh != null) { + if (Async.isReady(lastRefresh)) { + asyncCache.cache().refreshes().remove(keyReference, lastRefresh); + } else { + return lastRefresh; + } + } + + // If the entry is absent then perform a new load, else if in-flight then return it + var oldValueFuture = asyncCache.cache().getIfPresentQuietly(key, /* writeTime */ new long[1]); if ((oldValueFuture == null) || (oldValueFuture.isDone() && oldValueFuture.isCompletedExceptionally())) { - asyncCache.get(key, asyncCache.loader::asyncLoad, /* recordStats */ false); - return; + if (oldValueFuture != null) { + asyncCache.cache().remove(key, asyncCache); + } + var future = asyncCache.get(key, + asyncCache.loader::asyncLoad, /* recordStats */ false); + @SuppressWarnings("unchecked") + var prior = (CompletableFuture) asyncCache.cache() + .refreshes().putIfAbsent(keyReference, future); + return (prior == null) ? future : prior; } else if (!oldValueFuture.isDone()) { // no-op if load is pending - return; + return oldValueFuture; } - oldValueFuture.thenAccept(oldValue -> { - long now = asyncCache.cache().statsTicker().read(); - CompletableFuture refreshFuture = (oldValue == null) - ? asyncCache.loader.asyncLoad(key, asyncCache.cache().executor()) - : asyncCache.loader.asyncReload(key, oldValue, asyncCache.cache().executor()); - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = asyncCache.cache().statsTicker().read() - now; + // Fallback to the slow path, possibly retrying + return null; + } + + /** Begins a refresh if the entry has materialized and no reload is in-flight. */ + @SuppressWarnings("FutureReturnValueIgnored") + private @Nullable CompletableFuture tryComputeRefresh(K key, Object keyReference) { + long[] startTime = new long[1]; + long[] writeTime = new long[1]; + boolean[] refreshed = new boolean[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] oldValueFuture = new CompletableFuture[1]; + var future = asyncCache.cache().refreshes().computeIfAbsent(keyReference, k -> { + oldValueFuture[0] = asyncCache.cache().getIfPresentQuietly(key, writeTime); + V oldValue = Async.getIfReady(oldValueFuture[0]); + if (oldValue == null) { + return null; + } + + refreshed[0] = true; + startTime[0] = asyncCache.cache().statsTicker().read(); + return asyncCache.loader.asyncReload(key, oldValue, asyncCache.cache().executor()); + }); + + if (future == null) { + // Retry the optimistic path + return null; + } + + @SuppressWarnings("unchecked") + var castedFuture = (CompletableFuture) future; + if (refreshed[0]) { + castedFuture.whenComplete((newValue, error) -> { + long loadTime = asyncCache.cache().statsTicker().read() - startTime[0]; if (error != null) { - asyncCache.cache().statsCounter().recordLoadFailure(loadTime); logger.log(Level.WARNING, "Exception thrown during refresh", error); + asyncCache.cache().refreshes().remove(keyReference, castedFuture); + asyncCache.cache().statsCounter().recordLoadFailure(loadTime); return; } boolean[] discard = new boolean[1]; - asyncCache.cache().compute(key, (k, currentValue) -> { + asyncCache.cache().compute(key, (ignored, currentValue) -> { if (currentValue == null) { - return (newValue == null) ? null : refreshFuture; - } else if (currentValue == oldValueFuture) { + if (newValue == null) { + return null; + } else if (asyncCache.cache().refreshes().get(key) == castedFuture) { + return castedFuture; + } + } else if (currentValue == oldValueFuture[0]) { long expectedWriteTime = writeTime[0]; if (asyncCache.cache().hasWriteTime()) { asyncCache.cache().getIfPresentQuietly(key, writeTime); } if (writeTime[0] == expectedWriteTime) { - return (newValue == null) ? null : refreshFuture; + return (newValue == null) ? null : castedFuture; } } discard[0] = true; @@ -175,15 +239,18 @@ public void refresh(K key) { }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); if (discard[0] && asyncCache.cache().hasRemovalListener()) { - asyncCache.cache().notifyRemoval(key, refreshFuture, RemovalCause.REPLACED); + asyncCache.cache().notifyRemoval(key, castedFuture, RemovalCause.REPLACED); } if (newValue == null) { asyncCache.cache().statsCounter().recordLoadFailure(loadTime); } else { asyncCache.cache().statsCounter().recordLoadSuccess(loadTime); } + + asyncCache.cache().refreshes().remove(keyReference, castedFuture); }); - }); + } + return castedFuture; } } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java index 9ce22216c2..206055808a 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java @@ -16,6 +16,7 @@ package com.github.benmanes.caffeine.cache; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -52,6 +53,9 @@ interface LocalCache extends ConcurrentMap { /** Returns the {@link Executor} used by this cache. */ @NonNull Executor executor(); + /** Returns the map of in-flight refresh operations. */ + ConcurrentMap> refreshes(); + /** Returns whether the cache captures the write time of the entry. */ boolean hasWriteTime(); @@ -64,6 +68,9 @@ interface LocalCache extends ConcurrentMap { /** See {@link Cache#estimatedSize()}. */ long estimatedSize(); + /** Returns the reference key. */ + Object referenceKey(K key); + /** * See {@link Cache#getIfPresent(Object)}. This method differs by accepting a parameter of whether * to record the hit and miss statistics based on the success of this operation. diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java index 5e12bdf397..ccc265f986 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java @@ -89,49 +89,78 @@ default Map loadSequentially(Iterable keys) { @Override @SuppressWarnings("FutureReturnValueIgnored") - default void refresh(K key) { + default CompletableFuture refresh(K key) { requireNonNull(key); long[] writeTime = new long[1]; - long startTime = cache().statsTicker().read(); - V oldValue = cache().getIfPresentQuietly(key, writeTime); - CompletableFuture refreshFuture = (oldValue == null) - ? cacheLoader().asyncLoad(key, cache().executor()) - : cacheLoader().asyncReload(key, oldValue, cache().executor()); - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = cache().statsTicker().read() - startTime; - if (error != null) { - logger.log(Level.WARNING, "Exception thrown during refresh", error); - cache().statsCounter().recordLoadFailure(loadTime); - return; + long[] startTime = new long[1]; + @SuppressWarnings("unchecked") + V[] oldValue = (V[]) new Object[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] reloading = new CompletableFuture[1]; + Object keyReference = cache().referenceKey(key); + + var future = cache().refreshes().compute(keyReference, (k, existing) -> { + if ((existing != null) && !Async.isReady(existing)) { + return existing; } - boolean[] discard = new boolean[1]; - cache().compute(key, (k, currentValue) -> { - if (currentValue == null) { - return newValue; - } else if (currentValue == oldValue) { - long expectedWriteTime = writeTime[0]; - if (cache().hasWriteTime()) { - cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return newValue; + startTime[0] = cache().statsTicker().read(); + oldValue[0] = cache().getIfPresentQuietly(key, writeTime); + CompletableFuture refreshFuture = (oldValue[0] == null) + ? cacheLoader().asyncLoad(key, cache().executor()) + : cacheLoader().asyncReload(key, oldValue[0], cache().executor()); + reloading[0] = refreshFuture; + return refreshFuture; + }); + + if (reloading[0] != null) { + reloading[0].whenComplete((newValue, error) -> { + long loadTime = cache().statsTicker().read() - startTime[0]; + if (error != null) { + logger.log(Level.WARNING, "Exception thrown during refresh", error); + cache().refreshes().remove(keyReference, reloading[0]); + cache().statsCounter().recordLoadFailure(loadTime); + return; + } + + boolean[] discard = new boolean[1]; + cache().compute(key, (k, currentValue) -> { + if (currentValue == null) { + if (newValue == null) { + return null; + } else if (cache().refreshes().get(keyReference) == reloading[0]) { + return newValue; + } + } else if (currentValue == oldValue[0]) { + long expectedWriteTime = writeTime[0]; + if (cache().hasWriteTime()) { + cache().getIfPresentQuietly(key, writeTime); + } + if (writeTime[0] == expectedWriteTime) { + return newValue; + } } + discard[0] = true; + return currentValue; + }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); + + if (discard[0] && cache().hasRemovalListener()) { + cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); + } + if (newValue == null) { + cache().statsCounter().recordLoadFailure(loadTime); + } else { + cache().statsCounter().recordLoadSuccess(loadTime); } - discard[0] = true; - return currentValue; - }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - if (discard[0] && cache().hasRemovalListener()) { - cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); - } - if (newValue == null) { - cache().statsCounter().recordLoadFailure(loadTime); - } else { - cache().statsCounter().recordLoadSuccess(loadTime); - } - }); + cache().refreshes().remove(keyReference, reloading[0]); + }); + } + + @SuppressWarnings("unchecked") + CompletableFuture castedFuture = (CompletableFuture) future; + return castedFuture; } /** Returns a mapping function that adapts to {@link CacheLoader#load}. */ diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java index 7004b9c37b..81480c446a 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java @@ -22,6 +22,8 @@ import java.io.InvalidObjectException; import java.io.ObjectInputStream; import java.io.Serializable; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; import java.util.AbstractCollection; import java.util.AbstractSet; import java.util.Collection; @@ -36,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -54,6 +57,8 @@ */ @SuppressWarnings("deprecation") final class UnboundedLocalCache implements LocalCache { + static final Logger logger = System.getLogger(UnboundedLocalCache.class.getName()); + @Nullable final RemovalListener removalListener; final ConcurrentHashMap data; final StatsCounter statsCounter; @@ -62,9 +67,10 @@ final class UnboundedLocalCache implements LocalCache { final Executor executor; final Ticker ticker; - transient @Nullable Set keySet; - transient @Nullable Collection values; - transient @Nullable Set> entrySet; + @Nullable Set keySet; + @Nullable Collection values; + @Nullable Set> entrySet; + AtomicReference>> refreshes; UnboundedLocalCache(Caffeine builder, boolean async) { this.data = new ConcurrentHashMap<>(builder.getInitialCapacity()); @@ -72,6 +78,7 @@ final class UnboundedLocalCache implements LocalCache { this.removalListener = builder.getRemovalListener(async); this.isRecordingStats = builder.isRecordingStats(); this.writer = builder.getCacheWriter(async); + this.refreshes = new AtomicReference<>(); this.executor = builder.getExecutor(); this.ticker = builder.getTicker(); } @@ -81,6 +88,11 @@ public boolean hasWriteTime() { return false; } + @Override + public Object referenceKey(K key) { + return key; + } + /* --------------- Cache --------------- */ @Override @@ -154,7 +166,19 @@ public RemovalListener removalListener() { @Override public void notifyRemoval(@Nullable K key, @Nullable V value, RemovalCause cause) { requireNonNull(removalListener(), "Notification should be guarded with a check"); - executor.execute(() -> removalListener().onRemoval(key, value, cause)); + Runnable task = () -> { + try { + removalListener().onRemoval(key, value, cause); + } catch (Throwable t) { + logger.log(Level.WARNING, "Exception thrown by removal listener", t); + } + }; + try { + executor.execute(task); + } catch (Throwable t) { + logger.log(Level.ERROR, "Exception thrown when submitting removal listener", t); + task.run(); + } } @Override @@ -167,6 +191,19 @@ public Executor executor() { return executor; } + @Override + @SuppressWarnings("NullAway") + public ConcurrentMap> refreshes() { + var pending = refreshes.get(); + if (pending == null) { + pending = new ConcurrentHashMap<>(); + if (!refreshes.compareAndSet(null, pending)) { + pending = refreshes.get(); + } + } + return pending; + } + @Override public Ticker expirationTicker() { return Ticker.disabledTicker(); @@ -339,6 +376,11 @@ public int size() { public void clear() { if (!hasRemovalListener() && (writer == CacheWriter.disabledWriter())) { data.clear(); + + var pending = refreshes.get(); + if (pending != null) { + pending.clear(); + } return; } for (K key : data.keySet()) { @@ -431,6 +473,10 @@ public void putAll(Map map) { }); } + var pending = refreshes.get(); + if (pending != null) { + pending.remove(key); + } if (hasRemovalListener() && (oldValue[0] != null)) { notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); } @@ -460,9 +506,16 @@ public boolean remove(Object key, Object value) { }); boolean removed = (oldValue[0] != null); - if (hasRemovalListener() && removed) { - notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); + if (removed) { + var pending = refreshes.get(); + if (pending != null) { + pending.remove(key); + } + if (hasRemovalListener()) { + notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); + } } + return removed; } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java index 7af2d07dba..74e4622e24 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java @@ -604,9 +604,9 @@ public void getAll_writerFails(LoadingCache cache, CacheContex public void refresh(LoadingCache cache, CacheContext context) { context.ticker().advance(1, TimeUnit.MINUTES); Integer key = context.firstKey(); - cache.refresh(key); + cache.refresh(key).join(); - long count = (cache.estimatedSize() == 1) ? context.initialSize() : 1; + long count = context.initialSize(); verifyListeners(context, verifier -> verifier.hasOnly(count, RemovalCause.EXPIRED)); verifyWriter(context, verifier -> { verifier.deleted(key, context.original().get(key), RemovalCause.EXPIRED); @@ -623,7 +623,7 @@ public void refresh(LoadingCache cache, CacheContext context) compute = Compute.SYNC, writer = Writer.EXCEPTIONAL, removalListener = Listener.REJECTING) public void refresh_writerFails(LoadingCache cache, CacheContext context) { context.ticker().advance(1, TimeUnit.HOURS); - cache.refresh(context.firstKey()); + cache.refresh(context.firstKey()).join(); context.disableRejectingCacheWriter(); context.ticker().advance(-1, TimeUnit.HOURS); assertThat(cache.asMap(), equalTo(context.original())); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java index 9c7c6ee324..eb5d5adad6 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java @@ -49,10 +49,10 @@ import com.github.benmanes.caffeine.cache.testing.CacheProvider; import com.github.benmanes.caffeine.cache.testing.CacheSpec; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor; -import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Implementation; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.Maximum; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Population; import com.github.benmanes.caffeine.cache.testing.CacheSpec.ReferenceType; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Writer; @@ -350,16 +350,29 @@ class Key { @CacheSpec(removalListener = { Listener.DEFAULT, Listener.REJECTING }) @Test(dataProvider = "caches", expectedExceptions = NullPointerException.class) public void refresh_null(LoadingCache cache, CacheContext context) { - cache.refresh(null); + cache.refresh(null).join(); + } + + @Test(dataProvider = "caches") + @CacheSpec(loader = Loader.ASYNC_INCOMPLETE, implementation = Implementation.Caffeine) + public void refresh_dedupe(LoadingCache cache, CacheContext context) { + var key = context.original().isEmpty() ? context.absentKey() : context.firstKey(); + var future1 = cache.refresh(key); + var future2 = cache.refresh(key); + assertThat(future1, is(sameInstance(future2))); + + future1.complete(-key); + assertThat(cache.getIfPresent(key), is(-key)); } @CheckNoWriter @Test(dataProvider = "caches") - @CacheSpec(implementation = Implementation.Caffeine, compute=Compute.SYNC, + @CacheSpec(implementation = Implementation.Caffeine, executor = CacheExecutor.DIRECT, loader = Loader.NULL, population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_remove(LoadingCache cache, CacheContext context) { - cache.refresh(context.firstKey()); + var future = cache.refresh(context.firstKey()); + assertThat(future.join(), is(nullValue())); assertThat(cache.estimatedSize(), is(context.initialSize() - 1)); assertThat(cache.getIfPresent(context.firstKey()), is(nullValue())); verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPLICIT)); @@ -371,26 +384,53 @@ public void refresh_remove(LoadingCache cache, CacheContext co removalListener = { Listener.DEFAULT, Listener.REJECTING }, population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_failure(LoadingCache cache, CacheContext context) { - // Shouldn't leak exception to caller and should retain stale entry - cache.refresh(context.absentKey()); - cache.refresh(context.firstKey()); + // Shouldn't leak exception to caller nor retain the future; should retain the stale entry + var future1 = cache.refresh(context.absentKey()); + var future2 = cache.refresh(context.firstKey()); + var future3 = cache.refresh(context.firstKey()); + assertThat(future2, is(not(sameInstance(future3)))); + assertThat(future1.isCompletedExceptionally(), is(true)); + assertThat(future2.isCompletedExceptionally(), is(true)); + assertThat(future3.isCompletedExceptionally(), is(true)); assertThat(cache.estimatedSize(), is(context.initialSize())); - verifyStats(context, verifier -> verifier.success(0).failures(2)); + verifyStats(context, verifier -> verifier.success(0).failures(3)); + } + + @CheckNoWriter + @Test(dataProvider = "caches") + @CacheSpec(loader = Loader.ASYNC_INCOMPLETE, implementation = Implementation.Caffeine, + removalListener = { Listener.DEFAULT, Listener.REJECTING }) + public void refresh_cancel(LoadingCache cache, CacheContext context) { + var key = context.original().isEmpty() ? context.absentKey() : context.firstKey(); + var future1 = cache.refresh(key); + assertThat(future1.isDone(), is(false)); + future1.cancel(true); + + var future2 = cache.refresh(key); + assertThat(future1, is(not(sameInstance(future2)))); + + future2.cancel(false); + assertThat(cache.asMap(), is(equalTo(context.original()))); } @CheckNoWriter @CacheSpec(loader = Loader.NULL) @Test(dataProvider = "caches") public void refresh_absent_null(LoadingCache cache, CacheContext context) { - cache.refresh(context.absentKey()); + var future = cache.refresh(context.absentKey()); + assertThat(future.join(), is(nullValue())); assertThat(cache.estimatedSize(), is(context.initialSize())); } @CheckNoWriter @Test(dataProvider = "caches") - @CacheSpec(removalListener = { Listener.DEFAULT, Listener.REJECTING }) + @CacheSpec( + maximumSize = Maximum.UNREACHABLE, + removalListener = { Listener.DEFAULT, Listener.REJECTING }, population = Population.SINGLETON) public void refresh_absent(LoadingCache cache, CacheContext context) { - cache.refresh(context.absentKey()); + Integer key = context.absentKey(); + var future = cache.refresh(key); + assertThat(future.join(), is(not(nullValue()))); assertThat(cache.estimatedSize(), is(1 + context.initialSize())); verifyStats(context, verifier -> verifier.hits(0).misses(0).success(1).failures(0)); @@ -404,7 +444,8 @@ public void refresh_absent(LoadingCache cache, CacheContext co population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_present_null(LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(nullValue())); } int count = context.firstMiddleLastKeys().size(); verifyStats(context, verifier -> verifier.hits(0).misses(0).success(0).failures(count)); @@ -422,7 +463,8 @@ public void refresh_present_null(LoadingCache cache, CacheCont public void refresh_present_sameValue( LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(context.original().get(key))); } int count = context.firstMiddleLastKeys().size(); verifyStats(context, verifier -> verifier.hits(0).misses(0).success(count).failures(0)); @@ -441,7 +483,9 @@ public void refresh_present_sameValue( public void refresh_present_differentValue( LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(key)); + // records a hit assertThat(cache.get(key), is(key)); } @@ -466,10 +510,12 @@ public void refresh_conflict(CacheContext context) { }); cache.put(key, original); - cache.refresh(key); + var future = cache.refresh(key); assertThat(cache.asMap().put(key, updated), is(original)); refresh.set(true); + future.join(); + await().until(() -> context.removalNotifications().size(), is(2)); List removed = context.removalNotifications().stream() .map(RemovalNotification::getValue).collect(toList()); @@ -494,11 +540,13 @@ public void refresh_invalidate(CacheContext context) { }); cache.put(key, original); - cache.refresh(key); + var future = cache.refresh(key); cache.invalidate(key); refresh.set(true); - await().until(() -> cache.getIfPresent(key), is(refreshed)); + future.join(); + + assertThat(cache.getIfPresent(key), is(nullValue())); await().until(() -> context.removalNotifications(), hasSize(1)); verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPLICIT)); verifyStats(context, verifier -> verifier.success(1).failures(0)); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java index d34821f7d3..c2cde91d09 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java @@ -98,8 +98,8 @@ public void async_concurrent_bounded( Threads.runTest(cache, asyncOperations); } - @SuppressWarnings( - {"unchecked", "rawtypes", "ReturnValueIgnored", "SizeGreaterThanOrEqualsZero", "SelfEquals"}) + @SuppressWarnings({"unchecked", "rawtypes", "ReturnValueIgnored", + "FutureReturnValueIgnored", "SizeGreaterThanOrEqualsZero", "SelfEquals"}) List, Integer>> operations = ImmutableList.of( // LoadingCache (cache, key) -> { cache.get(key); }, diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java index 652894300d..742f709e84 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java @@ -467,7 +467,7 @@ public void refresh(LoadingCache cache, CacheContext context) GcFinalization.awaitFullGc(); awaitFullCleanup(cache); - cache.refresh(key); + cache.refresh(key).join(); assertThat(cache.estimatedSize(), is(1L)); assertThat(cache.getIfPresent(key), is(not(value))); @@ -489,7 +489,7 @@ public void refresh_writerFails(LoadingCache cache, CacheConte Integer key = context.firstKey(); context.clear(); GcFinalization.awaitFullGc(); - cache.refresh(key); + cache.refresh(key).join(); context.disableRejectingCacheWriter(); assertThat(cache.asMap().isEmpty(), is(false)); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java index 05cc6a0edf..52c527a52d 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java @@ -350,12 +350,53 @@ public void invalidate(CacheContext context) { cache.invalidate(key); refresh.set(true); - await().until(() -> cache.getIfPresent(key), is(refreshed)); + var executor = (TrackingExecutor) context.executor(); + await().until(() -> executor.submitted() == executor.completed()); + + if (context.implementation() == Implementation.Guava) { + // Guava does not protect against ABA when the entry was removed by allowing a possibly + // stale value from being inserted. + assertThat(cache.getIfPresent(key), is(refreshed)); + } else { + // Maintain linearizability by discarding the refresh if completing after an explicit removal + assertThat(cache.getIfPresent(key), is(nullValue())); + } + await().until(() -> context.removalNotifications(), hasSize(1)); verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPLICIT)); verifyStats(context, verifier -> verifier.success(1).failures(0)); } + @Test(dataProvider = "caches") + @CacheSpec(implementation = Implementation.Caffeine, + loader = Loader.ASYNC_INCOMPLETE, refreshAfterWrite = Expire.ONE_MINUTE) + public void refresh(LoadingCache cache, CacheContext context) { + cache.put(context.absentKey(), context.absentValue()); + var executor = (TrackingExecutor) context.executor(); + int submitted; + + // trigger an automatic refresh + submitted = executor.submitted(); + context.ticker().advance(2, TimeUnit.MINUTES); + cache.getIfPresent(context.absentKey()); + assertThat(executor.submitted(), is(submitted + 1)); + + // return in-flight future + var future1 = cache.refresh(context.absentKey()); + assertThat(executor.submitted(), is(submitted + 1)); + future1.complete(-context.absentValue()); + + // trigger a new automatic refresh + submitted = executor.submitted(); + context.ticker().advance(2, TimeUnit.MINUTES); + cache.getIfPresent(context.absentKey()); + assertThat(executor.submitted(), is(submitted + 1)); + + var future2 = cache.refresh(context.absentKey()); + assertThat(future2, is(not(sameInstance(future1)))); + future2.cancel(true); + } + /* --------------- Policy --------------- */ @Test(dataProvider = "caches") diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java index bd55c04390..89ae2b128b 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java @@ -90,6 +90,7 @@ public Stresser() { status(); } + @SuppressWarnings("FutureReturnValueIgnored") public void run() throws InterruptedException { ConcurrentTestHarness.timeTasks(operation.maxThreads, () -> { int index = ThreadLocalRandom.current().nextInt(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java new file mode 100644 index 0000000000..60f4369d52 --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache.issues; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.testing.FakeTicker; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.testing.TestingExecutors; + +/** + * Issue #193: Invalidate before Refresh completes still stores value + *

+ * When a refresh starts before an invalidate and completes afterwards, the entry is inserted into + * the cache. This breaks linearizability assumptions, as the invalidation may be to ensure that + * the cache does not hold stale data that refresh will have observed in its load. This undesirable + * behavior is also present in Guava, so the stricter handling is an intentional deviation. + * + * @author boschb (Robert Bosch) + */ +public final class Issue193Test { + private final AtomicLong counter = new AtomicLong(0); + private final FakeTicker ticker = new FakeTicker(); + + private ListenableFutureTask loadingTask; + + private final AsyncCacheLoader loader = + (key, exec) -> { + // Fools the cache into thinking there is a future that's not immediately ready. + // (The Cache has optimizations for this that we want to avoid) + loadingTask = ListenableFutureTask.create(counter::getAndIncrement); + var f = new CompletableFuture(); + loadingTask.addListener(() -> { + f.complete(Futures.getUnchecked(loadingTask)); + }, exec); + return f; + }; + + private final String key = Issue193Test.class.getSimpleName(); + + /** This ensures that any outstanding async loading is completed as well */ + private long loadGet(AsyncLoadingCache cache, String key) + throws InterruptedException, ExecutionException { + CompletableFuture future = cache.get(key); + if (!loadingTask.isDone()) { + loadingTask.run(); + } + return future.get(); + } + + @Test + public void invalidateDuringRefreshRemovalCheck() throws Exception { + List removed = new ArrayList<>(); + AsyncLoadingCache cache = + Caffeine.newBuilder() + .ticker(ticker::read) + .executor(TestingExecutors.sameThreadScheduledExecutor()) + .removalListener((key, value, reason) -> removed.add(value)) + .refreshAfterWrite(10, TimeUnit.NANOSECONDS) + .buildAsync(loader); + + // Load so there is an initial value. + assertThat(loadGet(cache, key), is(0L)); + + ticker.advance(11); // Refresh should fire on next access + assertThat(cache.synchronous().getIfPresent(key), is(0L)); // Old value + + cache.synchronous().invalidate(key); // Invalidate key entirely + assertThat(cache.synchronous().getIfPresent(key), is(nullValue())); // No value in cache (good) + loadingTask.run(); // Completes refresh + + // FIXME: java.lang.AssertionError: Not true that <1> is null + assertThat(cache.synchronous().getIfPresent(key), is(nullValue())); // Value in cache (bad) + + // FIXME: Maybe? This is what I wanted to actually test :) + assertThat(removed, is(List.of(0L, 1L))); // 1L was sent to removalListener anyways + } +} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java index 8c13ef9025..f68cff2fd9 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java @@ -532,6 +532,20 @@ enum Loader implements CacheLoader { @Override public Map loadAll(Iterable keys) { throw new IllegalStateException(); } + }, + ASYNC_INCOMPLETE { + @Override public Integer load(Integer key) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture asyncLoad(Integer key, Executor executor) { + executor.execute(() -> {}); + return new CompletableFuture<>(); + } + @Override public CompletableFuture asyncReload( + Integer key, Integer oldValue, Executor executor) { + executor.execute(() -> {}); + return new CompletableFuture<>(); + } }; private final boolean bulk; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java index 1e32afd4bc..029b1423ed 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -65,6 +66,7 @@ */ @SuppressWarnings("PreferJavaTimeOverload") public final class GuavaCacheFromContext { + private static final ThreadLocal error = new ThreadLocal<>(); private GuavaCacheFromContext() {} @@ -489,8 +491,19 @@ public Map getAll(Iterable keys) { } @Override - public void refresh(K key) { + public CompletableFuture refresh(K key) { + error.set(null); cache.refresh(key); + + var e = error.get(); + if (e == null) { + return CompletableFuture.completedFuture(cache.asMap().get(key)); + } else if (e instanceof CacheMissException) { + return CompletableFuture.completedFuture(null); + } + + error.remove(); + return CompletableFuture.failedFuture(e); } } @@ -542,11 +555,17 @@ static class SingleLoader extends CacheLoader implements Serializabl @Override public V load(K key) throws Exception { - V value = delegate.load(key); - if (value == null) { - throw new CacheMissException(); + try { + error.set(null); + V value = delegate.load(key); + if (value == null) { + throw new CacheMissException(); + } + return value; + } catch (Exception e) { + error.set(e); + throw e; } - return value; } } diff --git a/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaLoadingCache.java b/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaLoadingCache.java index 3b6fc2b14e..5377b8f4dc 100644 --- a/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaLoadingCache.java +++ b/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaLoadingCache.java @@ -117,6 +117,7 @@ public V apply(@NonNull K key) { } @Override + @SuppressWarnings("FutureReturnValueIgnored") public void refresh(K key) { cache.refresh(key); }