From 4bdbb661750c7d889cb11a95979173f2a1daaf88 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 23 Dec 2022 15:31:36 +0100 Subject: [PATCH 01/18] Let the CachingIntegrationTestSuite use a TEP Instead of using the SubscribingEventProcessor, the test suite should use a StreamingEventProcessor, like the TEP. Doing so, it will more closely mirror reality for our users. #enhancement/caching-saga-it --- .../integrationtests/cache/CachedSaga.java | 9 +- .../cache/CachingIntegrationTestSuite.java | 225 +++++++++++++----- 2 files changed, 173 insertions(+), 61 deletions(-) diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java index f20a2f3624..cc3d8e485b 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010-2018. Axon Framework + * Copyright (c) 2010-2022. Axon Framework * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,9 @@ public class CachedSaga { public void on(SagaCreatedEvent event) { this.name = event.name; this.state = new ArrayList<>(); + for (int i = 0; i < event.numberOfAssociations; i++) { + SagaLifecycle.associateWith(event.id + i, i); + } } @SagaEventHandler(associationProperty = "id") @@ -65,10 +68,12 @@ public static class SagaCreatedEvent { private final String id; private final String name; + private final int numberOfAssociations; - public SagaCreatedEvent(String id, String name) { + public SagaCreatedEvent(String id, String name, int numberOfAssociations) { this.id = id; this.name = name; + this.numberOfAssociations = numberOfAssociations; } public String getId() { diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java index 9928112161..884fdb9a1f 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java @@ -22,14 +22,16 @@ import org.axonframework.config.SagaConfigurer; import org.axonframework.eventhandling.EventMessage; import org.axonframework.eventhandling.GenericEventMessage; -import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore; -import org.axonframework.eventsourcing.eventstore.EventStore; +import org.axonframework.eventhandling.StreamingEventProcessor; +import org.axonframework.eventhandling.TrackingEventProcessor; +import org.axonframework.eventhandling.TrackingEventProcessorConfiguration; import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine; import org.axonframework.modelling.saga.repository.CachingSagaStore; import org.axonframework.modelling.saga.repository.SagaStore; import org.axonframework.modelling.saga.repository.inmemory.InMemorySagaStore; import org.junit.jupiter.api.*; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -46,7 +48,7 @@ import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; +import static org.testcontainers.shaded.org.awaitility.Awaitility.await; /** * Abstract integration test suite to validate the provided {@link org.axonframework.common.caching.Cache} @@ -61,17 +63,23 @@ public abstract class CachingIntegrationTestSuite { private static final int NUMBER_OF_UPDATES = 4096; private static final int NUMBER_OF_CONCURRENT_PUBLISHERS = 8; private static final String[] SAGA_NAMES = new String[]{"foo", "bar", "baz", "and", "some", "more"}; + private static final int NUMBER_OF_ASSOCIATIONS = 42; + + private static final Duration SHORT_DELAY = Duration.ofMillis(5); + private static final Duration DEFAULT_DELAY = Duration.ofMillis(25); + private static final Duration ONE_SECOND = Duration.ofSeconds(1); + private static final Duration TWO_SECONDS = Duration.ofSeconds(2); + private static final Duration FOUR_SECONDS = Duration.ofSeconds(4); + private static final Duration EIGHT_SECONDS = Duration.ofSeconds(8); protected Configuration config; + private StreamingEventProcessor sagaProcessor; private Cache sagaCache; private Cache associationsCache; @BeforeEach void setUp() { - EventStore eventStore = spy(EmbeddedEventStore.builder() - .storageEngine(new InMemoryEventStorageEngine()) - .build()); sagaCache = buildCache("saga"); associationsCache = buildCache("associations"); @@ -82,13 +90,20 @@ void setUp() { .associationsCache(associationsCache) .build()); + TrackingEventProcessorConfiguration tepConfig = + TrackingEventProcessorConfiguration.forParallelProcessing(4) + .andEventAvailabilityTimeout(10, TimeUnit.MILLISECONDS); config = DefaultConfigurer.defaultConfiguration(DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES) - .configureEventStore(c -> eventStore) + .configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine()) .eventProcessing( - procConfig -> procConfig.usingSubscribingEventProcessors() + procConfig -> procConfig.usingTrackingEventProcessors() + .registerTrackingEventProcessorConfiguration(c -> tepConfig) .registerSaga(CachedSaga.class, sagaConfigurer) ) .start(); + sagaProcessor = config.eventProcessingConfiguration() + .eventProcessor("CachedSagaProcessor", TrackingEventProcessor.class) + .orElseThrow(() -> new IllegalStateException("CachedSagaProcessor is not present")); } /** @@ -101,63 +116,94 @@ void setUp() { @Test void publishingBigEventTransactionTowardsCachedSagaWorksWithoutException() { + int createEvents = 1; + int deleteEvents = 1; String sagaName = SAGA_NAMES[0]; String associationValue = sagaName + "-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); // Construct the saga... - publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName)); + publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName, NUMBER_OF_ASSOCIATIONS)); + await().pollDelay(SHORT_DELAY) + .atMost(TWO_SECONDS) + .until(() -> handledEventsUpTo(createEvents)); + // Validate initial cache - assertTrue(associationsCache.containsKey(associationCacheKey)); + String sagaIdentifier = null; Set associations = associationsCache.get(associationCacheKey); - - String sagaIdentifier = associations.iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - //noinspection unchecked - CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); - assertEquals(sagaName, cachedSaga.getName()); - assertTrue(cachedSaga.getState().isEmpty()); + if (associations != null) { + // The associations cache may have been cleared, which is a fair scenario. + // Hence, only validate if we've found any associations. + sagaIdentifier = associations.iterator().next(); + assertTrue(sagaCache.containsKey(sagaIdentifier)); + //noinspection unchecked + CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertTrue(cachedSaga.getState().isEmpty()); + } // Bulk update the saga... publishBulkUpdatesTo(associationValue, NUMBER_OF_UPDATES); + await().pollDelay(DEFAULT_DELAY) + .atMost(TWO_SECONDS) + .until(() -> handledEventsUpTo(createEvents + NUMBER_OF_UPDATES)); + // Validate cache again assertTrue(associationsCache.containsKey(associationCacheKey)); associations = associationsCache.get(associationCacheKey); - - sagaIdentifier = associations.iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - //noinspection unchecked - cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); - assertEquals(sagaName, cachedSaga.getName()); - assertEquals(NUMBER_OF_UPDATES, cachedSaga.getState().size()); + if (associations.iterator().hasNext()) { + // The associations cache may have been cleared, which is a fair scenario. + // Hence, only validate if we've found any associations. + sagaIdentifier = associations.iterator().next(); + if (sagaCache.containsKey(sagaIdentifier)) { + //noinspection unchecked + CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertEquals(NUMBER_OF_UPDATES, cachedSaga.getState().size()); + } + } // Destruct the saga... publish(new CachedSaga.SagaEndsEvent(associationValue, true)); + await().pollDelay(DEFAULT_DELAY) + .atMost(ONE_SECOND) + .until(() -> handledEventsUpTo(createEvents + NUMBER_OF_UPDATES + deleteEvents)); + // Validate cache is empty - assertFalse(associationsCache.containsKey(associationCacheKey)); - assertFalse(sagaCache.containsKey(sagaIdentifier)); + await().pollDelay(SHORT_DELAY) + .atMost(ONE_SECOND) + .until(() -> !associationsCache.containsKey(associationCacheKey)); } @Test void publishingBigEventTransactionsConcurrentlyTowardsCachedSagaWorksWithoutException() throws ExecutionException, InterruptedException, TimeoutException { + int createEvents = 1; + int deleteEvents = 1; String sagaName = SAGA_NAMES[0]; String associationValue = "some-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_CONCURRENT_PUBLISHERS); // Construct the saga... - publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName)); + publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName, NUMBER_OF_ASSOCIATIONS)); + await().pollDelay(SHORT_DELAY) + .atMost(TWO_SECONDS) + .until(() -> handledEventsUpTo(createEvents)); + // Validate initial cache - assertTrue(associationsCache.containsKey(associationCacheKey)); + String sagaIdentifier = null; Set associations = associationsCache.get(associationCacheKey); - - String sagaIdentifier = associations.iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - //noinspection unchecked - CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); - assertEquals(sagaName, cachedSaga.getName()); - assertTrue(cachedSaga.getState().isEmpty()); + if (associations != null) { + // The associations cache may have been cleared, which is a fair scenario. + // Hence, only validate if we've found any associations. + sagaIdentifier = associations.iterator().next(); + assertTrue(sagaCache.containsKey(sagaIdentifier)); + //noinspection unchecked + CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertTrue(cachedSaga.getState().isEmpty()); + } // Concurrent bulk update the saga... IntStream.range(0, NUMBER_OF_CONCURRENT_PUBLISHERS) @@ -167,38 +213,60 @@ void publishingBigEventTransactionsConcurrentlyTowardsCachedSagaWorksWithoutExce .reduce(CompletableFuture::allOf) .orElse(CompletableFuture.completedFuture(null)) .get(15, TimeUnit.SECONDS); + await().pollDelay(DEFAULT_DELAY) + .atMost(EIGHT_SECONDS) + .until(() -> handledEventsUpTo(createEvents + (NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS))); // Validate cache again assertTrue(associationsCache.containsKey(associationCacheKey)); associations = associationsCache.get(associationCacheKey); - - sagaIdentifier = associations.iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - //noinspection unchecked - cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); - assertEquals(sagaName, cachedSaga.getName()); - assertEquals(NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS, cachedSaga.getState().size()); + if (associations.iterator().hasNext()) { + // The associations cache may have been cleared, which is a fair scenario. + // Hence, only validate if we've found any associations. + sagaIdentifier = associations.iterator().next(); + if (sagaCache.containsKey(sagaIdentifier)) { + //noinspection unchecked + CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertEquals(NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS, cachedSaga.getState().size()); + } + } // Destruct the saga... publish(new CachedSaga.SagaEndsEvent(associationValue, true)); + await().pollDelay(DEFAULT_DELAY) + .atMost(TWO_SECONDS) + .until(() -> handledEventsUpTo( + createEvents + (NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS) + deleteEvents + )); + // Validate cache is empty - assertFalse(associationsCache.containsKey(associationCacheKey)); - assertFalse(sagaCache.containsKey(sagaIdentifier)); + await().pollDelay(SHORT_DELAY) + .atMost(TWO_SECONDS) + .until(() -> !associationsCache.containsKey(associationCacheKey)); } @Test void publishingBigEventTransactionTowardsSeveralCachedSagasWorksWithoutException() throws ExecutionException, InterruptedException, TimeoutException { + int createEvents = SAGA_NAMES.length; + int deleteEvents = SAGA_NAMES.length; Map> associationReferences = new HashMap<>(); ExecutorService executor = Executors.newFixedThreadPool(SAGA_NAMES.length); // Construct the sagas... + for (String sagaName : SAGA_NAMES) { + publish(new CachedSaga.SagaCreatedEvent(sagaName + "-id", sagaName, NUMBER_OF_ASSOCIATIONS)); + } + await().pollDelay(SHORT_DELAY) + .atMost(TWO_SECONDS) + .until(() -> handledEventsUpTo(createEvents)); + + // Validate initial cache for (String sagaName : SAGA_NAMES) { String associationValue = sagaName + "-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); - publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName)); - // Validate initial cache assertTrue(associationsCache.containsKey(associationCacheKey)); associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); @@ -219,6 +287,10 @@ void publishingBigEventTransactionTowardsSeveralCachedSagasWorksWithoutException .reduce(CompletableFuture::allOf) .orElse(CompletableFuture.completedFuture(null)) .get(15, TimeUnit.SECONDS); + await().pollDelay(DEFAULT_DELAY) + .atMost(EIGHT_SECONDS) + .until(() -> handledEventsUpTo(createEvents + (NUMBER_OF_UPDATES * SAGA_NAMES.length))); + // Validate caches again for (String sagaName : SAGA_NAMES) { String associationValue = sagaName + "-id"; @@ -240,28 +312,41 @@ void publishingBigEventTransactionTowardsSeveralCachedSagasWorksWithoutException // Destruct the sagas... for (String sagaName : SAGA_NAMES) { - String associationValue = sagaName + "-id"; - String associationCacheKey = sagaAssociationCacheKey(associationValue); + publish(new CachedSaga.SagaEndsEvent(sagaName + "-id", true)); + } + await().pollDelay(DEFAULT_DELAY) + .atMost(TWO_SECONDS) + .until(() -> handledEventsUpTo( + createEvents + (NUMBER_OF_UPDATES * SAGA_NAMES.length) + deleteEvents + )); - publish(new CachedSaga.SagaEndsEvent(associationValue, true)); - // Validate cache is empty - assertFalse(associationsCache.containsKey(associationCacheKey)); + // Validate cache is empty + for (String sagaName : SAGA_NAMES) { + assertFalse(associationsCache.containsKey(sagaAssociationCacheKey(sagaName + "-id"))); } } @Test void publishingBigEventTransactionsConcurrentlyTowardsSeveralCachedSagasWorksWithoutException() throws ExecutionException, InterruptedException, TimeoutException { + int createEvents = SAGA_NAMES.length; + int deleteEvents = SAGA_NAMES.length; Map> associationReferences = new HashMap<>(); ExecutorService executor = Executors.newFixedThreadPool(SAGA_NAMES.length * NUMBER_OF_CONCURRENT_PUBLISHERS); // Construct the sagas... + for (String sagaName : SAGA_NAMES) { + publish(new CachedSaga.SagaCreatedEvent(sagaName + "-id", sagaName, NUMBER_OF_ASSOCIATIONS)); + } + await().pollDelay(SHORT_DELAY) + .atMost(TWO_SECONDS) + .until(() -> handledEventsUpTo(createEvents)); + + // Validate initial cache for (String sagaName : SAGA_NAMES) { String associationValue = sagaName + "-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); - publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName)); - // Validate initial cache assertTrue(associationsCache.containsKey(associationCacheKey)); associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); @@ -283,6 +368,12 @@ void publishingBigEventTransactionsConcurrentlyTowardsSeveralCachedSagasWorksWit .reduce(CompletableFuture::allOf) .orElse(CompletableFuture.completedFuture(null)) .get(15, TimeUnit.SECONDS); + await().pollDelay(DEFAULT_DELAY) + .atMost(Duration.ofSeconds(20)) + .until(() -> handledEventsUpTo( + createEvents + (NUMBER_OF_UPDATES * (SAGA_NAMES.length * NUMBER_OF_CONCURRENT_PUBLISHERS)) + )); + // Validate caches again for (String sagaName : SAGA_NAMES) { String associationValue = sagaName + "-id"; @@ -306,12 +397,19 @@ void publishingBigEventTransactionsConcurrentlyTowardsSeveralCachedSagasWorksWit // Destruct the sagas... for (String sagaName : SAGA_NAMES) { - String associationValue = sagaName + "-id"; - String associationCacheKey = sagaAssociationCacheKey(associationValue); + publish(new CachedSaga.SagaEndsEvent(sagaName + "-id", true)); + } + await().pollDelay(DEFAULT_DELAY) + .atMost(FOUR_SECONDS) + .until(() -> handledEventsUpTo( + createEvents + (NUMBER_OF_UPDATES * SAGA_NAMES.length) + deleteEvents + )); - publish(new CachedSaga.SagaEndsEvent(associationValue, true)); - // Validate cache is empty - assertFalse(associationsCache.containsKey(associationCacheKey)); + // Validate cache is empty + for (String sagaName : SAGA_NAMES) { + await().pollDelay(DEFAULT_DELAY) + .atMost(FOUR_SECONDS) + .until(() -> !associationsCache.containsKey(sagaAssociationCacheKey(sagaName + "-id"))); } } @@ -329,7 +427,16 @@ private void publish(Object... events) { for (Object event : events) { eventMessages.add(GenericEventMessage.asEventMessage(event)); } - config.eventStore().publish(eventMessages); + config.eventBus().publish(eventMessages); + } + + private Boolean handledEventsUpTo(int handledEvents) { + return sagaProcessor.processingStatus() + .values() + .stream() + .map(status -> status.getCurrentPosition().orElse(-1L) >= handledEvents - 1) + .reduce(Boolean::logicalAnd) + .orElse(false); } /** From cd14ac5e96895cc6c76cb5fcb3bff6c5bd3ba1ed Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Fri, 23 Dec 2022 15:32:51 +0100 Subject: [PATCH 02/18] Wrap result of cache retrieval in a new HashSet Wrap result of cache retrieval in a new HashSet. Otherwise, there's a window of opportunity that the cached result Set is adjusted while the SagaRepository uses the contents to add to the found sagas result #enhancement/caching-saga-it --- .../modelling/saga/repository/CachingSagaStore.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java index 7efd8622df..bb32859172 100644 --- a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java +++ b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java @@ -23,6 +23,7 @@ import org.axonframework.modelling.saga.SagaRepository; import java.io.Serializable; +import java.util.HashSet; import java.util.Set; import static org.axonframework.common.BuilderUtils.assertNonNull; @@ -76,7 +77,9 @@ public static Builder builder() { @Override public Set findSagas(Class sagaType, AssociationValue associationValue) { final String key = cacheKey(associationValue, sagaType); - return associationsCache.computeIfAbsent(key, () -> delegate.findSagas(sagaType, associationValue)); + return new HashSet<>(associationsCache.computeIfAbsent( + key, () -> delegate.findSagas(sagaType, associationValue)) + ); } @Override From f67e757073ce050517c954f01419a2293fda93c0 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Tue, 27 Dec 2022 15:53:22 +0100 Subject: [PATCH 03/18] Add more association adjustments Add more association adjustments #enhancement/caching-saga-it --- .../integrationtests/cache/CachedSaga.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java index cc3d8e485b..09ac3b2245 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachedSaga.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Random; /** * Test saga used by the {@link CachingIntegrationTestSuite}. @@ -32,13 +33,19 @@ public class CachedSaga { private String name; private List state; + private int numberOfAssociations; + private Random random; + private Integer associationToRemove; @StartSaga @SagaEventHandler(associationProperty = "id") public void on(SagaCreatedEvent event) { this.name = event.name; this.state = new ArrayList<>(); - for (int i = 0; i < event.numberOfAssociations; i++) { + this.numberOfAssociations = event.numberOfAssociations; + this.random = new Random(); + + for (int i = 0; i < numberOfAssociations; i++) { SagaLifecycle.associateWith(event.id + i, i); } } @@ -46,6 +53,13 @@ public void on(SagaCreatedEvent event) { @SagaEventHandler(associationProperty = "id") public void on(VeryImportantEvent event) { state.add(event.stateEntry); + if (associationToRemove == null) { + associationToRemove = random.nextInt(numberOfAssociations); + SagaLifecycle.removeAssociationWith(event.id + associationToRemove, associationToRemove); + } else { + SagaLifecycle.associateWith(event.id + associationToRemove, associationToRemove); + associationToRemove = null; + } } @SagaEventHandler(associationProperty = "id") From 871b784b3fe4a9aeacf5adaaa3d968a38cc96d02 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Tue, 27 Dec 2022 15:54:12 +0100 Subject: [PATCH 04/18] Make WeakReferenceCache#computeIfPresent use EntryListeners Make WeakReferenceCache#computeIfPresent use EntryListeners, as that's intended for usage of the WeakReferenceCache. #enhancement/caching-saga-it --- .../common/caching/WeakReferenceCache.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java b/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java index f38bb0cef3..a1089e2364 100644 --- a/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java +++ b/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java @@ -172,8 +172,21 @@ private void purgeItems() { @Override public void computeIfPresent(Object key, UnaryOperator update) { - //noinspection unchecked - cache.computeIfPresent(key, (k, v) -> new Entry(k, update.apply((V) v.get()))); + purgeItems(); + cache.computeIfPresent(key, (k, v) -> { + //noinspection unchecked + V value = update.apply((V) v.get()); + if (value != null) { + for (EntryListener adapter : adapters) { + adapter.onEntryUpdated(key, value); + } + } else { + for (EntryListener adapter : adapters) { + adapter.onEntryRemoved(key); + } + } + return new Entry(k, value); + }); } private class Entry extends WeakReference { From 51d6d42f99c74d69ef74032c3af1f113b2160314 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Tue, 27 Dec 2022 16:11:48 +0100 Subject: [PATCH 05/18] Use a Cache.EntryListener to validate instead of the Cache directly Use a Cache.EntryListener to validate instead of the Cache directly, as that provides additional certainty that the Cache hasn't been adjusted during the given/when phases of the tests. Doing so, we make the test cases more resilient. #enhancement/caching-saga-it --- .../cache/CachingIntegrationTestSuite.java | 310 +++++++++++------- 1 file changed, 185 insertions(+), 125 deletions(-) diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java index 884fdb9a1f..4a2f956cae 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java @@ -34,11 +34,13 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -53,6 +55,10 @@ /** * Abstract integration test suite to validate the provided {@link org.axonframework.common.caching.Cache} * implementations to work as intended under various stress scenarios. + *

+ * Uses a custom {@link org.axonframework.common.caching.Cache.EntryListener} to validate whether the {@link Cache} + * under test is hit accordingly. Doing so provides additional certainty that although the {@code Cache} may have + * adjusted under the hood, the expected invocation have been made. * * @author Steven van Beelen */ @@ -65,7 +71,6 @@ public abstract class CachingIntegrationTestSuite { private static final String[] SAGA_NAMES = new String[]{"foo", "bar", "baz", "and", "some", "more"}; private static final int NUMBER_OF_ASSOCIATIONS = 42; - private static final Duration SHORT_DELAY = Duration.ofMillis(5); private static final Duration DEFAULT_DELAY = Duration.ofMillis(25); private static final Duration ONE_SECOND = Duration.ofSeconds(1); private static final Duration TWO_SECONDS = Duration.ofSeconds(2); @@ -75,13 +80,20 @@ public abstract class CachingIntegrationTestSuite { protected Configuration config; private StreamingEventProcessor sagaProcessor; - private Cache sagaCache; - private Cache associationsCache; + private EntryListenerValidator> sagaCacheListener; + private EntryListenerValidator> associationsCacheListener; @BeforeEach void setUp() { - sagaCache = buildCache("saga"); - associationsCache = buildCache("associations"); + Cache sagaCache = buildCache("saga"); + sagaCacheListener = new EntryListenerValidator<>(ListenerType.SAGA); + //noinspection resource + sagaCache.registerCacheEntryListener(sagaCacheListener); + + Cache associationsCache = buildCache("associations"); + associationsCacheListener = new EntryListenerValidator<>(ListenerType.ASSOCIATIONS); + //noinspection resource + associationsCache.registerCacheEntryListener(associationsCacheListener); Consumer> sagaConfigurer = config -> config.configureSagaStore(c -> CachingSagaStore.builder() @@ -124,44 +136,35 @@ void publishingBigEventTransactionTowardsCachedSagaWorksWithoutException() { // Construct the saga... publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName, NUMBER_OF_ASSOCIATIONS)); - await().pollDelay(SHORT_DELAY) - .atMost(TWO_SECONDS) + await().pollDelay(DEFAULT_DELAY) + .atMost(ONE_SECOND) .until(() -> handledEventsUpTo(createEvents)); - // Validate initial cache - String sagaIdentifier = null; - Set associations = associationsCache.get(associationCacheKey); - if (associations != null) { - // The associations cache may have been cleared, which is a fair scenario. - // Hence, only validate if we've found any associations. - sagaIdentifier = associations.iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - //noinspection unchecked - CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); - assertEquals(sagaName, cachedSaga.getName()); - assertTrue(cachedSaga.getState().isEmpty()); - } + // Validate initial association cache + Optional> optionalAssociations = associationsCacheListener.get(associationCacheKey); + assertTrue(optionalAssociations.isPresent()); + Optional optionalAssociationValue = optionalAssociations.get().stream().findFirst(); + assertTrue(optionalAssociationValue.isPresent()); + String sagaIdentifier = optionalAssociationValue.get(); + // Validate initial saga cache + Optional> optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + CachedSaga cachedSaga = optionalCachedSaga.get().saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertTrue(cachedSaga.getState().isEmpty()); // Bulk update the saga... publishBulkUpdatesTo(associationValue, NUMBER_OF_UPDATES); await().pollDelay(DEFAULT_DELAY) - .atMost(TWO_SECONDS) + .atMost(EIGHT_SECONDS) .until(() -> handledEventsUpTo(createEvents + NUMBER_OF_UPDATES)); - // Validate cache again - assertTrue(associationsCache.containsKey(associationCacheKey)); - associations = associationsCache.get(associationCacheKey); - if (associations.iterator().hasNext()) { - // The associations cache may have been cleared, which is a fair scenario. - // Hence, only validate if we've found any associations. - sagaIdentifier = associations.iterator().next(); - if (sagaCache.containsKey(sagaIdentifier)) { - //noinspection unchecked - CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); - assertEquals(sagaName, cachedSaga.getName()); - assertEquals(NUMBER_OF_UPDATES, cachedSaga.getState().size()); - } - } + // Validate caches again + optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + cachedSaga = optionalCachedSaga.get().saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertEquals(NUMBER_OF_UPDATES, cachedSaga.getState().size()); // Destruct the saga... publish(new CachedSaga.SagaEndsEvent(associationValue, true)); @@ -169,10 +172,9 @@ void publishingBigEventTransactionTowardsCachedSagaWorksWithoutException() { .atMost(ONE_SECOND) .until(() -> handledEventsUpTo(createEvents + NUMBER_OF_UPDATES + deleteEvents)); - // Validate cache is empty - await().pollDelay(SHORT_DELAY) - .atMost(ONE_SECOND) - .until(() -> !associationsCache.containsKey(associationCacheKey)); + // Validate caches are empty + assertTrue(associationsCacheListener.isRemoved(associationCacheKey)); + assertTrue(sagaCacheListener.isRemoved(sagaIdentifier)); } @Test @@ -187,23 +189,22 @@ void publishingBigEventTransactionsConcurrentlyTowardsCachedSagaWorksWithoutExce // Construct the saga... publish(new CachedSaga.SagaCreatedEvent(associationValue, sagaName, NUMBER_OF_ASSOCIATIONS)); - await().pollDelay(SHORT_DELAY) + await().pollDelay(DEFAULT_DELAY) .atMost(TWO_SECONDS) .until(() -> handledEventsUpTo(createEvents)); - // Validate initial cache - String sagaIdentifier = null; - Set associations = associationsCache.get(associationCacheKey); - if (associations != null) { - // The associations cache may have been cleared, which is a fair scenario. - // Hence, only validate if we've found any associations. - sagaIdentifier = associations.iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - //noinspection unchecked - CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); - assertEquals(sagaName, cachedSaga.getName()); - assertTrue(cachedSaga.getState().isEmpty()); - } + // Validate initial association cache + Optional> optionalAssociations = associationsCacheListener.get(associationCacheKey); + assertTrue(optionalAssociations.isPresent()); + Optional optionalAssociationValue = optionalAssociations.get().stream().findFirst(); + assertTrue(optionalAssociationValue.isPresent()); + String sagaIdentifier = optionalAssociationValue.get(); + // Validate initial saga cache + Optional> optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + CachedSaga cachedSaga = optionalCachedSaga.get().saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertTrue(cachedSaga.getState().isEmpty()); // Concurrent bulk update the saga... IntStream.range(0, NUMBER_OF_CONCURRENT_PUBLISHERS) @@ -214,23 +215,15 @@ void publishingBigEventTransactionsConcurrentlyTowardsCachedSagaWorksWithoutExce .orElse(CompletableFuture.completedFuture(null)) .get(15, TimeUnit.SECONDS); await().pollDelay(DEFAULT_DELAY) - .atMost(EIGHT_SECONDS) + .atMost(Duration.ofSeconds(20)) .until(() -> handledEventsUpTo(createEvents + (NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS))); - // Validate cache again - assertTrue(associationsCache.containsKey(associationCacheKey)); - associations = associationsCache.get(associationCacheKey); - if (associations.iterator().hasNext()) { - // The associations cache may have been cleared, which is a fair scenario. - // Hence, only validate if we've found any associations. - sagaIdentifier = associations.iterator().next(); - if (sagaCache.containsKey(sagaIdentifier)) { - //noinspection unchecked - CachedSaga cachedSaga = ((SagaStore.Entry) sagaCache.get(sagaIdentifier)).saga(); - assertEquals(sagaName, cachedSaga.getName()); - assertEquals(NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS, cachedSaga.getState().size()); - } - } + // Validate caches again + optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + cachedSaga = optionalCachedSaga.get().saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertEquals(NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS, cachedSaga.getState().size()); // Destruct the saga... publish(new CachedSaga.SagaEndsEvent(associationValue, true)); @@ -240,10 +233,9 @@ void publishingBigEventTransactionsConcurrentlyTowardsCachedSagaWorksWithoutExce createEvents + (NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS) + deleteEvents )); - // Validate cache is empty - await().pollDelay(SHORT_DELAY) - .atMost(TWO_SECONDS) - .until(() -> !associationsCache.containsKey(associationCacheKey)); + // Validate caches are empty + assertTrue(associationsCacheListener.isRemoved(associationCacheKey)); + assertTrue(sagaCacheListener.isRemoved(sagaIdentifier)); } @Test @@ -251,30 +243,30 @@ void publishingBigEventTransactionTowardsSeveralCachedSagasWorksWithoutException throws ExecutionException, InterruptedException, TimeoutException { int createEvents = SAGA_NAMES.length; int deleteEvents = SAGA_NAMES.length; - Map> associationReferences = new HashMap<>(); ExecutorService executor = Executors.newFixedThreadPool(SAGA_NAMES.length); // Construct the sagas... for (String sagaName : SAGA_NAMES) { publish(new CachedSaga.SagaCreatedEvent(sagaName + "-id", sagaName, NUMBER_OF_ASSOCIATIONS)); } - await().pollDelay(SHORT_DELAY) + await().pollDelay(DEFAULT_DELAY) .atMost(TWO_SECONDS) .until(() -> handledEventsUpTo(createEvents)); - // Validate initial cache + // Validate initial caches for (String sagaName : SAGA_NAMES) { String associationValue = sagaName + "-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); - - assertTrue(associationsCache.containsKey(associationCacheKey)); - associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); - - String sagaIdentifier = (associationReferences.get(associationCacheKey)).iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - - SagaStore.Entry sagaEntry = sagaCache.get(sagaIdentifier); - CachedSaga cachedSaga = sagaEntry.saga(); + // Validate initial association cache + Optional> optionalAssociations = associationsCacheListener.get(associationCacheKey); + assertTrue(optionalAssociations.isPresent()); + Optional optionalAssociationValue = optionalAssociations.get().stream().findFirst(); + assertTrue(optionalAssociationValue.isPresent()); + String sagaIdentifier = optionalAssociationValue.get(); + // Validate initial saga cache + Optional> optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + CachedSaga cachedSaga = optionalCachedSaga.get().saga(); assertEquals(sagaName, cachedSaga.getName()); assertTrue(cachedSaga.getState().isEmpty()); } @@ -296,18 +288,17 @@ void publishingBigEventTransactionTowardsSeveralCachedSagasWorksWithoutException String associationValue = sagaName + "-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); - assertTrue(associationsCache.containsKey(associationCacheKey)); - associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); - - String sagaIdentifier = (associationReferences.get(associationCacheKey)).iterator().next(); - SagaStore.Entry sagaEntry = sagaCache.get(sagaIdentifier); - // The saga cache may have been cleared already when doing bulk updates, which is a fair scenario. - // Hence, only validate the entry if it's still present in the Cache. - if (sagaEntry != null) { - CachedSaga cachedSaga = sagaEntry.saga(); - assertEquals(sagaName, cachedSaga.getName()); - assertEquals(NUMBER_OF_UPDATES, cachedSaga.getState().size(), sagaName); - } + Optional> optionalAssociations = associationsCacheListener.get(associationCacheKey); + assertTrue(optionalAssociations.isPresent()); + Optional optionalAssociationValue = optionalAssociations.get().stream().findFirst(); + assertTrue(optionalAssociationValue.isPresent()); + String sagaIdentifier = optionalAssociationValue.get(); + + Optional> optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + CachedSaga cachedSaga = optionalCachedSaga.get().saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertEquals(NUMBER_OF_UPDATES, cachedSaga.getState().size()); } // Destruct the sagas... @@ -320,9 +311,9 @@ void publishingBigEventTransactionTowardsSeveralCachedSagasWorksWithoutException createEvents + (NUMBER_OF_UPDATES * SAGA_NAMES.length) + deleteEvents )); - // Validate cache is empty + // Validate association cache is empty for (String sagaName : SAGA_NAMES) { - assertFalse(associationsCache.containsKey(sagaAssociationCacheKey(sagaName + "-id"))); + assertTrue(associationsCacheListener.isRemoved(sagaAssociationCacheKey(sagaName + "-id"))); } } @@ -331,14 +322,13 @@ void publishingBigEventTransactionsConcurrentlyTowardsSeveralCachedSagasWorksWit throws ExecutionException, InterruptedException, TimeoutException { int createEvents = SAGA_NAMES.length; int deleteEvents = SAGA_NAMES.length; - Map> associationReferences = new HashMap<>(); ExecutorService executor = Executors.newFixedThreadPool(SAGA_NAMES.length * NUMBER_OF_CONCURRENT_PUBLISHERS); // Construct the sagas... for (String sagaName : SAGA_NAMES) { publish(new CachedSaga.SagaCreatedEvent(sagaName + "-id", sagaName, NUMBER_OF_ASSOCIATIONS)); } - await().pollDelay(SHORT_DELAY) + await().pollDelay(DEFAULT_DELAY) .atMost(TWO_SECONDS) .until(() -> handledEventsUpTo(createEvents)); @@ -347,14 +337,16 @@ void publishingBigEventTransactionsConcurrentlyTowardsSeveralCachedSagasWorksWit String associationValue = sagaName + "-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); - assertTrue(associationsCache.containsKey(associationCacheKey)); - associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); - - String sagaIdentifier = (associationReferences.get(associationCacheKey)).iterator().next(); - assertTrue(sagaCache.containsKey(sagaIdentifier)); - - SagaStore.Entry sagaEntry = sagaCache.get(sagaIdentifier); - CachedSaga cachedSaga = sagaEntry.saga(); + // Validate initial association cache + Optional> optionalAssociations = associationsCacheListener.get(associationCacheKey); + assertTrue(optionalAssociations.isPresent()); + Optional optionalAssociationValue = optionalAssociations.get().stream().findFirst(); + assertTrue(optionalAssociationValue.isPresent()); + String sagaIdentifier = optionalAssociationValue.get(); + // Validate initial saga cache + Optional> optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + CachedSaga cachedSaga = optionalCachedSaga.get().saga(); assertEquals(sagaName, cachedSaga.getName()); assertTrue(cachedSaga.getState().isEmpty()); } @@ -379,20 +371,17 @@ void publishingBigEventTransactionsConcurrentlyTowardsSeveralCachedSagasWorksWit String associationValue = sagaName + "-id"; String associationCacheKey = sagaAssociationCacheKey(associationValue); - assertTrue(associationsCache.containsKey(associationCacheKey)); - associationReferences.put(associationCacheKey, associationsCache.get(associationCacheKey)); - - String sagaIdentifier = (associationReferences.get(associationCacheKey)).iterator().next(); - SagaStore.Entry sagaEntry = sagaCache.get(sagaIdentifier); - // The saga cache may have been cleared already when doing bulk updates, which is a fair scenario. - // Hence, only validate the entry if it's still present in the Cache. - if (sagaEntry != null) { - CachedSaga cachedSaga = sagaEntry.saga(); - assertEquals(sagaName, cachedSaga.getName()); - assertEquals(NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS, - cachedSaga.getState().size(), - sagaName); - } + Optional> optionalAssociations = associationsCacheListener.get(associationCacheKey); + assertTrue(optionalAssociations.isPresent()); + Optional optionalAssociationValue = optionalAssociations.get().stream().findFirst(); + assertTrue(optionalAssociationValue.isPresent()); + String sagaIdentifier = optionalAssociationValue.get(); + + Optional> optionalCachedSaga = sagaCacheListener.get(sagaIdentifier); + assertTrue(optionalCachedSaga.isPresent()); + CachedSaga cachedSaga = optionalCachedSaga.get().saga(); + assertEquals(sagaName, cachedSaga.getName()); + assertEquals(NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS, cachedSaga.getState().size()); } // Destruct the sagas... @@ -405,11 +394,11 @@ void publishingBigEventTransactionsConcurrentlyTowardsSeveralCachedSagasWorksWit createEvents + (NUMBER_OF_UPDATES * SAGA_NAMES.length) + deleteEvents )); - // Validate cache is empty + // Validate association cache is empty for (String sagaName : SAGA_NAMES) { await().pollDelay(DEFAULT_DELAY) .atMost(FOUR_SECONDS) - .until(() -> !associationsCache.containsKey(sagaAssociationCacheKey(sagaName + "-id"))); + .until(() -> associationsCacheListener.isRemoved(sagaAssociationCacheKey(sagaName + "-id"))); } } @@ -448,4 +437,75 @@ private Boolean handledEventsUpTo(int handledEvents) { private static String sagaAssociationCacheKey(String sagaId) { return CachedSaga.class.getName() + "/id=" + sagaId; } + + /** + * {@link org.axonframework.common.caching.Cache.EntryListener} implementation used for validating the correct + * workings of caching within Axon Framework. Used instead of validating the {@link Cache} directly, as it may have + * changed while the given and/or when phase of a test is processing. + * + * @param The type of value listened to through the {@link Cache}. + */ + private static class EntryListenerValidator implements Cache.EntryListener { + + @SuppressWarnings({"FieldCanBeLocal", "unused"}) + private final ListenerType type; + private final Set created = new ConcurrentSkipListSet<>(); + private final Set removed = new ConcurrentSkipListSet<>(); + private final Set expired = new ConcurrentSkipListSet<>(); + private final Map updates = new ConcurrentHashMap<>(); + + private EntryListenerValidator(ListenerType type) { + this.type = type; + } + + @Override + public void onEntryExpired(Object key) { + expired.add(key.toString()); + } + + @Override + public void onEntryRemoved(Object key) { + removed.add(key.toString()); + } + + @Override + public void onEntryUpdated(Object key, Object value) { + //noinspection unchecked + updates.put(key.toString(), (V) value); + } + + @Override + public void onEntryCreated(Object key, Object value) { + String keyAsString = key.toString(); + //noinspection unchecked + updates.put(keyAsString, (V) value); + boolean added = created.add(keyAsString); + if (!added) { + removed.remove(keyAsString); + expired.remove(keyAsString); + } + } + + @Override + public void onEntryRead(Object key, Object value) { + // Not used for validation, as the EhCacheAdapter does not support this. + } + + @Override + public Object clone() { + throw new UnsupportedOperationException(); + } + + public Optional get(String key) { + return Optional.ofNullable(updates.get(key)); + } + + public boolean isRemoved(String key) { + return removed.contains(key) || expired.contains(key); + } + } + + private enum ListenerType { + SAGA, ASSOCIATIONS + } } From 990f8c5848b266bae786bedf84e76dc5a03805af Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Tue, 27 Dec 2022 16:19:25 +0100 Subject: [PATCH 06/18] Fix wait times Fix wait times to be less extensive #enhancement/caching-saga-it --- .../integrationtests/cache/CachingIntegrationTestSuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java index 4a2f956cae..6e1f70de9d 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java @@ -156,7 +156,7 @@ void publishingBigEventTransactionTowardsCachedSagaWorksWithoutException() { // Bulk update the saga... publishBulkUpdatesTo(associationValue, NUMBER_OF_UPDATES); await().pollDelay(DEFAULT_DELAY) - .atMost(EIGHT_SECONDS) + .atMost(TWO_SECONDS) .until(() -> handledEventsUpTo(createEvents + NUMBER_OF_UPDATES)); // Validate caches again @@ -215,7 +215,7 @@ void publishingBigEventTransactionsConcurrentlyTowardsCachedSagaWorksWithoutExce .orElse(CompletableFuture.completedFuture(null)) .get(15, TimeUnit.SECONDS); await().pollDelay(DEFAULT_DELAY) - .atMost(Duration.ofSeconds(20)) + .atMost(EIGHT_SECONDS) .until(() -> handledEventsUpTo(createEvents + (NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS))); // Validate caches again From 06887c0053d69cb74588c5346f9c1b57ef2b0120 Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Tue, 27 Dec 2022 17:30:57 +0100 Subject: [PATCH 07/18] Add test that triggers the ConcurrentModificationException reliably using concurrency on the CachingSagaStore --- .../cache/CachingIntegrationTestSuite.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java index 6e1f70de9d..12c5bf2474 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java @@ -16,6 +16,7 @@ package org.axonframework.integrationtests.cache; +import org.axonframework.common.IdentifierFactory; import org.axonframework.common.caching.Cache; import org.axonframework.config.Configuration; import org.axonframework.config.DefaultConfigurer; @@ -26,6 +27,7 @@ import org.axonframework.eventhandling.TrackingEventProcessor; import org.axonframework.eventhandling.TrackingEventProcessorConfiguration; import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine; +import org.axonframework.modelling.saga.AssociationValue; import org.axonframework.modelling.saga.repository.CachingSagaStore; import org.axonframework.modelling.saga.repository.SagaStore; import org.axonframework.modelling.saga.repository.inmemory.InMemorySagaStore; @@ -34,6 +36,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -126,6 +129,45 @@ void setUp() { */ public abstract Cache buildCache(String name); + @Test + void canHandleConcurrentReadsAndWrites() throws ExecutionException, InterruptedException, TimeoutException { + String sagaName = SAGA_NAMES[0]; + AssociationValue associationValue = new AssociationValue(sagaName + "-id", "value"); + + ExecutorService executor = Executors.newFixedThreadPool(32); + + SagaStore store = CachingSagaStore.builder() + .delegateSagaStore(new InMemorySagaStore()) + .sagaCache(buildCache("saga2")) + .associationsCache(buildCache("associations2")) + .build(); + + IntStream.range(0, 256) + .mapToObj(i -> CompletableFuture.runAsync( + () -> { + try { + String id = IdentifierFactory.getInstance() + .generateIdentifier(); + store.insertSaga(CachedSaga.class, + id, + id, + Collections.singleton( + associationValue)); + store.findSagas(CachedSaga.class, associationValue); + store.deleteSaga(CachedSaga.class, + id, + Collections.singleton( + associationValue)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, executor + )).reduce(CompletableFuture::allOf) + .orElse(CompletableFuture.completedFuture(null)) + .get(30, TimeUnit.SECONDS); + + } + @Test void publishingBigEventTransactionTowardsCachedSagaWorksWithoutException() { int createEvents = 1; From a1c7b6c69815292280f7912375dcb8d56442ba69 Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Tue, 27 Dec 2022 17:31:18 +0100 Subject: [PATCH 08/18] Fix concurrency issue on the CachingSagaStore by introducing synchronization on the associations cache --- .../saga/repository/CachingSagaStore.java | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java index bb32859172..5b4f72093c 100644 --- a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java +++ b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java @@ -76,10 +76,13 @@ public static Builder builder() { @Override public Set findSagas(Class sagaType, AssociationValue associationValue) { + final String key = cacheKey(associationValue, sagaType); - return new HashSet<>(associationsCache.computeIfAbsent( - key, () -> delegate.findSagas(sagaType, associationValue)) - ); + synchronized (associationsCache) { + return new HashSet<>(associationsCache.computeIfAbsent( + key, () -> delegate.findSagas(sagaType, associationValue)) + ); + } } @Override @@ -114,12 +117,14 @@ private void removeAssociationValueFromCache(Class sagaType, String sagaIdentifier, AssociationValue associationValue) { String key = cacheKey(associationValue, sagaType); - associationsCache.computeIfPresent(key, associations -> { - //noinspection unchecked - ((Set) associations).remove(sagaIdentifier); - //noinspection unchecked - return ((Set) associations).isEmpty() ? null : associations; - }); + synchronized (associationsCache) { + associationsCache.computeIfPresent(key, associations -> { + //noinspection unchecked + ((Set) associations).remove(sagaIdentifier); + //noinspection unchecked + return ((Set) associations).isEmpty() ? null : associations; + }); + } } /** @@ -133,13 +138,15 @@ private void removeAssociationValueFromCache(Class sagaType, protected void addCachedAssociations(Iterable associationValues, String sagaIdentifier, Class sagaType) { - for (AssociationValue associationValue : associationValues) { - String key = cacheKey(associationValue, sagaType); - associationsCache.computeIfPresent(key, identifiers -> { - //noinspection unchecked - ((Set) identifiers).add(sagaIdentifier); - return identifiers; - }); + synchronized (associationsCache) { + for (AssociationValue associationValue : associationValues) { + String key = cacheKey(associationValue, sagaType); + associationsCache.computeIfPresent(key, identifiers -> { + //noinspection unchecked + ((Set) identifiers).add(sagaIdentifier); + return identifiers; + }); + } } } @@ -148,7 +155,8 @@ public void updateSaga(Class sagaType, String sagaIdentifier, T saga, AssociationValues associationValues) { - sagaCache.put(sagaIdentifier, new CacheEntry<>(saga, associationValues.asSet())); + sagaCache.put(sagaIdentifier, new CacheEntry<>(saga, associationValues.asSet())); + delegate.updateSaga(sagaType, sagaIdentifier, saga, associationValues); associationValues.removedAssociations() .forEach(av -> removeAssociationValueFromCache(sagaType, sagaIdentifier, av)); From d1141491ffa54e86428512e8be3a6e6d253c4e10 Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Wed, 28 Dec 2022 09:55:48 +0100 Subject: [PATCH 09/18] Remove synchronization in the CachingSagaStore and wrap the result of the delegate in a synchronised set to prevent concurrency issues leading to a ConcurrentModificationException in some cases. --- .../saga/repository/CachingSagaStore.java | 46 +++++++++---------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java index 5b4f72093c..cd8543f73d 100644 --- a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java +++ b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java @@ -23,7 +23,7 @@ import org.axonframework.modelling.saga.SagaRepository; import java.io.Serializable; -import java.util.HashSet; +import java.util.Collections; import java.util.Set; import static org.axonframework.common.BuilderUtils.assertNonNull; @@ -78,11 +78,13 @@ public static Builder builder() { public Set findSagas(Class sagaType, AssociationValue associationValue) { final String key = cacheKey(associationValue, sagaType); - synchronized (associationsCache) { - return new HashSet<>(associationsCache.computeIfAbsent( - key, () -> delegate.findSagas(sagaType, associationValue)) - ); - } + return associationsCache.computeIfAbsent( + key, + () -> { + // Wrap the original collection in a synchronized implementation, since it might be changed while + // the SagaManager is reading it using the insertSaga/deleteSaga methods. + return Collections.synchronizedSet(delegate.findSagas(sagaType, associationValue)); + }); } @Override @@ -117,14 +119,12 @@ private void removeAssociationValueFromCache(Class sagaType, String sagaIdentifier, AssociationValue associationValue) { String key = cacheKey(associationValue, sagaType); - synchronized (associationsCache) { - associationsCache.computeIfPresent(key, associations -> { - //noinspection unchecked - ((Set) associations).remove(sagaIdentifier); - //noinspection unchecked - return ((Set) associations).isEmpty() ? null : associations; - }); - } + associationsCache.computeIfPresent(key, associations -> { + //noinspection unchecked + ((Set) associations).remove(sagaIdentifier); + //noinspection unchecked + return ((Set) associations).isEmpty() ? null : associations; + }); } /** @@ -138,15 +138,13 @@ private void removeAssociationValueFromCache(Class sagaType, protected void addCachedAssociations(Iterable associationValues, String sagaIdentifier, Class sagaType) { - synchronized (associationsCache) { - for (AssociationValue associationValue : associationValues) { - String key = cacheKey(associationValue, sagaType); - associationsCache.computeIfPresent(key, identifiers -> { - //noinspection unchecked - ((Set) identifiers).add(sagaIdentifier); - return identifiers; - }); - } + for (AssociationValue associationValue : associationValues) { + String key = cacheKey(associationValue, sagaType); + associationsCache.computeIfPresent(key, identifiers -> { + //noinspection unchecked + ((Set) identifiers).add(sagaIdentifier); + return identifiers; + }); } } @@ -155,7 +153,7 @@ public void updateSaga(Class sagaType, String sagaIdentifier, T saga, AssociationValues associationValues) { - sagaCache.put(sagaIdentifier, new CacheEntry<>(saga, associationValues.asSet())); + sagaCache.put(sagaIdentifier, new CacheEntry<>(saga, associationValues.asSet())); delegate.updateSaga(sagaType, sagaIdentifier, saga, associationValues); associationValues.removedAssociations() From 6d9f030817e9c4eb9b2856f67b03f5f8df48a2ea Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Wed, 28 Dec 2022 10:05:40 +0100 Subject: [PATCH 10/18] Fix NPE when using WeakReferenceCache.computeIfPresent due to missing check whether value is still not garbage collected or cleaned --- .../axonframework/common/caching/WeakReferenceCache.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java b/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java index a1089e2364..d47c2be383 100644 --- a/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java +++ b/messaging/src/main/java/org/axonframework/common/caching/WeakReferenceCache.java @@ -174,18 +174,23 @@ private void purgeItems() { public void computeIfPresent(Object key, UnaryOperator update) { purgeItems(); cache.computeIfPresent(key, (k, v) -> { + Object currentValue = v.get(); + if (currentValue == null) { + return null; + } //noinspection unchecked - V value = update.apply((V) v.get()); + V value = update.apply((V) currentValue); if (value != null) { for (EntryListener adapter : adapters) { adapter.onEntryUpdated(key, value); } + return new Entry(k, value); } else { for (EntryListener adapter : adapters) { adapter.onEntryRemoved(key); } + return null; } - return new Entry(k, value); }); } From c94e192c451942529a0900057d5fe661ac643c83 Mon Sep 17 00:00:00 2001 From: Mitchell Herrijgers Date: Wed, 28 Dec 2022 10:19:17 +0100 Subject: [PATCH 11/18] Move concurrency test to CachingSagaStoreTest --- .../cache/CachingIntegrationTestSuite.java | 42 ------------------- .../saga/repository/CachingSagaStoreTest.java | 40 ++++++++++++++++++ 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java index 12c5bf2474..6e1f70de9d 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java @@ -16,7 +16,6 @@ package org.axonframework.integrationtests.cache; -import org.axonframework.common.IdentifierFactory; import org.axonframework.common.caching.Cache; import org.axonframework.config.Configuration; import org.axonframework.config.DefaultConfigurer; @@ -27,7 +26,6 @@ import org.axonframework.eventhandling.TrackingEventProcessor; import org.axonframework.eventhandling.TrackingEventProcessorConfiguration; import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine; -import org.axonframework.modelling.saga.AssociationValue; import org.axonframework.modelling.saga.repository.CachingSagaStore; import org.axonframework.modelling.saga.repository.SagaStore; import org.axonframework.modelling.saga.repository.inmemory.InMemorySagaStore; @@ -36,7 +34,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -129,45 +126,6 @@ void setUp() { */ public abstract Cache buildCache(String name); - @Test - void canHandleConcurrentReadsAndWrites() throws ExecutionException, InterruptedException, TimeoutException { - String sagaName = SAGA_NAMES[0]; - AssociationValue associationValue = new AssociationValue(sagaName + "-id", "value"); - - ExecutorService executor = Executors.newFixedThreadPool(32); - - SagaStore store = CachingSagaStore.builder() - .delegateSagaStore(new InMemorySagaStore()) - .sagaCache(buildCache("saga2")) - .associationsCache(buildCache("associations2")) - .build(); - - IntStream.range(0, 256) - .mapToObj(i -> CompletableFuture.runAsync( - () -> { - try { - String id = IdentifierFactory.getInstance() - .generateIdentifier(); - store.insertSaga(CachedSaga.class, - id, - id, - Collections.singleton( - associationValue)); - store.findSagas(CachedSaga.class, associationValue); - store.deleteSaga(CachedSaga.class, - id, - Collections.singleton( - associationValue)); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, executor - )).reduce(CompletableFuture::allOf) - .orElse(CompletableFuture.completedFuture(null)) - .get(30, TimeUnit.SECONDS); - - } - @Test void publishingBigEventTransactionTowardsCachedSagaWorksWithoutException() { int createEvents = 1; diff --git a/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java b/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java index 88bc159c4a..c141ea3cb5 100644 --- a/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java +++ b/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java @@ -16,6 +16,7 @@ package org.axonframework.modelling.saga.repository; +import org.axonframework.common.IdentifierFactory; import org.axonframework.common.caching.Cache; import org.axonframework.modelling.saga.AssociationValue; import org.axonframework.modelling.saga.AssociationValuesImpl; @@ -25,7 +26,12 @@ import java.util.Collections; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import java.util.stream.IntStream; import static java.util.Collections.singleton; import static org.junit.jupiter.api.Assertions.*; @@ -177,4 +183,38 @@ void sagaAndAssociationsRemovedFromCacheOnDelete() { assertFalse(sagaCache.containsKey(testSagaId)); assertFalse(associationsCache.containsKey(expectedAssociationKey)); } + + @Test + void canHandleConcurrentReadsAndWrites() { + AssociationValue associationValue = new AssociationValue("StubSaga-id", "value"); + + try { + ExecutorService executor = Executors.newFixedThreadPool(16); + IntStream.range(0, 64) + .mapToObj(i -> CompletableFuture.runAsync( + () -> { + try { + String id = IdentifierFactory.getInstance() + .generateIdentifier(); + testSubject.insertSaga(StubSaga.class, + id, + mock(StubSaga.class), + Collections.singleton( + associationValue)); + testSubject.findSagas(StubSaga.class, associationValue); + testSubject.deleteSaga(StubSaga.class, + id, + Collections.singleton( + associationValue)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, executor + )).reduce(CompletableFuture::allOf) + .orElse(CompletableFuture.completedFuture(null)) + .get(30, TimeUnit.SECONDS); + } catch (Exception e) { + fail(e); + } + } } From ae9dc3203b4d5cfd8da9dc40b75ef6c411c5688b Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Wed, 28 Dec 2022 11:01:49 +0100 Subject: [PATCH 12/18] Remove redundant empty lines Remove redundant empty lines #2531 --- .../modelling/saga/repository/CachingSagaStore.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java index cd8543f73d..4add91f6d6 100644 --- a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java +++ b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java @@ -76,7 +76,6 @@ public static Builder builder() { @Override public Set findSagas(Class sagaType, AssociationValue associationValue) { - final String key = cacheKey(associationValue, sagaType); return associationsCache.computeIfAbsent( key, @@ -154,7 +153,6 @@ public void updateSaga(Class sagaType, T saga, AssociationValues associationValues) { sagaCache.put(sagaIdentifier, new CacheEntry<>(saga, associationValues.asSet())); - delegate.updateSaga(sagaType, sagaIdentifier, saga, associationValues); associationValues.removedAssociations() .forEach(av -> removeAssociationValueFromCache(sagaType, sagaIdentifier, av)); From 93c5c8129388f3fc7c8ceedd383135842eb0e758 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Wed, 28 Dec 2022 13:43:33 +0100 Subject: [PATCH 13/18] Adjust indentation and naming of test case for clarity Adjust indentation and naming of test case for clarity #2531 --- .../saga/repository/CachingSagaStoreTest.java | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java b/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java index c141ea3cb5..1f02ef8f5a 100644 --- a/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java +++ b/modelling/src/test/java/org/axonframework/modelling/saga/repository/CachingSagaStoreTest.java @@ -186,35 +186,37 @@ void sagaAndAssociationsRemovedFromCacheOnDelete() { @Test void canHandleConcurrentReadsAndWrites() { + int concurrentOperations = 64; + AssociationValue associationValue = new AssociationValue("StubSaga-id", "value"); + Set associationValues = singleton(associationValue); + ExecutorService executor = Executors.newFixedThreadPool(16); try { - ExecutorService executor = Executors.newFixedThreadPool(16); - IntStream.range(0, 64) + IntStream.range(0, concurrentOperations) .mapToObj(i -> CompletableFuture.runAsync( () -> { try { - String id = IdentifierFactory.getInstance() - .generateIdentifier(); - testSubject.insertSaga(StubSaga.class, - id, - mock(StubSaga.class), - Collections.singleton( - associationValue)); + String sagaId = IdentifierFactory.getInstance().generateIdentifier(); + + testSubject.insertSaga( + StubSaga.class, sagaId, mock(StubSaga.class), associationValues + ); testSubject.findSagas(StubSaga.class, associationValue); - testSubject.deleteSaga(StubSaga.class, - id, - Collections.singleton( - associationValue)); + testSubject.deleteSaga( + StubSaga.class, sagaId, associationValues + ); } catch (Exception e) { throw new RuntimeException(e); } - }, executor - )).reduce(CompletableFuture::allOf) + }, + executor + )) + .reduce(CompletableFuture::allOf) .orElse(CompletableFuture.completedFuture(null)) .get(30, TimeUnit.SECONDS); } catch (Exception e) { - fail(e); + fail("An unexpected exception occurred during concurrent invocations on the CachingSagaStore.", e); } } } From 6ee2bf15a7dd547bdf5dd08ae5d218a2298f0701 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Wed, 28 Dec 2022 13:44:33 +0100 Subject: [PATCH 14/18] Ensure the TEP configuration is set accordingly Due to the Saga default TEP configuration that isn't impacted by configuring a default TEP config for al TEP's, the provided configuration did not have any impact at all. #2531 --- .../cache/CachingIntegrationTestSuite.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java index 6e1f70de9d..1755c4f229 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java @@ -23,7 +23,6 @@ import org.axonframework.eventhandling.EventMessage; import org.axonframework.eventhandling.GenericEventMessage; import org.axonframework.eventhandling.StreamingEventProcessor; -import org.axonframework.eventhandling.TrackingEventProcessor; import org.axonframework.eventhandling.TrackingEventProcessorConfiguration; import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine; import org.axonframework.modelling.saga.repository.CachingSagaStore; @@ -108,13 +107,16 @@ void setUp() { config = DefaultConfigurer.defaultConfiguration(DO_NOT_AUTO_LOCATE_CONFIGURER_MODULES) .configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine()) .eventProcessing( - procConfig -> procConfig.usingTrackingEventProcessors() - .registerTrackingEventProcessorConfiguration(c -> tepConfig) - .registerSaga(CachedSaga.class, sagaConfigurer) + procConfig -> procConfig + .usingTrackingEventProcessors() + .registerTrackingEventProcessorConfiguration( + "CachedSagaProcessor", c -> tepConfig + ) + .registerSaga(CachedSaga.class, sagaConfigurer) ) .start(); sagaProcessor = config.eventProcessingConfiguration() - .eventProcessor("CachedSagaProcessor", TrackingEventProcessor.class) + .eventProcessor("CachedSagaProcessor", StreamingEventProcessor.class) .orElseThrow(() -> new IllegalStateException("CachedSagaProcessor is not present")); } From 3aceaf43da21e844627bb513bb86279443fc3370 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Wed, 28 Dec 2022 13:49:02 +0100 Subject: [PATCH 15/18] Adjust solution in EntryListenerValidator As there's a window of opportunity that a Saga is searched for right after it's removed, the removed check may currently fail from time to time. As this deviation isn't important for the test cases, we can get rid of clearing out the removed set when an entry is added. Added, clean-up the validator a bit further for clarity. #2531 --- .../cache/CachingIntegrationTestSuite.java | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java index 1755c4f229..1ba463119a 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java @@ -451,23 +451,23 @@ private static class EntryListenerValidator implements Cache.EntryListener { @SuppressWarnings({"FieldCanBeLocal", "unused"}) private final ListenerType type; + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") private final Set created = new ConcurrentSkipListSet<>(); + private final Map updates = new ConcurrentHashMap<>(); private final Set removed = new ConcurrentSkipListSet<>(); + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") private final Set expired = new ConcurrentSkipListSet<>(); - private final Map updates = new ConcurrentHashMap<>(); private EntryListenerValidator(ListenerType type) { this.type = type; } @Override - public void onEntryExpired(Object key) { - expired.add(key.toString()); - } - - @Override - public void onEntryRemoved(Object key) { - removed.add(key.toString()); + public void onEntryCreated(Object key, Object value) { + String keyAsString = key.toString(); + created.add(keyAsString); + //noinspection unchecked + updates.put(keyAsString, (V) value); } @Override @@ -477,15 +477,13 @@ public void onEntryUpdated(Object key, Object value) { } @Override - public void onEntryCreated(Object key, Object value) { - String keyAsString = key.toString(); - //noinspection unchecked - updates.put(keyAsString, (V) value); - boolean added = created.add(keyAsString); - if (!added) { - removed.remove(keyAsString); - expired.remove(keyAsString); - } + public void onEntryRemoved(Object key) { + removed.add(key.toString()); + } + + @Override + public void onEntryExpired(Object key) { + expired.add(key.toString()); } @Override From 8d272a72ca744282b310071e571d9d32f623303c Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Wed, 28 Dec 2022 13:49:53 +0100 Subject: [PATCH 16/18] Increase timeouts for resiliency Increase timeouts for resiliency, since busier systems may fail due to the current timeouts. #2531 --- .../cache/CachingIntegrationTestSuite.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java index 1ba463119a..c6f97e807d 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java @@ -75,6 +75,8 @@ public abstract class CachingIntegrationTestSuite { private static final Duration TWO_SECONDS = Duration.ofSeconds(2); private static final Duration FOUR_SECONDS = Duration.ofSeconds(4); private static final Duration EIGHT_SECONDS = Duration.ofSeconds(8); + private static final Duration SIXTEEN_SECONDS = Duration.ofSeconds(16); + private static final Duration THIRTY_TWO_SECONDS = Duration.ofSeconds(32); protected Configuration config; private StreamingEventProcessor sagaProcessor; @@ -158,7 +160,7 @@ void publishingBigEventTransactionTowardsCachedSagaWorksWithoutException() { // Bulk update the saga... publishBulkUpdatesTo(associationValue, NUMBER_OF_UPDATES); await().pollDelay(DEFAULT_DELAY) - .atMost(TWO_SECONDS) + .atMost(FOUR_SECONDS) .until(() -> handledEventsUpTo(createEvents + NUMBER_OF_UPDATES)); // Validate caches again @@ -217,7 +219,7 @@ void publishingBigEventTransactionsConcurrentlyTowardsCachedSagaWorksWithoutExce .orElse(CompletableFuture.completedFuture(null)) .get(15, TimeUnit.SECONDS); await().pollDelay(DEFAULT_DELAY) - .atMost(EIGHT_SECONDS) + .atMost(SIXTEEN_SECONDS) .until(() -> handledEventsUpTo(createEvents + (NUMBER_OF_UPDATES * NUMBER_OF_CONCURRENT_PUBLISHERS))); // Validate caches again @@ -363,7 +365,7 @@ void publishingBigEventTransactionsConcurrentlyTowardsSeveralCachedSagasWorksWit .orElse(CompletableFuture.completedFuture(null)) .get(15, TimeUnit.SECONDS); await().pollDelay(DEFAULT_DELAY) - .atMost(Duration.ofSeconds(20)) + .atMost(THIRTY_TWO_SECONDS) .until(() -> handledEventsUpTo( createEvents + (NUMBER_OF_UPDATES * (SAGA_NAMES.length * NUMBER_OF_CONCURRENT_PUBLISHERS)) )); From 702eb1aaa65b2a9f885fde982c69ab5cb2b67fd1 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Wed, 28 Dec 2022 13:51:45 +0100 Subject: [PATCH 17/18] Replace await-call for assertTrue Replace await-call for assertTrue. The await-call was included to incorrectly resolve the occurrence where a saga was found right after deletion, causing it to turn back up in the created set, thus clearing out the removed set. With the adjustments in the EntryListenerValidator, this predicament no longer occurs though. #2531 --- .../integrationtests/cache/CachingIntegrationTestSuite.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java index c6f97e807d..5b45af245f 100644 --- a/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java +++ b/integrationtests/src/test/java/org/axonframework/integrationtests/cache/CachingIntegrationTestSuite.java @@ -400,9 +400,7 @@ void publishingBigEventTransactionsConcurrentlyTowardsSeveralCachedSagasWorksWit // Validate association cache is empty for (String sagaName : SAGA_NAMES) { - await().pollDelay(DEFAULT_DELAY) - .atMost(FOUR_SECONDS) - .until(() -> associationsCacheListener.isRemoved(sagaAssociationCacheKey(sagaName + "-id"))); + assertTrue(associationsCacheListener.isRemoved(sagaAssociationCacheKey(sagaName + "-id"))); } } From 9cf9d45f8821efb48647ad1934fc9112d461129f Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Wed, 28 Dec 2022 13:53:53 +0100 Subject: [PATCH 18/18] Adjust indentation Adjust indentation #2531 --- .../modelling/saga/repository/CachingSagaStore.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java index 4add91f6d6..38c406eaf1 100644 --- a/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java +++ b/modelling/src/main/java/org/axonframework/modelling/saga/repository/CachingSagaStore.java @@ -83,7 +83,8 @@ public Set findSagas(Class sagaType, AssociationValue assoc // Wrap the original collection in a synchronized implementation, since it might be changed while // the SagaManager is reading it using the insertSaga/deleteSaga methods. return Collections.synchronizedSet(delegate.findSagas(sagaType, associationValue)); - }); + } + ); } @Override