Skip to content

Commit

Permalink
Merge pull request #2258 from AxonFramework/feature/dead-letter-queue
Browse files Browse the repository at this point in the history
[#2021] Dead Letter Queue for Event Processing Groups
  • Loading branch information
smcvb committed Sep 7, 2022
2 parents e905e6f + cd95e26 commit ef83538
Show file tree
Hide file tree
Showing 39 changed files with 5,684 additions and 230 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2018. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,16 +22,21 @@
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.ListenerInvocationErrorHandler;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.modelling.saga.repository.SagaStore;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.deadletter.EnqueuePolicy;
import org.axonframework.messaging.deadletter.SequencedDeadLetterProcessor;
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.modelling.saga.repository.SagaStore;
import org.axonframework.monitoring.MessageMonitor;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.Nonnull;

/**
* Defines a contract for accessor methods regarding event processing configuration.
Expand Down Expand Up @@ -212,4 +217,46 @@ default <T> SagaConfiguration<T> sagaConfiguration(Class<T> sagaType) {
* @return the {@link TransactionManager}belonging to the given {@code processorName}
*/
TransactionManager transactionManager(String processorName);

/**
* Returns the {@link SequencedDeadLetterQueue} tied to the given {@code processingGroup} in an {@link Optional}.
* May return an {@link Optional#empty() empty optional} when there's no {@code SequencedDeadLetterQueue} present
* for the given {@code processingGroup}.
*
* @param processingGroup The name of the processing group for which to return a {@link SequencedDeadLetterQueue}.
* @return The {@link SequencedDeadLetterQueue} tied to the given {@code processingGroup}, {@link Optional#empty()}
* if there is none.
*/
default Optional<SequencedDeadLetterQueue<EventMessage<?>>> deadLetterQueue(
@Nonnull String processingGroup
) {
return Optional.empty();
}

/**
* Returns the {@link EnqueuePolicy dead letter policy} tied to the given {@code processingGroup} in an
* {@link Optional}. May return an {@link Optional} containing the
* {@link EventProcessingConfigurer#registerDefaultDeadLetterPolicy(Function) default policy} if present.
*
* @param processingGroup The name of the processing group for which to return an {@link EnqueuePolicy}.
* @return The {@link EnqueuePolicy} belonging to the given {@code processingGroup}.
*/
default Optional<EnqueuePolicy<EventMessage<?>>> deadLetterPolicy(@Nonnull String processingGroup) {
return Optional.empty();
}

/**
* Returns the {@link SequencedDeadLetterProcessor} tied to the given {@code processingGroup} in an
* {@link Optional}. Returns an {@link Optional#empty() empty optional} when the {@code processingGroup} does not
* have a {@link SequencedDeadLetterQueue} attached to it.
*
* @param processingGroup The name of the processing group for which to return an {@link EnqueuePolicy}.
* @return The {@link SequencedDeadLetterProcessor} tied to the given {@code processingGroup} in an
* {@link Optional}, {@link Optional#empty()} if there is none.
*/
default Optional<SequencedDeadLetterProcessor<EventMessage<?>>> sequencedDeadLetterProcessor(
@Nonnull String processingGroup
) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.async.SequencingPolicy;
import org.axonframework.eventhandling.async.SequentialPerAggregatePolicy;
import org.axonframework.eventhandling.deadletter.DeadLetteringEventHandlerInvoker;
import org.axonframework.eventhandling.pooled.PooledStreamingEventProcessor;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.deadletter.DeadLetter;
import org.axonframework.messaging.deadletter.EnqueuePolicy;
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.modelling.saga.repository.SagaStore;
import org.axonframework.monitoring.MessageMonitor;
Expand All @@ -43,6 +47,7 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nonnull;

/**
* Defines a contract for configuring event processing.
Expand Down Expand Up @@ -664,6 +669,84 @@ EventProcessingConfigurer registerPooledStreamingEventProcessorConfiguration(
String name, PooledStreamingProcessorConfiguration pooledStreamingProcessorConfiguration
);

/**
* Register a {@link SequencedDeadLetterQueue} for the given {@code processingGroup}. The
* {@code SequencedDeadLetterQueue} will automatically enqueue failed events and evaluate them per the queue's
* configuration.
*
* @param processingGroup A {@link String} specifying the name of the processing group to register the given
* {@link SequencedDeadLetterQueue} for.
* @param queueBuilder A builder method returning a {@link SequencedDeadLetterQueue} based on a
* {@link Configuration}. The outcome is used by the given {@code processingGroup} to enqueue
* and evaluate failed events in.
* @return The current {@link EventProcessingConfigurer} instance, for fluent interfacing.
*/
default EventProcessingConfigurer registerDeadLetterQueue(
@Nonnull String processingGroup,
@Nonnull Function<Configuration, SequencedDeadLetterQueue<EventMessage<?>>> queueBuilder
) {
return this;
}

/**
* Register a default {@link EnqueuePolicy dead letter policy} for any processing group using a
* {@link #registerDeadLetterQueue(String, Function) dead letter queue}. The processing group uses the policy to
* deduce whether a failed {@link EventMessage} should be
* {@link SequencedDeadLetterQueue#enqueue(Object, DeadLetter) enqueued} for later evaluation.
* <p>
* Note that the configured component will not be used if the processing group <em>does not</em> have a dead letter
* queue.
*
* @param policyBuilder A builder method to construct a default {@link EnqueuePolicy dead letter policy}.
* @return The current {@link EventProcessingConfigurer} instance, for fluent interfacing.
*/
default EventProcessingConfigurer registerDefaultDeadLetterPolicy(
@Nonnull Function<Configuration, EnqueuePolicy<EventMessage<?>>> policyBuilder
) {
return this;
}

/**
* Register a {@link EnqueuePolicy dead letter policy} for the given {@code processingGroup} using a
* {@link #registerDeadLetterQueue(String, Function) dead letter queue}. The processing group uses the policy to
* deduce whether a failed {@link EventMessage} should be
* {@link SequencedDeadLetterQueue#enqueue(Object, DeadLetter) enqueued} for later evaluation.
* <p>
* Note that the configured component will not be used if the processing group <em>does not</em> have a dead letter
* queue.
*
* @param processingGroup The name of the processing group to build an {@link EnqueuePolicy} for.
* @param policyBuilder A builder method to construct a {@link EnqueuePolicy dead letter policy} for the given
* {@code processingGroup}.
* @return The current {@link EventProcessingConfigurer} instance, for fluent interfacing.
*/
default EventProcessingConfigurer registerDeadLetterPolicy(
@Nonnull String processingGroup,
@Nonnull Function<Configuration, EnqueuePolicy<EventMessage<?>>> policyBuilder
) {
return this;
}

/**
* Register a {@link DeadLetteringInvokerConfiguration} for the given {@code processingGroup}. This configuration
* object allows for fine-grained customization of a
* {@link DeadLetteringEventHandlerInvoker dead lettering processing group} through its
* {@link DeadLetteringEventHandlerInvoker.Builder builder}.
* <p>
* Note that the configured component will not be used if the processing group <em>does not</em> have a dead letter
* queue.
*
* @param processingGroup The name of the processing group to attach additional configuration too.
* @param configuration The additional configuration for the dead lettering processing group.
* @return The current {@link EventProcessingConfigurer} instance, for fluent interfacing.
*/
default EventProcessingConfigurer registerDeadLetteringEventHandlerInvokerConfiguration(
@Nonnull String processingGroup,
@Nonnull DeadLetteringInvokerConfiguration configuration
) {
return this;
}

/**
* Contract which defines how to build an event processor.
*/
Expand Down Expand Up @@ -711,4 +794,33 @@ static PooledStreamingProcessorConfiguration noOp() {
return (config, builder) -> builder;
}
}

/**
* Contract defining {@link DeadLetteringEventHandlerInvoker.Builder} based configuration when constructing a
* {@link DeadLetteringEventHandlerInvoker}.
*/
@FunctionalInterface
interface DeadLetteringInvokerConfiguration extends
BiFunction<Configuration, DeadLetteringEventHandlerInvoker.Builder, DeadLetteringEventHandlerInvoker.Builder> {

/**
* Returns a configuration that applies the given {@code other} configuration after applying {@code this}. Any
* configuration set by the {@code other} will override changes by {@code this} instance.
*
* @param other The configuration to apply after applying this.
* @return A configuration that applies both this and then the other configuration.
*/
default DeadLetteringInvokerConfiguration andThen(DeadLetteringInvokerConfiguration other) {
return (config, builder) -> other.apply(config, this.apply(config, builder));
}

/**
* A {@link DeadLetteringInvokerConfiguration} which does not add any configuration.
*
* @return A {@link DeadLetteringInvokerConfiguration} which does not add any configuration.
*/
static DeadLetteringInvokerConfiguration noOp() {
return (config, builder) -> builder;
}
}
}

0 comments on commit ef83538

Please sign in to comment.