diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java index f20a2f3624..09ac3b2245 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2018. Axon Framework + * Copyright (c) 2010-2022. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Random; /** * Test saga used by the {@link CachingIntegrationTestSuite}. @@ -32,17 +33,33 @@ public class CachedSaga { private String name; private List state; + private int numberOfAssociations; + private Random random; + private Integer associationToRemove; @StartSaga @SagaEventHandler(associationProperty = "id") public void on(SagaCreatedEvent event) { this.name = event.name; this.state = new ArrayList<>(); + this.numberOfAssociations = event.numberOfAssociations; + this.random = new Random(); + + for (int i = 0; i < numberOfAssociations; i++) { + SagaLifecycle.associateWith(event.id + i, i); + } } @SagaEventHandler(associationProperty = "id") public void on(VeryImportantEvent event) { state.add(event.stateEntry); + if (associationToRemove == null) { + associationToRemove = random.nextInt(numberOfAssociations); + SagaLifecycle.removeAssociationWith(event.id + associationToRemove, associationToRemove); + } else { + SagaLifecycle.associateWith(event.id + associationToRemove, associationToRemove); + associationToRemove = null; + } } @SagaEventHandler(associationProperty = "id") @@ -65,10 +82,12 @@ public static class SagaCreatedEvent { private final String id; private final String name; + private final int numberOfAssociations; - public SagaCreatedEvent(String id, String name) { + public SagaCreatedEvent(String id, String name, int numberOfAssociations) { this.id = id; this.name = name; + this.numberOfAssociations = numberOfAssociations; } public String getId() { diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java index 9928112161..5b45af245f 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java @@ -22,21 +22,24 @@ import org.axonframework.config.SagaConfigurer; import org.axonframework.eventhandling.EventMessage; import org.axonframework.eventhandling.GenericEventMessage; -import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore; -import org.axonframework.eventsourcing.eventstore.EventStore; +import org.axonframework.eventhandling.StreamingEventProcessor; +import org.axonframework.eventhandling.TrackingEventProcessorConfiguration; import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine; import org.axonframework.modelling.saga.repository.CachingSagaStore; import org.axonframework.modelling.saga.repository.SagaStore; import org.axonframework.modelling.saga.repository.inmemory.InMemorySagaStore; import org.junit.jupiter.api.*; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -46,11 +49,15 @@ import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; /** * Abstract integration test suite to validate the provided {@link org.axonframework.common.caching.Cache} * implementations to work as intended under various stress scenarios. + *

+ * Uses a custom {@link org.axonframework.common.caching.Cache.EntryListener} to validate whether the {@link Cache} + * under test is hit accordingly. Doing so provides additional certainty that although the {@code Cache} may have + * adjusted under the hood, the expected invocation have been made. * * @author Steven van Beelen */ @@ -61,19 +68,33 @@ public abstract class CachingIntegrationTestSuite { private static final int NUMBER_OF_UPDATES = 4096; private static final int NUMBER_OF_CONCURRENT_PUBLISHERS = 8; private static final String[] SAGA_NAMES = new String[]{"foo", "bar", "baz", "and", "some", "more"}; + private static final int NUMBER_OF_ASSOCIATIONS = 42; + + private static final Duration DEFAULT_DELAY = Duration.ofMillis(25); + private static final Duration ONE_SECOND = Duration.ofSeconds(1); + private static final Duration TWO_SECONDS = Duration.ofSeconds(2); + private static final Duration FOUR_SECONDS = Duration.ofSeconds(4); + private static final Duration EIGHT_SECONDS = Duration.ofSeconds(8); + private static final Duration SIXTEEN_SECONDS = Duration.ofSeconds(16); + private static final Duration THIRTY_TWO_SECONDS = Duration.ofSeconds(32); protected Configuration config; + private StreamingEventProcessor sagaProcessor; - private Cache sagaCache; - private Cache associationsCache; + private EntryListenerValidator> sagaCacheListener; + private EntryListenerValidator> associationsCacheListener; @BeforeEach void setUp() { - EventStore eventStore = spy(EmbeddedEventStore.builder() - .storageEngine(new InMemoryEventStorageEngine()) - .build()); - sagaCache = buildCache("saga"); - associationsCache = buildCache("associations"); + Cache sagaCache = buildCache("saga"); + sagaCacheListener = new EntryListenerValidator<>(ListenerType.SAGA); + //noinspection resource + sagaCache.registerCacheEntryListener(sagaCacheListener); + + Cache associationsCache = buildCache("associations"); + associationsCacheListener = new EntryListenerValidator<>(ListenerType.ASSOCIATIONS); + //noinspection resource + associationsCache.registerCacheEntryListener(associationsCacheListener); Consumer> sagaConfigurer = config -> config.configureSagaStore(c -> CachingSagaStore.builder() @@ -82,13 +103,23 @@ void setUp() { .associationsCache(associationsCache) .build()); + TrackingEventProcessorConfiguration tepConfig = + TrackingEventProcessorConfiguration.forParallelProcessing(4) + .andEventAvailabilityTimeout(10, TimeUnit.MILLISECONDS); config = DefaultConfigurer.defaultConfiguration(DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES) - .configureEventStore(c -> eventStore) + .configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine()) .eventProcessing( - procConfig -> procConfig.usingSubscribingEventProcessors() - .registerSaga(CachedSaga.class, sagaConfigurer) + procConfig -> procConfig + .usingTrackingEventProcessors() + .registerTrackingEventProcessorConfiguration( + "CachedSagaProcessor", c -> tepConfig + ) + .registerSaga(CachedSaga.class, sagaConfigurer) ) .start(); + sagaProcessor = config.eventProcessingConfiguration() + .eventProcessor("CachedSagaProcessor", StreamingEventProcessor.class) + .orElseThrow(() -> new IllegalStateException("CachedSagaProcessor is not present")); } /** @@ -101,61 +132,81 @@ void setUp() { @Test void publishingBigEventTransactionTowardsCachedSagaWorksWithoutException() { + int createEvents = 1; + int deleteEvents = 1; String sagaName = SAGA_NAMES[0]; String associationValue = sagaName + "-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); // Construct the saga... - publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName)); - // Validate initial cache - assertTrue(associationsCache.containsKey(associationCacheKey)); - Set associations = associationsCache.get(associationCacheKey); - - String sagaIdentifier = associations.iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - //noinspection unchecked - CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); + publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName, NUMBER_OF_ASSOCIATIONS)); + await().pollDelay(DEFAULT_DELAY) + .atMost(ONE_SECOND) + .until(() -> handledEventsUpTo(createEvents)); + + // Validate initial association cache + Optional> optionalAssociations = associationsCacheListener.get(associationCacheKey); + assertTrue(optionalAssociations.isPresent()); + Optional optionalAssociationValue = optionalAssociations.get().stream().findFirst(); + assertTrue(optionalAssociationValue.isPresent()); + String sagaIdentifier = optionalAssociationValue.get(); + // Validate initial saga cache + Optional> optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + CachedSaga cachedSaga = optionalCachedSaga.get().saga(); assertEquals(sagaName, cachedSaga.getName()); assertTrue(cachedSaga.getState().isEmpty()); // Bulk update the saga... publishBulkUpdatesTo(associationValue, NUMBER_OF_UPDATES); - // Validate cache again - assertTrue(associationsCache.containsKey(associationCacheKey)); - associations = associationsCache.get(associationCacheKey); - - sagaIdentifier = associations.iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - //noinspection unchecked - cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); + await().pollDelay(DEFAULT_DELAY) + .atMost(FOUR_SECONDS) + .until(() -> handledEventsUpTo(createEvents + NUMBER_OF_UPDATES)); + + // Validate caches again + optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + cachedSaga = optionalCachedSaga.get().saga(); assertEquals(sagaName, cachedSaga.getName()); assertEquals(NUMBER_OF_UPDATES, cachedSaga.getState().size()); // Destruct the saga... publish(new CachedSaga.SagaEndsEvent(associationValue, true)); - // Validate cache is empty - assertFalse(associationsCache.containsKey(associationCacheKey)); - assertFalse(sagaCache.containsKey(sagaIdentifier)); + await().pollDelay(DEFAULT_DELAY) + .atMost(ONE_SECOND) + .until(() -> handledEventsUpTo(createEvents + NUMBER_OF_UPDATES + deleteEvents)); + + // Validate caches are empty + assertTrue(associationsCacheListener.isRemoved(associationCacheKey)); + assertTrue(sagaCacheListener.isRemoved(sagaIdentifier)); } @Test void publishingBigEventTransactionsConcurrentlyTowardsCachedSagaWorksWithoutException() throws ExecutionException, InterruptedException, TimeoutException { + int createEvents = 1; + int deleteEvents = 1; String sagaName = SAGA_NAMES[0]; String associationValue = "some-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_CONCURRENT_PUBLISHERS); // Construct the saga... - publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName)); - // Validate initial cache - assertTrue(associationsCache.containsKey(associationCacheKey)); - Set associations = associationsCache.get(associationCacheKey); - - String sagaIdentifier = associations.iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - //noinspection unchecked - CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); + publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName, NUMBER_OF_ASSOCIATIONS)); + await().pollDelay(DEFAULT_DELAY) + .atMost(TWO_SECONDS) + .until(() -> handledEventsUpTo(createEvents)); + + // Validate initial association cache + Optional> optionalAssociations = associationsCacheListener.get(associationCacheKey); + assertTrue(optionalAssociations.isPresent()); + Optional optionalAssociationValue = optionalAssociations.get().stream().findFirst(); + assertTrue(optionalAssociationValue.isPresent()); + String sagaIdentifier = optionalAssociationValue.get(); + // Validate initial saga cache + Optional> optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + CachedSaga cachedSaga = optionalCachedSaga.get().saga(); assertEquals(sagaName, cachedSaga.getName()); assertTrue(cachedSaga.getState().isEmpty()); @@ -167,46 +218,59 @@ void publishingBigEventTransactionsConcurrentlyTowardsCachedSagaWorksWithoutExce .reduce(CompletableFuture::allOf) .orElse(CompletableFuture.completedFuture(null)) .get(15, TimeUnit.SECONDS); + await().pollDelay(DEFAULT_DELAY) + .atMost(SIXTEEN_SECONDS) + .until(() -> handledEventsUpTo(createEvents + (NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS))); - // Validate cache again - assertTrue(associationsCache.containsKey(associationCacheKey)); - associations = associationsCache.get(associationCacheKey); - - sagaIdentifier = associations.iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - //noinspection unchecked - cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); + // Validate caches again + optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + cachedSaga = optionalCachedSaga.get().saga(); assertEquals(sagaName, cachedSaga.getName()); assertEquals(NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS, cachedSaga.getState().size()); // Destruct the saga... publish(new CachedSaga.SagaEndsEvent(associationValue, true)); - // Validate cache is empty - assertFalse(associationsCache.containsKey(associationCacheKey)); - assertFalse(sagaCache.containsKey(sagaIdentifier)); + await().pollDelay(DEFAULT_DELAY) + .atMost(TWO_SECONDS) + .until(() -> handledEventsUpTo( + createEvents + (NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS) + deleteEvents + )); + + // Validate caches are empty + assertTrue(associationsCacheListener.isRemoved(associationCacheKey)); + assertTrue(sagaCacheListener.isRemoved(sagaIdentifier)); } @Test void publishingBigEventTransactionTowardsSeveralCachedSagasWorksWithoutException() throws ExecutionException, InterruptedException, TimeoutException { - Map> associationReferences = new HashMap<>(); + int createEvents = SAGA_NAMES.length; + int deleteEvents = SAGA_NAMES.length; ExecutorService executor = Executors.newFixedThreadPool(SAGA_NAMES.length); // Construct the sagas... + for (String sagaName : SAGA_NAMES) { + publish(new CachedSaga.SagaCreatedEvent(sagaName + "-id", sagaName, NUMBER_OF_ASSOCIATIONS)); + } + await().pollDelay(DEFAULT_DELAY) + .atMost(TWO_SECONDS) + .until(() -> handledEventsUpTo(createEvents)); + + // Validate initial caches for (String sagaName : SAGA_NAMES) { String associationValue = sagaName + "-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); - - publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName)); - // Validate initial cache - assertTrue(associationsCache.containsKey(associationCacheKey)); - associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); - - String sagaIdentifier = (associationReferences.get(associationCacheKey)).iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - - SagaStore.Entry sagaEntry = sagaCache.get(sagaIdentifier); - CachedSaga cachedSaga = sagaEntry.saga(); + // Validate initial association cache + Optional> optionalAssociations = associationsCacheListener.get(associationCacheKey); + assertTrue(optionalAssociations.isPresent()); + Optional optionalAssociationValue = optionalAssociations.get().stream().findFirst(); + assertTrue(optionalAssociationValue.isPresent()); + String sagaIdentifier = optionalAssociationValue.get(); + // Validate initial saga cache + Optional> optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + CachedSaga cachedSaga = optionalCachedSaga.get().saga(); assertEquals(sagaName, cachedSaga.getName()); assertTrue(cachedSaga.getState().isEmpty()); } @@ -219,57 +283,74 @@ void publishingBigEventTransactionTowardsSeveralCachedSagasWorksWithoutException .reduce(CompletableFuture::allOf) .orElse(CompletableFuture.completedFuture(null)) .get(15, TimeUnit.SECONDS); + await().pollDelay(DEFAULT_DELAY) + .atMost(EIGHT_SECONDS) + .until(() -> handledEventsUpTo(createEvents + (NUMBER_OF_UPDATES * SAGA_NAMES.length))); + // Validate caches again for (String sagaName : SAGA_NAMES) { String associationValue = sagaName + "-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); - assertTrue(associationsCache.containsKey(associationCacheKey)); - associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); - - String sagaIdentifier = (associationReferences.get(associationCacheKey)).iterator().next(); - SagaStore.Entry sagaEntry = sagaCache.get(sagaIdentifier); - // The saga cache may have been cleared already when doing bulk updates, which is a fair scenario. - // Hence, only validate the entry if it's still present in the Cache. - if (sagaEntry != null) { - CachedSaga cachedSaga = sagaEntry.saga(); - assertEquals(sagaName, cachedSaga.getName()); - assertEquals(NUMBER_OF_UPDATES, cachedSaga.getState().size(), sagaName); - } + Optional> optionalAssociations = associationsCacheListener.get(associationCacheKey); + assertTrue(optionalAssociations.isPresent()); + Optional optionalAssociationValue = optionalAssociations.get().stream().findFirst(); + assertTrue(optionalAssociationValue.isPresent()); + String sagaIdentifier = optionalAssociationValue.get(); + + Optional> optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + CachedSaga cachedSaga = optionalCachedSaga.get().saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertEquals(NUMBER_OF_UPDATES, cachedSaga.getState().size()); } // Destruct the sagas... for (String sagaName : SAGA_NAMES) { - String associationValue = sagaName + "-id"; - String associationCacheKey = sagaAssociationCacheKey(associationValue); + publish(new CachedSaga.SagaEndsEvent(sagaName + "-id", true)); + } + await().pollDelay(DEFAULT_DELAY) + .atMost(TWO_SECONDS) + .until(() -> handledEventsUpTo( + createEvents + (NUMBER_OF_UPDATES * SAGA_NAMES.length) + deleteEvents + )); - publish(new CachedSaga.SagaEndsEvent(associationValue, true)); - // Validate cache is empty - assertFalse(associationsCache.containsKey(associationCacheKey)); + // Validate association cache is empty + for (String sagaName : SAGA_NAMES) { + assertTrue(associationsCacheListener.isRemoved(sagaAssociationCacheKey(sagaName + "-id"))); } } @Test void publishingBigEventTransactionsConcurrentlyTowardsSeveralCachedSagasWorksWithoutException() throws ExecutionException, InterruptedException, TimeoutException { - Map> associationReferences = new HashMap<>(); + int createEvents = SAGA_NAMES.length; + int deleteEvents = SAGA_NAMES.length; ExecutorService executor = Executors.newFixedThreadPool(SAGA_NAMES.length * NUMBER_OF_CONCURRENT_PUBLISHERS); // Construct the sagas... + for (String sagaName : SAGA_NAMES) { + publish(new CachedSaga.SagaCreatedEvent(sagaName + "-id", sagaName, NUMBER_OF_ASSOCIATIONS)); + } + await().pollDelay(DEFAULT_DELAY) + .atMost(TWO_SECONDS) + .until(() -> handledEventsUpTo(createEvents)); + + // Validate initial cache for (String sagaName : SAGA_NAMES) { String associationValue = sagaName + "-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); - publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName)); - // Validate initial cache - assertTrue(associationsCache.containsKey(associationCacheKey)); - associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); - - String sagaIdentifier = (associationReferences.get(associationCacheKey)).iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - - SagaStore.Entry sagaEntry = sagaCache.get(sagaIdentifier); - CachedSaga cachedSaga = sagaEntry.saga(); + // Validate initial association cache + Optional> optionalAssociations = associationsCacheListener.get(associationCacheKey); + assertTrue(optionalAssociations.isPresent()); + Optional optionalAssociationValue = optionalAssociations.get().stream().findFirst(); + assertTrue(optionalAssociationValue.isPresent()); + String sagaIdentifier = optionalAssociationValue.get(); + // Validate initial saga cache + Optional> optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + CachedSaga cachedSaga = optionalCachedSaga.get().saga(); assertEquals(sagaName, cachedSaga.getName()); assertTrue(cachedSaga.getState().isEmpty()); } @@ -283,35 +364,43 @@ void publishingBigEventTransactionsConcurrentlyTowardsSeveralCachedSagasWorksWit .reduce(CompletableFuture::allOf) .orElse(CompletableFuture.completedFuture(null)) .get(15, TimeUnit.SECONDS); + await().pollDelay(DEFAULT_DELAY) + .atMost(THIRTY_TWO_SECONDS) + .until(() -> handledEventsUpTo( + createEvents + (NUMBER_OF_UPDATES * (SAGA_NAMES.length * NUMBER_OF_CONCURRENT_PUBLISHERS)) + )); + // Validate caches again for (String sagaName : SAGA_NAMES) { String associationValue = sagaName + "-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); - assertTrue(associationsCache.containsKey(associationCacheKey)); - associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); - - String sagaIdentifier = (associationReferences.get(associationCacheKey)).iterator().next(); - SagaStore.Entry sagaEntry = sagaCache.get(sagaIdentifier); - // The saga cache may have been cleared already when doing bulk updates, which is a fair scenario. - // Hence, only validate the entry if it's still present in the Cache. - if (sagaEntry != null) { - CachedSaga cachedSaga = sagaEntry.saga(); - assertEquals(sagaName, cachedSaga.getName()); - assertEquals(NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS, - cachedSaga.getState().size(), - sagaName); - } + Optional> optionalAssociations = associationsCacheListener.get(associationCacheKey); + assertTrue(optionalAssociations.isPresent()); + Optional optionalAssociationValue = optionalAssociations.get().stream().findFirst(); + assertTrue(optionalAssociationValue.isPresent()); + String sagaIdentifier = optionalAssociationValue.get(); + + Optional> optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + CachedSaga cachedSaga = optionalCachedSaga.get().saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertEquals(NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS, cachedSaga.getState().size()); } // Destruct the sagas... for (String sagaName : SAGA_NAMES) { - String associationValue = sagaName + "-id"; - String associationCacheKey = sagaAssociationCacheKey(associationValue); + publish(new CachedSaga.SagaEndsEvent(sagaName + "-id", true)); + } + await().pollDelay(DEFAULT_DELAY) + .atMost(FOUR_SECONDS) + .until(() -> handledEventsUpTo( + createEvents + (NUMBER_OF_UPDATES * SAGA_NAMES.length) + deleteEvents + )); - publish(new CachedSaga.SagaEndsEvent(associationValue, true)); - // Validate cache is empty - assertFalse(associationsCache.containsKey(associationCacheKey)); + // Validate association cache is empty + for (String sagaName : SAGA_NAMES) { + assertTrue(associationsCacheListener.isRemoved(sagaAssociationCacheKey(sagaName + "-id"))); } } @@ -329,7 +418,16 @@ private void publish(Object... events) { for (Object event : events) { eventMessages.add(GenericEventMessage.asEventMessage(event)); } - config.eventStore().publish(eventMessages); + config.eventBus().publish(eventMessages); + } + + private Boolean handledEventsUpTo(int handledEvents) { + return sagaProcessor.processingStatus() + .values() + .stream() + .map(status -> status.getCurrentPosition().orElse(-1L) >= handledEvents - 1) + .reduce(Boolean::logicalAnd) + .orElse(false); } /** @@ -341,4 +439,73 @@ private void publish(Object... events) { private static String sagaAssociationCacheKey(String sagaId) { return CachedSaga.class.getName() + "/id=" + sagaId; } + + /** + * {@link org.axonframework.common.caching.Cache.EntryListener} implementation used for validating the correct + * workings of caching within Axon Framework. Used instead of validating the {@link Cache} directly, as it may have + * changed while the given and/or when phase of a test is processing. + * + * @param The type of value listened to through the {@link Cache}. + */ + private static class EntryListenerValidator implements Cache.EntryListener { + + @SuppressWarnings({"FieldCanBeLocal", "unused"}) + private final ListenerType type; + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") + private final Set created = new ConcurrentSkipListSet<>(); + private final Map updates = new ConcurrentHashMap<>(); + private final Set removed = new ConcurrentSkipListSet<>(); + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") + private final Set expired = new ConcurrentSkipListSet<>(); + + private EntryListenerValidator(ListenerType type) { + this.type = type; + } + + @Override + public void onEntryCreated(Object key, Object value) { + String keyAsString = key.toString(); + created.add(keyAsString); + //noinspection unchecked + updates.put(keyAsString, (V) value); + } + + @Override + public void onEntryUpdated(Object key, Object value) { + //noinspection unchecked + updates.put(key.toString(), (V) value); + } + + @Override + public void onEntryRemoved(Object key) { + removed.add(key.toString()); + } + + @Override + public void onEntryExpired(Object key) { + expired.add(key.toString()); + } + + @Override + public void onEntryRead(Object key, Object value) { + // Not used for validation, as the EhCacheAdapter does not support this. + } + + @Override + public Object clone() { + throw new UnsupportedOperationException(); + } + + public Optional get(String key) { + return Optional.ofNullable(updates.get(key)); + } + + public boolean isRemoved(String key) { + return removed.contains(key) || expired.contains(key); + } + } + + private enum ListenerType { + SAGA, ASSOCIATIONS + } } diff --git a/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java b/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java index f38bb0cef3..d47c2be383 100644 --- a/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java +++ b/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java @@ -172,8 +172,26 @@ private void purgeItems() { @Override public void computeIfPresent(Object key, UnaryOperator update) { - //noinspection unchecked - cache.computeIfPresent(key, (k, v) -> new Entry(k, update.apply((V) v.get()))); + purgeItems(); + cache.computeIfPresent(key, (k, v) -> { + Object currentValue = v.get(); + if (currentValue == null) { + return null; + } + //noinspection unchecked + V value = update.apply((V) currentValue); + if (value != null) { + for (EntryListener adapter : adapters) { + adapter.onEntryUpdated(key, value); + } + return new Entry(k, value); + } else { + for (EntryListener adapter : adapters) { + adapter.onEntryRemoved(key); + } + return null; + } + }); } private class Entry extends WeakReference { diff --git a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java index 7efd8622df..38c406eaf1 100644 --- a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java +++ b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java @@ -23,6 +23,7 @@ import org.axonframework.modelling.saga.SagaRepository; import java.io.Serializable; +import java.util.Collections; import java.util.Set; import static org.axonframework.common.BuilderUtils.assertNonNull; @@ -76,7 +77,14 @@ public static Builder builder() { @Override public Set findSagas(Class sagaType, AssociationValue associationValue) { final String key = cacheKey(associationValue, sagaType); - return associationsCache.computeIfAbsent(key, () -> delegate.findSagas(sagaType, associationValue)); + return associationsCache.computeIfAbsent( + key, + () -> { + // Wrap the original collection in a synchronized implementation, since it might be changed while + // the SagaManager is reading it using the insertSaga/deleteSaga methods. + return Collections.synchronizedSet(delegate.findSagas(sagaType, associationValue)); + } + ); } @Override diff --git a/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java b/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java index 88bc159c4a..1f02ef8f5a 100644 --- a/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java +++ b/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java @@ -16,6 +16,7 @@ package org.axonframework.modelling.saga.repository; +import org.axonframework.common.IdentifierFactory; import org.axonframework.common.caching.Cache; import org.axonframework.modelling.saga.AssociationValue; import org.axonframework.modelling.saga.AssociationValuesImpl; @@ -25,7 +26,12 @@ import java.util.Collections; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.stream.IntStream; import static java.util.Collections.singleton; import static org.junit.jupiter.api.Assertions.*; @@ -177,4 +183,40 @@ void sagaAndAssociationsRemovedFromCacheOnDelete() { assertFalse(sagaCache.containsKey(testSagaId)); assertFalse(associationsCache.containsKey(expectedAssociationKey)); } + + @Test + void canHandleConcurrentReadsAndWrites() { + int concurrentOperations = 64; + + AssociationValue associationValue = new AssociationValue("StubSaga-id", "value"); + Set associationValues = singleton(associationValue); + ExecutorService executor = Executors.newFixedThreadPool(16); + + try { + IntStream.range(0, concurrentOperations) + .mapToObj(i -> CompletableFuture.runAsync( + () -> { + try { + String sagaId = IdentifierFactory.getInstance().generateIdentifier(); + + testSubject.insertSaga( + StubSaga.class, sagaId, mock(StubSaga.class), associationValues + ); + testSubject.findSagas(StubSaga.class, associationValue); + testSubject.deleteSaga( + StubSaga.class, sagaId, associationValues + ); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + executor + )) + .reduce(CompletableFuture::allOf) + .orElse(CompletableFuture.completedFuture(null)) + .get(30, TimeUnit.SECONDS); + } catch (Exception e) { + fail("An unexpected exception occurred during concurrent invocations on the CachingSagaStore.", e); + } + } }