Skip to content

Commit

Permalink
Validate processing id to insert new dead-letter in order
Browse files Browse the repository at this point in the history
As events should be handled in order, this means dead-lettered
events should include following events in the same sequence. Doing so
ensures processing doesn't go awry. To that end, it's beneficial to wrap
 the SimpleEventHandlerInvoker in a dead-letter aware variant that can
 validate the contents of the queue for a new event it receives. If it's
 present, the event should be added. If it's not present, regular
 handling should proceed.

#2021
  • Loading branch information
smcvb committed Nov 26, 2021
1 parent 57d6586 commit 1bab855
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.eventhandling.deadletter.DeadLetterErrorHandler;
import org.axonframework.eventhandling.deadletter.DeadLetterEventHandlerInvoker;
import org.axonframework.messaging.annotation.HandlerDefinition;
import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.messaging.deadletter.DeadLetterQueue;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -46,7 +49,7 @@ public class SimpleEventHandlerInvoker implements EventHandlerInvoker {
private final List<EventMessageHandler> wrappedEventHandlers;
private final ListenerInvocationErrorHandler listenerInvocationErrorHandler;
private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
private final String processingGroup;
protected final String processingGroup;

/**
* Instantiate a {@link SimpleEventHandlerInvoker} based on the fields contained in the {@link Builder}.
Expand Down Expand Up @@ -156,7 +159,7 @@ private boolean sequencingPolicyMatchesSegment(EventMessage<?> message, Segment
return segment.matches(Objects.hashCode(sequenceIdentifier(message)));
}

private Object sequenceIdentifier(EventMessage<?> event) {
protected Object sequenceIdentifier(EventMessage<?> event) {
return getOrDefault(sequencingPolicy.getSequenceIdentifierFor(event), event::getIdentifier);
}

Expand All @@ -172,10 +175,20 @@ public <R> void performReset(R resetContext) {
}
}

/**
* Return the {@link ListenerInvocationErrorHandler} as configured for this {@link EventHandlerInvoker}.
*
* @return the {@link ListenerInvocationErrorHandler} as configured for this {@link EventHandlerInvoker}
*/
public ListenerInvocationErrorHandler getListenerInvocationErrorHandler() {
return listenerInvocationErrorHandler;
}

/**
* Return the {@link SequencingPolicy} as configured for this {@link EventHandlerInvoker}.
*
* @return the {@link SequencingPolicy} as configured for this {@link EventHandlerInvoker}
*/
public SequencingPolicy<? super EventMessage<?>> getSequencingPolicy() {
return sequencingPolicy;
}
Expand All @@ -184,8 +197,8 @@ public SequencingPolicy<? super EventMessage<?>> getSequencingPolicy() {
* Builder class to instantiate a {@link SimpleEventHandlerInvoker}.
* <p>
* The {@link ListenerInvocationErrorHandler} is defaulted to a {@link LoggingErrorHandler} and the {@link
* SequencingPolicy} to a {@link SequentialPerAggregatePolicy}. Providing at least one Event Handler is a
* <b>hard requirement</b> and thus should be accounted for.
* SequencingPolicy} to a {@link SequentialPerAggregatePolicy}. Providing at least one Event Handler and a
* processing group name are a <b>hard requirements</b> and thus should be accounted for.
*/
public static class Builder {

Expand All @@ -195,6 +208,7 @@ public static class Builder {
private ListenerInvocationErrorHandler listenerInvocationErrorHandler = new LoggingErrorHandler();
private SequencingPolicy<? super EventMessage<?>> sequencingPolicy = SequentialPerAggregatePolicy.instance();
private String processingGroup;
private DeadLetterQueue<EventMessage<?>> deadLetterQueue;

/**
* Sets the {@code eventHandlers} this {@link EventHandlerInvoker} will forward all its events to. If an event
Expand Down Expand Up @@ -297,12 +311,37 @@ public Builder processingGroup(String processingGroup) {
}

/**
* Initializes a {@link SimpleEventHandlerInvoker} as specified through this Builder.
* Sets the {@link DeadLetterQueue} used by this invoker. todo fine-tune documentation
*
* @return a {@link SimpleEventHandlerInvoker} as specified through this Builder
* @param deadLetterQueue the name of this {@link EventHandlerInvoker}
* @return the current Builder instance, for fluent interfacing
*/
public SimpleEventHandlerInvoker build() {
return new SimpleEventHandlerInvoker(this);
public Builder deadLetterQueue(DeadLetterQueue<EventMessage<?>> deadLetterQueue) {
assertNonNull(this.deadLetterQueue, "The DeadLetterQueue may not be null");
this.deadLetterQueue = deadLetterQueue;
return this;
}

/**
* Initializes a {@link SimpleEventHandlerInvoker} or {@link DeadLetterEventHandlerInvoker} as specified through
* this Builder. Will return a {@code DeadLetterEventHandlerInvoker} if {@link
* #deadLetterQueue(DeadLetterQueue)} has been set. Otherwise, builds a regular {@code
* SimpleEventHandlerInvoker}.
*
* @param <R> a generic extending {@link SimpleEventHandlerInvoker}, to allow both an {@code
* SimpleEventHandlerInvoker} and {@link DeadLetterEventHandlerInvoker} return type
* @return a {@link SimpleEventHandlerInvoker} or {@link DeadLetterEventHandlerInvoker} (if {@link
* #deadLetterQueue(DeadLetterQueue)} has been set) as specified through this Builder
*/
@SuppressWarnings("unchecked")
public <R extends SimpleEventHandlerInvoker> R build() {
if (deadLetterQueue != null) {
this.listenerInvocationErrorHandler = DeadLetterErrorHandler.builder()
.deadLetterQueue(deadLetterQueue)
.build();
return (R) new DeadLetterEventHandlerInvoker(this);
}
return (R) new SimpleEventHandlerInvoker(this);
}

/**
Expand Down Expand Up @@ -336,5 +375,14 @@ protected void validate() throws AxonConfigurationException {
"At least one EventMessageHandler should be provided");
assertNonNull(processingGroup, "The processing group is a hard requirement and should be provided");
}

/**
* Return the {@link DeadLetterQueue} configured in this Builder.
*
* @return the {@link DeadLetterQueue} configured in this Builder
*/
public DeadLetterQueue<EventMessage<?>> deadLetterQueue() {
return deadLetterQueue;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void onError(Exception exception, EventMessage<?> event, EventMessageHand
* <p>
* The {@link DeadLetterQueue} is a <b>hard requirement</b> and as such should be provided.
*/
protected static class Builder {
public static class Builder {

private DeadLetterQueue<EventMessage<?>> deadLetterQueue;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.eventhandling.deadletter;

import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.messaging.deadletter.DeadLetterQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;

/**
* @author Steven van Beelen
* @since 4.6.0
*/
public class DeadLetterEventHandlerInvoker extends SimpleEventHandlerInvoker {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final DeadLetterQueue<EventMessage<?>> queue;

/**
* Instantiate a {@link SimpleEventHandlerInvoker} based on the fields contained in the {@link Builder}.
* <p>
* Will assert that at least one {@link org.axonframework.eventhandling.EventMessageHandler} is provided, and will
* throw an {@link org.axonframework.common.AxonConfigurationException} if this is not the case.
*
* @param builder the {@link Builder} used to instantiate a {@link SimpleEventHandlerInvoker} instance
*/
public DeadLetterEventHandlerInvoker(Builder builder) {
super(builder);
this.queue = builder.deadLetterQueue();
}

@Override
public void handle(EventMessage<?> message, Segment segment) throws Exception {
String sequenceIdentifier = super.sequenceIdentifier(message).toString();
if (queue.addIfPresent(sequenceIdentifier, () -> new GenericEventDeadLetter(sequenceIdentifier, message))) {
logger.info("Event [{}] is added to the dead-letter queue since its processing id [{}] was already present.",
message, sequenceIdentifier);
} else {
logger.debug("Event [{}] with processing id [{}] is not present in the dead-letter queue present."
+ "Handle operation is delegate to the parent.",
message, sequenceIdentifier);
super.handle(message, segment);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.axonframework.messaging.Message;

import java.util.function.Supplier;
import java.util.stream.Stream;

/**
Expand All @@ -39,11 +40,27 @@ public interface DeadLetterQueue<T extends Message<?>> {
* DeadLetter#sequenceIdentifier()}. If there's no queue for the {@code deadLetter} it is ignored.
*
* @param deadLetter the {@link DeadLetter} to attached in FIFO ordering for the given {@code sequenceIdentifier}
* @return {@code true} if the {@code deadLetter} is added, {@code false} otherwise
*/
default void addIfPresent(DeadLetter<T> deadLetter) {
if (!isEmpty() && contains(deadLetter.sequenceIdentifier())) {
add(deadLetter);
default boolean addIfPresent(DeadLetter<T> deadLetter) {
return addIfPresent(deadLetter.sequenceIdentifier(), () -> deadLetter);
}

/**
* Adds the result of the given {@code deadLetterSupplier} if this queue contains the given {@code
* sequenceIdentifier}. If there's no queue for the {@code deadLetter} it is ignored.
*
* @param sequenceIdentifier the identifier used to validate for contained {@link DeadLetter} instances
* @param deadLetterSupplier the {@link Supplier} of the {@link DeadLetter}. Only invoked if the given {@code
* sequenceIdentifier} is contained in this queue
* @return {@code true} if the {@code deadLetter} is added, {@code false} otherwise
*/
default boolean addIfPresent(String sequenceIdentifier, Supplier<DeadLetter<T>> deadLetterSupplier) {
boolean canAdd = !isEmpty() && contains(sequenceIdentifier);
if (canAdd) {
add(deadLetterSupplier.get());
}
return canAdd;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.axonframework.eventhandling.deadletter.GenericEventDeadLetter;
import org.junit.jupiter.api.*;

import java.time.Instant;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -64,8 +63,9 @@ void testAdd() {
void testAddIfPresentDoesNotAddForEmptyQueue() {
String testSequenceId = generateSequenceId();

testSubject.addIfPresent(generateDeadLetter(testSequenceId));
boolean result = testSubject.addIfPresent(generateDeadLetter(testSequenceId));

assertFalse(result);
assertFalse(testSubject.contains(testSequenceId));
assertTrue(testSubject.isEmpty());
}
Expand All @@ -77,8 +77,9 @@ void testAddIfPresentDoesNotAddForNonExistentSequenceId() {

String testSecondSequenceId = generateSequenceId();

testSubject.addIfPresent(generateDeadLetter(testFirstSequenceId));
boolean result = testSubject.addIfPresent(generateDeadLetter(testFirstSequenceId));

assertTrue(result);
assertTrue(testSubject.contains(testFirstSequenceId));
assertFalse(testSubject.contains(testSecondSequenceId));
assertFalse(testSubject.isEmpty());
Expand All @@ -91,13 +92,14 @@ void testAddIfPresentAddsForExistingSequence() {
testSubject.add(testFirstDeadLetter);
DeadLetter<EventMessage<?>> testSecondDeadLetter = generateDeadLetter(testSequenceId);

testSubject.addIfPresent(testSecondDeadLetter);
boolean result = testSubject.addIfPresent(testSecondDeadLetter);

assertTrue(result);
assertTrue(testSubject.contains(testSequenceId));
assertFalse(testSubject.isEmpty());
List<DeadLetter<EventMessage<?>>> result = testSubject.peek().collect(Collectors.toList());
assertTrue(result.contains(testFirstDeadLetter));
assertTrue(result.contains(testSecondDeadLetter));
List<DeadLetter<EventMessage<?>>> resultQueue = testSubject.peek().collect(Collectors.toList());
assertTrue(resultQueue.contains(testFirstDeadLetter));
assertTrue(resultQueue.contains(testSecondDeadLetter));
}

@Test
Expand Down Expand Up @@ -234,8 +236,6 @@ private static DeadLetter<EventMessage<?>> generateDeadLetter() {
}

private static DeadLetter<EventMessage<?>> generateDeadLetter(String sequenceIdentifier) {
return new GenericEventDeadLetter(sequenceIdentifier,
Instant.now(),
GenericEventMessage.asEventMessage(generateSequenceId()));
return new GenericEventDeadLetter(sequenceIdentifier, GenericEventMessage.asEventMessage(generateSequenceId()));
}
}

0 comments on commit 1bab855

Please sign in to comment.