diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index 59c557ce23..5ea426092e 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -60,6 +60,7 @@ import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -220,10 +221,10 @@ abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef final Executor executor; final boolean isAsync; - // The collection views - @Nullable transient Set keySet; - @Nullable transient Collection values; - @Nullable transient Set> entrySet; + @Nullable Set keySet; + @Nullable Collection values; + @Nullable Set> entrySet; + AtomicReference>> refreshes; /** Creates an instance based on the builder's configuration. */ protected BoundedLocalCache(Caffeine builder, @@ -233,6 +234,7 @@ protected BoundedLocalCache(Caffeine builder, executor = builder.getExecutor(); writer = builder.getCacheWriter(); evictionLock = new ReentrantLock(); + refreshes = new AtomicReference<>(); weigher = builder.getWeigher(isAsync); drainBuffersTask = new PerformCleanupTask(this); nodeFactory = NodeFactory.newFactory(builder, isAsync); @@ -288,11 +290,29 @@ public final Executor executor() { return executor; } + @Override + @SuppressWarnings("NullAway") + public ConcurrentMap> refreshes() { + var pending = refreshes.get(); + if (pending == null) { + pending = new ConcurrentHashMap<>(); + if (!refreshes.compareAndSet(null, pending)) { + pending = refreshes.get(); + } + } + return pending; + } + /** Returns whether this cache notifies a writer when an entry is modified. */ protected boolean hasWriter() { return (writer != CacheWriter.disabledWriter()); } + @Override + public Object referenceKey(K key) { + return nodeFactory.newLookupKey(key); + } + /* --------------- Stats Support --------------- */ @Override @@ -899,8 +919,9 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { boolean[] removed = new boolean[1]; boolean[] resurrect = new boolean[1]; RemovalCause[] actualCause = new RemovalCause[1]; + Object keyReference = node.getKeyReference(); - data.computeIfPresent(node.getKeyReference(), (k, n) -> { + data.computeIfPresent(keyReference, (k, n) -> { if (n != node) { return n; } @@ -965,6 +986,12 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { if (removed[0]) { statsCounter().recordEviction(node.getWeight(), actualCause[0]); + + var pending = refreshes.get(); + if (pending != null) { + pending.remove(keyReference); + } + if (hasRemovalListener()) { // Notify the listener only if the entry was evicted. This must be performed as the last // step during eviction to safe guard against the executor rejecting the notification task. @@ -1172,70 +1199,82 @@ void refreshIfNeeded(Node node, long now) { if (!refreshAfterWrite()) { return; } + K key; V oldValue; - long oldWriteTime = node.getWriteTime(); - long refreshWriteTime = (now + ASYNC_EXPIRY); - if (((now - oldWriteTime) > refreshAfterWriteNanos()) + long writeTime = node.getWriteTime(); + Object keyReference = node.getKeyReference(); + if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null) && ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null) - && node.casWriteTime(oldWriteTime, refreshWriteTime)) { - try { - CompletableFuture refreshFuture; - long startTime = statsTicker().read(); - if (isAsync) { - @SuppressWarnings("unchecked") - CompletableFuture future = (CompletableFuture) oldValue; - if (Async.isReady(future)) { - @SuppressWarnings("NullAway") - CompletableFuture refresh = future.thenCompose(value -> - cacheLoader.asyncReload(key, value, executor)); - refreshFuture = refresh; - } else { - // no-op if load is pending - node.casWriteTime(refreshWriteTime, oldWriteTime); - return; - } - } else { + && !refreshes().containsKey(keyReference)) { + var reloading = new CompletableFuture[1]; + refreshes().computeIfAbsent(keyReference, k -> { + reloading[0] = reload(node, key, oldValue, writeTime); + return reloading[0]; + }); + if (reloading[0] != null) { + reloading[0].whenComplete((r, e) -> refreshes().remove(keyReference, reloading[0])); + } + } + } + + @SuppressWarnings("FutureReturnValueIgnored") + private @Nullable CompletableFuture reload(Node node, + K key, V oldValue, long writeTime) { + try { + CompletableFuture refreshFuture; + long startTime = statsTicker().read(); + if (isAsync) { + @SuppressWarnings("unchecked") + CompletableFuture future = (CompletableFuture) oldValue; + if (Async.isReady(future)) { @SuppressWarnings("NullAway") - CompletableFuture refresh = cacheLoader.asyncReload(key, oldValue, executor); + var refresh = cacheLoader.asyncReload(key, future.join(), executor); refreshFuture = refresh; + } else { + // no-op if load is pending + return future; + } + } else { + @SuppressWarnings("NullAway") + var refresh = cacheLoader.asyncReload(key, oldValue, executor); + refreshFuture = refresh; + } + refreshFuture.whenComplete((newValue, error) -> { + long loadTime = statsTicker().read() - startTime; + if (error != null) { + logger.log(Level.WARNING, "Exception thrown during refresh", error); + statsCounter().recordLoadFailure(loadTime); + return; } - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = statsTicker().read() - startTime; - if (error != null) { - logger.log(Level.WARNING, "Exception thrown during refresh", error); - node.casWriteTime(refreshWriteTime, oldWriteTime); - statsCounter().recordLoadFailure(loadTime); - return; - } - @SuppressWarnings("unchecked") - V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue; - - boolean[] discard = new boolean[1]; - compute(key, (k, currentValue) -> { - if (currentValue == null) { - return value; - } else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) { - return value; - } - discard[0] = true; - return currentValue; - }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - - if (discard[0] && hasRemovalListener()) { - notifyRemoval(key, value, RemovalCause.REPLACED); - } - if (newValue == null) { - statsCounter().recordLoadFailure(loadTime); - } else { - statsCounter().recordLoadSuccess(loadTime); + @SuppressWarnings("unchecked") + V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue; + + boolean[] discard = new boolean[1]; + compute(key, (k, currentValue) -> { + if (currentValue == null) { + return value; + } else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) { + return value; } - }); - } catch (Throwable t) { - node.casWriteTime(refreshWriteTime, oldWriteTime); - logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t); - } + discard[0] = true; + return currentValue; + }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); + + if (discard[0] && hasRemovalListener()) { + notifyRemoval(key, value, RemovalCause.REPLACED); + } + if (newValue == null) { + statsCounter().recordLoadFailure(loadTime); + } else { + statsCounter().recordLoadSuccess(loadTime); + } + }); + return refreshFuture; + } catch (Throwable t) { + logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t); + return null; } } @@ -1782,8 +1821,12 @@ public void clear() { } // Discard all entries - for (Node node : data.values()) { - removeNode(node, now); + var pending = refreshes.get(); + for (var entry : data.entrySet()) { + removeNode(entry.getValue(), now); + if (pending != null) { + pending.remove(entry.getKey()); + } } // Discard all pending reads @@ -2099,8 +2142,9 @@ public Map getAllPresent(Iterable keys) { @SuppressWarnings("unchecked") V[] oldValue = (V[]) new Object[1]; RemovalCause[] cause = new RemovalCause[1]; + Object lookupKey = nodeFactory.newLookupKey(key); - data.computeIfPresent(nodeFactory.newLookupKey(key), (k, n) -> { + data.computeIfPresent(lookupKey, (k, n) -> { synchronized (n) { oldValue[0] = n.getValue(); if (oldValue[0] == null) { @@ -2118,6 +2162,11 @@ public Map getAllPresent(Iterable keys) { }); if (cause[0] != null) { + var pending = refreshes.get(); + if (pending != null) { + pending.remove(lookupKey); + } + afterWrite(new RemovalTask(node[0])); if (hasRemovalListener()) { notifyRemoval(castKey, oldValue[0], cause[0]); @@ -2140,8 +2189,9 @@ public boolean remove(Object key, Object value) { @SuppressWarnings("unchecked") V[] oldValue = (V[]) new Object[1]; RemovalCause[] cause = new RemovalCause[1]; + Object lookupKey = nodeFactory.newLookupKey(key); - data.computeIfPresent(nodeFactory.newLookupKey(key), (kR, node) -> { + data.computeIfPresent(lookupKey, (kR, node) -> { synchronized (node) { oldKey[0] = node.getKey(); oldValue[0] = node.getValue(); @@ -2163,7 +2213,13 @@ public boolean remove(Object key, Object value) { if (removed[0] == null) { return false; - } else if (hasRemovalListener()) { + } + + var pending = refreshes.get(); + if (pending != null) { + pending.remove(lookupKey); + } + if (hasRemovalListener()) { notifyRemoval(oldKey[0], oldValue[0], cause[0]); } afterWrite(new RemovalTask(removed[0])); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java index c0db010547..81c37db5a8 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java @@ -16,6 +16,7 @@ package com.github.benmanes.caffeine.cache; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.checkerframework.checker.nullness.qual.NonNull; @@ -101,9 +102,13 @@ public interface LoadingCache extends Cache { * Caches loaded by a {@link CacheLoader} will call {@link CacheLoader#reload} if the cache * currently contains a value for the {@code key}, and {@link CacheLoader#load} otherwise. Loading * is asynchronous by delegating to the default executor. + *

+ * Returns an existing future without doing anything if another thread is currently loading the + * value for {@code key}. * * @param key key with which a value may be associated + * @return the future that is loading the value * @throws NullPointerException if the specified key is null */ - void refresh(@NonNull K key); + CompletableFuture refresh(@NonNull K key); } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java index b11a7dc500..c2152db0e3 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java @@ -130,27 +130,47 @@ public Map getAll(Iterable keys) { @Override @SuppressWarnings("FutureReturnValueIgnored") - public void refresh(K key) { + public CompletableFuture refresh(K key) { requireNonNull(key); long[] writeTime = new long[1]; - CompletableFuture oldValueFuture = asyncCache.cache().getIfPresentQuietly(key, writeTime); - if ((oldValueFuture == null) - || (oldValueFuture.isDone() && oldValueFuture.isCompletedExceptionally())) { - asyncCache.get(key, asyncCache.loader::asyncLoad, /* recordStats */ false); - return; - } else if (!oldValueFuture.isDone()) { - // no-op if load is pending - return; - } + long[] startTime = new long[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] reloading = new CompletableFuture[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] oldValueFuture = new CompletableFuture[1]; + Object keyReference = asyncCache.cache().referenceKey(key); + + var future = asyncCache.cache().refreshes().compute(keyReference, (k, existing) -> { + if ((existing != null) && !Async.isReady(existing)) { + return existing; + } + + oldValueFuture[0] = asyncCache.cache().getIfPresentQuietly(key, writeTime); + if ((oldValueFuture[0] == null) + || (oldValueFuture[0].isDone() && oldValueFuture[0].isCompletedExceptionally())) { + var refreshFuture = asyncCache.get(key, + asyncCache.loader::asyncLoad, /* recordStats */ false); + reloading[0] = refreshFuture; + return refreshFuture; + } else if (!oldValueFuture[0].isDone()) { + // no-op if load is pending + reloading[0] = oldValueFuture[0]; + return oldValueFuture[0]; + } - oldValueFuture.thenAccept(oldValue -> { - long now = asyncCache.cache().statsTicker().read(); - CompletableFuture refreshFuture = (oldValue == null) + V oldValue = Async.getIfReady(oldValueFuture[0]); + startTime[0] = asyncCache.cache().statsTicker().read(); + var refreshFuture = (oldValue == null) ? asyncCache.loader.asyncLoad(key, asyncCache.cache().executor()) : asyncCache.loader.asyncReload(key, oldValue, asyncCache.cache().executor()); - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = asyncCache.cache().statsTicker().read() - now; + reloading[0] = refreshFuture; + return refreshFuture; + }); + + if (reloading[0] != null) { + reloading[0].whenComplete((newValue, error) -> { + long loadTime = asyncCache.cache().statsTicker().read() - startTime[0]; if (error != null) { asyncCache.cache().statsCounter().recordLoadFailure(loadTime); logger.log(Level.WARNING, "Exception thrown during refresh", error); @@ -158,16 +178,20 @@ public void refresh(K key) { } boolean[] discard = new boolean[1]; - asyncCache.cache().compute(key, (k, currentValue) -> { + asyncCache.cache().compute(key, (ignored, currentValue) -> { if (currentValue == null) { - return (newValue == null) ? null : refreshFuture; - } else if (currentValue == oldValueFuture) { + if (newValue == null) { + return null; + } else if (asyncCache.cache().refreshes().get(key) == reloading[0]) { + return reloading[0]; + } + } else if (currentValue == oldValueFuture[0]) { long expectedWriteTime = writeTime[0]; if (asyncCache.cache().hasWriteTime()) { asyncCache.cache().getIfPresentQuietly(key, writeTime); } if (writeTime[0] == expectedWriteTime) { - return (newValue == null) ? null : refreshFuture; + return (newValue == null) ? null : reloading[0]; } } discard[0] = true; @@ -175,15 +199,21 @@ public void refresh(K key) { }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); if (discard[0] && asyncCache.cache().hasRemovalListener()) { - asyncCache.cache().notifyRemoval(key, refreshFuture, RemovalCause.REPLACED); + asyncCache.cache().notifyRemoval(key, reloading[0], RemovalCause.REPLACED); } if (newValue == null) { asyncCache.cache().statsCounter().recordLoadFailure(loadTime); } else { asyncCache.cache().statsCounter().recordLoadSuccess(loadTime); } + + asyncCache.cache().refreshes().remove(keyReference, reloading[0]); }); - }); + } + + @SuppressWarnings("unchecked") + var castedFuture = (CompletableFuture) future; + return castedFuture; } } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java index 9ce22216c2..206055808a 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java @@ -16,6 +16,7 @@ package com.github.benmanes.caffeine.cache; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -52,6 +53,9 @@ interface LocalCache extends ConcurrentMap { /** Returns the {@link Executor} used by this cache. */ @NonNull Executor executor(); + /** Returns the map of in-flight refresh operations. */ + ConcurrentMap> refreshes(); + /** Returns whether the cache captures the write time of the entry. */ boolean hasWriteTime(); @@ -64,6 +68,9 @@ interface LocalCache extends ConcurrentMap { /** See {@link Cache#estimatedSize()}. */ long estimatedSize(); + /** Returns the reference key. */ + Object referenceKey(K key); + /** * See {@link Cache#getIfPresent(Object)}. This method differs by accepting a parameter of whether * to record the hit and miss statistics based on the success of this operation. diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java index 5e12bdf397..78313a4cae 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java @@ -89,49 +89,77 @@ default Map loadSequentially(Iterable keys) { @Override @SuppressWarnings("FutureReturnValueIgnored") - default void refresh(K key) { + default CompletableFuture refresh(K key) { requireNonNull(key); long[] writeTime = new long[1]; - long startTime = cache().statsTicker().read(); - V oldValue = cache().getIfPresentQuietly(key, writeTime); - CompletableFuture refreshFuture = (oldValue == null) - ? cacheLoader().asyncLoad(key, cache().executor()) - : cacheLoader().asyncReload(key, oldValue, cache().executor()); - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = cache().statsTicker().read() - startTime; - if (error != null) { - logger.log(Level.WARNING, "Exception thrown during refresh", error); - cache().statsCounter().recordLoadFailure(loadTime); - return; + long[] startTime = new long[1]; + @SuppressWarnings("unchecked") + V[] oldValue = (V[]) new Object[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] reloading = new CompletableFuture[1]; + Object keyReference = cache().referenceKey(key); + + var future = cache().refreshes().compute(keyReference, (k, existing) -> { + if ((existing != null) && !Async.isReady(existing)) { + return existing; } - boolean[] discard = new boolean[1]; - cache().compute(key, (k, currentValue) -> { - if (currentValue == null) { - return newValue; - } else if (currentValue == oldValue) { - long expectedWriteTime = writeTime[0]; - if (cache().hasWriteTime()) { - cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return newValue; + startTime[0] = cache().statsTicker().read(); + oldValue[0] = cache().getIfPresentQuietly(key, writeTime); + CompletableFuture refreshFuture = (oldValue[0] == null) + ? cacheLoader().asyncLoad(key, cache().executor()) + : cacheLoader().asyncReload(key, oldValue[0], cache().executor()); + reloading[0] = refreshFuture; + return refreshFuture; + }); + + if (reloading[0] != null) { + reloading[0].whenComplete((newValue, error) -> { + long loadTime = cache().statsTicker().read() - startTime[0]; + if (error != null) { + logger.log(Level.WARNING, "Exception thrown during refresh", error); + cache().statsCounter().recordLoadFailure(loadTime); + return; + } + + boolean[] discard = new boolean[1]; + cache().compute(key, (k, currentValue) -> { + if (currentValue == null) { + if (newValue == null) { + return null; + } else if (cache().refreshes().get(key) == reloading[0]) { + return newValue; + } + } else if (currentValue == oldValue) { + long expectedWriteTime = writeTime[0]; + if (cache().hasWriteTime()) { + cache().getIfPresentQuietly(key, writeTime); + } + if (writeTime[0] == expectedWriteTime) { + return newValue; + } } + discard[0] = true; + return currentValue; + }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); + + if (discard[0] && cache().hasRemovalListener()) { + cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); + } + if (newValue == null) { + cache().statsCounter().recordLoadFailure(loadTime); + } else { + cache().statsCounter().recordLoadSuccess(loadTime); } - discard[0] = true; - return currentValue; - }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - if (discard[0] && cache().hasRemovalListener()) { - cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); - } - if (newValue == null) { - cache().statsCounter().recordLoadFailure(loadTime); - } else { - cache().statsCounter().recordLoadSuccess(loadTime); - } - }); + cache().refreshes().remove(keyReference, reloading[0]); + }); + } + + @SuppressWarnings("unchecked") + CompletableFuture castedFuture = (CompletableFuture) future; + return castedFuture; } /** Returns a mapping function that adapts to {@link CacheLoader#load}. */ diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java index 70e12f2fb7..48b6f4cac8 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -61,15 +62,17 @@ final class UnboundedLocalCache implements LocalCache { final Executor executor; final Ticker ticker; - transient @Nullable Set keySet; - transient @Nullable Collection values; - transient @Nullable Set> entrySet; + @Nullable Set keySet; + @Nullable Collection values; + @Nullable Set> entrySet; + AtomicReference>> refreshes; UnboundedLocalCache(Caffeine builder, boolean async) { this.data = new ConcurrentHashMap<>(builder.getInitialCapacity()); this.statsCounter = builder.getStatsCounterSupplier().get(); this.removalListener = builder.getRemovalListener(async); this.isRecordingStats = builder.isRecordingStats(); + this.refreshes = new AtomicReference<>(); this.writer = builder.getCacheWriter(); this.executor = builder.getExecutor(); this.ticker = builder.getTicker(); @@ -80,6 +83,11 @@ public boolean hasWriteTime() { return false; } + @Override + public Object referenceKey(K key) { + return key; + } + /* --------------- Cache --------------- */ @Override @@ -166,6 +174,19 @@ public Executor executor() { return executor; } + @Override + @SuppressWarnings("NullAway") + public ConcurrentMap> refreshes() { + var pending = refreshes.get(); + if (pending == null) { + pending = new ConcurrentHashMap<>(); + if (!refreshes.compareAndSet(null, pending)) { + pending = refreshes.get(); + } + } + return pending; + } + @Override public Ticker expirationTicker() { return Ticker.disabledTicker(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java index 8852b361f2..67b46c54c2 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java @@ -602,7 +602,7 @@ public void getAll_writerFails(LoadingCache cache, CacheContex public void refresh(LoadingCache cache, CacheContext context) { context.ticker().advance(1, TimeUnit.MINUTES); Integer key = context.firstKey(); - cache.refresh(key); + cache.refresh(key).join(); long count = (cache.estimatedSize() == 1) ? context.initialSize() : 1; assertThat(cache, hasRemovalNotifications(context, count, RemovalCause.EXPIRED)); @@ -621,7 +621,7 @@ public void refresh(LoadingCache cache, CacheContext context) compute = Compute.SYNC, writer = Writer.EXCEPTIONAL, removalListener = Listener.REJECTING) public void refresh_writerFails(LoadingCache cache, CacheContext context) { context.ticker().advance(1, TimeUnit.HOURS); - cache.refresh(context.firstKey()); + cache.refresh(context.firstKey()).join(); context.disableRejectingCacheWriter(); context.ticker().advance(-1, TimeUnit.HOURS); assertThat(cache.asMap(), equalTo(context.original())); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java index 920ddf41d9..0b01f81c58 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java @@ -361,7 +361,7 @@ class Key { @CacheSpec(removalListener = { Listener.DEFAULT, Listener.REJECTING }) @Test(dataProvider = "caches", expectedExceptions = NullPointerException.class) public void refresh_null(LoadingCache cache, CacheContext context) { - cache.refresh(null); + cache.refresh(null).join(); } @CheckNoWriter @@ -370,7 +370,8 @@ public void refresh_null(LoadingCache cache, CacheContext cont executor = CacheExecutor.DIRECT, loader = Loader.NULL, population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_remove(LoadingCache cache, CacheContext context) { - cache.refresh(context.firstKey()); + var future = cache.refresh(context.firstKey()); + assertThat(future.join(), is(nullValue())); assertThat(cache.estimatedSize(), is(context.initialSize() - 1)); assertThat(cache.getIfPresent(context.firstKey()), is(nullValue())); assertThat(cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); @@ -383,8 +384,10 @@ public void refresh_remove(LoadingCache cache, CacheContext co population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_failure(LoadingCache cache, CacheContext context) { // Shouldn't leak exception to caller and should retain stale entry - cache.refresh(context.absentKey()); - cache.refresh(context.firstKey()); + var future1 = cache.refresh(context.absentKey()); + var future2 = cache.refresh(context.firstKey()); + assertThat(future1.isCompletedExceptionally(), is(true)); + assertThat(future2.isCompletedExceptionally(), is(true)); assertThat(cache.estimatedSize(), is(context.initialSize())); assertThat(context, both(hasLoadSuccessCount(0)).and(hasLoadFailureCount(2))); } @@ -393,7 +396,8 @@ public void refresh_failure(LoadingCache cache, CacheContext c @CacheSpec(loader = Loader.NULL) @Test(dataProvider = "caches") public void refresh_absent_null(LoadingCache cache, CacheContext context) { - cache.refresh(context.absentKey()); + var future = cache.refresh(context.absentKey()); + assertThat(future.join(), is(nullValue())); assertThat(cache.estimatedSize(), is(context.initialSize())); } @@ -401,7 +405,8 @@ public void refresh_absent_null(LoadingCache cache, CacheConte @Test(dataProvider = "caches") @CacheSpec(removalListener = { Listener.DEFAULT, Listener.REJECTING }) public void refresh_absent(LoadingCache cache, CacheContext context) { - cache.refresh(context.absentKey()); + var future = cache.refresh(context.absentKey()); + assertThat(future.join(), is(not(nullValue()))); assertThat(cache.estimatedSize(), is(1 + context.initialSize())); assertThat(context, both(hasMissCount(0)).and(hasHitCount(0))); assertThat(context, both(hasLoadSuccessCount(1)).and(hasLoadFailureCount(0))); @@ -416,7 +421,8 @@ public void refresh_absent(LoadingCache cache, CacheContext co population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_present_null(LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(nullValue())); } int count = context.firstMiddleLastKeys().size(); assertThat(context, both(hasMissCount(0)).and(hasHitCount(0))); @@ -435,7 +441,8 @@ public void refresh_present_null(LoadingCache cache, CacheCont public void refresh_present_sameValue( LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(context.original().get(key))); } int count = context.firstMiddleLastKeys().size(); assertThat(context, both(hasMissCount(0)).and(hasHitCount(0))); @@ -455,7 +462,9 @@ public void refresh_present_sameValue( public void refresh_present_differentValue( LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(key)); + // records a hit assertThat(cache.get(key), is(key)); } @@ -481,10 +490,12 @@ public void refresh_conflict(CacheContext context) { }); cache.put(key, original); - cache.refresh(key); + var future = cache.refresh(key); assertThat(cache.asMap().put(key, updated), is(original)); refresh.set(true); + future.join(); + await().until(() -> context.consumedNotifications().size(), is(2)); List removed = context.consumedNotifications().stream() .map(RemovalNotification::getValue).collect(toList()); @@ -509,11 +520,13 @@ public void refresh_invalidate(CacheContext context) { }); cache.put(key, original); - cache.refresh(key); + var future = cache.refresh(key); cache.invalidate(key); refresh.set(true); - await().until(() -> cache.getIfPresent(key), is(refreshed)); + future.join(); + + assertThat(cache.getIfPresent(key), is(nullValue())); await().until(() -> cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); assertThat(context, both(hasLoadSuccessCount(1)).and(hasLoadFailureCount(0))); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java index 121899daaf..9c7e2f97b2 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java @@ -94,8 +94,8 @@ public void async_concurrent_bounded( Threads.runTest(cache, asyncOperations); } - @SuppressWarnings( - {"unchecked", "rawtypes", "ReturnValueIgnored", "SizeGreaterThanOrEqualsZero", "SelfEquals"}) + @SuppressWarnings({"unchecked", "rawtypes", "ReturnValueIgnored", + "FutureReturnValueIgnored", "SizeGreaterThanOrEqualsZero", "SelfEquals"}) List, Integer>> operations = ImmutableList.of( // LoadingCache (cache, key) -> { cache.get(key); }, diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java index 5dbddf0042..a37b752383 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java @@ -465,7 +465,7 @@ public void refresh(LoadingCache cache, CacheContext context) GcFinalization.awaitFullGc(); awaitFullCleanup(cache); - cache.refresh(key); + cache.refresh(key).join(); assertThat(cache.estimatedSize(), is(1L)); assertThat(cache.getIfPresent(key), is(not(value))); @@ -487,7 +487,7 @@ public void refresh_writerFails(LoadingCache cache, CacheConte Integer key = context.firstKey(); context.clear(); GcFinalization.awaitFullGc(); - cache.refresh(key); + cache.refresh(key).join(); context.disableRejectingCacheWriter(); assertThat(cache.asMap().isEmpty(), is(false)); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java index bd55c04390..89ae2b128b 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java @@ -90,6 +90,7 @@ public Stresser() { status(); } + @SuppressWarnings("FutureReturnValueIgnored") public void run() throws InterruptedException { ConcurrentTestHarness.timeTasks(operation.maxThreads, () -> { int index = ThreadLocalRandom.current().nextInt(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java index 886e75f19d..9af394a058 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -489,8 +490,9 @@ public Map getAll(Iterable keys) { } @Override - public void refresh(K key) { + public CompletableFuture refresh(K key) { cache.refresh(key); + return CompletableFuture.completedFuture(null); } }