New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dead-Letter Queue for Event Processors #2021
Comments
Clarify exception message by introducing the other token's name #2021
Add the processing group to the SimpleEventHandlerInvoker. This will allow for clearer tracing of failed event message handling. #2021
Include the failure that the caused the dead letter #2021
Construct an exception dedicated for event handling failures. This exception should then carry the required information to store events as dead letters, with things like the sequenceId and processingGroup. The exception should be thrown from the SimpleEventHandlerInvoker, that wraps the original exception of the event handler in it. #2021
Construct a ListenerInvocationErrorHandler that stores EventExecutionExceptions as dead letters in a DeadLetterQueue for EventMessages. #2021
As events should be handled in order, this means dead-lettered events should include following events in the same sequence. Doing so ensures processing doesn't go awry. To that end, it's beneficial to wrap the SimpleEventHandlerInvoker in a dead-letter aware variant that can validate the contents of the queue for a new event it receives. If it's present, the event should be added. If it's not present, regular handling should proceed. #2021
Adjust the DeadLetterQueue on several perspectives, like: - Replace 'add' for 'enqueue' - Enqueue Message instances instead of DeadLetter instances - Rename sequenceId to identifier to be more generic - Add group name to allow uniqueness among e.g. processors or aggregates - Rename DeadLetter to DeadLetterEntry - Add expiresAt field to DeadLetterEntry, as a handle to trigger evaluation - Remove poll() method from DeadLetterQueue - Return a single DeadLetterEntry from peek() - Add failed/succeeded methods to the DeadLetterQueue to mark a letter as successfully or failed evaluation. Failing should allow updating the cause - Introduce notion of the queue being full - Update test suite according to the above #2021
Align the InMemoryDeadLetterQueue with the adjusted DeadLetterQueue approach. Also introduce a notion of queue ordering, so that peek() returns the right entries at the right time. Leave some todos for future work and introduce a builder #2021
Update GenericEventDeadLetter to comply with new DeadLetterEntry interface #2021
Instead of the SimpleEventHandlerInvoker, a distinct EventHandlerInvoker implementation should be constructed to deal with the dead letter logic This warrants the made invoker changes obsolete, as well as the EventExecutionException.java #2021
Remove the DeadLetterErrorHandler, since all dead lettering logic will reside in a specific EventHandlerInvoker #2021
A dead lettering EventHandlerInvoker should be able to retrieve the sequence id for a given event from its delegate. Without this, it will not be able to correctly store the events in the right order. By added EventHandlerInvoker#sequenceIdentifier, any specific implementation can retrieve the seqId for any event from its delegate. #2021
Update the DeadLetterEventHandlerInvoker to contain a delegate EventHandlerInvoker. It uses the delegate to delegate the event handling process too. This allows it to capture exceptions, making it the place to store dead letter in the queue too. As an EventHandlerInvoker implementation, it will delegate all other invoker calls to the delegate. Lastly, add a builder for configuration and a test case to validate it #2021
- Introduce the concept of a QueueIdentifier (plus concrete implementations), pairing the identifier and group fields used in the DeadLetterQueue. - Let enqueue and enqueueIfPresent return a DeadLetterEntry and Optional<DeadLetterEntry. This allows for immediate validation if operations succeeded, as well as additional referral to the entry constructed. - Separate maxSize() into maxQueues() and maxQueueSize(). This allows space for a maximum amount of queues as well as a maximum queue size. This requires the isFull method to take in a QueueIdentifier, to be able to make the correct validation. - Peek should contain the group it is peeking for. Otherwise, results from different (processing) groups may end up in the wrong handlers. Next to that, let peek return an Optional to eliminate null values. - Remove DeadLetterQueue#evaluationSucceeded, in favor of DeadLetterEntry#release. - Remove DeadLetterQueue#evaluationFailed. Instead, the implementations should update the DeadLetterEntry#expiresAt and reinsert the entry, right after performing peek. This safeguards against concurrent retrieval of entries. - Introduce a means to clear out the DeadLetterQueue. - Make all necessary adjustments to committed code (e.g., the DeadLetteringEventHandlerInvoker and InMemoryDeadLetterQueue) to comply with the above description. #2021
The DeadLetteringEventHandlerInvoker should extend the SimpleEventHandlerInvoker for a couple of reasons. One, we don't have a clean way to disregard event handling, and thus event enqueueing, for events that aren't intended for this segment. Thus, multithreaded solutions might all invoke the DeadLetterQueue#enqueueIfPresent whilst they will not handle the event to begin with. Secondly, it removes the necessity to make the sequenceIdentifier public logic of the EventHandlerInvoker. Thirdly, it allows far greater flexibility to adjust the overall handle approach of the SimpleEventHandlerInvoker, ensuring the correct flow of matching segments, enqueueing if present, handling the event, enqueuing on failures and (potentially) dealing with the ErrorHandler too. #2021
- Add an ExecutorService to start the queue release task - Make the invoker a Lifecycle implementation - Set incomplete start/shutdown methods - Set todos #2021
- Rename release() to acknowledge() - Add an evict() method. This method should be used to remove a letter if it is not required to be evaluated - Adjust implementation accordingly - Adjust JavaDoc accordingly #2021
Introduce a dedicated letter evaluation trigger. This trigger should be able to run the evaluation task from the DeadLetteringEventHandlerInvoker. Also, it should be "triggerable" on various scenarios, like manual, fixed delay and on enqueue count. For it to be able to run the evaluation task it'll likely require moving the Executor to the "evaluator" implementation(s) in favor of the DeadLetteringEventHandlerInvoker. With this move, the start/shutdown process of the invoker should change to accommodate correct start/shutdown of the evaluation tasks. #2021
Remove DeadLetterEvaluator idea entirely. Instead, we should implement onAvailable callbacks and release operations on the DLQ directly. #2021
We should adjust the SimpleEventHandlerInvoker#eventHandlers to return EventMessageHandlers directly. Doing so enables the DeadLetteringEventHandlerInvoker to retrieve these directly and use in the evaluation runnable. Added, we should validate the builder adjustments work as expected. #2021
Add a numberOfRetries method to the DeadLetterEntry. This allows users a chance to evict the entry after a certain amount if they desire so. Also, drop the GenericEventDeadLetter for now, as we're not using it at all. Lastly, fix the test entry in the DeadLetterEntryTest to comply with the compareTo change using the expiresAt field instead of the deadLettered field for comparison. #2021
Instead of the DeadLetterEvaluator, the DeadLetterQueue should be able to invoke callbacks/runnables once entries are ready to be taken. Or, when they are released. To that end, callbacks should be registered with the DeadLetterQueue. These callbacks are scheduled for invocation whenever an entry matching the provided group expires through the expireThreshold. These callbacks are also invoked upon invocation of release. Release should be invokable with a predicate, filtering out entries that should be released. Releasing means that the expireAt time is set to now, thus making them available for "the taking." The InMemoryDeadLetterQueue implementation should have a configurable ScheduledExecutorService, and allow for updating the expireAt time on release invocation. #2021
Drop the compareTo operation since the use of a Deque resolves the ordering of entries #2021
- Only throw DeadLetterQueueOverflowException on enqueue, as enqueueIfPresent already invokes enqueue - Use the sequenceIdentifier constructor for simplicity #2021
- Remove empty lines - Add missing parameter descriptions - Drop reference to EnqueueDecision#shouldEvict #2021
Return sequenceIdentifier directly if it's of type String instead of hashing it for simplification #2021
Rename EnqueuePolicy containing methods to reference DeadLetterPolicy instead. This clarifies the intent for the users, as an EnqueuePolicy doesn't state anything among event processor configuration. #2021
- Fix JavaDoc typo in DeadLetteringEventHandlerInvoker - Remove verification of TransactionManager use during DeadLetteringEventHandlerInvoker#handle - Remove last use of the TransactionManager in the DeadLetteringEventHandlerInvoker - Flip the actual and expected fields in the GenericDeadLetterTest as they do not follow the default usages of assertEquals #2021
To further protect the InMemorySequencedDeadLetterQueue against concurrent changes on the dead-letters collection, we should add some synchronization around it when accessing. Main point of attention here are the evict and requeue operations which might remove a sequence or bring it back. Due to this, contains and enqueue should also synchronize The process operation does not receive any synchronized-blocks as it works through the takenSequences set. #2021
Replace usages of Function<X, X> for UnaryOperator<X> for clarity. #2021
- Fix test that used incorrect construction start point - Remove unused exception class #2021
Adjust test names according to current style without the test prefix. Furthermore, ensure all test method names are descriptive enough. Lastly, IgnoreDecision.java should be renamed to Ignore to be inline with the other EnqueueDecision implementations. #2021
- fix javadoc inconsistency - adjust log statement #2021
Introduce an operation to check the amount of dead-letters in the queue. Introduce an operation to check the amount of dead-letters in a sequence Introduce an operation to check the amount of unique sequences in the queue. #2021
As the heavy-weight of the concurrency lies within the sequence, for which we use a ConcurrentLinkedDeque, we can drop more restrictive ConcurrentSkipListMap for the ConcurrentHashMap. It means the size values might deviate a little, but that should be fair given the nature of this in-memory DLQ. #2021
Instead of expecting the provided maxSequences / maxSequenceSize to be a value equal or above 128, we should expect a strictly positive number. We can hardly deduce all the scenarios of our end-users, so expecting anything else than strictly positive wouldn't be fair. #2021
Emphasize that only a single DeadLetter sequence is processed through the process operations on the SequencedDeadLetterProcessor and SequencedDeadLetterQueue. #2021
Remove maxSequences and maxSequenceSize from the SequencedDeadLetterQueue. The other size operations cover sufficiently for monitoring at this time. #2021
Fix merge issues, indentation, and warnings #2021
Increase test fault tolerance by removing SpanFactory usage #2021
Remove hyphen between dead and letter in the documentation and logs statements, as it's incorrect. #2021
[#2021] Dead Letter Queue for Event Processing Groups
Happy to announce, as follows from the label changes, that the base pull request introducing the Dead-Letter Queue is finalized and merged into the main branch. The JPA implementation of the Dead-Letter Queue will follow soon. Note that there are still future additions we will make to the Dead-Letter Queue support, with most notably the introduction of a JDBC implementation of the Dead-Letter Queue.
|
- Add deprecated description to deprecated VersionedAggregateIdentifier constructor - Fix typo on IdentifierMissingException class-level JavaDoc - Add copyright notice to IdentifierMissingException - Remove unused Matchers.eq invocations in AggregateAnnotationCommandHandlerTest - Rename test methods of AggregateAnnotationCommandHandlerTest to not contain test prefix - Remove public modifier from AggregateAnnotationCommandHandlerTest test methods. #2021
Feature Description
It would be beneficial if
EventProcessors
would allow the configuration of a Dead Letter Queue.This queue should be filled with
EventMessages
for which event handling failed.Furthermore, it should consider that events are handled in sequence, based on the
SequencingPolicy
.As such, events that return a
SequencingPolicy
for a dead-lettered event should not be handled and added to the queue instead.Doing so will allow for retries of the entire event order starting from a given point.
This should ensure that event execution, e.g., for query model updates, doesn't get mixed up with failed and successful operations.
The
DeadLetterQueue
implementation should provide several implementations, differing per storage medium of the letters.Logical implementations are in-memory, JPA, JDBC, Mongo, and Axon Server.
Lastly, several triggers upon which to reevaluate the letters should be supported.
The above does not mean we should provide several approaches directly, but only that there's space to offer other triggers.
Upon such a trigger, the dead-letters should be injected as a regular event stream into the processor.
Current Behaviour
There's no support for dead letters on the Event Processor.
Wanted Behaviour
Axon Framework should provide dead letter support for Event Processors.
Possible Workarounds
Entirely custom implementations done by the end user.
The text was updated successfully, but these errors were encountered: