Skip to content

Commit

Permalink
Fix DuplicateCommandHandlerResolvers not working for annotated aggreg…
Browse files Browse the repository at this point in the history
…ates
  • Loading branch information
CodeDrivenMitch committed Apr 26, 2022
1 parent 4c65a00 commit 4b8f007
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import org.axonframework.messaging.Message;

import java.lang.reflect.Constructor;
import java.lang.reflect.Executable;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -213,7 +215,12 @@ private void initializeMessageHandlers(ParameterResolverFactory parameterResolve
.forEach((key, value) -> value.forEach(h -> registerHandler(key, (MessageHandlingMember<T>) h))));
superClassInspectors.forEach(sci -> sci.getAllHandlers()
.forEach((key, value) -> value.forEach(h -> {
registerHandler(key, h);
boolean isAbstract = h.unwrap(Executable.class)
.map(e -> Modifier.isAbstract(e.getModifiers()))
.orElse(false);
if (!isAbstract) {
registerHandler(key, h);
}
registerHandler(inspectedType, h);
})));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2020. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import org.junit.jupiter.api.*;
import org.mockito.internal.util.collections.*;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -118,6 +119,15 @@ void testComplexHandlerHierarchy() throws NoSuchMethodException {
);
}

@Test
void testDoesNotRegisterAbstractHandlersTwice() {
AnnotatedHandlerInspector<AB> aaInspector = AnnotatedHandlerInspector.inspectType(AB.class,
parameterResolverFactory);

assertEquals(1, aaInspector.getAllHandlers().size());
assertEquals(1, (int) aaInspector.getAllHandlers().values().stream().flatMap(Collection::stream).count());
}

private <T extends MessageHandlingMember<?>> List<AnnotatedMessageHandlingMember<?>> unwrapToList(
Stream<T> stream
) {
Expand Down Expand Up @@ -221,4 +231,18 @@ private static class D extends B {
public void dHandle(String d) {
}
}
}

public static abstract class AA {

@CommandHandler
public abstract String handleAbstractly(String command);
}

public static class AB extends AA {

@Override
public String handleAbstractly(String command) {
return "Some result";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand All @@ -50,10 +52,14 @@
import static org.axonframework.modelling.command.AggregateCreationPolicy.NEVER;

/**
* Command handler that handles commands based on {@link CommandHandler} annotations on an aggregate. Those annotations
* Command handler that registers a set of {@link CommandHandler} based on annotations of an aggregate. Those annotations
* may appear on methods, in which case a specific aggregate instance needs to be targeted by the command, or on the
* constructor. The latter will create a new Aggregate instance, which is then stored in the repository.
*
* Despite being an {@link CommandMessageHandler} it does not actually handle the commands. During registration to the
* {@link CommandBus} it registers the {@link CommandMessageHandler}s directly instead of itself so duplicate command
* handlers can be detected and handled correctly.
*
* @param <T> the type of aggregate this handler handles commands for
* @author Allard Buijze
* @since 1.2
Expand All @@ -64,6 +70,7 @@ public class AggregateAnnotationCommandHandler<T> implements CommandMessageHandl
private final CommandTargetResolver commandTargetResolver;
private final List<MessageHandler<CommandMessage<?>>> handlers;
private final Set<String> supportedCommandNames;
private final Map<String, Set<MessageHandler<CommandMessage<?>>>> supportedCommandsByName;

/**
* Instantiate a Builder to be able to create a {@link AggregateAnnotationCommandHandler}.
Expand Down Expand Up @@ -100,6 +107,7 @@ protected AggregateAnnotationCommandHandler(Builder<T> builder) {
this.repository = builder.repository;
this.commandTargetResolver = builder.commandTargetResolver;
this.supportedCommandNames = new HashSet<>();
this.supportedCommandsByName = new HashMap<>();
this.handlers = initializeHandlers(builder.buildAggregateModel());
}

Expand All @@ -111,10 +119,16 @@ protected AggregateAnnotationCommandHandler(Builder<T> builder) {
* @return A handle that can be used to unsubscribe
*/
public Registration subscribe(CommandBus commandBus) {
List<Registration> subscriptions = supportedCommandNames()
List<Registration> subscriptions = supportedCommandsByName
.entrySet()
.stream()
.map(supportedCommand -> commandBus.subscribe(supportedCommand, this))
.filter(Objects::nonNull).collect(Collectors.toList());
.flatMap(entry ->
entry.getValue().stream().map(messageHandler ->
commandBus.subscribe(entry.getKey(), messageHandler)
)
)
.filter(Objects::nonNull)
.collect(Collectors.toList());
return () -> subscriptions.stream().map(Registration::cancel).reduce(Boolean::logicalOr).orElse(false);
}

Expand All @@ -137,30 +151,33 @@ private void initializeHandler(AggregateModel<T> aggregateModel,
Optional<AggregateCreationPolicy> policy = handler.unwrap(CreationPolicyMember.class)
.map(CreationPolicyMember::creationPolicy);

MessageHandler<CommandMessage<?>> messageHandler = null;
if (cmh.isFactoryHandler()) {
assertThat(
policy,
p -> p.map(AggregateCreationPolicy.ALWAYS::equals).orElse(true),
aggregateModel.type() + ": Static methods/constructors can only use creationPolicy ALWAYS"
);
handlersFound.add(new AggregateConstructorCommandHandler(handler));
messageHandler = new AggregateConstructorCommandHandler(handler);
} else {
switch (policy.orElse(NEVER)) {
case ALWAYS:
handlersFound.add(new AlwaysCreateAggregateCommandHandler(
messageHandler = new AlwaysCreateAggregateCommandHandler(
handler, aggregateModel.entityClass()::newInstance
));
);
break;
case CREATE_IF_MISSING:
handlersFound.add(new AggregateCreateOrUpdateCommandHandler(
messageHandler = new AggregateCreateOrUpdateCommandHandler(
handler, aggregateModel.entityClass()::newInstance
));
);
break;
case NEVER:
handlersFound.add(new AggregateCommandHandler(handler));
messageHandler = new AggregateCommandHandler(handler);
break;
}
}
handlersFound.add(messageHandler);
supportedCommandsByName.computeIfAbsent(cmh.commandName(), key -> new HashSet<>()).add(messageHandler);
supportedCommandNames.add(cmh.commandName());
});
}
Expand Down

0 comments on commit 4b8f007

Please sign in to comment.