Skip to content

Commit

Permalink
Let DeadLetteringEventHandlerInvoker extend SimpleEventHandlerInvoker
Browse files Browse the repository at this point in the history
The DeadLetteringEventHandlerInvoker should extend the
SimpleEventHandlerInvoker for a couple of reasons. One, we don't have a
clean way to disregard event handling, and thus event enqueueing, for
events that aren't intended for this segment. Thus, multithreaded
solutions might all invoke the DeadLetterQueue#enqueueIfPresent whilst
they will not handle the event to begin with. Secondly, it removes the
necessity to make the sequenceIdentifier public logic of the
EventHandlerInvoker. Thirdly, it allows far greater flexibility to
adjust the overall handle approach of the SimpleEventHandlerInvoker,
ensuring the correct flow of matching segments, enqueueing if present,
handling the event, enqueuing on failures and (potentially) dealing with
 the ErrorHandler too.

#2021
  • Loading branch information
smcvb committed Jan 14, 2022
1 parent 03ee261 commit 15d06fb
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 186 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2021. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -90,15 +90,4 @@ default <R> void performReset(R resetContext) {
);
}
}

/**
* Retrieve the sequence identifier for the given {@code event}, used to handle events in the desired order.
* Defaults to {@link EventMessage#getIdentifier()}.
*
* @param event the {@link EventMessage} to deduce the sequence identifier for
* @return the sequence identifier for the given {@code event}
*/
default Object sequenceIdentifier(EventMessage<?> event) {
return event.getIdentifier();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2021. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,14 +56,15 @@ public class SimpleEventHandlerInvoker implements EventHandlerInvoker {
*
* @param builder the {@link Builder} used to instantiate a {@link SimpleEventHandlerInvoker} instance
*/
protected SimpleEventHandlerInvoker(Builder builder) {
protected SimpleEventHandlerInvoker(Builder<?> builder) {
builder.validate();
this.eventHandlers = builder.eventHandlers;
this.wrappedEventHandlers =
eventHandlers.stream()
.map(handler -> handler instanceof EventMessageHandler
? (EventMessageHandler) handler
: builder.wrapEventMessageHandler(handler))
: builder.wrapEventMessageHandler(handler)
)
.collect(Collectors.toCollection(ArrayList::new));
this.sequencingPolicy = builder.sequencingPolicy;
this.listenerInvocationErrorHandler = builder.listenerInvocationErrorHandler;
Expand Down Expand Up @@ -91,8 +92,8 @@ private static List<?> detectList(Object[] eventHandlers) {
*
* @return a Builder to be able to create a {@link SimpleEventHandlerInvoker}
*/
public static Builder builder() {
return new Builder();
public static Builder<?> builder() {
return new Builder<>();
}

/**
Expand All @@ -106,13 +107,26 @@ public List<?> eventHandlers() {

@Override
public void handle(EventMessage<?> message, Segment segment) throws Exception {
if (sequencingPolicyMatchesSegment(message, segment)) {
for (EventMessageHandler handler : wrappedEventHandlers) {
try {
handler.handle(message);
} catch (Exception e) {
listenerInvocationErrorHandler.onError(e, message, handler);
}
if (!sequencingPolicyMatchesSegment(message, segment)) {
return;
}
invokeHandlers(message);
}

protected boolean sequencingPolicyMatchesSegment(EventMessage<?> message, Segment segment) {
return segment.matches(Objects.hashCode(sequenceIdentifier(message)));
}

protected Object sequenceIdentifier(EventMessage<?> event) {
return getOrDefault(sequencingPolicy.getSequenceIdentifierFor(event), event::getIdentifier);
}

protected void invokeHandlers(EventMessage<?> message) throws Exception {
for (EventMessageHandler handler : wrappedEventHandlers) {
try {
handler.handle(message);
} catch (Exception e) {
listenerInvocationErrorHandler.onError(e, message, handler);
}
}
}
Expand All @@ -122,10 +136,6 @@ public boolean canHandle(EventMessage<?> eventMessage, Segment segment) {
return hasHandler(eventMessage) && sequencingPolicyMatchesSegment(eventMessage, segment);
}

private boolean sequencingPolicyMatchesSegment(EventMessage<?> message, Segment segment) {
return segment.matches(Objects.hashCode(sequenceIdentifier(message)));
}

@Override
public boolean canHandleType(Class<?> payloadType) {
return wrappedEventHandlers.stream().anyMatch(eh -> eh.canHandleType(payloadType));
Expand Down Expand Up @@ -162,11 +172,6 @@ public <R> void performReset(R resetContext) {
}
}

@Override
public Object sequenceIdentifier(EventMessage<?> event) {
return getOrDefault(sequencingPolicy.getSequenceIdentifierFor(event), event::getIdentifier);
}

/**
* Return the {@link ListenerInvocationErrorHandler} as configured for this {@link EventHandlerInvoker}.
*
Expand All @@ -192,7 +197,7 @@ public SequencingPolicy<? super EventMessage<?>> getSequencingPolicy() {
* SequencingPolicy} to a {@link SequentialPerAggregatePolicy}. Providing at least one Event Handler and a
* processing group name are a <b>hard requirements</b> and thus should be accounted for.
*/
public static class Builder {
public static class Builder<B extends Builder<?>> {

private List<?> eventHandlers;
private ParameterResolverFactory parameterResolverFactory;
Expand All @@ -208,7 +213,7 @@ public static class Builder {
* @param eventHandlers an array of {@link Object}s which can handle events
* @return the current Builder instance, for fluent interfacing
*/
public Builder eventHandlers(Object... eventHandlers) {
public B eventHandlers(Object... eventHandlers) {
return eventHandlers(detectList(eventHandlers));
}

Expand All @@ -220,10 +225,11 @@ public Builder eventHandlers(Object... eventHandlers) {
* @param eventHandlers a {@link List} of {@link Object}s which can handle events
* @return the current Builder instance, for fluent interfacing
*/
public Builder eventHandlers(List<?> eventHandlers) {
public B eventHandlers(List<?> eventHandlers) {
assertThat(eventHandlers, list -> !list.isEmpty(), "At least one EventMessageHandler should be provided");
this.eventHandlers = eventHandlers;
return this;
//noinspection unchecked
return (B) this;
}

/**
Expand All @@ -236,10 +242,11 @@ public Builder eventHandlers(List<?> eventHandlers) {
* instantiated {@link AnnotationEventHandlerAdapter}s
* @return the current Builder instance, for fluent interfacing
*/
public Builder parameterResolverFactory(ParameterResolverFactory parameterResolverFactory) {
public B parameterResolverFactory(ParameterResolverFactory parameterResolverFactory) {
assertNonNull(parameterResolverFactory, "ParameterResolverFactory may not be null");
this.parameterResolverFactory = parameterResolverFactory;
return this;
//noinspection unchecked
return (B) this;
}

/**
Expand All @@ -252,10 +259,11 @@ public Builder parameterResolverFactory(ParameterResolverFactory parameterResolv
* {@link AnnotationEventHandlerAdapter}s
* @return the current Builder instance, for fluent interfacing
*/
public Builder handlerDefinition(HandlerDefinition handlerDefinition) {
public B handlerDefinition(HandlerDefinition handlerDefinition) {
assertNonNull(handlerDefinition, "HandlerDefinition may not be null");
this.handlerDefinition = handlerDefinition;
return this;
//noinspection unchecked
return (B) this;
}

/**
Expand All @@ -266,10 +274,11 @@ public Builder handlerDefinition(HandlerDefinition handlerDefinition) {
* Exception}s being thrown by the {@link EventListener}s
* @return the current Builder instance, for fluent interfacing
*/
public Builder listenerInvocationErrorHandler(ListenerInvocationErrorHandler listenerInvocationErrorHandler) {
public B listenerInvocationErrorHandler(ListenerInvocationErrorHandler listenerInvocationErrorHandler) {
assertNonNull(listenerInvocationErrorHandler, "ListenerInvocationErrorHandler may not be null");
this.listenerInvocationErrorHandler = listenerInvocationErrorHandler;
return this;
//noinspection unchecked
return (B) this;
}

/**
Expand All @@ -282,10 +291,11 @@ public Builder listenerInvocationErrorHandler(ListenerInvocationErrorHandler lis
* handled by the given {@link Segment}
* @return the current Builder instance, for fluent interfacing
*/
public Builder sequencingPolicy(SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
public B sequencingPolicy(SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
assertNonNull(sequencingPolicy, "{} may not be null");
this.sequencingPolicy = sequencingPolicy;
return this;
//noinspection unchecked
return (B) this;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.messaging.deadletter.DeadLetterQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,14 +31,18 @@
import static org.axonframework.common.BuilderUtils.assertNonNull;

/**
* Implementation of the {@link SimpleEventHandlerInvoker} utilizing a {@link DeadLetterQueue} to enqueue {@link
* EventMessage} for which event handling failed. This dead-lettering {@link EventHandlerInvoker} takes into account
* that events part of the same sequence (as according to the {@link org.axonframework.eventhandling.async.SequencingPolicy})
* should be enqueued in order too.
*
* @author Steven van Beelen
* @since 4.6.0
*/
public class DeadLetteringEventHandlerInvoker implements EventHandlerInvoker {
public class DeadLetteringEventHandlerInvoker extends SimpleEventHandlerInvoker {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final EventHandlerInvoker delegate;
private final DeadLetterQueue<EventMessage<?>> queue;
private final String processingGroup;
private final boolean allowReset;
Expand All @@ -49,8 +54,7 @@ public class DeadLetteringEventHandlerInvoker implements EventHandlerInvoker {
* @param builder The {@link Builder} used to instantiate a {@link DeadLetteringEventHandlerInvoker} instance.
*/
protected DeadLetteringEventHandlerInvoker(Builder builder) {
builder.validate();
this.delegate = builder.delegate;
super(builder);
this.queue = builder.queue;
this.processingGroup = builder.processingGroup;
this.allowReset = builder.allowReset;
Expand All @@ -59,95 +63,66 @@ protected DeadLetteringEventHandlerInvoker(Builder builder) {
/**
* Instantiate a builder to construct a {@link DeadLetteringEventHandlerInvoker}.
*
* @return A builder that can consturct a {@link DeadLetteringEventHandlerInvoker}.
* @return A builder that can construct a {@link DeadLetteringEventHandlerInvoker}.
*/
public static Builder builder() {
return new Builder();
}

@Override
public boolean canHandle(EventMessage<?> eventMessage, Segment segment) {
return delegate.canHandle(eventMessage, segment);
}

@Override
public boolean canHandleType(Class<?> payloadType) {
return delegate.canHandleType(payloadType);
}

@Override
public void handle(EventMessage<?> message, Segment segment) throws Exception {
String sequenceIdentifier = Integer.toString(Objects.hashCode(delegate.sequenceIdentifier(message)));
EventHandlingQueueIdentifier identifier = new EventHandlingQueueIdentifier(sequenceIdentifier, processingGroup);
if (!super.sequencingPolicyMatchesSegment(message, segment)) {
logger.trace("Ignoring event [{}] as it is not meant for segment [{}].", message, segment);
return;
}

Object sequenceId = super.sequenceIdentifier(message);
EventHandlingQueueIdentifier identifier = new EventHandlingQueueIdentifier(sequenceId, processingGroup);
if (queue.enqueueIfPresent(identifier, message).isPresent()) {
logger.info(
"Event [{}] is added to the dead-letter queue since its processing id [{}-{}] was already present.",
message, sequenceIdentifier, processingGroup
);
logger.info("Event [{}] is added to the dead-letter queue since its queue id [{}] was already present.",
message, identifier.combinedIdentifier());
} else {
logger.debug("Event [{}] with processing id [{}-{}] is not present in the dead-letter queue present."
+ "Handle operation is delegated to the wrapped EventHandlerInvoker.",
message, sequenceIdentifier, processingGroup);
try {
logger.trace("Event [{}] with queue id [{}] is not present in the dead-letter queue present."
+ "Handle operation is delegated to the wrapped EventHandlerInvoker.",
message, identifier.combinedIdentifier());
super.invokeHandlers(message);
} catch (Exception e) {
// TODO: 03-12-21 how to deal with the delegates ListenerInvocationErrorHandler in this case?
// It is mandatory to rethrow the exception, as otherwise the message isn't enqueued.
delegate.handle(message, segment);
} catch (Exception e) {
// TODO: 14-01-22 We could (1) move the errorHandler invocation to a protected method to override,
// ensuring enqueue is invoked at all times with a try-catch block
// or (2) enforce a PropagatingErrorHandler at all times or (3) do nothing.
queue.enqueue(identifier, message, e);
}
}
}

@Override
public boolean supportsReset() {
return delegate.supportsReset();
}

@Override
public void performReset() {
if (allowReset) {
queue.clear(processingGroup);
}
delegate.performReset();
super.performReset(null);
}

@Override
public <R> void performReset(R resetContext) {
if (allowReset) {
queue.clear(processingGroup);
}
delegate.performReset(resetContext);
}

@Override
public Object sequenceIdentifier(EventMessage<?> event) {
return delegate.sequenceIdentifier(event);
super.performReset(resetContext);
}

/**
*
*/
public static class Builder {
public static class Builder extends SimpleEventHandlerInvoker.Builder<Builder> {

private EventHandlerInvoker delegate;
private DeadLetterQueue<EventMessage<?>> queue;
private String processingGroup;
private boolean allowReset = false;

/**
* The {@link EventHandlerInvoker} wrapped by this dead-lettering implementation. Operations {@link
* #canHandle(EventMessage, Segment)}, {@link #canHandleType(Class)}, {@link #supportsReset()} and {@link
* #sequenceIdentifier(EventMessage)} are completely taken care of by the given {@code delegate}.
*
* @param delegate The {@link EventHandlerInvoker} instance to delegate operations to.
* @return The current Builder instance, for fluent interfacing.
*/
public Builder delegate(EventHandlerInvoker delegate) {
assertNonNull(delegate, "The delegate EventHandlerInvoker may not be null");
this.delegate = delegate;
return this;
}

/**
* Sets the {@link DeadLetterQueue} this {@link EventHandlerInvoker} maintains dead-letters with.
*
Expand Down Expand Up @@ -202,9 +177,8 @@ public DeadLetteringEventHandlerInvoker build() {
* specifications.
*/
protected void validate() {
assertNonNull(delegate, "The delegate EventHandlerInvoker is a hard requirement and should be provided");
assertNonNull(queue, "The DeadLetterQueue is a hard requirement and should be provided");
assertNonNull(processingGroup, "The processing group is a hard requirement and should be provided");
assertNonEmpty(processingGroup, "The processing group is a hard requirement and should be provided");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2021. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,6 @@
class SimpleEventHandlerInvokerTest {

private static final Object NO_RESET_PAYLOAD = null;
private static final String PROCESSING_GROUP = "processingGroup";

private EventMessageHandler mockHandler1;
private EventMessageHandler mockHandler2;
Expand Down

0 comments on commit 15d06fb

Please sign in to comment.