diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index b91e4c0ad6..1a96fcc857 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -60,6 +60,7 @@ import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -221,10 +222,10 @@ abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef final Executor executor; final boolean isAsync; - // The collection views - @Nullable transient Set keySet; - @Nullable transient Collection values; - @Nullable transient Set> entrySet; + @Nullable Set keySet; + @Nullable Collection values; + @Nullable Set> entrySet; + AtomicReference>> refreshes; /** Creates an instance based on the builder's configuration. */ protected BoundedLocalCache(Caffeine builder, @@ -233,6 +234,7 @@ protected BoundedLocalCache(Caffeine builder, this.cacheLoader = cacheLoader; executor = builder.getExecutor(); evictionLock = new ReentrantLock(); + refreshes = new AtomicReference<>(); weigher = builder.getWeigher(isAsync); writer = builder.getCacheWriter(isAsync); drainBuffersTask = new PerformCleanupTask(this); @@ -289,11 +291,29 @@ public final Executor executor() { return executor; } + @Override + @SuppressWarnings("NullAway") + public ConcurrentMap> refreshes() { + var pending = refreshes.get(); + if (pending == null) { + pending = new ConcurrentHashMap<>(); + if (!refreshes.compareAndSet(null, pending)) { + pending = refreshes.get(); + } + } + return pending; + } + /** Returns whether this cache notifies a writer when an entry is modified. */ protected boolean hasWriter() { return (writer != CacheWriter.disabledWriter()); } + @Override + public Object referenceKey(K key) { + return nodeFactory.newLookupKey(key); + } + /* --------------- Stats Support --------------- */ @Override @@ -908,8 +928,9 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { boolean[] removed = new boolean[1]; boolean[] resurrect = new boolean[1]; RemovalCause[] actualCause = new RemovalCause[1]; + Object keyReference = node.getKeyReference(); - data.computeIfPresent(node.getKeyReference(), (k, n) -> { + data.computeIfPresent(keyReference, (k, n) -> { if (n != node) { return n; } @@ -972,6 +993,12 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { if (removed[0]) { statsCounter().recordEviction(node.getWeight(), actualCause[0]); + + var pending = refreshes.get(); + if (pending != null) { + pending.remove(keyReference); + } + if (hasRemovalListener()) { // Notify the listener only if the entry was evicted. This must be performed as the last // step during eviction to safe guard against the executor rejecting the notification task. @@ -1178,51 +1205,60 @@ void refreshIfNeeded(Node node, long now) { if (!refreshAfterWrite()) { return; } + K key; V oldValue; - long oldWriteTime = node.getWriteTime(); - long refreshWriteTime = (now + ASYNC_EXPIRY); - if (((now - oldWriteTime) > refreshAfterWriteNanos()) + long writeTime = node.getWriteTime(); + Object keyReference = node.getKeyReference(); + if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null) && ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null) - && node.casWriteTime(oldWriteTime, refreshWriteTime)) { - try { - CompletableFuture refreshFuture; - long startTime = statsTicker().read(); + && !refreshes().containsKey(keyReference)) { + long[] startTime = new long[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] refreshFuture = new CompletableFuture[1]; + refreshes().computeIfAbsent(keyReference, k -> { + startTime[0] = statsTicker().read(); if (isAsync) { @SuppressWarnings("unchecked") CompletableFuture future = (CompletableFuture) oldValue; if (Async.isReady(future)) { @SuppressWarnings("NullAway") - CompletableFuture refresh = future.thenCompose(value -> - cacheLoader.asyncReload(key, value, executor)); - refreshFuture = refresh; + var refresh = cacheLoader.asyncReload(key, future.join(), executor); + refreshFuture[0] = refresh; } else { // no-op if load is pending - node.casWriteTime(refreshWriteTime, oldWriteTime); - return; + return future; } } else { @SuppressWarnings("NullAway") - CompletableFuture refresh = cacheLoader.asyncReload(key, oldValue, executor); - refreshFuture = refresh; + var refresh = cacheLoader.asyncReload(key, oldValue, executor); + refreshFuture[0] = refresh; } - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = statsTicker().read() - startTime; + return refreshFuture[0]; + }); + + if (refreshFuture[0] != null) { + refreshFuture[0].whenComplete((newValue, error) -> { + long loadTime = statsTicker().read() - startTime[0]; if (error != null) { logger.log(Level.WARNING, "Exception thrown during refresh", error); - node.casWriteTime(refreshWriteTime, oldWriteTime); + refreshes().remove(keyReference, refreshFuture[0]); statsCounter().recordLoadFailure(loadTime); return; } @SuppressWarnings("unchecked") - V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue; + V value = (isAsync && (newValue != null)) ? (V) refreshFuture[0] : newValue; boolean[] discard = new boolean[1]; compute(key, (k, currentValue) -> { if (currentValue == null) { - return value; - } else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) { + if (value == null) { + return null; + } else if (refreshes().get(key) == refreshFuture[0]) { + return value; + } + } else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) { return value; } discard[0] = true; @@ -1237,10 +1273,9 @@ void refreshIfNeeded(Node node, long now) { } else { statsCounter().recordLoadSuccess(loadTime); } + + refreshes().remove(keyReference, refreshFuture[0]); }); - } catch (Throwable t) { - node.casWriteTime(refreshWriteTime, oldWriteTime); - logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t); } } } @@ -1811,8 +1846,12 @@ public void clear() { } // Discard all entries - for (Node node : data.values()) { - removeNode(node, now); + var pending = refreshes.get(); + for (var entry : data.entrySet()) { + removeNode(entry.getValue(), now); + if (pending != null) { + pending.remove(entry.getKey()); + } } // Discard all pending reads @@ -2128,8 +2167,9 @@ public Map getAllPresent(Iterable keys) { @SuppressWarnings("unchecked") V[] oldValue = (V[]) new Object[1]; RemovalCause[] cause = new RemovalCause[1]; + Object lookupKey = nodeFactory.newLookupKey(key); - data.computeIfPresent(nodeFactory.newLookupKey(key), (k, n) -> { + data.computeIfPresent(lookupKey, (k, n) -> { synchronized (n) { oldValue[0] = n.getValue(); if (oldValue[0] == null) { @@ -2147,6 +2187,11 @@ public Map getAllPresent(Iterable keys) { }); if (cause[0] != null) { + var pending = refreshes.get(); + if (pending != null) { + pending.remove(lookupKey); + } + afterWrite(new RemovalTask(node[0])); if (hasRemovalListener()) { notifyRemoval(castKey, oldValue[0], cause[0]); @@ -2169,8 +2214,9 @@ public boolean remove(Object key, Object value) { @SuppressWarnings("unchecked") V[] oldValue = (V[]) new Object[1]; RemovalCause[] cause = new RemovalCause[1]; + Object lookupKey = nodeFactory.newLookupKey(key); - data.computeIfPresent(nodeFactory.newLookupKey(key), (kR, node) -> { + data.computeIfPresent(lookupKey, (kR, node) -> { synchronized (node) { oldKey[0] = node.getKey(); oldValue[0] = node.getValue(); @@ -2192,7 +2238,13 @@ public boolean remove(Object key, Object value) { if (removed[0] == null) { return false; - } else if (hasRemovalListener()) { + } + + var pending = refreshes.get(); + if (pending != null) { + pending.remove(lookupKey); + } + if (hasRemovalListener()) { notifyRemoval(oldKey[0], oldValue[0], cause[0]); } afterWrite(new RemovalTask(removed[0])); @@ -2611,15 +2663,14 @@ public void replaceAll(BiFunction function) { if (expiresAfterWrite() || (weightedDifference != 0)) { afterWrite(new UpdateTask(node, weightedDifference)); } else { - if (cause[0] == null) { - if (!isComputingAsync(node)) { - tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]); - setAccessTime(node, now[0]); - } - } else if (cause[0] == RemovalCause.COLLECTED) { - scheduleDrainBuffers(); + if ((cause[0] == null) && !isComputingAsync(node)) { + tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]); + setAccessTime(node, now[0]); } afterRead(node, now[0], /* recordHit */ false); + if ((cause[0] != null) && cause[0].wasEvicted()) { + scheduleDrainBuffers(); + } } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java index c0db010547..81c37db5a8 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java @@ -16,6 +16,7 @@ package com.github.benmanes.caffeine.cache; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.checkerframework.checker.nullness.qual.NonNull; @@ -101,9 +102,13 @@ public interface LoadingCache extends Cache { * Caches loaded by a {@link CacheLoader} will call {@link CacheLoader#reload} if the cache * currently contains a value for the {@code key}, and {@link CacheLoader#load} otherwise. Loading * is asynchronous by delegating to the default executor. + *

+ * Returns an existing future without doing anything if another thread is currently loading the + * value for {@code key}. * * @param key key with which a value may be associated + * @return the future that is loading the value * @throws NullPointerException if the specified key is null */ - void refresh(@NonNull K key); + CompletableFuture refresh(@NonNull K key); } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java index b11a7dc500..f9e01979a9 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java @@ -129,45 +129,109 @@ public Map getAll(Iterable keys) { } @Override - @SuppressWarnings("FutureReturnValueIgnored") - public void refresh(K key) { + public CompletableFuture refresh(K key) { requireNonNull(key); - long[] writeTime = new long[1]; - CompletableFuture oldValueFuture = asyncCache.cache().getIfPresentQuietly(key, writeTime); + Object keyReference = asyncCache.cache().referenceKey(key); + for (;;) { + var future = tryOptimisticRefresh(key, keyReference); + if (future == null) { + future = tryComputeRefresh(key, keyReference); + } + if (future != null) { + return future; + } + } + } + + /** Attempts to avoid a reload if the entry is absent, or a load or reload is in-flight. */ + private @Nullable CompletableFuture tryOptimisticRefresh(K key, Object keyReference) { + // If a refresh is in-flight, then return it directly. If completed and not yet removed, then + // remove to trigger a new reload. + @SuppressWarnings("unchecked") + var lastRefresh = (CompletableFuture) asyncCache.cache().refreshes().get(keyReference); + if (lastRefresh != null) { + if (Async.isReady(lastRefresh)) { + asyncCache.cache().refreshes().remove(keyReference, lastRefresh); + } else { + return lastRefresh; + } + } + + // If the entry is absent then perform a new load, else if in-flight then return it + var oldValueFuture = asyncCache.cache().getIfPresentQuietly(key, /* writeTime */ new long[1]); if ((oldValueFuture == null) || (oldValueFuture.isDone() && oldValueFuture.isCompletedExceptionally())) { - asyncCache.get(key, asyncCache.loader::asyncLoad, /* recordStats */ false); - return; + if (oldValueFuture != null) { + asyncCache.cache().remove(key, asyncCache); + } + var future = asyncCache.get(key, + asyncCache.loader::asyncLoad, /* recordStats */ false); + @SuppressWarnings("unchecked") + var prior = (CompletableFuture) asyncCache.cache() + .refreshes().putIfAbsent(keyReference, future); + return (prior == null) ? future : prior; } else if (!oldValueFuture.isDone()) { // no-op if load is pending - return; + return oldValueFuture; } - oldValueFuture.thenAccept(oldValue -> { - long now = asyncCache.cache().statsTicker().read(); - CompletableFuture refreshFuture = (oldValue == null) - ? asyncCache.loader.asyncLoad(key, asyncCache.cache().executor()) - : asyncCache.loader.asyncReload(key, oldValue, asyncCache.cache().executor()); - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = asyncCache.cache().statsTicker().read() - now; + // Fallback to the slow path, possibly retrying + return null; + } + + /** Begins a refresh if the entry has materialized and no reload is in-flight. */ + @SuppressWarnings("FutureReturnValueIgnored") + private @Nullable CompletableFuture tryComputeRefresh(K key, Object keyReference) { + long[] startTime = new long[1]; + long[] writeTime = new long[1]; + boolean[] refreshed = new boolean[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] oldValueFuture = new CompletableFuture[1]; + var future = asyncCache.cache().refreshes().computeIfAbsent(keyReference, k -> { + oldValueFuture[0] = asyncCache.cache().getIfPresentQuietly(key, writeTime); + V oldValue = Async.getIfReady(oldValueFuture[0]); + if (oldValue == null) { + return null; + } + + refreshed[0] = true; + startTime[0] = asyncCache.cache().statsTicker().read(); + return asyncCache.loader.asyncReload(key, oldValue, asyncCache.cache().executor()); + }); + + if (future == null) { + // Retry the optimistic path + return null; + } + + @SuppressWarnings("unchecked") + var castedFuture = (CompletableFuture) future; + if (refreshed[0]) { + castedFuture.whenComplete((newValue, error) -> { + long loadTime = asyncCache.cache().statsTicker().read() - startTime[0]; if (error != null) { - asyncCache.cache().statsCounter().recordLoadFailure(loadTime); logger.log(Level.WARNING, "Exception thrown during refresh", error); + asyncCache.cache().refreshes().remove(keyReference, castedFuture); + asyncCache.cache().statsCounter().recordLoadFailure(loadTime); return; } boolean[] discard = new boolean[1]; - asyncCache.cache().compute(key, (k, currentValue) -> { + asyncCache.cache().compute(key, (ignored, currentValue) -> { if (currentValue == null) { - return (newValue == null) ? null : refreshFuture; - } else if (currentValue == oldValueFuture) { + if (newValue == null) { + return null; + } else if (asyncCache.cache().refreshes().get(key) == castedFuture) { + return castedFuture; + } + } else if (currentValue == oldValueFuture[0]) { long expectedWriteTime = writeTime[0]; if (asyncCache.cache().hasWriteTime()) { asyncCache.cache().getIfPresentQuietly(key, writeTime); } if (writeTime[0] == expectedWriteTime) { - return (newValue == null) ? null : refreshFuture; + return (newValue == null) ? null : castedFuture; } } discard[0] = true; @@ -175,15 +239,18 @@ public void refresh(K key) { }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); if (discard[0] && asyncCache.cache().hasRemovalListener()) { - asyncCache.cache().notifyRemoval(key, refreshFuture, RemovalCause.REPLACED); + asyncCache.cache().notifyRemoval(key, castedFuture, RemovalCause.REPLACED); } if (newValue == null) { asyncCache.cache().statsCounter().recordLoadFailure(loadTime); } else { asyncCache.cache().statsCounter().recordLoadSuccess(loadTime); } + + asyncCache.cache().refreshes().remove(keyReference, castedFuture); }); - }); + } + return castedFuture; } } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java index 9ce22216c2..206055808a 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java @@ -16,6 +16,7 @@ package com.github.benmanes.caffeine.cache; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -52,6 +53,9 @@ interface LocalCache extends ConcurrentMap { /** Returns the {@link Executor} used by this cache. */ @NonNull Executor executor(); + /** Returns the map of in-flight refresh operations. */ + ConcurrentMap> refreshes(); + /** Returns whether the cache captures the write time of the entry. */ boolean hasWriteTime(); @@ -64,6 +68,9 @@ interface LocalCache extends ConcurrentMap { /** See {@link Cache#estimatedSize()}. */ long estimatedSize(); + /** Returns the reference key. */ + Object referenceKey(K key); + /** * See {@link Cache#getIfPresent(Object)}. This method differs by accepting a parameter of whether * to record the hit and miss statistics based on the success of this operation. diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java index 5e12bdf397..ccc265f986 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java @@ -89,49 +89,78 @@ default Map loadSequentially(Iterable keys) { @Override @SuppressWarnings("FutureReturnValueIgnored") - default void refresh(K key) { + default CompletableFuture refresh(K key) { requireNonNull(key); long[] writeTime = new long[1]; - long startTime = cache().statsTicker().read(); - V oldValue = cache().getIfPresentQuietly(key, writeTime); - CompletableFuture refreshFuture = (oldValue == null) - ? cacheLoader().asyncLoad(key, cache().executor()) - : cacheLoader().asyncReload(key, oldValue, cache().executor()); - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = cache().statsTicker().read() - startTime; - if (error != null) { - logger.log(Level.WARNING, "Exception thrown during refresh", error); - cache().statsCounter().recordLoadFailure(loadTime); - return; + long[] startTime = new long[1]; + @SuppressWarnings("unchecked") + V[] oldValue = (V[]) new Object[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] reloading = new CompletableFuture[1]; + Object keyReference = cache().referenceKey(key); + + var future = cache().refreshes().compute(keyReference, (k, existing) -> { + if ((existing != null) && !Async.isReady(existing)) { + return existing; } - boolean[] discard = new boolean[1]; - cache().compute(key, (k, currentValue) -> { - if (currentValue == null) { - return newValue; - } else if (currentValue == oldValue) { - long expectedWriteTime = writeTime[0]; - if (cache().hasWriteTime()) { - cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return newValue; + startTime[0] = cache().statsTicker().read(); + oldValue[0] = cache().getIfPresentQuietly(key, writeTime); + CompletableFuture refreshFuture = (oldValue[0] == null) + ? cacheLoader().asyncLoad(key, cache().executor()) + : cacheLoader().asyncReload(key, oldValue[0], cache().executor()); + reloading[0] = refreshFuture; + return refreshFuture; + }); + + if (reloading[0] != null) { + reloading[0].whenComplete((newValue, error) -> { + long loadTime = cache().statsTicker().read() - startTime[0]; + if (error != null) { + logger.log(Level.WARNING, "Exception thrown during refresh", error); + cache().refreshes().remove(keyReference, reloading[0]); + cache().statsCounter().recordLoadFailure(loadTime); + return; + } + + boolean[] discard = new boolean[1]; + cache().compute(key, (k, currentValue) -> { + if (currentValue == null) { + if (newValue == null) { + return null; + } else if (cache().refreshes().get(keyReference) == reloading[0]) { + return newValue; + } + } else if (currentValue == oldValue[0]) { + long expectedWriteTime = writeTime[0]; + if (cache().hasWriteTime()) { + cache().getIfPresentQuietly(key, writeTime); + } + if (writeTime[0] == expectedWriteTime) { + return newValue; + } } + discard[0] = true; + return currentValue; + }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); + + if (discard[0] && cache().hasRemovalListener()) { + cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); + } + if (newValue == null) { + cache().statsCounter().recordLoadFailure(loadTime); + } else { + cache().statsCounter().recordLoadSuccess(loadTime); } - discard[0] = true; - return currentValue; - }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - if (discard[0] && cache().hasRemovalListener()) { - cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); - } - if (newValue == null) { - cache().statsCounter().recordLoadFailure(loadTime); - } else { - cache().statsCounter().recordLoadSuccess(loadTime); - } - }); + cache().refreshes().remove(keyReference, reloading[0]); + }); + } + + @SuppressWarnings("unchecked") + CompletableFuture castedFuture = (CompletableFuture) future; + return castedFuture; } /** Returns a mapping function that adapts to {@link CacheLoader#load}. */ diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java index 7004b9c37b..81480c446a 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java @@ -22,6 +22,8 @@ import java.io.InvalidObjectException; import java.io.ObjectInputStream; import java.io.Serializable; +import java.lang.System.Logger; +import java.lang.System.Logger.Level; import java.util.AbstractCollection; import java.util.AbstractSet; import java.util.Collection; @@ -36,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -54,6 +57,8 @@ */ @SuppressWarnings("deprecation") final class UnboundedLocalCache implements LocalCache { + static final Logger logger = System.getLogger(UnboundedLocalCache.class.getName()); + @Nullable final RemovalListener removalListener; final ConcurrentHashMap data; final StatsCounter statsCounter; @@ -62,9 +67,10 @@ final class UnboundedLocalCache implements LocalCache { final Executor executor; final Ticker ticker; - transient @Nullable Set keySet; - transient @Nullable Collection values; - transient @Nullable Set> entrySet; + @Nullable Set keySet; + @Nullable Collection values; + @Nullable Set> entrySet; + AtomicReference>> refreshes; UnboundedLocalCache(Caffeine builder, boolean async) { this.data = new ConcurrentHashMap<>(builder.getInitialCapacity()); @@ -72,6 +78,7 @@ final class UnboundedLocalCache implements LocalCache { this.removalListener = builder.getRemovalListener(async); this.isRecordingStats = builder.isRecordingStats(); this.writer = builder.getCacheWriter(async); + this.refreshes = new AtomicReference<>(); this.executor = builder.getExecutor(); this.ticker = builder.getTicker(); } @@ -81,6 +88,11 @@ public boolean hasWriteTime() { return false; } + @Override + public Object referenceKey(K key) { + return key; + } + /* --------------- Cache --------------- */ @Override @@ -154,7 +166,19 @@ public RemovalListener removalListener() { @Override public void notifyRemoval(@Nullable K key, @Nullable V value, RemovalCause cause) { requireNonNull(removalListener(), "Notification should be guarded with a check"); - executor.execute(() -> removalListener().onRemoval(key, value, cause)); + Runnable task = () -> { + try { + removalListener().onRemoval(key, value, cause); + } catch (Throwable t) { + logger.log(Level.WARNING, "Exception thrown by removal listener", t); + } + }; + try { + executor.execute(task); + } catch (Throwable t) { + logger.log(Level.ERROR, "Exception thrown when submitting removal listener", t); + task.run(); + } } @Override @@ -167,6 +191,19 @@ public Executor executor() { return executor; } + @Override + @SuppressWarnings("NullAway") + public ConcurrentMap> refreshes() { + var pending = refreshes.get(); + if (pending == null) { + pending = new ConcurrentHashMap<>(); + if (!refreshes.compareAndSet(null, pending)) { + pending = refreshes.get(); + } + } + return pending; + } + @Override public Ticker expirationTicker() { return Ticker.disabledTicker(); @@ -339,6 +376,11 @@ public int size() { public void clear() { if (!hasRemovalListener() && (writer == CacheWriter.disabledWriter())) { data.clear(); + + var pending = refreshes.get(); + if (pending != null) { + pending.clear(); + } return; } for (K key : data.keySet()) { @@ -431,6 +473,10 @@ public void putAll(Map map) { }); } + var pending = refreshes.get(); + if (pending != null) { + pending.remove(key); + } if (hasRemovalListener() && (oldValue[0] != null)) { notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); } @@ -460,9 +506,16 @@ public boolean remove(Object key, Object value) { }); boolean removed = (oldValue[0] != null); - if (hasRemovalListener() && removed) { - notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); + if (removed) { + var pending = refreshes.get(); + if (pending != null) { + pending.remove(key); + } + if (hasRemovalListener()) { + notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); + } } + return removed; } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java index 114f18e4ff..0e7a225fc4 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java @@ -604,9 +604,9 @@ public void getAll_writerFails(LoadingCache cache, CacheContex public void refresh(LoadingCache cache, CacheContext context) { context.ticker().advance(1, TimeUnit.MINUTES); Integer key = context.firstKey(); - cache.refresh(key); + cache.refresh(key).join(); - long count = (cache.estimatedSize() == 1) ? context.initialSize() : 1; + long count = context.initialSize(); verifyListeners(context, verifier -> verifier.hasOnly(count, RemovalCause.EXPIRED)); verifyWriter(context, verifier -> { verifier.deleted(key, context.original().get(key), RemovalCause.EXPIRED); @@ -623,7 +623,7 @@ public void refresh(LoadingCache cache, CacheContext context) compute = Compute.SYNC, writer = Writer.EXCEPTIONAL, removalListener = Listener.REJECTING) public void refresh_writerFails(LoadingCache cache, CacheContext context) { context.ticker().advance(1, TimeUnit.HOURS); - cache.refresh(context.firstKey()); + cache.refresh(context.firstKey()).join(); context.disableRejectingCacheWriter(); context.ticker().advance(-1, TimeUnit.HOURS); assertThat(cache.asMap(), equalTo(context.original())); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java index 4a69124f2c..ac4e54d4b7 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java @@ -48,10 +48,10 @@ import com.github.benmanes.caffeine.cache.testing.CacheProvider; import com.github.benmanes.caffeine.cache.testing.CacheSpec; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor; -import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Implementation; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.Maximum; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Population; import com.github.benmanes.caffeine.cache.testing.CacheSpec.ReferenceType; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Writer; @@ -349,16 +349,29 @@ class Key { @CacheSpec(removalListener = { Listener.DEFAULT, Listener.REJECTING }) @Test(dataProvider = "caches", expectedExceptions = NullPointerException.class) public void refresh_null(LoadingCache cache, CacheContext context) { - cache.refresh(null); + cache.refresh(null).join(); + } + + @Test(dataProvider = "caches") + @CacheSpec(loader = Loader.ASYNC_INCOMPLETE, implementation = Implementation.Caffeine) + public void refresh_dedupe(LoadingCache cache, CacheContext context) { + var key = context.original().isEmpty() ? context.absentKey() : context.firstKey(); + var future1 = cache.refresh(key); + var future2 = cache.refresh(key); + assertThat(future1, is(sameInstance(future2))); + + future1.complete(-key); + assertThat(cache.getIfPresent(key), is(-key)); } @CheckNoWriter @Test(dataProvider = "caches") - @CacheSpec(implementation = Implementation.Caffeine, compute=Compute.SYNC, + @CacheSpec(implementation = Implementation.Caffeine, executor = CacheExecutor.DIRECT, loader = Loader.NULL, population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_remove(LoadingCache cache, CacheContext context) { - cache.refresh(context.firstKey()); + var future = cache.refresh(context.firstKey()); + assertThat(future.join(), is(nullValue())); assertThat(cache.estimatedSize(), is(context.initialSize() - 1)); assertThat(cache.getIfPresent(context.firstKey()), is(nullValue())); verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPLICIT)); @@ -370,26 +383,53 @@ public void refresh_remove(LoadingCache cache, CacheContext co removalListener = { Listener.DEFAULT, Listener.REJECTING }, population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_failure(LoadingCache cache, CacheContext context) { - // Shouldn't leak exception to caller and should retain stale entry - cache.refresh(context.absentKey()); - cache.refresh(context.firstKey()); + // Shouldn't leak exception to caller nor retain the future; should retain the stale entry + var future1 = cache.refresh(context.absentKey()); + var future2 = cache.refresh(context.firstKey()); + var future3 = cache.refresh(context.firstKey()); + assertThat(future2, is(not(sameInstance(future3)))); + assertThat(future1.isCompletedExceptionally(), is(true)); + assertThat(future2.isCompletedExceptionally(), is(true)); + assertThat(future3.isCompletedExceptionally(), is(true)); assertThat(cache.estimatedSize(), is(context.initialSize())); - verifyStats(context, verifier -> verifier.success(0).failures(2)); + verifyStats(context, verifier -> verifier.success(0).failures(3)); + } + + @CheckNoWriter + @Test(dataProvider = "caches") + @CacheSpec(loader = Loader.ASYNC_INCOMPLETE, implementation = Implementation.Caffeine, + removalListener = { Listener.DEFAULT, Listener.REJECTING }) + public void refresh_cancel(LoadingCache cache, CacheContext context) { + var key = context.original().isEmpty() ? context.absentKey() : context.firstKey(); + var future1 = cache.refresh(key); + assertThat(future1.isDone(), is(false)); + future1.cancel(true); + + var future2 = cache.refresh(key); + assertThat(future1, is(not(sameInstance(future2)))); + + future2.cancel(false); + assertThat(cache.asMap(), is(equalTo(context.original()))); } @CheckNoWriter @CacheSpec(loader = Loader.NULL) @Test(dataProvider = "caches") public void refresh_absent_null(LoadingCache cache, CacheContext context) { - cache.refresh(context.absentKey()); + var future = cache.refresh(context.absentKey()); + assertThat(future.join(), is(nullValue())); assertThat(cache.estimatedSize(), is(context.initialSize())); } @CheckNoWriter @Test(dataProvider = "caches") - @CacheSpec(removalListener = { Listener.DEFAULT, Listener.REJECTING }) + @CacheSpec( + maximumSize = Maximum.UNREACHABLE, + removalListener = { Listener.DEFAULT, Listener.REJECTING }, population = Population.SINGLETON) public void refresh_absent(LoadingCache cache, CacheContext context) { - cache.refresh(context.absentKey()); + Integer key = context.absentKey(); + var future = cache.refresh(key); + assertThat(future.join(), is(not(nullValue()))); assertThat(cache.estimatedSize(), is(1 + context.initialSize())); verifyStats(context, verifier -> verifier.hits(0).misses(0).success(1).failures(0)); @@ -403,7 +443,8 @@ public void refresh_absent(LoadingCache cache, CacheContext co population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_present_null(LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(nullValue())); } int count = context.firstMiddleLastKeys().size(); verifyStats(context, verifier -> verifier.hits(0).misses(0).success(0).failures(count)); @@ -421,7 +462,8 @@ public void refresh_present_null(LoadingCache cache, CacheCont public void refresh_present_sameValue( LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(context.original().get(key))); } int count = context.firstMiddleLastKeys().size(); verifyStats(context, verifier -> verifier.hits(0).misses(0).success(count).failures(0)); @@ -440,7 +482,9 @@ public void refresh_present_sameValue( public void refresh_present_differentValue( LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(key)); + // records a hit assertThat(cache.get(key), is(key)); } @@ -465,10 +509,12 @@ public void refresh_conflict(CacheContext context) { }); cache.put(key, original); - cache.refresh(key); + var future = cache.refresh(key); assertThat(cache.asMap().put(key, updated), is(original)); refresh.set(true); + future.join(); + await().until(() -> context.removalNotifications().size(), is(2)); List removed = context.removalNotifications().stream() .map(RemovalNotification::getValue).collect(toList()); @@ -493,11 +539,13 @@ public void refresh_invalidate(CacheContext context) { }); cache.put(key, original); - cache.refresh(key); + var future = cache.refresh(key); cache.invalidate(key); refresh.set(true); - await().until(() -> cache.getIfPresent(key), is(refreshed)); + future.join(); + + assertThat(cache.getIfPresent(key), is(nullValue())); verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPLICIT)); verifyStats(context, verifier -> verifier.success(1).failures(0)); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java index d34821f7d3..c2cde91d09 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java @@ -98,8 +98,8 @@ public void async_concurrent_bounded( Threads.runTest(cache, asyncOperations); } - @SuppressWarnings( - {"unchecked", "rawtypes", "ReturnValueIgnored", "SizeGreaterThanOrEqualsZero", "SelfEquals"}) + @SuppressWarnings({"unchecked", "rawtypes", "ReturnValueIgnored", + "FutureReturnValueIgnored", "SizeGreaterThanOrEqualsZero", "SelfEquals"}) List, Integer>> operations = ImmutableList.of( // LoadingCache (cache, key) -> { cache.get(key); }, diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java index 3daf4b06ce..79d1f1c85c 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java @@ -467,7 +467,7 @@ public void refresh(LoadingCache cache, CacheContext context) GcFinalization.awaitFullGc(); awaitFullCleanup(cache); - cache.refresh(key); + cache.refresh(key).join(); assertThat(cache.estimatedSize(), is(1L)); assertThat(cache.getIfPresent(key), is(not(value))); @@ -489,7 +489,7 @@ public void refresh_writerFails(LoadingCache cache, CacheConte Integer key = context.firstKey(); context.clear(); GcFinalization.awaitFullGc(); - cache.refresh(key); + cache.refresh(key).join(); context.disableRejectingCacheWriter(); assertThat(cache.asMap().isEmpty(), is(false)); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java index 48d8f953b7..2f82a0fa70 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java @@ -349,11 +349,52 @@ public void invalidate(CacheContext context) { cache.invalidate(key); refresh.set(true); - await().until(() -> cache.getIfPresent(key), is(refreshed)); - verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPLICIT)); + var executor = (TrackingExecutor) context.executor(); + await().until(() -> executor.submitted() == executor.completed()); + + if (context.implementation() == Implementation.Guava) { + // Guava does not protect against ABA when the entry was removed by allowing a possibly + // stale value from being inserted. + assertThat(cache.getIfPresent(key), is(refreshed)); + } else { + // Maintain linearizability by discarding the refresh if completing after an explicit removal + assertThat(cache.getIfPresent(key), is(nullValue())); + } + + verifyRemovalListener(context, verifier -> verifier.hasCount(1, RemovalCause.EXPLICIT)); verifyStats(context, verifier -> verifier.success(1).failures(0)); } + @Test(dataProvider = "caches") + @CacheSpec(implementation = Implementation.Caffeine, + loader = Loader.ASYNC_INCOMPLETE, refreshAfterWrite = Expire.ONE_MINUTE) + public void refresh(LoadingCache cache, CacheContext context) { + cache.put(context.absentKey(), context.absentValue()); + var executor = (TrackingExecutor) context.executor(); + int submitted; + + // trigger an automatic refresh + submitted = executor.submitted(); + context.ticker().advance(2, TimeUnit.MINUTES); + cache.getIfPresent(context.absentKey()); + assertThat(executor.submitted(), is(submitted + 1)); + + // return in-flight future + var future1 = cache.refresh(context.absentKey()); + assertThat(executor.submitted(), is(submitted + 1)); + future1.complete(-context.absentValue()); + + // trigger a new automatic refresh + submitted = executor.submitted(); + context.ticker().advance(2, TimeUnit.MINUTES); + cache.getIfPresent(context.absentKey()); + assertThat(executor.submitted(), is(submitted + 1)); + + var future2 = cache.refresh(context.absentKey()); + assertThat(future2, is(not(sameInstance(future1)))); + future2.cancel(true); + } + /* --------------- Policy --------------- */ @Test(dataProvider = "caches") diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java index bd55c04390..89ae2b128b 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java @@ -90,6 +90,7 @@ public Stresser() { status(); } + @SuppressWarnings("FutureReturnValueIgnored") public void run() throws InterruptedException { ConcurrentTestHarness.timeTasks(operation.maxThreads, () -> { int index = ThreadLocalRandom.current().nextInt(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java new file mode 100644 index 0000000000..60f4369d52 --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache.issues; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.testing.FakeTicker; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.testing.TestingExecutors; + +/** + * Issue #193: Invalidate before Refresh completes still stores value + *

+ * When a refresh starts before an invalidate and completes afterwards, the entry is inserted into + * the cache. This breaks linearizability assumptions, as the invalidation may be to ensure that + * the cache does not hold stale data that refresh will have observed in its load. This undesirable + * behavior is also present in Guava, so the stricter handling is an intentional deviation. + * + * @author boschb (Robert Bosch) + */ +public final class Issue193Test { + private final AtomicLong counter = new AtomicLong(0); + private final FakeTicker ticker = new FakeTicker(); + + private ListenableFutureTask loadingTask; + + private final AsyncCacheLoader loader = + (key, exec) -> { + // Fools the cache into thinking there is a future that's not immediately ready. + // (The Cache has optimizations for this that we want to avoid) + loadingTask = ListenableFutureTask.create(counter::getAndIncrement); + var f = new CompletableFuture(); + loadingTask.addListener(() -> { + f.complete(Futures.getUnchecked(loadingTask)); + }, exec); + return f; + }; + + private final String key = Issue193Test.class.getSimpleName(); + + /** This ensures that any outstanding async loading is completed as well */ + private long loadGet(AsyncLoadingCache cache, String key) + throws InterruptedException, ExecutionException { + CompletableFuture future = cache.get(key); + if (!loadingTask.isDone()) { + loadingTask.run(); + } + return future.get(); + } + + @Test + public void invalidateDuringRefreshRemovalCheck() throws Exception { + List removed = new ArrayList<>(); + AsyncLoadingCache cache = + Caffeine.newBuilder() + .ticker(ticker::read) + .executor(TestingExecutors.sameThreadScheduledExecutor()) + .removalListener((key, value, reason) -> removed.add(value)) + .refreshAfterWrite(10, TimeUnit.NANOSECONDS) + .buildAsync(loader); + + // Load so there is an initial value. + assertThat(loadGet(cache, key), is(0L)); + + ticker.advance(11); // Refresh should fire on next access + assertThat(cache.synchronous().getIfPresent(key), is(0L)); // Old value + + cache.synchronous().invalidate(key); // Invalidate key entirely + assertThat(cache.synchronous().getIfPresent(key), is(nullValue())); // No value in cache (good) + loadingTask.run(); // Completes refresh + + // FIXME: java.lang.AssertionError: Not true that <1> is null + assertThat(cache.synchronous().getIfPresent(key), is(nullValue())); // Value in cache (bad) + + // FIXME: Maybe? This is what I wanted to actually test :) + assertThat(removed, is(List.of(0L, 1L))); // 1L was sent to removalListener anyways + } +} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java index dab68c7b42..0ca206ec26 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java @@ -532,6 +532,20 @@ enum Loader implements CacheLoader { @Override public Map loadAll(Iterable keys) { throw new IllegalStateException(); } + }, + ASYNC_INCOMPLETE { + @Override public Integer load(Integer key) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture asyncLoad(Integer key, Executor executor) { + executor.execute(() -> {}); + return new CompletableFuture<>(); + } + @Override public CompletableFuture asyncReload( + Integer key, Integer oldValue, Executor executor) { + executor.execute(() -> {}); + return new CompletableFuture<>(); + } }; private final boolean bulk; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java index 1e32afd4bc..029b1423ed 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -65,6 +66,7 @@ */ @SuppressWarnings("PreferJavaTimeOverload") public final class GuavaCacheFromContext { + private static final ThreadLocal error = new ThreadLocal<>(); private GuavaCacheFromContext() {} @@ -489,8 +491,19 @@ public Map getAll(Iterable keys) { } @Override - public void refresh(K key) { + public CompletableFuture refresh(K key) { + error.set(null); cache.refresh(key); + + var e = error.get(); + if (e == null) { + return CompletableFuture.completedFuture(cache.asMap().get(key)); + } else if (e instanceof CacheMissException) { + return CompletableFuture.completedFuture(null); + } + + error.remove(); + return CompletableFuture.failedFuture(e); } } @@ -542,11 +555,17 @@ static class SingleLoader extends CacheLoader implements Serializabl @Override public V load(K key) throws Exception { - V value = delegate.load(key); - if (value == null) { - throw new CacheMissException(); + try { + error.set(null); + V value = delegate.load(key); + if (value == null) { + throw new CacheMissException(); + } + return value; + } catch (Exception e) { + error.set(e); + throw e; } - return value; } } diff --git a/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaLoadingCache.java b/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaLoadingCache.java index 3b6fc2b14e..5377b8f4dc 100644 --- a/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaLoadingCache.java +++ b/guava/src/main/java/com/github/benmanes/caffeine/guava/CaffeinatedGuavaLoadingCache.java @@ -117,6 +117,7 @@ public V apply(@NonNull K key) { } @Override + @SuppressWarnings("FutureReturnValueIgnored") public void refresh(K key) { cache.refresh(key); }