Skip to content

Commit

Permalink
Merge pull request #1992 from AxonFramework/feature/1490
Browse files Browse the repository at this point in the history
[#1490] Simplify LockFactory configuration for Aggregates
  • Loading branch information
smcvb committed Nov 1, 2021
2 parents 88f007c + 99e77e9 commit 8f08efe
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.axonframework.common.caching.Cache;
import org.axonframework.common.caching.WeakReferenceCache;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.common.lock.LockFactory;
import org.axonframework.common.lock.NullLockFactory;
import org.axonframework.common.lock.PessimisticLockFactory;
import org.axonframework.disruptor.commandhandling.DisruptorCommandBus;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventsourcing.AggregateFactory;
Expand Down Expand Up @@ -59,10 +62,10 @@
import static org.axonframework.common.Assert.state;

/**
* Axon Configuration API extension that allows the definition of an Aggregate. This component will automatically setup
* Axon Configuration API extension that allows the definition of an Aggregate. This component will automatically set up
* all components required for the Aggregate to operate.
*
* @param <A> The type of Aggregate configured
* @param <A> the type of Aggregate configured
* @author Allard Buijze
* @since 3.0
*/
Expand All @@ -74,6 +77,7 @@ public class AggregateConfigurer<A> implements AggregateConfiguration<A> {
private final Component<Repository<A>> repository;
private final Component<Cache> cache;
private final Component<AggregateFactory<A>> aggregateFactory;
private final Component<LockFactory> lockFactory;
private final Component<SnapshotTriggerDefinition> snapshotTriggerDefinition;
private final Component<SnapshotFilter> snapshotFilter;
private final Component<CommandTargetResolver> commandTargetResolver;
Expand Down Expand Up @@ -112,7 +116,8 @@ public static <A> AggregateConfigurer<A> defaultConfiguration(Class<A> aggregate
* @return An AggregateConfigurer instance for further configuration of the Aggregate
*/
public static <A> AggregateConfigurer<A> jpaMappedConfiguration(Class<A> aggregateType) {
AggregateConfigurer<A> configurer = new AggregateConfigurer<>(aggregateType);
AggregateConfigurer<A> configurer = new AggregateConfigurer<>(aggregateType)
.configureLockFactory(config -> NullLockFactory.INSTANCE);
return configurer.configureRepository(
c -> {
EntityManagerProvider entityManagerProvider = c.getComponent(
Expand All @@ -128,6 +133,7 @@ public static <A> AggregateConfigurer<A> jpaMappedConfiguration(Class<A> aggrega
});
return GenericJpaRepository.builder(aggregateType)
.aggregateModel(configurer.metaModel.get())
.lockFactory(configurer.lockFactory.get())
.entityManagerProvider(entityManagerProvider)
.eventBus(c.eventBus())
.repositoryProvider(c::repository)
Expand All @@ -147,10 +153,12 @@ public static <A> AggregateConfigurer<A> jpaMappedConfiguration(Class<A> aggrega
*/
public static <A> AggregateConfigurer<A> jpaMappedConfiguration(Class<A> aggregateType,
EntityManagerProvider entityManagerProvider) {
AggregateConfigurer<A> configurer = new AggregateConfigurer<>(aggregateType);
AggregateConfigurer<A> configurer = new AggregateConfigurer<>(aggregateType)
.configureLockFactory(config -> NullLockFactory.INSTANCE);
return configurer.configureRepository(
c -> GenericJpaRepository.builder(aggregateType)
.aggregateModel(configurer.metaModel.get())
.lockFactory(configurer.lockFactory.get())
.entityManagerProvider(entityManagerProvider)
.eventBus(c.eventBus())
.repositoryProvider(c::repository)
Expand Down Expand Up @@ -180,6 +188,7 @@ protected AggregateConfigurer(Class<A> aggregate) {
() -> AnnotationCommandTargetResolver.builder().build()
)
);
lockFactory = new Component<>(() -> parent, name("lockFactory"), c -> PessimisticLockFactory.usingDefaults());
snapshotTriggerDefinition = new Component<>(() -> parent, name("snapshotTriggerDefinition"),
c -> NoSnapshotTriggerDefinition.INSTANCE);
snapshotFilter = new Component<>(() -> parent, name("snapshotFilter"), c -> {
Expand Down Expand Up @@ -227,11 +236,12 @@ protected AggregateConfigurer(Class<A> aggregate) {
EventSourcingRepository.Builder<A> builder =
EventSourcingRepository.builder(aggregate)
.aggregateModel(metaModel.get())
.aggregateFactory(aggregateFactory.get())
.lockFactory(lockFactory.get())
.eventStore(c.eventStore())
.snapshotTriggerDefinition(snapshotTriggerDefinition.get())
.cache(cache.get())
.repositoryProvider(c::repository);
.aggregateFactory(aggregateFactory.get())
.repositoryProvider(c::repository)
.cache(cache.get());
if (eventStreamFilter.get() != null) {
builder = builder.eventStreamFilter(eventStreamFilter.get());
} else if (filterEventsByType.get()) {
Expand Down Expand Up @@ -281,7 +291,7 @@ public AggregateConfigurer<A> configureRepository(Function<Configuration, Reposi
}

/**
* Defines the factory to use to to create new Aggregates instances of the type under configuration.
* Defines the factory to use to create new Aggregates instances of the type under configuration.
*
* @param aggregateFactoryBuilder The builder function for the AggregateFactory
* @return this configurer instance for chaining
Expand All @@ -292,6 +302,19 @@ public AggregateConfigurer<A> configureAggregateFactory(
return this;
}

/**
* Defines the {@link LockFactory} to use in the {@link Repository} for the aggregate under configuration. Defaults
* to the {@link PessimisticLockFactory} for the {@link EventSourcingRepository} and {@link NullLockFactory} for a
* {@link GenericJpaRepository}.
*
* @param lockFactory a {@link Function} building the {@link LockFactory} to use based on the {@link Configuration}
* @return this configurer instance for chaining
*/
public AggregateConfigurer<A> configureLockFactory(Function<Configuration, LockFactory> lockFactory) {
this.lockFactory.update(lockFactory);
return this;
}

/**
* Defines the AggregateAnnotationCommandHandler instance to use.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.common.jpa.SimpleEntityManagerProvider;
import org.axonframework.common.lock.PessimisticLockFactory;
import org.axonframework.disruptor.commandhandling.DisruptorCommandBus;
import org.axonframework.eventhandling.DomainEventData;
import org.axonframework.eventhandling.DomainEventMessage;
Expand All @@ -45,6 +48,7 @@
import org.junit.jupiter.api.*;

import java.util.Optional;
import javax.persistence.EntityManager;

import static org.axonframework.config.utils.TestSerializer.xStreamSerializer;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
Expand Down Expand Up @@ -76,8 +80,9 @@ public void setUp() {
testParameterResolverFactory = mock(ParameterResolverFactory.class);
when(mockConfiguration.parameterResolverFactory()).thenReturn(testParameterResolverFactory);
when(mockConfiguration.getComponent(eq(AggregateMetaModelFactory.class), any()))
.thenReturn(new AnnotatedAggregateMetaModelFactory(testParameterResolverFactory,
new AnnotatedMessageHandlingMemberDefinition()));
.thenReturn(new AnnotatedAggregateMetaModelFactory(
testParameterResolverFactory, new AnnotatedMessageHandlingMemberDefinition()
));

testSubject = new AggregateConfigurer<>(TestAggregate.class);
}
Expand Down Expand Up @@ -210,6 +215,73 @@ void testAggregateConfigurationThrowsAxonConfigExceptionWhenCreatingRevisionSnap
assertThrows(AxonConfigurationException.class, undefinedDeclaredAggregateTypeTestSubject::snapshotFilter);
}

@Test
void testConfigureLockFactoryForEventSourcedAggregate() {
PessimisticLockFactory lockFactory = spy(PessimisticLockFactory.usingDefaults());
AggregateConfigurer<A> aggregateConfigurer = AggregateConfigurer.defaultConfiguration(A.class)
.configureLockFactory(config -> lockFactory);

Configuration config = DefaultConfigurer.defaultConfiguration()
.configureAggregate(aggregateConfigurer)
.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
.buildConfiguration();
config.start();

CommandGateway commandGateway = config.commandGateway();
String testAggregateIdentifier = "123";
commandGateway.sendAndWait(new CreateACommand(testAggregateIdentifier));
verify(lockFactory).obtainLock(testAggregateIdentifier);

config.shutdown();
}

@Test
void testConfigureLockFactoryForStateStoredAggregateWithConfiguredEntityManagerProviderComponent() {
PessimisticLockFactory lockFactory = spy(PessimisticLockFactory.usingDefaults());
AggregateConfigurer<A> aggregateConfigurer = AggregateConfigurer.jpaMappedConfiguration(A.class)
.configureLockFactory(config -> lockFactory);

Configuration config = DefaultConfigurer.defaultConfiguration()
.configureAggregate(aggregateConfigurer)
.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
.registerComponent(
EntityManagerProvider.class,
c -> new SimpleEntityManagerProvider(mock(EntityManager.class))
)
.buildConfiguration();
config.start();

CommandGateway commandGateway = config.commandGateway();
String testAggregateIdentifier = "123";
commandGateway.sendAndWait(new CreateACommand(testAggregateIdentifier));
verify(lockFactory).obtainLock(testAggregateIdentifier);

config.shutdown();
}

@Test
void testConfigureLockFactoryForStateStoredAggregate() {
PessimisticLockFactory lockFactory = spy(PessimisticLockFactory.usingDefaults());
AggregateConfigurer<A> aggregateConfigurer =
AggregateConfigurer.jpaMappedConfiguration(
A.class, new SimpleEntityManagerProvider(mock(EntityManager.class))
)
.configureLockFactory(config -> lockFactory);

Configuration config = DefaultConfigurer.defaultConfiguration()
.configureAggregate(aggregateConfigurer)
.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
.buildConfiguration();
config.start();

CommandGateway commandGateway = config.commandGateway();
String testAggregateIdentifier = "123";
commandGateway.sendAndWait(new CreateACommand(testAggregateIdentifier));
verify(lockFactory).obtainLock(testAggregateIdentifier);

config.shutdown();
}

private static class TestAggregate {

TestAggregate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@ private <A> void registerAggregateBeanDefinitions(Configurer configurer, BeanDef
aggregateConfigurer.configureCache(c -> beanFactory.getBean(cacheBeanName, Cache.class));
}

String lockFactoryBeanName = aggregateAnnotation.lockFactory();
if (nonEmptyBeanName(lockFactoryBeanName)) {
aggregateConfigurer.configureLockFactory(
c -> beanFactory.getBean(lockFactoryBeanName, LockFactory.class)
);
}

if (AnnotationUtils.isAnnotationPresent(aggregateType, "javax.persistence.Entity")) {
aggregateConfigurer.configureRepository(
c -> GenericJpaRepository.builder(aggregateType)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2020. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -90,4 +90,16 @@
* created, unless explicitly configured on the referenced repository.
*/
String cache() default "";

/**
* Sets the name of the bean providing the {@link org.axonframework.common.lock.LockFactory}. If none is provided,
* the {@link org.axonframework.modelling.command.Repository} implementation's default is used, unless explicitly
* configured on the referenced repository.
* <p>
* Note that the use of {@link #repository()}, or adding a {@link org.axonframework.modelling.command.Repository}
* bean to the Spring context with the default naming scheme overrides this setting, as a Repository explicitly defines
* the snapshot trigger definition. The default name corresponds to {@code "[aggregate-name]Repository"}, thus a
* {@code Trade} Aggregate would by default create/look for a bean named {@code "tradeRepository"}.
*/
String lockFactory() default "";
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2020. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,8 @@
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.callbacks.FutureCallback;
import org.axonframework.common.caching.Cache;
import org.axonframework.common.lock.LockFactory;
import org.axonframework.common.lock.PessimisticLockFactory;
import org.axonframework.config.EventProcessingConfiguration;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.config.EventProcessingModule;
Expand Down Expand Up @@ -180,6 +182,10 @@ public class SpringAxonAutoConfigurerTest {
@Autowired
private TagsConfiguration tagsConfiguration;

@Autowired
@Qualifier("myLockFactory")
private LockFactory myLockFactory;

@Test
void contextWiresMainComponents() {
assertNotNull(axonConfig);
Expand Down Expand Up @@ -348,6 +354,19 @@ public void testAggregateCaching() {
);
}

@Test
void testAggregateLockFactory() {
String expectedAggregateId = "someIdentifier";

FutureCallback<Object, Object> commandCallback = new FutureCallback<>();
commandBus.dispatch(asCommandMessage(
new Context.CreateMyCachedAggregateCommand(expectedAggregateId)), commandCallback
);
commandCallback.getResult();

verify(myLockFactory).obtainLock(expectedAggregateId);
}

@AnnotationDriven
@Import({SpringAxonAutoConfigurer.ImportSelector.class, AnnotationDrivenRegistrar.class})
@Scope
Expand Down Expand Up @@ -423,6 +442,12 @@ public Cache myCache() {
return mock(Cache.class);
}

@Bean
@Qualifier("myLockFactory")
public LockFactory myLockFactory() {
return spy(PessimisticLockFactory.usingDefaults());
}

@Aggregate(type = "MyCustomAggregateType", filterEventsByType = true)
public static class MyAggregate {

Expand Down Expand Up @@ -521,7 +546,7 @@ public MyCachedAggregateCreatedEvent(String id) {
}
}

@Aggregate(cache = "myCache")
@Aggregate(cache = "myCache", lockFactory = "myLockFactory")
public static class MyCachedAggregate {

@AggregateIdentifier
Expand Down

0 comments on commit 8f08efe

Please sign in to comment.