From 843f624b55b305ac054e97e1d003f613a9472224 Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Sat, 2 Jan 2021 13:54:53 -0800 Subject: [PATCH] Return refresh future and ignore redundant refreshes A mapping of in-flight refreshes is now maintained and lazily initialized if not used. This allows the cache to ignore redundant requests for reloads, like Guava does. It also removes disablement of expiration during refresh and resolves an ABA problem if the entry is modified in an undectectable way. The refresh future can now be obtained from LoadingCache to chain operations against. TODO: unit tests for these changes fixes #143 fixes #193 fixes #236 fixes #282 fixes #322 fixed #373 fixes #467 --- .../caffeine/cache/BoundedLocalCache.java | 188 ++++++++++++------ .../benmanes/caffeine/cache/LoadingCache.java | 7 +- .../cache/LocalAsyncLoadingCache.java | 72 +++++-- .../benmanes/caffeine/cache/LocalCache.java | 7 + .../caffeine/cache/LocalLoadingCache.java | 98 +++++---- .../caffeine/cache/UnboundedLocalCache.java | 27 ++- .../caffeine/cache/ExpirationTest.java | 4 +- .../caffeine/cache/LoadingCacheTest.java | 31 ++- .../caffeine/cache/MultiThreadedTest.java | 4 +- .../caffeine/cache/ReferenceTest.java | 4 +- .../benmanes/caffeine/cache/Stresser.java | 1 + .../cache/testing/GuavaCacheFromContext.java | 4 +- 12 files changed, 303 insertions(+), 144 deletions(-) diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index 59c557ce23..5ea426092e 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -60,6 +60,7 @@ import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -220,10 +221,10 @@ abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef final Executor executor; final boolean isAsync; - // The collection views - @Nullable transient Set keySet; - @Nullable transient Collection values; - @Nullable transient Set> entrySet; + @Nullable Set keySet; + @Nullable Collection values; + @Nullable Set> entrySet; + AtomicReference>> refreshes; /** Creates an instance based on the builder's configuration. */ protected BoundedLocalCache(Caffeine builder, @@ -233,6 +234,7 @@ protected BoundedLocalCache(Caffeine builder, executor = builder.getExecutor(); writer = builder.getCacheWriter(); evictionLock = new ReentrantLock(); + refreshes = new AtomicReference<>(); weigher = builder.getWeigher(isAsync); drainBuffersTask = new PerformCleanupTask(this); nodeFactory = NodeFactory.newFactory(builder, isAsync); @@ -288,11 +290,29 @@ public final Executor executor() { return executor; } + @Override + @SuppressWarnings("NullAway") + public ConcurrentMap> refreshes() { + var pending = refreshes.get(); + if (pending == null) { + pending = new ConcurrentHashMap<>(); + if (!refreshes.compareAndSet(null, pending)) { + pending = refreshes.get(); + } + } + return pending; + } + /** Returns whether this cache notifies a writer when an entry is modified. */ protected boolean hasWriter() { return (writer != CacheWriter.disabledWriter()); } + @Override + public Object referenceKey(K key) { + return nodeFactory.newLookupKey(key); + } + /* --------------- Stats Support --------------- */ @Override @@ -899,8 +919,9 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { boolean[] removed = new boolean[1]; boolean[] resurrect = new boolean[1]; RemovalCause[] actualCause = new RemovalCause[1]; + Object keyReference = node.getKeyReference(); - data.computeIfPresent(node.getKeyReference(), (k, n) -> { + data.computeIfPresent(keyReference, (k, n) -> { if (n != node) { return n; } @@ -965,6 +986,12 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { if (removed[0]) { statsCounter().recordEviction(node.getWeight(), actualCause[0]); + + var pending = refreshes.get(); + if (pending != null) { + pending.remove(keyReference); + } + if (hasRemovalListener()) { // Notify the listener only if the entry was evicted. This must be performed as the last // step during eviction to safe guard against the executor rejecting the notification task. @@ -1172,70 +1199,82 @@ void refreshIfNeeded(Node node, long now) { if (!refreshAfterWrite()) { return; } + K key; V oldValue; - long oldWriteTime = node.getWriteTime(); - long refreshWriteTime = (now + ASYNC_EXPIRY); - if (((now - oldWriteTime) > refreshAfterWriteNanos()) + long writeTime = node.getWriteTime(); + Object keyReference = node.getKeyReference(); + if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null) && ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null) - && node.casWriteTime(oldWriteTime, refreshWriteTime)) { - try { - CompletableFuture refreshFuture; - long startTime = statsTicker().read(); - if (isAsync) { - @SuppressWarnings("unchecked") - CompletableFuture future = (CompletableFuture) oldValue; - if (Async.isReady(future)) { - @SuppressWarnings("NullAway") - CompletableFuture refresh = future.thenCompose(value -> - cacheLoader.asyncReload(key, value, executor)); - refreshFuture = refresh; - } else { - // no-op if load is pending - node.casWriteTime(refreshWriteTime, oldWriteTime); - return; - } - } else { + && !refreshes().containsKey(keyReference)) { + var reloading = new CompletableFuture[1]; + refreshes().computeIfAbsent(keyReference, k -> { + reloading[0] = reload(node, key, oldValue, writeTime); + return reloading[0]; + }); + if (reloading[0] != null) { + reloading[0].whenComplete((r, e) -> refreshes().remove(keyReference, reloading[0])); + } + } + } + + @SuppressWarnings("FutureReturnValueIgnored") + private @Nullable CompletableFuture reload(Node node, + K key, V oldValue, long writeTime) { + try { + CompletableFuture refreshFuture; + long startTime = statsTicker().read(); + if (isAsync) { + @SuppressWarnings("unchecked") + CompletableFuture future = (CompletableFuture) oldValue; + if (Async.isReady(future)) { @SuppressWarnings("NullAway") - CompletableFuture refresh = cacheLoader.asyncReload(key, oldValue, executor); + var refresh = cacheLoader.asyncReload(key, future.join(), executor); refreshFuture = refresh; + } else { + // no-op if load is pending + return future; + } + } else { + @SuppressWarnings("NullAway") + var refresh = cacheLoader.asyncReload(key, oldValue, executor); + refreshFuture = refresh; + } + refreshFuture.whenComplete((newValue, error) -> { + long loadTime = statsTicker().read() - startTime; + if (error != null) { + logger.log(Level.WARNING, "Exception thrown during refresh", error); + statsCounter().recordLoadFailure(loadTime); + return; } - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = statsTicker().read() - startTime; - if (error != null) { - logger.log(Level.WARNING, "Exception thrown during refresh", error); - node.casWriteTime(refreshWriteTime, oldWriteTime); - statsCounter().recordLoadFailure(loadTime); - return; - } - @SuppressWarnings("unchecked") - V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue; - - boolean[] discard = new boolean[1]; - compute(key, (k, currentValue) -> { - if (currentValue == null) { - return value; - } else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) { - return value; - } - discard[0] = true; - return currentValue; - }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - - if (discard[0] && hasRemovalListener()) { - notifyRemoval(key, value, RemovalCause.REPLACED); - } - if (newValue == null) { - statsCounter().recordLoadFailure(loadTime); - } else { - statsCounter().recordLoadSuccess(loadTime); + @SuppressWarnings("unchecked") + V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue; + + boolean[] discard = new boolean[1]; + compute(key, (k, currentValue) -> { + if (currentValue == null) { + return value; + } else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) { + return value; } - }); - } catch (Throwable t) { - node.casWriteTime(refreshWriteTime, oldWriteTime); - logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t); - } + discard[0] = true; + return currentValue; + }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); + + if (discard[0] && hasRemovalListener()) { + notifyRemoval(key, value, RemovalCause.REPLACED); + } + if (newValue == null) { + statsCounter().recordLoadFailure(loadTime); + } else { + statsCounter().recordLoadSuccess(loadTime); + } + }); + return refreshFuture; + } catch (Throwable t) { + logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t); + return null; } } @@ -1782,8 +1821,12 @@ public void clear() { } // Discard all entries - for (Node node : data.values()) { - removeNode(node, now); + var pending = refreshes.get(); + for (var entry : data.entrySet()) { + removeNode(entry.getValue(), now); + if (pending != null) { + pending.remove(entry.getKey()); + } } // Discard all pending reads @@ -2099,8 +2142,9 @@ public Map getAllPresent(Iterable keys) { @SuppressWarnings("unchecked") V[] oldValue = (V[]) new Object[1]; RemovalCause[] cause = new RemovalCause[1]; + Object lookupKey = nodeFactory.newLookupKey(key); - data.computeIfPresent(nodeFactory.newLookupKey(key), (k, n) -> { + data.computeIfPresent(lookupKey, (k, n) -> { synchronized (n) { oldValue[0] = n.getValue(); if (oldValue[0] == null) { @@ -2118,6 +2162,11 @@ public Map getAllPresent(Iterable keys) { }); if (cause[0] != null) { + var pending = refreshes.get(); + if (pending != null) { + pending.remove(lookupKey); + } + afterWrite(new RemovalTask(node[0])); if (hasRemovalListener()) { notifyRemoval(castKey, oldValue[0], cause[0]); @@ -2140,8 +2189,9 @@ public boolean remove(Object key, Object value) { @SuppressWarnings("unchecked") V[] oldValue = (V[]) new Object[1]; RemovalCause[] cause = new RemovalCause[1]; + Object lookupKey = nodeFactory.newLookupKey(key); - data.computeIfPresent(nodeFactory.newLookupKey(key), (kR, node) -> { + data.computeIfPresent(lookupKey, (kR, node) -> { synchronized (node) { oldKey[0] = node.getKey(); oldValue[0] = node.getValue(); @@ -2163,7 +2213,13 @@ public boolean remove(Object key, Object value) { if (removed[0] == null) { return false; - } else if (hasRemovalListener()) { + } + + var pending = refreshes.get(); + if (pending != null) { + pending.remove(lookupKey); + } + if (hasRemovalListener()) { notifyRemoval(oldKey[0], oldValue[0], cause[0]); } afterWrite(new RemovalTask(removed[0])); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java index c0db010547..81c37db5a8 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java @@ -16,6 +16,7 @@ package com.github.benmanes.caffeine.cache; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.checkerframework.checker.nullness.qual.NonNull; @@ -101,9 +102,13 @@ public interface LoadingCache extends Cache { * Caches loaded by a {@link CacheLoader} will call {@link CacheLoader#reload} if the cache * currently contains a value for the {@code key}, and {@link CacheLoader#load} otherwise. Loading * is asynchronous by delegating to the default executor. + *

+ * Returns an existing future without doing anything if another thread is currently loading the + * value for {@code key}. * * @param key key with which a value may be associated + * @return the future that is loading the value * @throws NullPointerException if the specified key is null */ - void refresh(@NonNull K key); + CompletableFuture refresh(@NonNull K key); } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java index b11a7dc500..c2152db0e3 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java @@ -130,27 +130,47 @@ public Map getAll(Iterable keys) { @Override @SuppressWarnings("FutureReturnValueIgnored") - public void refresh(K key) { + public CompletableFuture refresh(K key) { requireNonNull(key); long[] writeTime = new long[1]; - CompletableFuture oldValueFuture = asyncCache.cache().getIfPresentQuietly(key, writeTime); - if ((oldValueFuture == null) - || (oldValueFuture.isDone() && oldValueFuture.isCompletedExceptionally())) { - asyncCache.get(key, asyncCache.loader::asyncLoad, /* recordStats */ false); - return; - } else if (!oldValueFuture.isDone()) { - // no-op if load is pending - return; - } + long[] startTime = new long[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] reloading = new CompletableFuture[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] oldValueFuture = new CompletableFuture[1]; + Object keyReference = asyncCache.cache().referenceKey(key); + + var future = asyncCache.cache().refreshes().compute(keyReference, (k, existing) -> { + if ((existing != null) && !Async.isReady(existing)) { + return existing; + } + + oldValueFuture[0] = asyncCache.cache().getIfPresentQuietly(key, writeTime); + if ((oldValueFuture[0] == null) + || (oldValueFuture[0].isDone() && oldValueFuture[0].isCompletedExceptionally())) { + var refreshFuture = asyncCache.get(key, + asyncCache.loader::asyncLoad, /* recordStats */ false); + reloading[0] = refreshFuture; + return refreshFuture; + } else if (!oldValueFuture[0].isDone()) { + // no-op if load is pending + reloading[0] = oldValueFuture[0]; + return oldValueFuture[0]; + } - oldValueFuture.thenAccept(oldValue -> { - long now = asyncCache.cache().statsTicker().read(); - CompletableFuture refreshFuture = (oldValue == null) + V oldValue = Async.getIfReady(oldValueFuture[0]); + startTime[0] = asyncCache.cache().statsTicker().read(); + var refreshFuture = (oldValue == null) ? asyncCache.loader.asyncLoad(key, asyncCache.cache().executor()) : asyncCache.loader.asyncReload(key, oldValue, asyncCache.cache().executor()); - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = asyncCache.cache().statsTicker().read() - now; + reloading[0] = refreshFuture; + return refreshFuture; + }); + + if (reloading[0] != null) { + reloading[0].whenComplete((newValue, error) -> { + long loadTime = asyncCache.cache().statsTicker().read() - startTime[0]; if (error != null) { asyncCache.cache().statsCounter().recordLoadFailure(loadTime); logger.log(Level.WARNING, "Exception thrown during refresh", error); @@ -158,16 +178,20 @@ public void refresh(K key) { } boolean[] discard = new boolean[1]; - asyncCache.cache().compute(key, (k, currentValue) -> { + asyncCache.cache().compute(key, (ignored, currentValue) -> { if (currentValue == null) { - return (newValue == null) ? null : refreshFuture; - } else if (currentValue == oldValueFuture) { + if (newValue == null) { + return null; + } else if (asyncCache.cache().refreshes().get(key) == reloading[0]) { + return reloading[0]; + } + } else if (currentValue == oldValueFuture[0]) { long expectedWriteTime = writeTime[0]; if (asyncCache.cache().hasWriteTime()) { asyncCache.cache().getIfPresentQuietly(key, writeTime); } if (writeTime[0] == expectedWriteTime) { - return (newValue == null) ? null : refreshFuture; + return (newValue == null) ? null : reloading[0]; } } discard[0] = true; @@ -175,15 +199,21 @@ public void refresh(K key) { }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); if (discard[0] && asyncCache.cache().hasRemovalListener()) { - asyncCache.cache().notifyRemoval(key, refreshFuture, RemovalCause.REPLACED); + asyncCache.cache().notifyRemoval(key, reloading[0], RemovalCause.REPLACED); } if (newValue == null) { asyncCache.cache().statsCounter().recordLoadFailure(loadTime); } else { asyncCache.cache().statsCounter().recordLoadSuccess(loadTime); } + + asyncCache.cache().refreshes().remove(keyReference, reloading[0]); }); - }); + } + + @SuppressWarnings("unchecked") + var castedFuture = (CompletableFuture) future; + return castedFuture; } } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java index 9ce22216c2..206055808a 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java @@ -16,6 +16,7 @@ package com.github.benmanes.caffeine.cache; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -52,6 +53,9 @@ interface LocalCache extends ConcurrentMap { /** Returns the {@link Executor} used by this cache. */ @NonNull Executor executor(); + /** Returns the map of in-flight refresh operations. */ + ConcurrentMap> refreshes(); + /** Returns whether the cache captures the write time of the entry. */ boolean hasWriteTime(); @@ -64,6 +68,9 @@ interface LocalCache extends ConcurrentMap { /** See {@link Cache#estimatedSize()}. */ long estimatedSize(); + /** Returns the reference key. */ + Object referenceKey(K key); + /** * See {@link Cache#getIfPresent(Object)}. This method differs by accepting a parameter of whether * to record the hit and miss statistics based on the success of this operation. diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java index 5e12bdf397..78313a4cae 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java @@ -89,49 +89,77 @@ default Map loadSequentially(Iterable keys) { @Override @SuppressWarnings("FutureReturnValueIgnored") - default void refresh(K key) { + default CompletableFuture refresh(K key) { requireNonNull(key); long[] writeTime = new long[1]; - long startTime = cache().statsTicker().read(); - V oldValue = cache().getIfPresentQuietly(key, writeTime); - CompletableFuture refreshFuture = (oldValue == null) - ? cacheLoader().asyncLoad(key, cache().executor()) - : cacheLoader().asyncReload(key, oldValue, cache().executor()); - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = cache().statsTicker().read() - startTime; - if (error != null) { - logger.log(Level.WARNING, "Exception thrown during refresh", error); - cache().statsCounter().recordLoadFailure(loadTime); - return; + long[] startTime = new long[1]; + @SuppressWarnings("unchecked") + V[] oldValue = (V[]) new Object[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] reloading = new CompletableFuture[1]; + Object keyReference = cache().referenceKey(key); + + var future = cache().refreshes().compute(keyReference, (k, existing) -> { + if ((existing != null) && !Async.isReady(existing)) { + return existing; } - boolean[] discard = new boolean[1]; - cache().compute(key, (k, currentValue) -> { - if (currentValue == null) { - return newValue; - } else if (currentValue == oldValue) { - long expectedWriteTime = writeTime[0]; - if (cache().hasWriteTime()) { - cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return newValue; + startTime[0] = cache().statsTicker().read(); + oldValue[0] = cache().getIfPresentQuietly(key, writeTime); + CompletableFuture refreshFuture = (oldValue[0] == null) + ? cacheLoader().asyncLoad(key, cache().executor()) + : cacheLoader().asyncReload(key, oldValue[0], cache().executor()); + reloading[0] = refreshFuture; + return refreshFuture; + }); + + if (reloading[0] != null) { + reloading[0].whenComplete((newValue, error) -> { + long loadTime = cache().statsTicker().read() - startTime[0]; + if (error != null) { + logger.log(Level.WARNING, "Exception thrown during refresh", error); + cache().statsCounter().recordLoadFailure(loadTime); + return; + } + + boolean[] discard = new boolean[1]; + cache().compute(key, (k, currentValue) -> { + if (currentValue == null) { + if (newValue == null) { + return null; + } else if (cache().refreshes().get(key) == reloading[0]) { + return newValue; + } + } else if (currentValue == oldValue) { + long expectedWriteTime = writeTime[0]; + if (cache().hasWriteTime()) { + cache().getIfPresentQuietly(key, writeTime); + } + if (writeTime[0] == expectedWriteTime) { + return newValue; + } } + discard[0] = true; + return currentValue; + }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); + + if (discard[0] && cache().hasRemovalListener()) { + cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); + } + if (newValue == null) { + cache().statsCounter().recordLoadFailure(loadTime); + } else { + cache().statsCounter().recordLoadSuccess(loadTime); } - discard[0] = true; - return currentValue; - }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - if (discard[0] && cache().hasRemovalListener()) { - cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); - } - if (newValue == null) { - cache().statsCounter().recordLoadFailure(loadTime); - } else { - cache().statsCounter().recordLoadSuccess(loadTime); - } - }); + cache().refreshes().remove(keyReference, reloading[0]); + }); + } + + @SuppressWarnings("unchecked") + CompletableFuture castedFuture = (CompletableFuture) future; + return castedFuture; } /** Returns a mapping function that adapts to {@link CacheLoader#load}. */ diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java index 70e12f2fb7..48b6f4cac8 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -61,15 +62,17 @@ final class UnboundedLocalCache implements LocalCache { final Executor executor; final Ticker ticker; - transient @Nullable Set keySet; - transient @Nullable Collection values; - transient @Nullable Set> entrySet; + @Nullable Set keySet; + @Nullable Collection values; + @Nullable Set> entrySet; + AtomicReference>> refreshes; UnboundedLocalCache(Caffeine builder, boolean async) { this.data = new ConcurrentHashMap<>(builder.getInitialCapacity()); this.statsCounter = builder.getStatsCounterSupplier().get(); this.removalListener = builder.getRemovalListener(async); this.isRecordingStats = builder.isRecordingStats(); + this.refreshes = new AtomicReference<>(); this.writer = builder.getCacheWriter(); this.executor = builder.getExecutor(); this.ticker = builder.getTicker(); @@ -80,6 +83,11 @@ public boolean hasWriteTime() { return false; } + @Override + public Object referenceKey(K key) { + return key; + } + /* --------------- Cache --------------- */ @Override @@ -166,6 +174,19 @@ public Executor executor() { return executor; } + @Override + @SuppressWarnings("NullAway") + public ConcurrentMap> refreshes() { + var pending = refreshes.get(); + if (pending == null) { + pending = new ConcurrentHashMap<>(); + if (!refreshes.compareAndSet(null, pending)) { + pending = refreshes.get(); + } + } + return pending; + } + @Override public Ticker expirationTicker() { return Ticker.disabledTicker(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java index 8852b361f2..67b46c54c2 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java @@ -602,7 +602,7 @@ public void getAll_writerFails(LoadingCache cache, CacheContex public void refresh(LoadingCache cache, CacheContext context) { context.ticker().advance(1, TimeUnit.MINUTES); Integer key = context.firstKey(); - cache.refresh(key); + cache.refresh(key).join(); long count = (cache.estimatedSize() == 1) ? context.initialSize() : 1; assertThat(cache, hasRemovalNotifications(context, count, RemovalCause.EXPIRED)); @@ -621,7 +621,7 @@ public void refresh(LoadingCache cache, CacheContext context) compute = Compute.SYNC, writer = Writer.EXCEPTIONAL, removalListener = Listener.REJECTING) public void refresh_writerFails(LoadingCache cache, CacheContext context) { context.ticker().advance(1, TimeUnit.HOURS); - cache.refresh(context.firstKey()); + cache.refresh(context.firstKey()).join(); context.disableRejectingCacheWriter(); context.ticker().advance(-1, TimeUnit.HOURS); assertThat(cache.asMap(), equalTo(context.original())); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java index 920ddf41d9..0a03d3aa9e 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java @@ -361,7 +361,7 @@ class Key { @CacheSpec(removalListener = { Listener.DEFAULT, Listener.REJECTING }) @Test(dataProvider = "caches", expectedExceptions = NullPointerException.class) public void refresh_null(LoadingCache cache, CacheContext context) { - cache.refresh(null); + cache.refresh(null).join(); } @CheckNoWriter @@ -370,7 +370,8 @@ public void refresh_null(LoadingCache cache, CacheContext cont executor = CacheExecutor.DIRECT, loader = Loader.NULL, population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_remove(LoadingCache cache, CacheContext context) { - cache.refresh(context.firstKey()); + var future = cache.refresh(context.firstKey()); + assertThat(future.join(), is(nullValue())); assertThat(cache.estimatedSize(), is(context.initialSize() - 1)); assertThat(cache.getIfPresent(context.firstKey()), is(nullValue())); assertThat(cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); @@ -383,8 +384,10 @@ public void refresh_remove(LoadingCache cache, CacheContext co population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_failure(LoadingCache cache, CacheContext context) { // Shouldn't leak exception to caller and should retain stale entry - cache.refresh(context.absentKey()); - cache.refresh(context.firstKey()); + var future1 = cache.refresh(context.absentKey()); + var future2 = cache.refresh(context.firstKey()); + assertThat(future1.isCompletedExceptionally(), is(true)); + assertThat(future2.isCompletedExceptionally(), is(true)); assertThat(cache.estimatedSize(), is(context.initialSize())); assertThat(context, both(hasLoadSuccessCount(0)).and(hasLoadFailureCount(2))); } @@ -393,7 +396,8 @@ public void refresh_failure(LoadingCache cache, CacheContext c @CacheSpec(loader = Loader.NULL) @Test(dataProvider = "caches") public void refresh_absent_null(LoadingCache cache, CacheContext context) { - cache.refresh(context.absentKey()); + var future = cache.refresh(context.absentKey()); + assertThat(future.join(), is(nullValue())); assertThat(cache.estimatedSize(), is(context.initialSize())); } @@ -401,7 +405,8 @@ public void refresh_absent_null(LoadingCache cache, CacheConte @Test(dataProvider = "caches") @CacheSpec(removalListener = { Listener.DEFAULT, Listener.REJECTING }) public void refresh_absent(LoadingCache cache, CacheContext context) { - cache.refresh(context.absentKey()); + var future = cache.refresh(context.absentKey()); + assertThat(future.join(), is(not(nullValue()))); assertThat(cache.estimatedSize(), is(1 + context.initialSize())); assertThat(context, both(hasMissCount(0)).and(hasHitCount(0))); assertThat(context, both(hasLoadSuccessCount(1)).and(hasLoadFailureCount(0))); @@ -416,7 +421,8 @@ public void refresh_absent(LoadingCache cache, CacheContext co population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_present_null(LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(nullValue())); } int count = context.firstMiddleLastKeys().size(); assertThat(context, both(hasMissCount(0)).and(hasHitCount(0))); @@ -435,7 +441,8 @@ public void refresh_present_null(LoadingCache cache, CacheCont public void refresh_present_sameValue( LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(context.original().get(key))); } int count = context.firstMiddleLastKeys().size(); assertThat(context, both(hasMissCount(0)).and(hasHitCount(0))); @@ -455,7 +462,9 @@ public void refresh_present_sameValue( public void refresh_present_differentValue( LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(key)); + // records a hit assertThat(cache.get(key), is(key)); } @@ -481,7 +490,7 @@ public void refresh_conflict(CacheContext context) { }); cache.put(key, original); - cache.refresh(key); + cache.refresh(key).join(); assertThat(cache.asMap().put(key, updated), is(original)); refresh.set(true); @@ -509,7 +518,7 @@ public void refresh_invalidate(CacheContext context) { }); cache.put(key, original); - cache.refresh(key); + cache.refresh(key).join(); cache.invalidate(key); refresh.set(true); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java index 121899daaf..9c7e2f97b2 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java @@ -94,8 +94,8 @@ public void async_concurrent_bounded( Threads.runTest(cache, asyncOperations); } - @SuppressWarnings( - {"unchecked", "rawtypes", "ReturnValueIgnored", "SizeGreaterThanOrEqualsZero", "SelfEquals"}) + @SuppressWarnings({"unchecked", "rawtypes", "ReturnValueIgnored", + "FutureReturnValueIgnored", "SizeGreaterThanOrEqualsZero", "SelfEquals"}) List, Integer>> operations = ImmutableList.of( // LoadingCache (cache, key) -> { cache.get(key); }, diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java index 5dbddf0042..a37b752383 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java @@ -465,7 +465,7 @@ public void refresh(LoadingCache cache, CacheContext context) GcFinalization.awaitFullGc(); awaitFullCleanup(cache); - cache.refresh(key); + cache.refresh(key).join(); assertThat(cache.estimatedSize(), is(1L)); assertThat(cache.getIfPresent(key), is(not(value))); @@ -487,7 +487,7 @@ public void refresh_writerFails(LoadingCache cache, CacheConte Integer key = context.firstKey(); context.clear(); GcFinalization.awaitFullGc(); - cache.refresh(key); + cache.refresh(key).join(); context.disableRejectingCacheWriter(); assertThat(cache.asMap().isEmpty(), is(false)); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java index bd55c04390..89ae2b128b 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java @@ -90,6 +90,7 @@ public Stresser() { status(); } + @SuppressWarnings("FutureReturnValueIgnored") public void run() throws InterruptedException { ConcurrentTestHarness.timeTasks(operation.maxThreads, () -> { int index = ThreadLocalRandom.current().nextInt(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java index 886e75f19d..9af394a058 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -489,8 +490,9 @@ public Map getAll(Iterable keys) { } @Override - public void refresh(K key) { + public CompletableFuture refresh(K key) { cache.refresh(key); + return CompletableFuture.completedFuture(null); } }