Skip to content

Commit

Permalink
Remove logic from SimpleEventHandlerInvoker
Browse files Browse the repository at this point in the history
Instead of the SimpleEventHandlerInvoker, a distinct EventHandlerInvoker
 implementation should be constructed to deal with the dead letter logic
This warrants the made invoker changes obsolete, as well as the
EventExecutionException.java

#2021
  • Loading branch information
smcvb committed Dec 3, 2021
1 parent acff8aa commit 398bd83
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ private void registerSimpleEventHandlerInvokers(
processingGroup
))
.sequencingPolicy(sequencingPolicy(processingGroup))
.processingGroup(processingGroup)
.build()
);
});
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.eventhandling.deadletter.DeadLetterErrorHandler;
import org.axonframework.eventhandling.deadletter.DeadLetterEventHandlerInvoker;
import org.axonframework.messaging.annotation.HandlerDefinition;
import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.messaging.deadletter.DeadLetterQueue;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -33,7 +30,8 @@
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static org.axonframework.common.BuilderUtils.*;
import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.common.BuilderUtils.assertThat;
import static org.axonframework.common.ObjectUtils.getOrDefault;

/**
Expand All @@ -49,7 +47,6 @@ public class SimpleEventHandlerInvoker implements EventHandlerInvoker {
private final List<EventMessageHandler> wrappedEventHandlers;
private final ListenerInvocationErrorHandler listenerInvocationErrorHandler;
private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
protected final String processingGroup;

/**
* Instantiate a {@link SimpleEventHandlerInvoker} based on the fields contained in the {@link Builder}.
Expand All @@ -70,7 +67,6 @@ protected SimpleEventHandlerInvoker(Builder builder) {
.collect(Collectors.toCollection(ArrayList::new));
this.sequencingPolicy = builder.sequencingPolicy;
this.listenerInvocationErrorHandler = builder.listenerInvocationErrorHandler;
this.processingGroup = builder.processingGroup;
}

/**
Expand Down Expand Up @@ -115,17 +111,12 @@ public void handle(EventMessage<?> message, Segment segment) throws Exception {
try {
handler.handle(message);
} catch (Exception e) {
listenerInvocationErrorHandler.onError(executionException(message, e), message, handler);
listenerInvocationErrorHandler.onError(e, message, handler);
}
}
}
}

private EventExecutionException executionException(EventMessage<?> event, Exception exception) {
String message = String.format("Handling failed for event [%s] in group [%s]", event, processingGroup);
return new EventExecutionException(message, exception, sequenceIdentifier(event).toString(), processingGroup);
}

@Override
public boolean canHandle(EventMessage<?> eventMessage, Segment segment) {
return hasHandler(eventMessage) && sequencingPolicyMatchesSegment(eventMessage, segment);
Expand Down Expand Up @@ -207,8 +198,6 @@ public static class Builder {
private HandlerDefinition handlerDefinition;
private ListenerInvocationErrorHandler listenerInvocationErrorHandler = new LoggingErrorHandler();
private SequencingPolicy<? super EventMessage<?>> sequencingPolicy = SequentialPerAggregatePolicy.instance();
private String processingGroup;
private DeadLetterQueue<EventMessage<?>> deadLetterQueue;

/**
* Sets the {@code eventHandlers} this {@link EventHandlerInvoker} will forward all its events to. If an event
Expand Down Expand Up @@ -299,49 +288,12 @@ public Builder sequencingPolicy(SequencingPolicy<? super EventMessage<?>> sequen
}

/**
* Sets the name of this invoker.
*
* @param processingGroup the name of this {@link EventHandlerInvoker}
* @return the current Builder instance, for fluent interfacing
*/
public Builder processingGroup(String processingGroup) {
assertNonEmpty(processingGroup, "The processing group may not be null or empty");
this.processingGroup = processingGroup;
return this;
}

/**
* Sets the {@link DeadLetterQueue} used by this invoker. todo fine-tune documentation
*
* @param deadLetterQueue the name of this {@link EventHandlerInvoker}
* @return the current Builder instance, for fluent interfacing
*/
public Builder deadLetterQueue(DeadLetterQueue<EventMessage<?>> deadLetterQueue) {
assertNonNull(this.deadLetterQueue, "The DeadLetterQueue may not be null");
this.deadLetterQueue = deadLetterQueue;
return this;
}

/**
* Initializes a {@link SimpleEventHandlerInvoker} or {@link DeadLetterEventHandlerInvoker} as specified through
* this Builder. Will return a {@code DeadLetterEventHandlerInvoker} if {@link
* #deadLetterQueue(DeadLetterQueue)} has been set. Otherwise, builds a regular {@code
* SimpleEventHandlerInvoker}.
* Initializes a {@link SimpleEventHandlerInvoker} as specified through this Builder.
*
* @param <R> a generic extending {@link SimpleEventHandlerInvoker}, to allow both an {@code
* SimpleEventHandlerInvoker} and {@link DeadLetterEventHandlerInvoker} return type
* @return a {@link SimpleEventHandlerInvoker} or {@link DeadLetterEventHandlerInvoker} (if {@link
* #deadLetterQueue(DeadLetterQueue)} has been set) as specified through this Builder
* @return a {@link SimpleEventHandlerInvoker} as specified through this Builder
*/
@SuppressWarnings("unchecked")
public <R extends SimpleEventHandlerInvoker> R build() {
if (deadLetterQueue != null) {
this.listenerInvocationErrorHandler = DeadLetterErrorHandler.builder()
.deadLetterQueue(deadLetterQueue)
.build();
return (R) new DeadLetterEventHandlerInvoker(this);
}
return (R) new SimpleEventHandlerInvoker(this);
public SimpleEventHandlerInvoker build() {
return new SimpleEventHandlerInvoker(this);
}

/**
Expand Down Expand Up @@ -373,16 +325,6 @@ protected void validate() throws AxonConfigurationException {
assertThat(eventHandlers,
list -> list != null && !list.isEmpty(),
"At least one EventMessageHandler should be provided");
assertNonNull(processingGroup, "The processing group is a hard requirement and should be provided");
}

/**
* Return the {@link DeadLetterQueue} configured in this Builder.
*
* @return the {@link DeadLetterQueue} configured in this Builder
*/
public DeadLetterQueue<EventMessage<?>> deadLetterQueue() {
return deadLetterQueue;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@

package org.axonframework.eventhandling;

import org.axonframework.common.AxonConfigurationException;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.junit.jupiter.api.*;
import org.mockito.*;

import java.util.List;

import static org.axonframework.utils.EventTestUtils.createEvent;
import static org.axonframework.utils.EventTestUtils.createEvents;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

/**
Expand All @@ -49,7 +46,6 @@ void setUp() {
mockHandler2 = mock(EventMessageHandler.class);
testSubject = SimpleEventHandlerInvoker.builder()
.eventHandlers("test", mockHandler1, mockHandler2)
.processingGroup(PROCESSING_GROUP)
.build();
}

Expand Down Expand Up @@ -81,46 +77,6 @@ void testRepeatedEventPublication() throws Exception {
inOrder.verifyNoMoreInteractions();
}

@Test
void testHandleWrapsExceptionInEventExecutionException() throws Exception {
// given...
ListenerInvocationErrorHandler errorHandler = mock(ListenerInvocationErrorHandler.class);
//noinspection unchecked
SequencingPolicy<EventMessage<?>> sequencingPolicy = mock(SequencingPolicy.class);

SimpleEventHandlerInvoker customTestSubject =
SimpleEventHandlerInvoker.builder()
.eventHandlers(mockHandler1)
.listenerInvocationErrorHandler(errorHandler)
.sequencingPolicy(sequencingPolicy)
.processingGroup(PROCESSING_GROUP)
.build();

EventMessage<?> testEvent = createEvent();

RuntimeException expectedException = new RuntimeException("some-exception");
String expectedSequenceIdentifier = "sequenceIdentifier";

when(mockHandler1.handle(testEvent)).thenThrow(expectedException);
when(sequencingPolicy.getSequenceIdentifierFor(testEvent)).thenReturn(expectedSequenceIdentifier);

// when...
customTestSubject.handle(testEvent, Segment.ROOT_SEGMENT);

// then...
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);

verify(errorHandler).onError(exceptionCaptor.capture(), eq(testEvent), eq(mockHandler1));

Exception result = exceptionCaptor.getValue();

assertTrue(result instanceof EventExecutionException);
EventExecutionException executionException = ((EventExecutionException) result);
assertEquals(expectedSequenceIdentifier, executionException.getSequenceIdentifier());
assertEquals(PROCESSING_GROUP, executionException.getProcessingGroup());
assertEquals(expectedException, executionException.getCause());
}

@Test
void testPerformReset() {
testSubject.performReset();
Expand All @@ -138,25 +94,4 @@ void testPerformResetWithResetContext() {
verify(mockHandler1).prepareReset(eq(resetContext));
verify(mockHandler2).prepareReset(eq(resetContext));
}

@Test
void testBuildWithNullProcessingGroupThrowsAxonConfigurationException() {
SimpleEventHandlerInvoker.Builder testSubject = SimpleEventHandlerInvoker.builder();

assertThrows(AxonConfigurationException.class, () -> testSubject.processingGroup(null));
}

@Test
void testBuildWithEmptyProcessingGroupThrowsAxonConfigurationException() {
SimpleEventHandlerInvoker.Builder testSubject = SimpleEventHandlerInvoker.builder();

assertThrows(AxonConfigurationException.class, () -> testSubject.processingGroup(""));
}

@Test
void testBuildWithoutProcessingGroupThrowsAxonConfigurationException() {
SimpleEventHandlerInvoker.Builder testSubject = SimpleEventHandlerInvoker.builder();

assertThrows(AxonConfigurationException.class, testSubject::build);
}
}

0 comments on commit 398bd83

Please sign in to comment.