From 7670f3439483fac086d69b5a4abd3e881d26deaa Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Wed, 28 Dec 2022 14:56:01 +0100 Subject: [PATCH 1/2] Ensure a default TEP config is used for Sagas Whenever a default TrackingEventProcessorConfiguration was configured consciously through the EventProcessingConfigurer, it was not taken into accounts for Sagas. This occurs as the EventProcessingModule does not validate whether a default TEP was configured to switch between customization and the default Saga TEP configuration. #bug/default-tep-config-sagas --- .../config/EventProcessingModule.java | 22 +++++--- .../config/EventProcessingModuleTest.java | 54 +++++++++++++------ 2 files changed, 55 insertions(+), 21 deletions(-) diff --git a/config/src/main/java/org/axonframework/config/EventProcessingModule.java b/config/src/main/java/org/axonframework/config/EventProcessingModule.java index 29165f28e6..a3fd5abe5c 100644 --- a/config/src/main/java/org/axonframework/config/EventProcessingModule.java +++ b/config/src/main/java/org/axonframework/config/EventProcessingModule.java @@ -75,6 +75,7 @@ import static java.lang.String.format; import static java.util.Comparator.comparing; import static org.axonframework.common.BuilderUtils.assertNonNull; +import static org.axonframework.common.ObjectUtils.getOrDefault; import static org.axonframework.common.annotation.AnnotationUtils.findAnnotationAttributes; import static org.axonframework.config.EventProcessingConfigurer.PooledStreamingProcessorConfiguration.noOp; @@ -88,9 +89,11 @@ public class EventProcessingModule implements ModuleConfiguration, EventProcessingConfiguration, EventProcessingConfigurer { + private static final String CONFIGURED_DEFAULT_TEP_CONFIG = "___DEFAULT_TEP_CONFIG"; + private static final TrackingEventProcessorConfiguration DEFAULT_TEP_CONFIG = + TrackingEventProcessorConfiguration.forSingleThreadedProcessing(); private static final TrackingEventProcessorConfiguration DEFAULT_SAGA_TEP_CONFIG = - TrackingEventProcessorConfiguration.forSingleThreadedProcessing() - .andInitialTrackingToken(StreamableMessageSource::createHeadToken); + DEFAULT_TEP_CONFIG.andInitialTrackingToken(StreamableMessageSource::createHeadToken); private static final Function, String> DEFAULT_SAGA_PROCESSING_GROUP_FUNCTION = c -> c.getSimpleName() + "Processor"; @@ -182,7 +185,7 @@ public class EventProcessingModule "defaultSubscribableMessageSource", Configuration::eventBus ); - private final Component defaultTrackingEventProcessorConfiguration = + private final Component defaultTepConfig = new Component<>( () -> configuration, "trackingEventProcessorConfiguration", @@ -344,7 +347,8 @@ private boolean noSagaProcessorCustomization(Class type, String processingGro return DEFAULT_SAGA_PROCESSING_GROUP_FUNCTION.apply(type).equals(processingGroup) && processingGroup.equals(processorName) && !eventProcessorBuilders.containsKey(processorName) - && !tepConfigs.containsKey(processorName); + && !tepConfigs.containsKey(processorName) + && !tepConfigs.containsKey(CONFIGURED_DEFAULT_TEP_CONFIG); } private EventProcessor buildEventProcessor(List> builderFunctions, @@ -791,7 +795,10 @@ public EventProcessingConfigurer registerTrackingEventProcessorConfiguration( public EventProcessingConfigurer registerTrackingEventProcessorConfiguration( Function trackingEventProcessorConfigurationBuilder ) { - this.defaultTrackingEventProcessorConfiguration.update(trackingEventProcessorConfigurationBuilder); + this.tepConfigs.put(CONFIGURED_DEFAULT_TEP_CONFIG, + new Component<>(() -> configuration, + "trackingEventProcessorConfiguration", + trackingEventProcessorConfigurationBuilder)); return this; } @@ -879,7 +886,10 @@ private EventProcessor defaultEventProcessor(String name, } private TrackingEventProcessorConfiguration trackingEventProcessorConfig(String name) { - return tepConfigs.getOrDefault(name, defaultTrackingEventProcessorConfiguration).get(); + if (tepConfigs.containsKey(name)) { + return tepConfigs.get(name).get(); + } + return tepConfigs.getOrDefault(CONFIGURED_DEFAULT_TEP_CONFIG, defaultTepConfig).get(); } /** diff --git a/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java b/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java index 25c0b95b8a..6e8f7fa31b 100644 --- a/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java +++ b/config/src/test/java/org/axonframework/config/EventProcessingModuleTest.java @@ -647,7 +647,7 @@ void customTrackingEventProcessingConfiguration( } @Test - void sagaTrackingProcessorsDefaultsToSagaTrackingEventProcessorConfigIfNoCustomizationIsPresent( + void sagaTrackingProcessorConstructionUsesDefaultSagaProcessorConfigIfNoCustomizationIsPresent( @Mock StreamableMessageSource> mockedSource, @Mock StreamableMessageSource> mockedSourceForVerification ) throws NoSuchFieldException { @@ -661,8 +661,7 @@ void sagaTrackingProcessorsDefaultsToSagaTrackingEventProcessorConfigIfNoCustomi config.eventProcessingConfiguration().eventProcessor("ObjectProcessor", TrackingEventProcessor.class); assertTrue(resultTep.isPresent()); TrackingEventProcessor tep = resultTep.get(); - int tepSegmentsSize = - getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep); + int tepSegmentsSize = getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep); assertEquals(1, tepSegmentsSize); Function>, TrackingToken> tepInitialTokenBuilder = @@ -674,7 +673,7 @@ void sagaTrackingProcessorsDefaultsToSagaTrackingEventProcessorConfigIfNoCustomi } @Test - void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigForCustomProcessingGroup( + void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomProcessingGroup( @Mock StreamableMessageSource> mockedSource, @Mock StreamableMessageSource> mockedSourceForVerification ) throws NoSuchFieldException { @@ -689,8 +688,7 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo ); assertTrue(resultTep.isPresent()); TrackingEventProcessor tep = resultTep.get(); - int tepSegmentsSize = - getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep); + int tepSegmentsSize = getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep); assertEquals(1, tepSegmentsSize); Function>, TrackingToken> tepInitialTokenBuilder = @@ -702,7 +700,7 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo } @Test - void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigForCustomProcessor( + void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomProcessor( @Mock StreamableMessageSource> mockedSource, @Mock StreamableMessageSource> mockedSourceForVerification ) throws NoSuchFieldException { @@ -717,8 +715,7 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo ); assertTrue(resultTep.isPresent()); TrackingEventProcessor tep = resultTep.get(); - int tepSegmentsSize = - getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep); + int tepSegmentsSize = getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep); assertEquals(1, tepSegmentsSize); Function>, TrackingToken> tepInitialTokenBuilder = @@ -730,7 +727,7 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo } @Test - void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigForCustomTrackingProcessorBuilder( + void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomTrackingProcessorBuilder( @Mock StreamableMessageSource> mockedSource, @Mock StreamableMessageSource> mockedSourceForVerification ) throws NoSuchFieldException { @@ -745,8 +742,7 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo config.eventProcessingConfiguration().eventProcessor("ObjectProcessor", TrackingEventProcessor.class); assertTrue(resultTep.isPresent()); TrackingEventProcessor tep = resultTep.get(); - int tepSegmentsSize = - getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep); + int tepSegmentsSize = getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep); assertEquals(3, tepSegmentsSize); Function>, TrackingToken> tepInitialTokenBuilder = @@ -758,7 +754,7 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo } @Test - void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigForCustomConfigInstance( + void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomConfigInstance( @Mock StreamableMessageSource> mockedSource, @Mock StreamableMessageSource> mockedSourceForVerification ) throws NoSuchFieldException { @@ -775,8 +771,36 @@ void sagaTrackingProcessorsDoesNotPickDefaultsSagaTrackingEventProcessorConfigFo config.eventProcessingConfiguration().eventProcessor("ObjectProcessor", TrackingEventProcessor.class); assertTrue(resultTep.isPresent()); TrackingEventProcessor tep = resultTep.get(); - int tepSegmentsSize = - getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep); + int tepSegmentsSize = getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep); + assertEquals(4, tepSegmentsSize); + + Function>, TrackingToken> tepInitialTokenBuilder = + getFieldValue(TrackingEventProcessor.class.getDeclaredField("initialTrackingTokenBuilder"), tep); + tepInitialTokenBuilder.apply(mockedSourceForVerification); + // In absence of the default Saga Config, the stream starts at the tail + verify(mockedSourceForVerification).createTailToken(); + verify(mockedSourceForVerification, times(0)).createHeadToken(); + } + + @Test + void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomDefaultConfig( + @Mock StreamableMessageSource> mockedSource, + @Mock StreamableMessageSource> mockedSourceForVerification + ) throws NoSuchFieldException { + TrackingEventProcessorConfiguration testTepConfig = + TrackingEventProcessorConfiguration.forParallelProcessing(4); + configurer.eventProcessing() + .usingTrackingEventProcessors() + .configureDefaultStreamableMessageSource(config -> mockedSource) + .registerSaga(Object.class) + .registerTrackingEventProcessorConfiguration(config -> testTepConfig); + Configuration config = configurer.start(); + + Optional resultTep = + config.eventProcessingConfiguration().eventProcessor("ObjectProcessor", TrackingEventProcessor.class); + assertTrue(resultTep.isPresent()); + TrackingEventProcessor tep = resultTep.get(); + int tepSegmentsSize = getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), tep); assertEquals(4, tepSegmentsSize); Function>, TrackingToken> tepInitialTokenBuilder = From f660c8ce6e8ee6a0c9969b5909199cf8934df77e Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Wed, 28 Dec 2022 15:56:57 +0100 Subject: [PATCH 2/2] Remove redundant import Remove redundant import #2533 --- .../java/org/axonframework/config/EventProcessingModule.java | 1 - 1 file changed, 1 deletion(-) diff --git a/config/src/main/java/org/axonframework/config/EventProcessingModule.java b/config/src/main/java/org/axonframework/config/EventProcessingModule.java index a3fd5abe5c..95c17692e8 100644 --- a/config/src/main/java/org/axonframework/config/EventProcessingModule.java +++ b/config/src/main/java/org/axonframework/config/EventProcessingModule.java @@ -75,7 +75,6 @@ import static java.lang.String.format; import static java.util.Comparator.comparing; import static org.axonframework.common.BuilderUtils.assertNonNull; -import static org.axonframework.common.ObjectUtils.getOrDefault; import static org.axonframework.common.annotation.AnnotationUtils.findAnnotationAttributes; import static org.axonframework.config.EventProcessingConfigurer.PooledStreamingProcessorConfiguration.noOp;