Skip to content

Commit

Permalink
Allow configuration of the LockFactory
Browse files Browse the repository at this point in the history
Add a configureLockFactory() method and use the subsequently configured
LockFactory for the construction of the repository.

#1490
  • Loading branch information
smcvb committed Oct 29, 2021
1 parent 47030a2 commit 3ca8b90
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.axonframework.common.caching.Cache;
import org.axonframework.common.caching.WeakReferenceCache;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.common.lock.LockFactory;
import org.axonframework.common.lock.NullLockFactory;
import org.axonframework.common.lock.PessimisticLockFactory;
import org.axonframework.disruptor.commandhandling.DisruptorCommandBus;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventsourcing.AggregateFactory;
Expand Down Expand Up @@ -59,10 +62,10 @@
import static org.axonframework.common.Assert.state;

/**
* Axon Configuration API extension that allows the definition of an Aggregate. This component will automatically setup
* Axon Configuration API extension that allows the definition of an Aggregate. This component will automatically set up
* all components required for the Aggregate to operate.
*
* @param <A> The type of Aggregate configured
* @param <A> the type of Aggregate configured
* @author Allard Buijze
* @since 3.0
*/
Expand All @@ -74,6 +77,7 @@ public class AggregateConfigurer<A> implements AggregateConfiguration<A> {
private final Component<Repository<A>> repository;
private final Component<Cache> cache;
private final Component<AggregateFactory<A>> aggregateFactory;
private final Component<LockFactory> lockFactory;
private final Component<SnapshotTriggerDefinition> snapshotTriggerDefinition;
private final Component<SnapshotFilter> snapshotFilter;
private final Component<CommandTargetResolver> commandTargetResolver;
Expand Down Expand Up @@ -112,7 +116,8 @@ public static <A> AggregateConfigurer<A> defaultConfiguration(Class<A> aggregate
* @return An AggregateConfigurer instance for further configuration of the Aggregate
*/
public static <A> AggregateConfigurer<A> jpaMappedConfiguration(Class<A> aggregateType) {
AggregateConfigurer<A> configurer = new AggregateConfigurer<>(aggregateType);
AggregateConfigurer<A> configurer = new AggregateConfigurer<>(aggregateType)
.configureLockFactory(config -> NullLockFactory.INSTANCE);
return configurer.configureRepository(
c -> {
EntityManagerProvider entityManagerProvider = c.getComponent(
Expand All @@ -128,6 +133,7 @@ public static <A> AggregateConfigurer<A> jpaMappedConfiguration(Class<A> aggrega
});
return GenericJpaRepository.builder(aggregateType)
.aggregateModel(configurer.metaModel.get())
.lockFactory(configurer.lockFactory.get())
.entityManagerProvider(entityManagerProvider)
.eventBus(c.eventBus())
.repositoryProvider(c::repository)
Expand All @@ -147,10 +153,12 @@ public static <A> AggregateConfigurer<A> jpaMappedConfiguration(Class<A> aggrega
*/
public static <A> AggregateConfigurer<A> jpaMappedConfiguration(Class<A> aggregateType,
EntityManagerProvider entityManagerProvider) {
AggregateConfigurer<A> configurer = new AggregateConfigurer<>(aggregateType);
AggregateConfigurer<A> configurer = new AggregateConfigurer<>(aggregateType)
.configureLockFactory(config -> NullLockFactory.INSTANCE);
return configurer.configureRepository(
c -> GenericJpaRepository.builder(aggregateType)
.aggregateModel(configurer.metaModel.get())
.lockFactory(configurer.lockFactory.get())
.entityManagerProvider(entityManagerProvider)
.eventBus(c.eventBus())
.repositoryProvider(c::repository)
Expand Down Expand Up @@ -180,6 +188,7 @@ protected AggregateConfigurer(Class<A> aggregate) {
() -> AnnotationCommandTargetResolver.builder().build()
)
);
lockFactory = new Component<>(() -> parent, name("lockFactory"), c -> PessimisticLockFactory.usingDefaults());
snapshotTriggerDefinition = new Component<>(() -> parent, name("snapshotTriggerDefinition"),
c -> NoSnapshotTriggerDefinition.INSTANCE);
snapshotFilter = new Component<>(() -> parent, name("snapshotFilter"), c -> {
Expand Down Expand Up @@ -227,11 +236,12 @@ protected AggregateConfigurer(Class<A> aggregate) {
EventSourcingRepository.Builder<A> builder =
EventSourcingRepository.builder(aggregate)
.aggregateModel(metaModel.get())
.aggregateFactory(aggregateFactory.get())
.lockFactory(lockFactory.get())
.eventStore(c.eventStore())
.snapshotTriggerDefinition(snapshotTriggerDefinition.get())
.cache(cache.get())
.repositoryProvider(c::repository);
.aggregateFactory(aggregateFactory.get())
.repositoryProvider(c::repository)
.cache(cache.get());
if (eventStreamFilter.get() != null) {
builder = builder.eventStreamFilter(eventStreamFilter.get());
} else if (filterEventsByType.get()) {
Expand Down Expand Up @@ -281,7 +291,7 @@ public AggregateConfigurer<A> configureRepository(Function<Configuration, Reposi
}

/**
* Defines the factory to use to to create new Aggregates instances of the type under configuration.
* Defines the factory to use to create new Aggregates instances of the type under configuration.
*
* @param aggregateFactoryBuilder The builder function for the AggregateFactory
* @return this configurer instance for chaining
Expand All @@ -292,6 +302,19 @@ public AggregateConfigurer<A> configureAggregateFactory(
return this;
}

/**
* Defines the {@link LockFactory} to use in the {@link Repository} for the aggregate under configuration. Defaults
* to the {@link PessimisticLockFactory} for the {@link EventSourcingRepository} and {@link NullLockFactory} for a
* {@link GenericJpaRepository}.
*
* @param lockFactory a {@link Function} building the {@link LockFactory} to use based on the {@link Configuration}
* @return this configurer instance for chaining
*/
public AggregateConfigurer<A> configureLockFactory(Function<Configuration, LockFactory> lockFactory) {
this.lockFactory.update(lockFactory);
return this;
}

/**
* Defines the AggregateAnnotationCommandHandler instance to use.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import org.axonframework.commandhandling.distributed.DistributedCommandBus;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.common.jpa.SimpleEntityManagerProvider;
import org.axonframework.common.lock.PessimisticLockFactory;
import org.axonframework.disruptor.commandhandling.DisruptorCommandBus;
import org.axonframework.eventhandling.DomainEventData;
import org.axonframework.eventhandling.DomainEventMessage;
Expand All @@ -45,6 +48,7 @@
import org.junit.jupiter.api.*;

import java.util.Optional;
import javax.persistence.EntityManager;

import static org.axonframework.config.utils.TestSerializer.xStreamSerializer;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
Expand Down Expand Up @@ -76,8 +80,9 @@ public void setUp() {
testParameterResolverFactory = mock(ParameterResolverFactory.class);
when(mockConfiguration.parameterResolverFactory()).thenReturn(testParameterResolverFactory);
when(mockConfiguration.getComponent(eq(AggregateMetaModelFactory.class), any()))
.thenReturn(new AnnotatedAggregateMetaModelFactory(testParameterResolverFactory,
new AnnotatedMessageHandlingMemberDefinition()));
.thenReturn(new AnnotatedAggregateMetaModelFactory(
testParameterResolverFactory, new AnnotatedMessageHandlingMemberDefinition()
));

testSubject = new AggregateConfigurer<>(TestAggregate.class);
}
Expand Down Expand Up @@ -210,6 +215,73 @@ void testAggregateConfigurationThrowsAxonConfigExceptionWhenCreatingRevisionSnap
assertThrows(AxonConfigurationException.class, undefinedDeclaredAggregateTypeTestSubject::snapshotFilter);
}

@Test
void testConfigureLockFactoryForEventSourcedAggregate() {
PessimisticLockFactory lockFactory = spy(PessimisticLockFactory.usingDefaults());
AggregateConfigurer<A> aggregateConfigurer = AggregateConfigurer.defaultConfiguration(A.class)
.configureLockFactory(config -> lockFactory);

Configuration config = DefaultConfigurer.defaultConfiguration()
.configureAggregate(aggregateConfigurer)
.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
.buildConfiguration();
config.start();

CommandGateway commandGateway = config.commandGateway();
String testAggregateIdentifier = "123";
commandGateway.sendAndWait(new CreateACommand(testAggregateIdentifier));
verify(lockFactory).obtainLock(testAggregateIdentifier);

config.shutdown();
}

@Test
void testConfigureLockFactoryForStateStoredAggregateWithConfiguredEntityManagerProviderComponent() {
PessimisticLockFactory lockFactory = spy(PessimisticLockFactory.usingDefaults());
AggregateConfigurer<A> aggregateConfigurer = AggregateConfigurer.jpaMappedConfiguration(A.class)
.configureLockFactory(config -> lockFactory);

Configuration config = DefaultConfigurer.defaultConfiguration()
.configureAggregate(aggregateConfigurer)
.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
.registerComponent(
EntityManagerProvider.class,
c -> new SimpleEntityManagerProvider(mock(EntityManager.class))
)
.buildConfiguration();
config.start();

CommandGateway commandGateway = config.commandGateway();
String testAggregateIdentifier = "123";
commandGateway.sendAndWait(new CreateACommand(testAggregateIdentifier));
verify(lockFactory).obtainLock(testAggregateIdentifier);

config.shutdown();
}

@Test
void testConfigureLockFactoryForStateStoredAggregate() {
PessimisticLockFactory lockFactory = spy(PessimisticLockFactory.usingDefaults());
AggregateConfigurer<A> aggregateConfigurer =
AggregateConfigurer.jpaMappedConfiguration(
A.class, new SimpleEntityManagerProvider(mock(EntityManager.class))
)
.configureLockFactory(config -> lockFactory);

Configuration config = DefaultConfigurer.defaultConfiguration()
.configureAggregate(aggregateConfigurer)
.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine())
.buildConfiguration();
config.start();

CommandGateway commandGateway = config.commandGateway();
String testAggregateIdentifier = "123";
commandGateway.sendAndWait(new CreateACommand(testAggregateIdentifier));
verify(lockFactory).obtainLock(testAggregateIdentifier);

config.shutdown();
}

private static class TestAggregate {

TestAggregate() {
Expand Down

0 comments on commit 3ca8b90

Please sign in to comment.