Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicate command handler resolver not being used for annotated aggregates #2207

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.axonframework.messaging.Message;

import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -213,7 +215,12 @@ private void initializeMessageHandlers(ParameterResolverFactory parameterResolve
.forEach((key, value) -> value.forEach(h -> registerHandler(key, (MessageHandlingMember<T>) h))));
superClassInspectors.forEach(sci -> sci.getAllHandlers()
.forEach((key, value) -> value.forEach(h -> {
registerHandler(key, h);
boolean isAbstract = h.unwrap(Executable.class)
.map(e -> Modifier.isAbstract(e.getModifiers()))
.orElse(false);
if (!isAbstract) {
registerHandler(key, h);
}
registerHandler(inspectedType, h);
})));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2020. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import org.junit.jupiter.api.*;
import org.mockito.internal.util.collections.*;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -118,6 +119,15 @@ void testComplexHandlerHierarchy() throws NoSuchMethodException {
);
}

@Test
void testDoesNotRegisterAbstractHandlersTwice() {
AnnotatedHandlerInspector<AB> aaInspector = AnnotatedHandlerInspector.inspectType(AB.class,
parameterResolverFactory);

assertEquals(1, aaInspector.getAllHandlers().size());
assertEquals(1, (int) aaInspector.getAllHandlers().values().stream().flatMap(Collection::stream).count());
}

private <T extends MessageHandlingMember<?>> List<AnnotatedMessageHandlingMember<?>> unwrapToList(
Stream<T> stream
) {
Expand Down Expand Up @@ -221,4 +231,18 @@ private static class D extends B {
public void dHandle(String d) {
}
}
}

public static abstract class AA {

@CommandHandler
public abstract String handleAbstractly(String command);
}

public static class AB extends AA {

@Override
public String handleAbstractly(String command) {
return "Some result";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand All @@ -50,10 +52,14 @@
import static org.axonframework.modelling.command.AggregateCreationPolicy.NEVER;

/**
* Command handler that handles commands based on {@link CommandHandler} annotations on an aggregate. Those annotations
* Command handler that registers a set of {@link CommandHandler} based on annotations of an aggregate. Those annotations
* may appear on methods, in which case a specific aggregate instance needs to be targeted by the command, or on the
* constructor. The latter will create a new Aggregate instance, which is then stored in the repository.
*
* Despite being an {@link CommandMessageHandler} it does not actually handle the commands. During registration to the
* {@link CommandBus} it registers the {@link CommandMessageHandler}s directly instead of itself so duplicate command
* handlers can be detected and handled correctly.
*
* @param <T> the type of aggregate this handler handles commands for
* @author Allard Buijze
* @since 1.2
Expand All @@ -64,6 +70,7 @@ public class AggregateAnnotationCommandHandler<T> implements CommandMessageHandl
private final CommandTargetResolver commandTargetResolver;
private final List<MessageHandler<CommandMessage<?>>> handlers;
private final Set<String> supportedCommandNames;
private final Map<String, Set<MessageHandler<CommandMessage<?>>>> supportedCommandsByName;

/**
* Instantiate a Builder to be able to create a {@link AggregateAnnotationCommandHandler}.
Expand Down Expand Up @@ -100,6 +107,7 @@ protected AggregateAnnotationCommandHandler(Builder<T> builder) {
this.repository = builder.repository;
this.commandTargetResolver = builder.commandTargetResolver;
this.supportedCommandNames = new HashSet<>();
this.supportedCommandsByName = new HashMap<>();
this.handlers = initializeHandlers(builder.buildAggregateModel());
}

Expand All @@ -111,10 +119,16 @@ protected AggregateAnnotationCommandHandler(Builder<T> builder) {
* @return A handle that can be used to unsubscribe
*/
public Registration subscribe(CommandBus commandBus) {
List<Registration> subscriptions = supportedCommandNames()
List<Registration> subscriptions = supportedCommandsByName
.entrySet()
.stream()
.map(supportedCommand -> commandBus.subscribe(supportedCommand, this))
.filter(Objects::nonNull).collect(Collectors.toList());
.flatMap(entry ->
entry.getValue().stream().map(messageHandler ->
commandBus.subscribe(entry.getKey(), messageHandler)
)
)
.filter(Objects::nonNull)
.collect(Collectors.toList());
return () -> subscriptions.stream().map(Registration::cancel).reduce(Boolean::logicalOr).orElse(false);
}

Expand All @@ -137,30 +151,33 @@ private void initializeHandler(AggregateModel<T> aggregateModel,
Optional<AggregateCreationPolicy> policy = handler.unwrap(CreationPolicyMember.class)
.map(CreationPolicyMember::creationPolicy);

MessageHandler<CommandMessage<?>> messageHandler = null;
if (cmh.isFactoryHandler()) {
assertThat(
policy,
p -> p.map(AggregateCreationPolicy.ALWAYS::equals).orElse(true),
aggregateModel.type() + ": Static methods/constructors can only use creationPolicy ALWAYS"
);
handlersFound.add(new AggregateConstructorCommandHandler(handler));
messageHandler = new AggregateConstructorCommandHandler(handler);
} else {
switch (policy.orElse(NEVER)) {
case ALWAYS:
handlersFound.add(new AlwaysCreateAggregateCommandHandler(
messageHandler = new AlwaysCreateAggregateCommandHandler(
handler, aggregateModel.entityClass()::newInstance
));
);
break;
case CREATE_IF_MISSING:
handlersFound.add(new AggregateCreateOrUpdateCommandHandler(
messageHandler = new AggregateCreateOrUpdateCommandHandler(
handler, aggregateModel.entityClass()::newInstance
));
);
break;
case NEVER:
handlersFound.add(new AggregateCommandHandler(handler));
messageHandler = new AggregateCommandHandler(handler);
break;
}
}
handlersFound.add(messageHandler);
supportedCommandsByName.computeIfAbsent(cmh.commandName(), key -> new HashSet<>()).add(messageHandler);
supportedCommandNames.add(cmh.commandName());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandResultMessage;
import org.axonframework.commandhandling.DuplicateCommandHandlerResolution;
import org.axonframework.commandhandling.DuplicateCommandHandlerSubscriptionException;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.callbacks.LoggingCallback;
import org.axonframework.common.AxonConfigurationException;
Expand Down Expand Up @@ -170,7 +172,9 @@ void testSupportedCommands() {
void testCommandHandlerSubscribesToCommands() {
verify(commandBus).subscribe(eq(CreateCommand.class.getName()),
any(MessageHandler.class));
verify(commandBus).subscribe(eq(UpdateCommandWithAnnotatedMethod.class.getName()),
// Is subscribed two times because of the duplicate handler. This is good and indicates usage of the
// DuplicateCommandHandlerResolver
verify(commandBus, times(2)).subscribe(eq(UpdateCommandWithAnnotatedMethod.class.getName()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should be obsolete with the change in the AnnotatedHandlerInspector, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by obsolete? the times(2) change is valid, see the comment. I have added an additional handler, with a different signature. The change in AnnotatedHandlerInspector is to prevent abstract methods from being registered twice (for the same implementation)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaah thanks for clarifying, I wasn't thinking straight enough when I wrote that I guess 😅

any(MessageHandler.class));
}

Expand Down Expand Up @@ -572,6 +576,29 @@ void testCommandHandledByEntityFromMapNullIdInCommand() {
verify(mockRepository).load(aggregateIdentifier, null);
}

@Test
void testUsesDuplicateCommandHandlerResolver() throws Exception {
commandBus = SimpleCommandBus.builder()
.duplicateCommandHandlerResolver(DuplicateCommandHandlerResolution.rejectDuplicates())
.build();
commandBus = spy(commandBus);
mockRepository = mock(Repository.class);

ParameterResolverFactory parameterResolverFactory = MultiParameterResolverFactory.ordered(
ClasspathParameterResolverFactory.forClass(AggregateAnnotationCommandHandler.class),
new CustomParameterResolverFactory());
aggregateModel = AnnotatedAggregateMetaModelFactory.inspectAggregate(StubCommandAnnotatedAggregate.class,
parameterResolverFactory);
testSubject = AggregateAnnotationCommandHandler.<StubCommandAnnotatedAggregate>builder()
.aggregateType(StubCommandAnnotatedAggregate.class)
.repository(mockRepository)
.build();

assertThrows(DuplicateCommandHandlerSubscriptionException.class, () -> {
testSubject.subscribe(commandBus);
});
}

@SuppressWarnings("unused")
private abstract static class AbstractStubCommandAnnotatedAggregate {

Expand Down Expand Up @@ -661,6 +688,11 @@ public String handleUpdate(UpdateCommandWithAnnotatedMethod updateCommand) {
return "Method works fine";
}

@CommandHandler
public String handleUpdateDuplicate(UpdateCommandWithAnnotatedMethod updateCommand) {
return "Method works fine";
}

@CommandHandler
public String handleUpdate(UpdateCommandWithAnnotatedMethodAndVersion updateCommand) {
return "Method with version works fine";
Expand Down