Skip to content

Commit

Permalink
Add processing group to SimpleEventHandlerInvoker
Browse files Browse the repository at this point in the history
Add the processing group to the SimpleEventHandlerInvoker. This will
allow for clearer tracing of failed event message handling.

#2021
  • Loading branch information
smcvb committed Nov 26, 2021
1 parent 261a2ff commit 71ef1e5
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ private void registerSimpleEventHandlerInvokers(
processingGroup
))
.sequencingPolicy(sequencingPolicy(processingGroup))
.processingGroup(processingGroup)
.build()
);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.common.BuilderUtils.assertThat;
import static org.axonframework.common.BuilderUtils.*;
import static org.axonframework.common.ObjectUtils.getOrDefault;

/**
* Implementation of an {@link EventHandlerInvoker} that forwards events to a list of registered
* {@link EventMessageHandler}.
* Implementation of an {@link EventHandlerInvoker} that forwards events to a list of registered {@link
* EventMessageHandler}.
*
* @author Rene de Waele
* @since 3.0
Expand All @@ -47,12 +46,13 @@ public class SimpleEventHandlerInvoker implements EventHandlerInvoker {
private final List<EventMessageHandler> wrappedEventHandlers;
private final ListenerInvocationErrorHandler listenerInvocationErrorHandler;
private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
private final String processingGroup;

/**
* Instantiate a {@link SimpleEventHandlerInvoker} based on the fields contained in the {@link Builder}.
* <p>
* Will assert that at least one {@link EventMessageHandler} is provided, and will throw an
* {@link AxonConfigurationException} if this is not the case.
* Will assert that at least one {@link EventMessageHandler} is provided, and will throw an {@link
* AxonConfigurationException} if this is not the case.
*
* @param builder the {@link Builder} used to instantiate a {@link SimpleEventHandlerInvoker} instance
*/
Expand All @@ -67,6 +67,7 @@ protected SimpleEventHandlerInvoker(Builder builder) {
.collect(Collectors.toCollection(ArrayList::new));
this.sequencingPolicy = builder.sequencingPolicy;
this.listenerInvocationErrorHandler = builder.listenerInvocationErrorHandler;
this.processingGroup = builder.processingGroup;
}

/**
Expand All @@ -85,8 +86,8 @@ private static List<?> detectList(Object[] eventHandlers) {
/**
* Instantiate a Builder to be able to create a {@link SimpleEventHandlerInvoker}.
* <p>
* The {@link ListenerInvocationErrorHandler} is defaulted to a {@link LoggingErrorHandler} and the
* {@link SequencingPolicy} to a {@link SequentialPerAggregatePolicy}. Providing at least one Event Handler is a
* The {@link ListenerInvocationErrorHandler} is defaulted to a {@link LoggingErrorHandler} and the {@link
* SequencingPolicy} to a {@link SequentialPerAggregatePolicy}. Providing at least one Event Handler is a
* <b>hard requirement</b> and thus should be accounted for.
*
* @return a Builder to be able to create a {@link SimpleEventHandlerInvoker}
Expand Down Expand Up @@ -176,8 +177,8 @@ public SequencingPolicy<? super EventMessage<?>> getSequencingPolicy() {
/**
* Builder class to instantiate a {@link SimpleEventHandlerInvoker}.
* <p>
* The {@link ListenerInvocationErrorHandler} is defaulted to a {@link LoggingErrorHandler} and the
* {@link SequencingPolicy} to a {@link SequentialPerAggregatePolicy}. Providing at least one Event Handler is a
* The {@link ListenerInvocationErrorHandler} is defaulted to a {@link LoggingErrorHandler} and the {@link
* SequencingPolicy} to a {@link SequentialPerAggregatePolicy}. Providing at least one Event Handler is a
* <b>hard requirement</b> and thus should be accounted for.
*/
public static class Builder {
Expand All @@ -187,6 +188,7 @@ public static class Builder {
private HandlerDefinition handlerDefinition;
private ListenerInvocationErrorHandler listenerInvocationErrorHandler = new LoggingErrorHandler();
private SequencingPolicy<? super EventMessage<?>> sequencingPolicy = SequentialPerAggregatePolicy.instance();
private String processingGroup;

/**
* Sets the {@code eventHandlers} this {@link EventHandlerInvoker} will forward all its events to. If an event
Expand Down Expand Up @@ -217,8 +219,8 @@ public Builder eventHandlers(List<?> eventHandlers) {
/**
* Sets the {@link ParameterResolverFactory} used to resolve parameter values for annotated handlers in the
* {@link AnnotationEventHandlerAdapter} this {@link EventHandlerInvoker} instantiates. This invoker will only
* instantiate a new {@link EventMessageHandler} if a given Event Handler (through
* {@link #eventHandlers(Object...)} or {@link #eventHandlers(List)}) is not assignable to EventMessageHandler.
* instantiate a new {@link EventMessageHandler} if a given Event Handler (through {@link
* #eventHandlers(Object...)} or {@link #eventHandlers(List)}) is not assignable to EventMessageHandler.
*
* @param parameterResolverFactory the {@link ParameterResolverFactory} used to resolve parameter values for
* instantiated {@link AnnotationEventHandlerAdapter}s
Expand All @@ -231,11 +233,10 @@ public Builder parameterResolverFactory(ParameterResolverFactory parameterResolv
}

/**
* Sets the {@link HandlerDefinition} used to create concrete handlers in the annotated handlers in the
* {@link AnnotationEventHandlerAdapter} this {@link EventHandlerInvoker} instantiates. This invoker will only
* instantiate a new {@link EventMessageHandler} if a given Event Handler
* (through {@link #eventHandlers(Object...)} or {@link #eventHandlers(List)}) is not assignable to
* EventMessageHandler.
* Sets the {@link HandlerDefinition} used to create concrete handlers in the annotated handlers in the {@link
* AnnotationEventHandlerAdapter} this {@link EventHandlerInvoker} instantiates. This invoker will only
* instantiate a new {@link EventMessageHandler} if a given Event Handler (through {@link
* #eventHandlers(Object...)} or {@link #eventHandlers(List)}) is not assignable to EventMessageHandler.
*
* @param handlerDefinition the {@link HandlerDefinition} used to create concrete handlers in the instantiated
* {@link AnnotationEventHandlerAdapter}s
Expand Down Expand Up @@ -277,6 +278,18 @@ public Builder sequencingPolicy(SequencingPolicy<? super EventMessage<?>> sequen
return this;
}

/**
* Sets the name of this invoker.
*
* @param processingGroup the name of this {@link EventHandlerInvoker}
* @return the current Builder instance, for fluent interfacing
*/
public Builder processingGroup(String processingGroup) {
assertNonEmpty(processingGroup, "The processing group may not be null or empty");
this.processingGroup = processingGroup;
return this;
}

/**
* Initializes a {@link SimpleEventHandlerInvoker} as specified through this Builder.
*
Expand All @@ -287,10 +300,10 @@ public SimpleEventHandlerInvoker build() {
}

/**
* Wrap a given {@code eventHandler} in an {@link AnnotationEventHandlerAdapter} to allow this
* {@link EventHandlerInvoker} to correctly pass {@link EventMessage}s to it. If a
* {@link ParameterResolverFactory} or both a ParameterResolverFactory and {@link HandlerDefinition} are
* present, one/both will be given to the AnnotationEventHandlerAdapter
* Wrap a given {@code eventHandler} in an {@link AnnotationEventHandlerAdapter} to allow this {@link
* EventHandlerInvoker} to correctly pass {@link EventMessage}s to it. If a {@link ParameterResolverFactory} or
* both a ParameterResolverFactory and {@link HandlerDefinition} are present, one/both will be given to the
* AnnotationEventHandlerAdapter
*
* @param eventHandler an {@link Object} which will be wrapped in an {@link AnnotationEventHandlerAdapter}
* @return an {@link AnnotationEventHandlerAdapter} which the given {@code eventHandler} will be wrapped in
Expand All @@ -312,7 +325,10 @@ public AnnotationEventHandlerAdapter wrapEventMessageHandler(Object eventHandler
* specifications
*/
protected void validate() throws AxonConfigurationException {
assertThat(eventHandlers, list -> list != null && !list.isEmpty(), "At least one EventMessageHandler should be provided");
assertThat(eventHandlers,
list -> list != null && !list.isEmpty(),
"At least one EventMessageHandler should be provided");
assertNonNull(processingGroup, "The processing group is a hard requirement and should be provided");
}
}
}

0 comments on commit 71ef1e5

Please sign in to comment.