diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index 59c557ce23..0a73697373 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -60,6 +60,7 @@ import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -220,10 +221,10 @@ abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef final Executor executor; final boolean isAsync; - // The collection views - @Nullable transient Set keySet; - @Nullable transient Collection values; - @Nullable transient Set> entrySet; + @Nullable Set keySet; + @Nullable Collection values; + @Nullable Set> entrySet; + AtomicReference>> refreshes; /** Creates an instance based on the builder's configuration. */ protected BoundedLocalCache(Caffeine builder, @@ -233,6 +234,7 @@ protected BoundedLocalCache(Caffeine builder, executor = builder.getExecutor(); writer = builder.getCacheWriter(); evictionLock = new ReentrantLock(); + refreshes = new AtomicReference<>(); weigher = builder.getWeigher(isAsync); drainBuffersTask = new PerformCleanupTask(this); nodeFactory = NodeFactory.newFactory(builder, isAsync); @@ -288,11 +290,29 @@ public final Executor executor() { return executor; } + @Override + @SuppressWarnings("NullAway") + public ConcurrentMap> refreshes() { + var pending = refreshes.get(); + if (pending == null) { + pending = new ConcurrentHashMap<>(); + if (!refreshes.compareAndSet(null, pending)) { + pending = refreshes.get(); + } + } + return pending; + } + /** Returns whether this cache notifies a writer when an entry is modified. */ protected boolean hasWriter() { return (writer != CacheWriter.disabledWriter()); } + @Override + public Object referenceKey(K key) { + return nodeFactory.newLookupKey(key); + } + /* --------------- Stats Support --------------- */ @Override @@ -899,8 +919,9 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { boolean[] removed = new boolean[1]; boolean[] resurrect = new boolean[1]; RemovalCause[] actualCause = new RemovalCause[1]; + Object keyReference = node.getKeyReference(); - data.computeIfPresent(node.getKeyReference(), (k, n) -> { + data.computeIfPresent(keyReference, (k, n) -> { if (n != node) { return n; } @@ -965,6 +986,12 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { if (removed[0]) { statsCounter().recordEviction(node.getWeight(), actualCause[0]); + + var pending = refreshes.get(); + if (pending != null) { + pending.remove(keyReference); + } + if (hasRemovalListener()) { // Notify the listener only if the entry was evicted. This must be performed as the last // step during eviction to safe guard against the executor rejecting the notification task. @@ -1172,51 +1199,60 @@ void refreshIfNeeded(Node node, long now) { if (!refreshAfterWrite()) { return; } + K key; V oldValue; - long oldWriteTime = node.getWriteTime(); - long refreshWriteTime = (now + ASYNC_EXPIRY); - if (((now - oldWriteTime) > refreshAfterWriteNanos()) + long writeTime = node.getWriteTime(); + Object keyReference = node.getKeyReference(); + if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null) && ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null) - && node.casWriteTime(oldWriteTime, refreshWriteTime)) { - try { - CompletableFuture refreshFuture; - long startTime = statsTicker().read(); + && !refreshes().containsKey(keyReference)) { + long[] startTime = new long[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] refreshFuture = new CompletableFuture[1]; + refreshes().computeIfAbsent(keyReference, k -> { + startTime[0] = statsTicker().read(); if (isAsync) { @SuppressWarnings("unchecked") CompletableFuture future = (CompletableFuture) oldValue; if (Async.isReady(future)) { @SuppressWarnings("NullAway") - CompletableFuture refresh = future.thenCompose(value -> - cacheLoader.asyncReload(key, value, executor)); - refreshFuture = refresh; + var refresh = cacheLoader.asyncReload(key, future.join(), executor); + refreshFuture[0] = refresh; } else { // no-op if load is pending - node.casWriteTime(refreshWriteTime, oldWriteTime); - return; + return future; } } else { @SuppressWarnings("NullAway") - CompletableFuture refresh = cacheLoader.asyncReload(key, oldValue, executor); - refreshFuture = refresh; + var refresh = cacheLoader.asyncReload(key, oldValue, executor); + refreshFuture[0] = refresh; } - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = statsTicker().read() - startTime; + return refreshFuture[0]; + }); + + if (refreshFuture[0] != null) { + refreshFuture[0].whenComplete((newValue, error) -> { + long loadTime = statsTicker().read() - startTime[0]; if (error != null) { logger.log(Level.WARNING, "Exception thrown during refresh", error); - node.casWriteTime(refreshWriteTime, oldWriteTime); + refreshes().remove(keyReference, refreshFuture[0]); statsCounter().recordLoadFailure(loadTime); return; } @SuppressWarnings("unchecked") - V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue; + V value = (isAsync && (newValue != null)) ? (V) refreshFuture[0] : newValue; boolean[] discard = new boolean[1]; compute(key, (k, currentValue) -> { if (currentValue == null) { - return value; - } else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) { + if (value == null) { + return null; + } else if (refreshes().get(key) == refreshFuture[0]) { + return value; + } + } else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) { return value; } discard[0] = true; @@ -1231,10 +1267,9 @@ void refreshIfNeeded(Node node, long now) { } else { statsCounter().recordLoadSuccess(loadTime); } + + refreshes().remove(keyReference, refreshFuture[0]); }); - } catch (Throwable t) { - node.casWriteTime(refreshWriteTime, oldWriteTime); - logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t); } } } @@ -1782,8 +1817,12 @@ public void clear() { } // Discard all entries - for (Node node : data.values()) { - removeNode(node, now); + var pending = refreshes.get(); + for (var entry : data.entrySet()) { + removeNode(entry.getValue(), now); + if (pending != null) { + pending.remove(entry.getKey()); + } } // Discard all pending reads @@ -2099,8 +2138,9 @@ public Map getAllPresent(Iterable keys) { @SuppressWarnings("unchecked") V[] oldValue = (V[]) new Object[1]; RemovalCause[] cause = new RemovalCause[1]; + Object lookupKey = nodeFactory.newLookupKey(key); - data.computeIfPresent(nodeFactory.newLookupKey(key), (k, n) -> { + data.computeIfPresent(lookupKey, (k, n) -> { synchronized (n) { oldValue[0] = n.getValue(); if (oldValue[0] == null) { @@ -2118,6 +2158,11 @@ public Map getAllPresent(Iterable keys) { }); if (cause[0] != null) { + var pending = refreshes.get(); + if (pending != null) { + pending.remove(lookupKey); + } + afterWrite(new RemovalTask(node[0])); if (hasRemovalListener()) { notifyRemoval(castKey, oldValue[0], cause[0]); @@ -2140,8 +2185,9 @@ public boolean remove(Object key, Object value) { @SuppressWarnings("unchecked") V[] oldValue = (V[]) new Object[1]; RemovalCause[] cause = new RemovalCause[1]; + Object lookupKey = nodeFactory.newLookupKey(key); - data.computeIfPresent(nodeFactory.newLookupKey(key), (kR, node) -> { + data.computeIfPresent(lookupKey, (kR, node) -> { synchronized (node) { oldKey[0] = node.getKey(); oldValue[0] = node.getValue(); @@ -2163,7 +2209,13 @@ public boolean remove(Object key, Object value) { if (removed[0] == null) { return false; - } else if (hasRemovalListener()) { + } + + var pending = refreshes.get(); + if (pending != null) { + pending.remove(lookupKey); + } + if (hasRemovalListener()) { notifyRemoval(oldKey[0], oldValue[0], cause[0]); } afterWrite(new RemovalTask(removed[0])); @@ -2582,15 +2634,14 @@ public void replaceAll(BiFunction function) { if (expiresAfterWrite() || (weightedDifference != 0)) { afterWrite(new UpdateTask(node, weightedDifference)); } else { - if (cause[0] == null) { - if (!isComputingAsync(node)) { - tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]); - setAccessTime(node, now[0]); - } - } else if (cause[0] == RemovalCause.COLLECTED) { - scheduleDrainBuffers(); + if ((cause[0] == null) && !isComputingAsync(node)) { + tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]); + setAccessTime(node, now[0]); } afterRead(node, now[0], /* recordHit */ false); + if ((cause[0] != null) && cause[0].wasEvicted()) { + scheduleDrainBuffers(); + } } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java index c0db010547..81c37db5a8 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java @@ -16,6 +16,7 @@ package com.github.benmanes.caffeine.cache; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.checkerframework.checker.nullness.qual.NonNull; @@ -101,9 +102,13 @@ public interface LoadingCache extends Cache { * Caches loaded by a {@link CacheLoader} will call {@link CacheLoader#reload} if the cache * currently contains a value for the {@code key}, and {@link CacheLoader#load} otherwise. Loading * is asynchronous by delegating to the default executor. + *

+ * Returns an existing future without doing anything if another thread is currently loading the + * value for {@code key}. * * @param key key with which a value may be associated + * @return the future that is loading the value * @throws NullPointerException if the specified key is null */ - void refresh(@NonNull K key); + CompletableFuture refresh(@NonNull K key); } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java index b11a7dc500..f9e01979a9 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java @@ -129,45 +129,109 @@ public Map getAll(Iterable keys) { } @Override - @SuppressWarnings("FutureReturnValueIgnored") - public void refresh(K key) { + public CompletableFuture refresh(K key) { requireNonNull(key); - long[] writeTime = new long[1]; - CompletableFuture oldValueFuture = asyncCache.cache().getIfPresentQuietly(key, writeTime); + Object keyReference = asyncCache.cache().referenceKey(key); + for (;;) { + var future = tryOptimisticRefresh(key, keyReference); + if (future == null) { + future = tryComputeRefresh(key, keyReference); + } + if (future != null) { + return future; + } + } + } + + /** Attempts to avoid a reload if the entry is absent, or a load or reload is in-flight. */ + private @Nullable CompletableFuture tryOptimisticRefresh(K key, Object keyReference) { + // If a refresh is in-flight, then return it directly. If completed and not yet removed, then + // remove to trigger a new reload. + @SuppressWarnings("unchecked") + var lastRefresh = (CompletableFuture) asyncCache.cache().refreshes().get(keyReference); + if (lastRefresh != null) { + if (Async.isReady(lastRefresh)) { + asyncCache.cache().refreshes().remove(keyReference, lastRefresh); + } else { + return lastRefresh; + } + } + + // If the entry is absent then perform a new load, else if in-flight then return it + var oldValueFuture = asyncCache.cache().getIfPresentQuietly(key, /* writeTime */ new long[1]); if ((oldValueFuture == null) || (oldValueFuture.isDone() && oldValueFuture.isCompletedExceptionally())) { - asyncCache.get(key, asyncCache.loader::asyncLoad, /* recordStats */ false); - return; + if (oldValueFuture != null) { + asyncCache.cache().remove(key, asyncCache); + } + var future = asyncCache.get(key, + asyncCache.loader::asyncLoad, /* recordStats */ false); + @SuppressWarnings("unchecked") + var prior = (CompletableFuture) asyncCache.cache() + .refreshes().putIfAbsent(keyReference, future); + return (prior == null) ? future : prior; } else if (!oldValueFuture.isDone()) { // no-op if load is pending - return; + return oldValueFuture; } - oldValueFuture.thenAccept(oldValue -> { - long now = asyncCache.cache().statsTicker().read(); - CompletableFuture refreshFuture = (oldValue == null) - ? asyncCache.loader.asyncLoad(key, asyncCache.cache().executor()) - : asyncCache.loader.asyncReload(key, oldValue, asyncCache.cache().executor()); - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = asyncCache.cache().statsTicker().read() - now; + // Fallback to the slow path, possibly retrying + return null; + } + + /** Begins a refresh if the entry has materialized and no reload is in-flight. */ + @SuppressWarnings("FutureReturnValueIgnored") + private @Nullable CompletableFuture tryComputeRefresh(K key, Object keyReference) { + long[] startTime = new long[1]; + long[] writeTime = new long[1]; + boolean[] refreshed = new boolean[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] oldValueFuture = new CompletableFuture[1]; + var future = asyncCache.cache().refreshes().computeIfAbsent(keyReference, k -> { + oldValueFuture[0] = asyncCache.cache().getIfPresentQuietly(key, writeTime); + V oldValue = Async.getIfReady(oldValueFuture[0]); + if (oldValue == null) { + return null; + } + + refreshed[0] = true; + startTime[0] = asyncCache.cache().statsTicker().read(); + return asyncCache.loader.asyncReload(key, oldValue, asyncCache.cache().executor()); + }); + + if (future == null) { + // Retry the optimistic path + return null; + } + + @SuppressWarnings("unchecked") + var castedFuture = (CompletableFuture) future; + if (refreshed[0]) { + castedFuture.whenComplete((newValue, error) -> { + long loadTime = asyncCache.cache().statsTicker().read() - startTime[0]; if (error != null) { - asyncCache.cache().statsCounter().recordLoadFailure(loadTime); logger.log(Level.WARNING, "Exception thrown during refresh", error); + asyncCache.cache().refreshes().remove(keyReference, castedFuture); + asyncCache.cache().statsCounter().recordLoadFailure(loadTime); return; } boolean[] discard = new boolean[1]; - asyncCache.cache().compute(key, (k, currentValue) -> { + asyncCache.cache().compute(key, (ignored, currentValue) -> { if (currentValue == null) { - return (newValue == null) ? null : refreshFuture; - } else if (currentValue == oldValueFuture) { + if (newValue == null) { + return null; + } else if (asyncCache.cache().refreshes().get(key) == castedFuture) { + return castedFuture; + } + } else if (currentValue == oldValueFuture[0]) { long expectedWriteTime = writeTime[0]; if (asyncCache.cache().hasWriteTime()) { asyncCache.cache().getIfPresentQuietly(key, writeTime); } if (writeTime[0] == expectedWriteTime) { - return (newValue == null) ? null : refreshFuture; + return (newValue == null) ? null : castedFuture; } } discard[0] = true; @@ -175,15 +239,18 @@ public void refresh(K key) { }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); if (discard[0] && asyncCache.cache().hasRemovalListener()) { - asyncCache.cache().notifyRemoval(key, refreshFuture, RemovalCause.REPLACED); + asyncCache.cache().notifyRemoval(key, castedFuture, RemovalCause.REPLACED); } if (newValue == null) { asyncCache.cache().statsCounter().recordLoadFailure(loadTime); } else { asyncCache.cache().statsCounter().recordLoadSuccess(loadTime); } + + asyncCache.cache().refreshes().remove(keyReference, castedFuture); }); - }); + } + return castedFuture; } } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java index 9ce22216c2..206055808a 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java @@ -16,6 +16,7 @@ package com.github.benmanes.caffeine.cache; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -52,6 +53,9 @@ interface LocalCache extends ConcurrentMap { /** Returns the {@link Executor} used by this cache. */ @NonNull Executor executor(); + /** Returns the map of in-flight refresh operations. */ + ConcurrentMap> refreshes(); + /** Returns whether the cache captures the write time of the entry. */ boolean hasWriteTime(); @@ -64,6 +68,9 @@ interface LocalCache extends ConcurrentMap { /** See {@link Cache#estimatedSize()}. */ long estimatedSize(); + /** Returns the reference key. */ + Object referenceKey(K key); + /** * See {@link Cache#getIfPresent(Object)}. This method differs by accepting a parameter of whether * to record the hit and miss statistics based on the success of this operation. diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java index 5e12bdf397..ccc265f986 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java @@ -89,49 +89,78 @@ default Map loadSequentially(Iterable keys) { @Override @SuppressWarnings("FutureReturnValueIgnored") - default void refresh(K key) { + default CompletableFuture refresh(K key) { requireNonNull(key); long[] writeTime = new long[1]; - long startTime = cache().statsTicker().read(); - V oldValue = cache().getIfPresentQuietly(key, writeTime); - CompletableFuture refreshFuture = (oldValue == null) - ? cacheLoader().asyncLoad(key, cache().executor()) - : cacheLoader().asyncReload(key, oldValue, cache().executor()); - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = cache().statsTicker().read() - startTime; - if (error != null) { - logger.log(Level.WARNING, "Exception thrown during refresh", error); - cache().statsCounter().recordLoadFailure(loadTime); - return; + long[] startTime = new long[1]; + @SuppressWarnings("unchecked") + V[] oldValue = (V[]) new Object[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] reloading = new CompletableFuture[1]; + Object keyReference = cache().referenceKey(key); + + var future = cache().refreshes().compute(keyReference, (k, existing) -> { + if ((existing != null) && !Async.isReady(existing)) { + return existing; } - boolean[] discard = new boolean[1]; - cache().compute(key, (k, currentValue) -> { - if (currentValue == null) { - return newValue; - } else if (currentValue == oldValue) { - long expectedWriteTime = writeTime[0]; - if (cache().hasWriteTime()) { - cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return newValue; + startTime[0] = cache().statsTicker().read(); + oldValue[0] = cache().getIfPresentQuietly(key, writeTime); + CompletableFuture refreshFuture = (oldValue[0] == null) + ? cacheLoader().asyncLoad(key, cache().executor()) + : cacheLoader().asyncReload(key, oldValue[0], cache().executor()); + reloading[0] = refreshFuture; + return refreshFuture; + }); + + if (reloading[0] != null) { + reloading[0].whenComplete((newValue, error) -> { + long loadTime = cache().statsTicker().read() - startTime[0]; + if (error != null) { + logger.log(Level.WARNING, "Exception thrown during refresh", error); + cache().refreshes().remove(keyReference, reloading[0]); + cache().statsCounter().recordLoadFailure(loadTime); + return; + } + + boolean[] discard = new boolean[1]; + cache().compute(key, (k, currentValue) -> { + if (currentValue == null) { + if (newValue == null) { + return null; + } else if (cache().refreshes().get(keyReference) == reloading[0]) { + return newValue; + } + } else if (currentValue == oldValue[0]) { + long expectedWriteTime = writeTime[0]; + if (cache().hasWriteTime()) { + cache().getIfPresentQuietly(key, writeTime); + } + if (writeTime[0] == expectedWriteTime) { + return newValue; + } } + discard[0] = true; + return currentValue; + }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); + + if (discard[0] && cache().hasRemovalListener()) { + cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); + } + if (newValue == null) { + cache().statsCounter().recordLoadFailure(loadTime); + } else { + cache().statsCounter().recordLoadSuccess(loadTime); } - discard[0] = true; - return currentValue; - }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - if (discard[0] && cache().hasRemovalListener()) { - cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); - } - if (newValue == null) { - cache().statsCounter().recordLoadFailure(loadTime); - } else { - cache().statsCounter().recordLoadSuccess(loadTime); - } - }); + cache().refreshes().remove(keyReference, reloading[0]); + }); + } + + @SuppressWarnings("unchecked") + CompletableFuture castedFuture = (CompletableFuture) future; + return castedFuture; } /** Returns a mapping function that adapts to {@link CacheLoader#load}. */ diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java index 70e12f2fb7..6974bd5770 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -61,15 +62,17 @@ final class UnboundedLocalCache implements LocalCache { final Executor executor; final Ticker ticker; - transient @Nullable Set keySet; - transient @Nullable Collection values; - transient @Nullable Set> entrySet; + @Nullable Set keySet; + @Nullable Collection values; + @Nullable Set> entrySet; + AtomicReference>> refreshes; UnboundedLocalCache(Caffeine builder, boolean async) { this.data = new ConcurrentHashMap<>(builder.getInitialCapacity()); this.statsCounter = builder.getStatsCounterSupplier().get(); this.removalListener = builder.getRemovalListener(async); this.isRecordingStats = builder.isRecordingStats(); + this.refreshes = new AtomicReference<>(); this.writer = builder.getCacheWriter(); this.executor = builder.getExecutor(); this.ticker = builder.getTicker(); @@ -80,6 +83,11 @@ public boolean hasWriteTime() { return false; } + @Override + public Object referenceKey(K key) { + return key; + } + /* --------------- Cache --------------- */ @Override @@ -166,6 +174,19 @@ public Executor executor() { return executor; } + @Override + @SuppressWarnings("NullAway") + public ConcurrentMap> refreshes() { + var pending = refreshes.get(); + if (pending == null) { + pending = new ConcurrentHashMap<>(); + if (!refreshes.compareAndSet(null, pending)) { + pending = refreshes.get(); + } + } + return pending; + } + @Override public Ticker expirationTicker() { return Ticker.disabledTicker(); @@ -338,6 +359,11 @@ public int size() { public void clear() { if (!hasRemovalListener() && (writer == CacheWriter.disabledWriter())) { data.clear(); + + var pending = refreshes.get(); + if (pending != null) { + pending.clear(); + } return; } for (K key : data.keySet()) { @@ -430,6 +456,10 @@ public void putAll(Map map) { }); } + var pending = refreshes.get(); + if (pending != null) { + pending.remove(key); + } if (hasRemovalListener() && (oldValue[0] != null)) { notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); } @@ -459,9 +489,16 @@ public boolean remove(Object key, Object value) { }); boolean removed = (oldValue[0] != null); - if (hasRemovalListener() && removed) { - notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); + if (removed) { + var pending = refreshes.get(); + if (pending != null) { + pending.remove(key); + } + if (hasRemovalListener()) { + notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); + } } + return removed; } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java index 8852b361f2..f701dfcb76 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java @@ -602,9 +602,9 @@ public void getAll_writerFails(LoadingCache cache, CacheContex public void refresh(LoadingCache cache, CacheContext context) { context.ticker().advance(1, TimeUnit.MINUTES); Integer key = context.firstKey(); - cache.refresh(key); + cache.refresh(key).join(); - long count = (cache.estimatedSize() == 1) ? context.initialSize() : 1; + long count = context.initialSize(); assertThat(cache, hasRemovalNotifications(context, count, RemovalCause.EXPIRED)); verifyWriter(context, (verifier, writer) -> { verifier.deleted(key, context.original().get(key), RemovalCause.EXPIRED); @@ -621,7 +621,7 @@ public void refresh(LoadingCache cache, CacheContext context) compute = Compute.SYNC, writer = Writer.EXCEPTIONAL, removalListener = Listener.REJECTING) public void refresh_writerFails(LoadingCache cache, CacheContext context) { context.ticker().advance(1, TimeUnit.HOURS); - cache.refresh(context.firstKey()); + cache.refresh(context.firstKey()).join(); context.disableRejectingCacheWriter(); context.ticker().advance(-1, TimeUnit.HOURS); assertThat(cache.asMap(), equalTo(context.original())); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java index 920ddf41d9..016aee3c8b 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java @@ -56,6 +56,7 @@ import com.github.benmanes.caffeine.cache.testing.CacheSpec.Implementation; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.Maximum; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Population; import com.github.benmanes.caffeine.cache.testing.CacheSpec.ReferenceType; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Writer; @@ -361,7 +362,7 @@ class Key { @CacheSpec(removalListener = { Listener.DEFAULT, Listener.REJECTING }) @Test(dataProvider = "caches", expectedExceptions = NullPointerException.class) public void refresh_null(LoadingCache cache, CacheContext context) { - cache.refresh(null); + cache.refresh(null).join(); } @CheckNoWriter @@ -370,7 +371,8 @@ public void refresh_null(LoadingCache cache, CacheContext cont executor = CacheExecutor.DIRECT, loader = Loader.NULL, population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_remove(LoadingCache cache, CacheContext context) { - cache.refresh(context.firstKey()); + var future = cache.refresh(context.firstKey()); + assertThat(future.join(), is(nullValue())); assertThat(cache.estimatedSize(), is(context.initialSize() - 1)); assertThat(cache.getIfPresent(context.firstKey()), is(nullValue())); assertThat(cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); @@ -382,9 +384,11 @@ public void refresh_remove(LoadingCache cache, CacheContext co removalListener = { Listener.DEFAULT, Listener.REJECTING }, population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_failure(LoadingCache cache, CacheContext context) { - // Shouldn't leak exception to caller and should retain stale entry - cache.refresh(context.absentKey()); - cache.refresh(context.firstKey()); + // Shouldn't leak exception to caller and should retain the stale entry + var future1 = cache.refresh(context.absentKey()); + var future2 = cache.refresh(context.firstKey()); + assertThat(future1.isCompletedExceptionally(), is(true)); + assertThat(future2.isCompletedExceptionally(), is(true)); assertThat(cache.estimatedSize(), is(context.initialSize())); assertThat(context, both(hasLoadSuccessCount(0)).and(hasLoadFailureCount(2))); } @@ -393,15 +397,20 @@ public void refresh_failure(LoadingCache cache, CacheContext c @CacheSpec(loader = Loader.NULL) @Test(dataProvider = "caches") public void refresh_absent_null(LoadingCache cache, CacheContext context) { - cache.refresh(context.absentKey()); + var future = cache.refresh(context.absentKey()); + assertThat(future.join(), is(nullValue())); assertThat(cache.estimatedSize(), is(context.initialSize())); } @CheckNoWriter @Test(dataProvider = "caches") - @CacheSpec(removalListener = { Listener.DEFAULT, Listener.REJECTING }) + @CacheSpec( + maximumSize = Maximum.UNREACHABLE, + removalListener = { Listener.DEFAULT, Listener.REJECTING }, population = Population.SINGLETON) public void refresh_absent(LoadingCache cache, CacheContext context) { - cache.refresh(context.absentKey()); + Integer key = context.absentKey(); + var future = cache.refresh(key); + assertThat(future.join(), is(not(nullValue()))); assertThat(cache.estimatedSize(), is(1 + context.initialSize())); assertThat(context, both(hasMissCount(0)).and(hasHitCount(0))); assertThat(context, both(hasLoadSuccessCount(1)).and(hasLoadFailureCount(0))); @@ -416,7 +425,8 @@ public void refresh_absent(LoadingCache cache, CacheContext co population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_present_null(LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(nullValue())); } int count = context.firstMiddleLastKeys().size(); assertThat(context, both(hasMissCount(0)).and(hasHitCount(0))); @@ -435,7 +445,8 @@ public void refresh_present_null(LoadingCache cache, CacheCont public void refresh_present_sameValue( LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(context.original().get(key))); } int count = context.firstMiddleLastKeys().size(); assertThat(context, both(hasMissCount(0)).and(hasHitCount(0))); @@ -455,7 +466,9 @@ public void refresh_present_sameValue( public void refresh_present_differentValue( LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(key)); + // records a hit assertThat(cache.get(key), is(key)); } @@ -481,10 +494,12 @@ public void refresh_conflict(CacheContext context) { }); cache.put(key, original); - cache.refresh(key); + var future = cache.refresh(key); assertThat(cache.asMap().put(key, updated), is(original)); refresh.set(true); + future.join(); + await().until(() -> context.consumedNotifications().size(), is(2)); List removed = context.consumedNotifications().stream() .map(RemovalNotification::getValue).collect(toList()); @@ -509,11 +524,13 @@ public void refresh_invalidate(CacheContext context) { }); cache.put(key, original); - cache.refresh(key); + var future = cache.refresh(key); cache.invalidate(key); refresh.set(true); - await().until(() -> cache.getIfPresent(key), is(refreshed)); + future.join(); + + assertThat(cache.getIfPresent(key), is(nullValue())); await().until(() -> cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); assertThat(context, both(hasLoadSuccessCount(1)).and(hasLoadFailureCount(0))); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java index 121899daaf..9c7e2f97b2 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java @@ -94,8 +94,8 @@ public void async_concurrent_bounded( Threads.runTest(cache, asyncOperations); } - @SuppressWarnings( - {"unchecked", "rawtypes", "ReturnValueIgnored", "SizeGreaterThanOrEqualsZero", "SelfEquals"}) + @SuppressWarnings({"unchecked", "rawtypes", "ReturnValueIgnored", + "FutureReturnValueIgnored", "SizeGreaterThanOrEqualsZero", "SelfEquals"}) List, Integer>> operations = ImmutableList.of( // LoadingCache (cache, key) -> { cache.get(key); }, diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java index 5dbddf0042..a37b752383 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java @@ -465,7 +465,7 @@ public void refresh(LoadingCache cache, CacheContext context) GcFinalization.awaitFullGc(); awaitFullCleanup(cache); - cache.refresh(key); + cache.refresh(key).join(); assertThat(cache.estimatedSize(), is(1L)); assertThat(cache.getIfPresent(key), is(not(value))); @@ -487,7 +487,7 @@ public void refresh_writerFails(LoadingCache cache, CacheConte Integer key = context.firstKey(); context.clear(); GcFinalization.awaitFullGc(); - cache.refresh(key); + cache.refresh(key).join(); context.disableRejectingCacheWriter(); assertThat(cache.asMap().isEmpty(), is(false)); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java index 3c774539d2..9866683068 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java @@ -351,7 +351,18 @@ public void invalidate(CacheContext context) { cache.invalidate(key); refresh.set(true); - await().until(() -> cache.getIfPresent(key), is(refreshed)); + var executor = (TrackingExecutor) context.executor(); + await().until(() -> executor.submitted() == executor.completed()); + + if (context.implementation() == Implementation.Guava) { + // Guava does not protect against ABA when the entry was removed by allowing a possibly + // stale value from being inserted. + assertThat(cache.getIfPresent(key), is(refreshed)); + } else { + // Maintain linearizability by discarding the refresh if completing after an explicit removal + assertThat(cache.getIfPresent(key), is(nullValue())); + } + await().until(() -> cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); assertThat(context, both(hasLoadSuccessCount(1)).and(hasLoadFailureCount(0))); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java index bd55c04390..89ae2b128b 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java @@ -90,6 +90,7 @@ public Stresser() { status(); } + @SuppressWarnings("FutureReturnValueIgnored") public void run() throws InterruptedException { ConcurrentTestHarness.timeTasks(operation.maxThreads, () -> { int index = ThreadLocalRandom.current().nextInt(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java new file mode 100644 index 0000000000..60f4369d52 --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java @@ -0,0 +1,104 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache.issues; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.testing.FakeTicker; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.testing.TestingExecutors; + +/** + * Issue #193: Invalidate before Refresh completes still stores value + *

+ * When a refresh starts before an invalidate and completes afterwards, the entry is inserted into + * the cache. This breaks linearizability assumptions, as the invalidation may be to ensure that + * the cache does not hold stale data that refresh will have observed in its load. This undesirable + * behavior is also present in Guava, so the stricter handling is an intentional deviation. + * + * @author boschb (Robert Bosch) + */ +public final class Issue193Test { + private final AtomicLong counter = new AtomicLong(0); + private final FakeTicker ticker = new FakeTicker(); + + private ListenableFutureTask loadingTask; + + private final AsyncCacheLoader loader = + (key, exec) -> { + // Fools the cache into thinking there is a future that's not immediately ready. + // (The Cache has optimizations for this that we want to avoid) + loadingTask = ListenableFutureTask.create(counter::getAndIncrement); + var f = new CompletableFuture(); + loadingTask.addListener(() -> { + f.complete(Futures.getUnchecked(loadingTask)); + }, exec); + return f; + }; + + private final String key = Issue193Test.class.getSimpleName(); + + /** This ensures that any outstanding async loading is completed as well */ + private long loadGet(AsyncLoadingCache cache, String key) + throws InterruptedException, ExecutionException { + CompletableFuture future = cache.get(key); + if (!loadingTask.isDone()) { + loadingTask.run(); + } + return future.get(); + } + + @Test + public void invalidateDuringRefreshRemovalCheck() throws Exception { + List removed = new ArrayList<>(); + AsyncLoadingCache cache = + Caffeine.newBuilder() + .ticker(ticker::read) + .executor(TestingExecutors.sameThreadScheduledExecutor()) + .removalListener((key, value, reason) -> removed.add(value)) + .refreshAfterWrite(10, TimeUnit.NANOSECONDS) + .buildAsync(loader); + + // Load so there is an initial value. + assertThat(loadGet(cache, key), is(0L)); + + ticker.advance(11); // Refresh should fire on next access + assertThat(cache.synchronous().getIfPresent(key), is(0L)); // Old value + + cache.synchronous().invalidate(key); // Invalidate key entirely + assertThat(cache.synchronous().getIfPresent(key), is(nullValue())); // No value in cache (good) + loadingTask.run(); // Completes refresh + + // FIXME: java.lang.AssertionError: Not true that <1> is null + assertThat(cache.synchronous().getIfPresent(key), is(nullValue())); // Value in cache (bad) + + // FIXME: Maybe? This is what I wanted to actually test :) + assertThat(removed, is(List.of(0L, 1L))); // 1L was sent to removalListener anyways + } +} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java index 886e75f19d..135c52c6a7 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -65,6 +66,7 @@ */ @SuppressWarnings("PreferJavaTimeOverload") public final class GuavaCacheFromContext { + private static final ThreadLocal error = new ThreadLocal<>(); private GuavaCacheFromContext() {} @@ -489,8 +491,19 @@ public Map getAll(Iterable keys) { } @Override - public void refresh(K key) { + public CompletableFuture refresh(K key) { + error.set(null); cache.refresh(key); + + var e = error.get(); + if (e == null) { + return CompletableFuture.completedFuture(cache.asMap().get(key)); + } else if (e instanceof CacheMissException) { + return CompletableFuture.completedFuture(null); + } + + error.remove(); + return CompletableFuture.failedFuture(e); } } @@ -542,11 +555,17 @@ static class SingleLoader extends CacheLoader implements Serializabl @Override public V load(K key) throws Exception { - V value = delegate.load(key); - if (value == null) { - throw new CacheMissException(); + try { + error.set(null); + V value = delegate.load(key); + if (value == null) { + throw new CacheMissException(); + } + return value; + } catch (Exception e) { + error.set(e); + throw e; } - return value; } } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/HasStats.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/HasStats.java index cc4ea9bbd0..82fa17ac80 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/HasStats.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/HasStats.java @@ -39,6 +39,8 @@ private enum StatsType { final long count; final StatsType type; + + boolean describe; DescriptionBuilder desc; private HasStats(StatsType type, long count) { @@ -67,8 +69,14 @@ protected boolean matchesSafely(CacheContext context, Description description) { } } - CacheStats stats = context.stats(); desc = new DescriptionBuilder(description); + boolean matches = matches(context); + describe = true; + return matches; + } + + private boolean matches(CacheContext context) throws AssertionError { + CacheStats stats = context.stats(); switch (type) { case HIT: return awaitStatistic(context, stats::hitCount); @@ -95,6 +103,9 @@ private boolean awaitStatistic(CacheContext context, Callable statistic) { await().pollInSameThread().until(statistic, is(count)); return true; } + if (describe) { + is(count).describeMismatch(statistic.call(), desc.getDescription()); + } return false; } catch (Exception e) { return desc.expectThat(type.name(), statistic, is(count)).matches();