diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java new file mode 100644 index 0000000000..f20a2f3624 --- /dev/null +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2010-2018. Axon Framework + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.axonframework.integrationtests.cache; + +import org.axonframework.modelling.saga.SagaEventHandler; +import org.axonframework.modelling.saga.SagaLifecycle; +import org.axonframework.modelling.saga.StartSaga; + +import java.util.ArrayList; +import java.util.List; + +/** + * Test saga used by the {@link CachingIntegrationTestSuite}. + * + * @author Steven van Beelen + */ +public class CachedSaga { + + private String name; + private List state; + + @StartSaga + @SagaEventHandler(associationProperty = "id") + public void on(SagaCreatedEvent event) { + this.name = event.name; + this.state = new ArrayList<>(); + } + + @SagaEventHandler(associationProperty = "id") + public void on(VeryImportantEvent event) { + state.add(event.stateEntry); + } + + @SagaEventHandler(associationProperty = "id") + public void on(SagaEndsEvent event) { + if (event.shouldEnd) { + SagaLifecycle.end(); + } + } + + public String getName() { + return name; + } + + public List getState() { + return state; + } + + @SuppressWarnings({"FieldCanBeLocal", "unused"}) + public static class SagaCreatedEvent { + + private final String id; + private final String name; + + public SagaCreatedEvent(String id, String name) { + this.id = id; + this.name = name; + } + + public String getId() { + return id; + } + } + + @SuppressWarnings({"FieldCanBeLocal", "unused"}) + public static class VeryImportantEvent { + + private final String id; + private final Object stateEntry; + + public VeryImportantEvent(String id, Object stateEntry) { + this.id = id; + this.stateEntry = stateEntry; + } + + public String getId() { + return id; + } + } + + @SuppressWarnings({"FieldCanBeLocal", "unused"}) + public static class SagaEndsEvent { + + private final String id; + private final boolean shouldEnd; + + public SagaEndsEvent(String id, boolean shouldEnd) { + this.id = id; + this.shouldEnd = shouldEnd; + } + + public String getId() { + return id; + } + } +} diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java new file mode 100644 index 0000000000..8c65b51b81 --- /dev/null +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java @@ -0,0 +1,328 @@ +package org.axonframework.integrationtests.cache; + +import org.axonframework.common.caching.Cache; +import org.axonframework.config.Configuration; +import org.axonframework.config.DefaultConfigurer; +import org.axonframework.config.SagaConfigurer; +import org.axonframework.eventhandling.EventMessage; +import org.axonframework.eventhandling.GenericEventMessage; +import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore; +import org.axonframework.eventsourcing.eventstore.EventStore; +import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine; +import org.axonframework.modelling.saga.repository.CachingSagaStore; +import org.axonframework.modelling.saga.repository.SagaStore; +import org.axonframework.modelling.saga.repository.inmemory.InMemorySagaStore; +import org.junit.jupiter.api.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + +/** + * Abstract integration test suite to validate the provided {@link org.axonframework.common.caching.Cache} + * implementations to work as intended under various stress scenarios. + * + * @author Steven van Beelen + */ +public abstract class CachingIntegrationTestSuite { + + // This ensures we do not wire Axon Server components + private static final boolean DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES = false; + private static final int NUMBER_OF_UPDATES = 4096; + private static final int NUMBER_OF_CONCURRENT_PUBLISHERS = 8; + private static final String[] SAGA_NAMES = new String[]{"foo", "bar", "baz", "and", "some", "more"}; + + protected Configuration config; + + private Cache sagaCache; + private Cache associationsCache; + + @BeforeEach + void setUp() { + EventStore eventStore = spy(EmbeddedEventStore.builder() + .storageEngine(new InMemoryEventStorageEngine()) + .build()); + sagaCache = buildCache("saga"); + associationsCache = buildCache("associations"); + + Consumer> sagaConfigurer = + config -> config.configureSagaStore(c -> CachingSagaStore.builder() + .delegateSagaStore(new InMemorySagaStore()) + .sagaCache(sagaCache) + .associationsCache(associationsCache) + .build()); + + config = DefaultConfigurer.defaultConfiguration(DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES) + .configureEventStore(c -> eventStore) + .eventProcessing( + procConfig -> procConfig.usingSubscribingEventProcessors() + .registerSaga(CachedSaga.class, sagaConfigurer) + ) + .start(); + } + + /** + * Construct a {@link Cache} implementation used during testing. + * + * @param name The name to give to the {@link Cache} under construction. + * @return The constructed {@link Cache} instance. + */ + public abstract Cache buildCache(String name); + + @Test + void publishingBigEventTransactionTowardsCachedSagaWorksWithoutException() { + String sagaName = SAGA_NAMES[0]; + String associationValue = sagaName + "-id"; + String associationCacheKey = sagaAssociationCacheKey(associationValue); + + // Construct the saga... + publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName)); + // Validate initial cache + assertTrue(associationsCache.containsKey(associationCacheKey)); + Set associations = associationsCache.get(associationCacheKey); + + String sagaIdentifier = associations.iterator().next(); + assertTrue(sagaCache.containsKey(sagaIdentifier)); + //noinspection unchecked + CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertTrue(cachedSaga.getState().isEmpty()); + + // Bulk update the saga... + publishBulkUpdatesTo(associationValue, NUMBER_OF_UPDATES); + // Validate cache again + assertTrue(associationsCache.containsKey(associationCacheKey)); + associations = associationsCache.get(associationCacheKey); + + sagaIdentifier = associations.iterator().next(); + assertTrue(sagaCache.containsKey(sagaIdentifier)); + //noinspection unchecked + cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertEquals(NUMBER_OF_UPDATES, cachedSaga.getState().size()); + + // Destruct the saga... + publish(new CachedSaga.SagaEndsEvent(associationValue, true)); + // Validate cache is empty + assertFalse(associationsCache.containsKey(associationCacheKey)); + assertFalse(sagaCache.containsKey(sagaIdentifier)); + } + + @Test + void publishingBigEventTransactionsConcurrentlyTowardsCachedSagaWorksWithoutException() + throws ExecutionException, InterruptedException, TimeoutException { + String sagaName = SAGA_NAMES[0]; + String associationValue = "some-id"; + String associationCacheKey = sagaAssociationCacheKey(associationValue); + ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_CONCURRENT_PUBLISHERS); + + // Construct the saga... + publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName)); + // Validate initial cache + assertTrue(associationsCache.containsKey(associationCacheKey)); + Set associations = associationsCache.get(associationCacheKey); + + String sagaIdentifier = associations.iterator().next(); + assertTrue(sagaCache.containsKey(sagaIdentifier)); + //noinspection unchecked + CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertTrue(cachedSaga.getState().isEmpty()); + + // Concurrent bulk update the saga... + IntStream.range(0, NUMBER_OF_CONCURRENT_PUBLISHERS) + .mapToObj(i -> CompletableFuture.runAsync( + () -> publishBulkUpdatesTo(associationValue, NUMBER_OF_UPDATES), executor + )) + .reduce(CompletableFuture::allOf) + .orElse(CompletableFuture.completedFuture(null)) + .get(5, TimeUnit.SECONDS); + + // Validate cache again + assertTrue(associationsCache.containsKey(associationCacheKey)); + associations = associationsCache.get(associationCacheKey); + + sagaIdentifier = associations.iterator().next(); + assertTrue(sagaCache.containsKey(sagaIdentifier)); + //noinspection unchecked + cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertEquals(NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS, cachedSaga.getState().size()); + + // Destruct the saga... + publish(new CachedSaga.SagaEndsEvent(associationValue, true)); + // Validate cache is empty + assertFalse(associationsCache.containsKey(associationCacheKey)); + assertFalse(sagaCache.containsKey(sagaIdentifier)); + } + + @Test + void publishingBigEventTransactionTowardsSeveralCachedSagasWorksWithoutException() + throws ExecutionException, InterruptedException, TimeoutException { + Map> associationReferences = new HashMap<>(); + ExecutorService executor = Executors.newFixedThreadPool(SAGA_NAMES.length); + + // Construct the sagas... + for (String sagaName : SAGA_NAMES) { + String associationValue = sagaName + "-id"; + String associationCacheKey = sagaAssociationCacheKey(associationValue); + + publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName)); + // Validate initial cache + assertTrue(associationsCache.containsKey(associationCacheKey)); + associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); + + String sagaIdentifier = (associationReferences.get(associationCacheKey)).iterator().next(); + assertTrue(sagaCache.containsKey(sagaIdentifier)); + + SagaStore.Entry sagaEntry = sagaCache.get(sagaIdentifier); + CachedSaga cachedSaga = sagaEntry.saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertTrue(cachedSaga.getState().isEmpty()); + } + + // Bulk update the sagas... + Arrays.stream(SAGA_NAMES) + .map(name -> CompletableFuture.runAsync( + () -> publishBulkUpdatesTo(name + "-id", NUMBER_OF_UPDATES), executor + )) + .reduce(CompletableFuture::allOf) + .orElse(CompletableFuture.completedFuture(null)) + .get(5, TimeUnit.SECONDS); + // Validate caches again + for (String sagaName : SAGA_NAMES) { + String associationValue = sagaName + "-id"; + String associationCacheKey = sagaAssociationCacheKey(associationValue); + + assertTrue(associationsCache.containsKey(associationCacheKey)); + associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); + + String sagaIdentifier = (associationReferences.get(associationCacheKey)).iterator().next(); + SagaStore.Entry sagaEntry = sagaCache.get(sagaIdentifier); + // The saga cache may have been cleared already when doing bulk updates, which is a fair scenario. + // Hence, only validate the entry if it's still present in the Cache. + if (sagaEntry != null) { + CachedSaga cachedSaga = sagaEntry.saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertEquals(NUMBER_OF_UPDATES, cachedSaga.getState().size(), sagaName); + } + } + + // Destruct the sagas... + for (String sagaName : SAGA_NAMES) { + String associationValue = sagaName + "-id"; + String associationCacheKey = sagaAssociationCacheKey(associationValue); + + publish(new CachedSaga.SagaEndsEvent(associationValue, true)); + // Validate cache is empty + assertFalse(associationsCache.containsKey(associationCacheKey)); + } + } + + @Test + void publishingBigEventTransactionsConcurrentlyTowardsSeveralCachedSagasWorksWithoutException() + throws ExecutionException, InterruptedException, TimeoutException { + Map> associationReferences = new HashMap<>(); + ExecutorService executor = Executors.newFixedThreadPool(SAGA_NAMES.length * NUMBER_OF_CONCURRENT_PUBLISHERS); + + // Construct the sagas... + for (String sagaName : SAGA_NAMES) { + String associationValue = sagaName + "-id"; + String associationCacheKey = sagaAssociationCacheKey(associationValue); + + publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName)); + // Validate initial cache + assertTrue(associationsCache.containsKey(associationCacheKey)); + associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); + + String sagaIdentifier = (associationReferences.get(associationCacheKey)).iterator().next(); + assertTrue(sagaCache.containsKey(sagaIdentifier)); + + SagaStore.Entry sagaEntry = sagaCache.get(sagaIdentifier); + CachedSaga cachedSaga = sagaEntry.saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertTrue(cachedSaga.getState().isEmpty()); + } + + // Bulk update the sagas... + IntStream.range(0, SAGA_NAMES.length * NUMBER_OF_CONCURRENT_PUBLISHERS) + .mapToObj(i -> CompletableFuture.runAsync( + () -> publishBulkUpdatesTo(SAGA_NAMES[i % SAGA_NAMES.length] + "-id", NUMBER_OF_UPDATES), + executor + )) + .reduce(CompletableFuture::allOf) + .orElse(CompletableFuture.completedFuture(null)) + .get(5, TimeUnit.SECONDS); + // Validate caches again + for (String sagaName : SAGA_NAMES) { + String associationValue = sagaName + "-id"; + String associationCacheKey = sagaAssociationCacheKey(associationValue); + + assertTrue(associationsCache.containsKey(associationCacheKey)); + associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); + + String sagaIdentifier = (associationReferences.get(associationCacheKey)).iterator().next(); + SagaStore.Entry sagaEntry = sagaCache.get(sagaIdentifier); + // The saga cache may have been cleared already when doing bulk updates, which is a fair scenario. + // Hence, only validate the entry if it's still present in the Cache. + if (sagaEntry != null) { + CachedSaga cachedSaga = sagaEntry.saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertEquals(NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS, + cachedSaga.getState().size(), + sagaName); + } + } + + // Destruct the sagas... + for (String sagaName : SAGA_NAMES) { + String associationValue = sagaName + "-id"; + String associationCacheKey = sagaAssociationCacheKey(associationValue); + + publish(new CachedSaga.SagaEndsEvent(associationValue, true)); + // Validate cache is empty + assertFalse(associationsCache.containsKey(associationCacheKey)); + } + } + + private void publishBulkUpdatesTo(String sagaId, + @SuppressWarnings("SameParameterValue") int amount) { + Object[] updateEvents = new Object[amount]; + for (int i = 0; i < amount; i++) { + updateEvents[i] = new CachedSaga.VeryImportantEvent(sagaId, i); + } + publish(updateEvents); + } + + private void publish(Object... events) { + List> eventMessages = new ArrayList<>(); + for (Object event : events) { + eventMessages.add(GenericEventMessage.asEventMessage(event)); + } + config.eventStore().publish(eventMessages); + } + + /** + * This method is based on the private {@code CachingSagaStore#cacheKey(AssociationValue, Class)} method. + * + * @param sagaId The association key within the event. + * @return The caching key used for the association values by a {@link CachingSagaStore}. + */ + private static String sagaAssociationCacheKey(String sagaId) { + return CachedSaga.class.getName() + "/id=" + sagaId; + } +} diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/EhCacheIntegrationTest.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/EhCacheIntegrationTest.java new file mode 100644 index 0000000000..4166a6703d --- /dev/null +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/EhCacheIntegrationTest.java @@ -0,0 +1,58 @@ +package org.axonframework.integrationtests.cache; + +import net.sf.ehcache.CacheManager; +import net.sf.ehcache.Ehcache; +import net.sf.ehcache.config.CacheConfiguration; +import net.sf.ehcache.config.SizeOfPolicyConfiguration; +import net.sf.ehcache.store.MemoryStoreEvictionPolicy; +import org.axonframework.common.caching.Cache; +import org.axonframework.common.caching.EhCacheAdapter; +import org.junit.jupiter.api.*; + +/** + * {@link Ehcache} specific implementation of the {@link CachingIntegrationTestSuite}. + * + * @author Steven van Beelen + */ +class EhCacheIntegrationTest extends CachingIntegrationTestSuite { + + private CacheManager cacheManager; + + @Override + @BeforeEach + void setUp() { + cacheManager = CacheManager.create(getEhCacheConfiguration()); + super.setUp(); + } + + @AfterEach + void tearDown() { + cacheManager.shutdown(); + } + + @Override + public Cache buildCache(String name) { + Ehcache cache = createCache(name); + cacheManager.addCache(cache); + return new EhCacheAdapter(cache); + } + + private Ehcache createCache(String name) { + CacheConfiguration cacheConfig = new CacheConfiguration(name, 10_000) + .name(name) + .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU) + .eternal(false) + .timeToLiveSeconds(600) + .timeToIdleSeconds(600); + return new net.sf.ehcache.Cache(cacheConfig); + } + + private net.sf.ehcache.config.Configuration getEhCacheConfiguration() { + net.sf.ehcache.config.Configuration configuration = new net.sf.ehcache.config.Configuration(); + SizeOfPolicyConfiguration sizeOfPolicyConfiguration = new SizeOfPolicyConfiguration(); + sizeOfPolicyConfiguration.maxDepth(13000); + sizeOfPolicyConfiguration.maxDepthExceededBehavior(SizeOfPolicyConfiguration.MaxDepthExceededBehavior.ABORT); + configuration.addSizeOfPolicy(sizeOfPolicyConfiguration); + return configuration; + } +} diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/WeakReferenceCacheIntegrationTest.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/WeakReferenceCacheIntegrationTest.java new file mode 100644 index 0000000000..b551e2330b --- /dev/null +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/WeakReferenceCacheIntegrationTest.java @@ -0,0 +1,17 @@ +package org.axonframework.integrationtests.cache; + +import org.axonframework.common.caching.Cache; +import org.axonframework.common.caching.WeakReferenceCache; + +/** + * {@link WeakReferenceCache} specific implementation of the {@link CachingIntegrationTestSuite}. + * + * @author Steven van Beelen + */ +class WeakReferenceCacheIntegrationTest extends CachingIntegrationTestSuite { + + @Override + public Cache buildCache(String name) { + return new WeakReferenceCache(); + } +} diff --git a/messaging/src/main/java/org/axonframework/common/caching/Cache.java b/messaging/src/main/java/org/axonframework/common/caching/Cache.java index b440a5620f..9547acce91 100644 --- a/messaging/src/main/java/org/axonframework/common/caching/Cache.java +++ b/messaging/src/main/java/org/axonframework/common/caching/Cache.java @@ -18,6 +18,7 @@ import org.axonframework.common.Registration; +import java.util.function.Supplier; import java.util.function.UnaryOperator; /** @@ -59,6 +60,30 @@ public interface Cache { */ boolean putIfAbsent(Object key, Object value); + /** + * Returns the value under the given {@code key} in the cache. If there is no value present, will invoke the given + * {@code valueSupplier}, put the value in the cache and return the produced value. + * + * @param key The key under which the item was cached. If not present, this key is used to cache the + * outcome of the {@code valueSupplier}. + * @param valueSupplier A supplier that lazily supplies the value if there's no {@code key} present. + * @return The value that is in the cache after the operation. This can be the original value or the one supplied by + * the {@code valueSupplier}. + */ + default T computeIfAbsent(Object key, Supplier valueSupplier) { + Object currentValue = get(key); + if (currentValue != null) { + //noinspection unchecked + return (T) currentValue; + } + T newValue = valueSupplier.get(); + if (newValue == null) { + throw new IllegalArgumentException("Value Supplier of Cache produced a null value for key [" + key + "]!"); + } + put(key, newValue); + return newValue; + } + /** * Removes the entry stored under given {@code key}. If no such entry exists, nothing happens. * diff --git a/messaging/src/main/java/org/axonframework/common/caching/NoCache.java b/messaging/src/main/java/org/axonframework/common/caching/NoCache.java index d530809db4..ba4430fe62 100644 --- a/messaging/src/main/java/org/axonframework/common/caching/NoCache.java +++ b/messaging/src/main/java/org/axonframework/common/caching/NoCache.java @@ -18,6 +18,7 @@ import org.axonframework.common.Registration; +import java.util.function.Supplier; import java.util.function.UnaryOperator; /** @@ -51,6 +52,11 @@ public boolean putIfAbsent(Object key, Object value) { return true; } + @Override + public T computeIfAbsent(Object key, Supplier valueSupplier) { + return valueSupplier.get(); + } + @Override public boolean remove(Object key) { return false; diff --git a/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java b/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java index 72d0608f9d..f38bb0cef3 100644 --- a/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java +++ b/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java @@ -17,6 +17,7 @@ package org.axonframework.common.caching; import org.axonframework.common.Assert; +import org.axonframework.common.ObjectUtils; import org.axonframework.common.Registration; import java.lang.ref.Reference; @@ -27,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.function.Supplier; import java.util.function.UnaryOperator; /** @@ -106,6 +108,26 @@ public boolean putIfAbsent(Object key, Object value) { return false; } + @Override + public T computeIfAbsent(Object key, Supplier valueSupplier) { + purgeItems(); + Entry currentEntry = cache.get(key); + Object existingValue = ObjectUtils.getOrDefault(currentEntry, Entry::get, null); + if (existingValue != null) { + //noinspection unchecked + return (T) existingValue; + } + T newValue = valueSupplier.get(); + if (newValue == null) { + throw new IllegalStateException("Value Supplier of Cache produced a null value for key [" + key + "]!"); + } + cache.put(key, new Entry(key, newValue)); + for (EntryListener adapter : adapters) { + adapter.onEntryCreated(key, newValue); + } + return newValue; + } + @Override public boolean remove(Object key) { if (cache.remove(key) != null) { diff --git a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java index bb45104544..7efd8622df 100644 --- a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java +++ b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java @@ -76,8 +76,7 @@ public static Builder builder() { @Override public Set findSagas(Class sagaType, AssociationValue associationValue) { final String key = cacheKey(associationValue, sagaType); - associationsCache.putIfAbsent(key, delegate.findSagas(sagaType, associationValue)); - return associationsCache.get(key); + return associationsCache.computeIfAbsent(key, () -> delegate.findSagas(sagaType, associationValue)); } @Override diff --git a/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java b/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java index f613dda51f..88bc159c4a 100644 --- a/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java +++ b/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java @@ -21,9 +21,11 @@ import org.axonframework.modelling.saga.AssociationValuesImpl; import org.axonframework.modelling.saga.repository.inmemory.InMemorySagaStore; import org.junit.jupiter.api.*; +import org.mockito.*; import java.util.Collections; import java.util.Set; +import java.util.function.Supplier; import static java.util.Collections.singleton; import static org.junit.jupiter.api.Assertions.*; @@ -98,9 +100,13 @@ void associationsAddedToCacheOnLoad() { Set actual = testSubject.findSagas(StubSaga.class, associationValue); assertEquals(singleton("id"), actual); - verify(associationsCache, atLeast(1)).get("org.axonframework.modelling.saga.repository.StubSaga/key=value"); - verify(associationsCache).putIfAbsent("org.axonframework.modelling.saga.repository.StubSaga/key=value", - Collections.singleton("id")); + //noinspection unchecked + ArgumentCaptor> captor = ArgumentCaptor.forClass(Supplier.class); + verify(associationsCache, atLeast(1)).computeIfAbsent( + eq("org.axonframework.modelling.saga.repository.StubSaga/key=value"), + captor.capture() + ); + assertEquals(Collections.singleton("id"), captor.getValue().get()); } @Test diff --git a/spring-boot-autoconfigure/src/test/java/org/axonframework/springboot/autoconfig/AggregateStereotypeAutoConfigurationTest.java b/spring-boot-autoconfigure/src/test/java/org/axonframework/springboot/autoconfig/AggregateStereotypeAutoConfigurationTest.java index e411a3def4..86ff825339 100644 --- a/spring-boot-autoconfigure/src/test/java/org/axonframework/springboot/autoconfig/AggregateStereotypeAutoConfigurationTest.java +++ b/spring-boot-autoconfigure/src/test/java/org/axonframework/springboot/autoconfig/AggregateStereotypeAutoConfigurationTest.java @@ -43,11 +43,12 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import javax.persistence.Entity; -import javax.persistence.Id; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import java.util.function.UnaryOperator; +import javax.persistence.Entity; +import javax.persistence.Id; import static org.axonframework.modelling.command.AggregateLifecycle.apply; import static org.junit.jupiter.api.Assertions.*; @@ -309,6 +310,11 @@ public boolean putIfAbsent(Object key, Object value) { return false; } + @Override + public T computeIfAbsent(Object key, Supplier valueSupplier) { + return valueSupplier.get(); + } + @Override public boolean remove(Object key) { return false;