Skip to content

Commit

Permalink
Solidify API of the DeadLetterQueue
Browse files Browse the repository at this point in the history
- Introduce the concept of a QueueIdentifier (plus concrete
implementations), pairing the identifier and group fields used in the
DeadLetterQueue.
- Let enqueue and enqueueIfPresent return a DeadLetterEntry and
Optional<DeadLetterEntry. This allows for immediate validation if
operations succeeded, as well as additional referral to the entry
constructed.
- Separate maxSize() into maxQueues() and maxQueueSize(). This allows
space for a maximum amount of queues as well as a maximum queue size.
This requires the isFull method to take in a QueueIdentifier, to be able
 to make the correct validation.
- Peek should contain the group it is peeking for. Otherwise, results
from different (processing) groups may end up in the wrong handlers.
Next to that, let peek return an Optional to eliminate null values.
- Remove DeadLetterQueue#evaluationSucceeded, in favor of
DeadLetterEntry#release.
- Remove DeadLetterQueue#evaluationFailed. Instead, the implementations
should update the DeadLetterEntry#expiresAt and reinsert the entry,
right after performing peek. This safeguards against concurrent
retrieval of entries.
- Introduce a means to clear out the DeadLetterQueue.
- Make all necessary adjustments to committed code (e.g., the
DeadLetteringEventHandlerInvoker and InMemoryDeadLetterQueue) to
comply with the above description.

#2021
  • Loading branch information
smcvb committed Jan 11, 2022
1 parent 6043f85 commit e87e575
Show file tree
Hide file tree
Showing 13 changed files with 1,168 additions and 411 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2021. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,8 +70,9 @@ public boolean canHandleType(Class<?> payloadType) {

@Override
public void handle(EventMessage<?> message, Segment segment) throws Exception {
String sequenceIdentifier = Integer.toString(delegate.sequenceIdentifier(message).hashCode());
if (queue.enqueueIfPresent(sequenceIdentifier, processingGroup, message)) {
String sequenceIdentifier = Integer.toString(Objects.hashCode(delegate.sequenceIdentifier(message)));
EventHandlingQueueIdentifier identifier = new EventHandlingQueueIdentifier(sequenceIdentifier, processingGroup);
if (queue.enqueueIfPresent(identifier, message).isPresent()) {
logger.info(
"Event [{}] is added to the dead-letter queue since its processing id [{}-{}] was already present.",
message, sequenceIdentifier, processingGroup
Expand All @@ -85,7 +86,7 @@ public void handle(EventMessage<?> message, Segment segment) throws Exception {
// It is mandatory to rethrow the exception, as otherwise the message isn't enqueued.
delegate.handle(message, segment);
} catch (Exception e) {
queue.enqueue(sequenceIdentifier, processingGroup, message, e);
queue.enqueue(identifier, message, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.eventhandling.deadletter;


import org.axonframework.messaging.deadletter.QueueIdentifier;

import java.util.Objects;

/**
* Implementation of the {@link QueueIdentifier} dedicated for dead-lettering in event handling components.
* <p>
* This identifier is used to uniquely identify a sequence of events for a specific {@code processingGroup}. The
* sequence identifier is typically the result of a {@link org.axonframework.eventhandling.async.SequencingPolicy#getSequenceIdentifierFor(Object)}
* operation.
*
* @author Steven van Beelen
* @see DeadLetteringEventHandlerInvoker
* @since 4.6.0
*/
public class EventHandlingQueueIdentifier implements QueueIdentifier {

private final Object sequenceIdentifier;
private final String processingGroup;

/**
* Constructs an event handling specific {@link QueueIdentifier}.
*
* @param sequenceIdentifier The identifier of a sequence of events to enqueue.
* @param processingGroup The processing group that is required to enqueue events.
*/
public EventHandlingQueueIdentifier(Object sequenceIdentifier, String processingGroup) {
this.sequenceIdentifier = sequenceIdentifier;
this.processingGroup = processingGroup;
}

@Override
public Object identifier() {
return this.sequenceIdentifier;
}

@Override
public String group() {
return this.processingGroup;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EventHandlingQueueIdentifier that = (EventHandlingQueueIdentifier) o;
return Objects.equals(sequenceIdentifier, that.sequenceIdentifier)
&& Objects.equals(processingGroup, that.processingGroup);
}

@Override
public int hashCode() {
return Objects.hash(sequenceIdentifier, processingGroup);
}

@Override
public String toString() {
return "EventHandlingQueueIdentifier{" +
"sequenceIdentifier=" + sequenceIdentifier +
", processingGroup='" + processingGroup + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2021. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,9 +19,11 @@
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.messaging.deadletter.DeadLetterEntry;
import org.axonframework.messaging.deadletter.QueueIdentifier;

import java.time.Instant;
import java.util.Objects;
import java.util.function.Consumer;

/**
* @author Steven van Beelen
Expand All @@ -31,70 +33,70 @@ public class GenericEventDeadLetter implements DeadLetterEntry<EventMessage<?>>

public static final Throwable SEQUENCED_DEAD_LETTER = null;

private final String identifier;
private final String group;
private final QueueIdentifier queueIdentifier;
private final EventMessage<?> deadLetter;
private final Throwable failure;
private final Instant expiresAt;
private final Instant deadLettered;
private final Instant expiresAt;
private final Consumer<GenericEventDeadLetter> releaseOperation;

public GenericEventDeadLetter(String identifier,
String group,
public GenericEventDeadLetter(QueueIdentifier queueIdentifier,
EventMessage<?> deadLetter,
Instant expiresAt) {
this(identifier, group, deadLetter, SEQUENCED_DEAD_LETTER, expiresAt);
Instant expiresAt,
Consumer<GenericEventDeadLetter> releaseOperation) {
this(queueIdentifier, deadLetter, SEQUENCED_DEAD_LETTER, expiresAt, releaseOperation);
}

public GenericEventDeadLetter(String identifier,
String group,
public GenericEventDeadLetter(QueueIdentifier queueIdentifier,
EventMessage<?> deadLetter,
Throwable failure,
Instant expiresAt) {
this(identifier, group, deadLetter, failure, expiresAt, GenericEventMessage.clock.instant());
Instant expiresAt,
Consumer<GenericEventDeadLetter> releaseOperation) {
this(queueIdentifier, deadLetter, failure, GenericEventMessage.clock.instant(), expiresAt, releaseOperation);
}

public GenericEventDeadLetter(String identifier,
String group,
public GenericEventDeadLetter(QueueIdentifier queueIdentifier,
EventMessage<?> deadLetter,
Throwable failure,
Instant deadLettered,
Instant expiresAt,
Instant deadLettered) {
this.identifier = identifier;
this.group = group;
Consumer<GenericEventDeadLetter> releaseOperation) {
this.queueIdentifier = queueIdentifier;
this.deadLetter = deadLetter;
this.failure = failure;
this.expiresAt = expiresAt;
this.deadLettered = deadLettered;
this.expiresAt = expiresAt;
this.releaseOperation = releaseOperation;
}

@Override
public String identifier() {
return identifier;
public QueueIdentifier queueIdentifier() {
return this.queueIdentifier;
}

@Override
public String group() {
return group;
public EventMessage<?> message() {
return this.deadLetter;
}

@Override
public EventMessage<?> message() {
return deadLetter;
public Throwable cause() {
return this.failure;
}

@Override
public Throwable cause() {
return failure;
public Instant deadLettered() {
return this.deadLettered;
}

@Override
public Instant expiresAt() {
return expiresAt;
return this.expiresAt;
}

@Override
public Instant deadLettered() {
return deadLettered;
public void release() {
releaseOperation.accept(this);
}

@Override
Expand All @@ -106,28 +108,26 @@ public boolean equals(Object o) {
return false;
}
GenericEventDeadLetter that = (GenericEventDeadLetter) o;
return Objects.equals(identifier, that.identifier)
&& Objects.equals(group, that.group)
return Objects.equals(queueIdentifier, that.queueIdentifier)
&& Objects.equals(deadLetter, that.deadLetter)
&& Objects.equals(failure, that.failure)
&& Objects.equals(expiresAt, that.expiresAt)
&& Objects.equals(deadLettered, that.deadLettered);
&& Objects.equals(deadLettered, that.deadLettered)
&& Objects.equals(expiresAt, that.expiresAt);
}

@Override
public int hashCode() {
return Objects.hash(identifier, group, deadLetter, failure, expiresAt, deadLettered);
return Objects.hash(queueIdentifier, deadLetter, failure, deadLettered, expiresAt);
}

@Override
public String toString() {
return "GenericEventDeadLetter{" +
"identifier='" + identifier + '\'' +
", group='" + group + '\'' +
"queueIdentifier=" + queueIdentifier +
", deadLetter=" + deadLetter +
", failure=" + failure +
", expiresAt=" + expiresAt +
", deadLettered=" + deadLettered +
", expiresAt=" + expiresAt +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2021. Axon Framework
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,46 +21,68 @@
import java.time.Instant;

/**
* Entry describing a dead-lettered {@link Message}.
* <p>
* The time of storing the {@link #message()} is kept through {@link #deadLettered()}. This letter can be regarded for
* evaluation once the {@link #expiresAt()} time is reached. Upon successful evaluation the entry can be cleared through
* {@link #release()}.
*
* @param <T> The type of {@link Message} represented by this entry.
* @author Steven van Beelen
* @since 4.6.0
*/
public interface DeadLetterEntry<T extends Message<?>> {

/**
* The {@link QueueIdentifier} this dead-letter belongs to.
*
* @return
* @return The {@link QueueIdentifier} this dead-letter belongs to
*/
String identifier();
QueueIdentifier queueIdentifier();

/**
* The {@link Message} of type {@code T} contained in this entry.
*
* @return
* @return The {@link Message} of type {@code T} contained in this entry.
*/
String group();
T message();

/**
* The cause for the {@link #message()} to be dead-lettered.
*
* @return
* @return The cause for the {@link #message()} to be dead-lettered
*/
T message();

Throwable cause();

Instant expiresAt();

/**
* The moment in time when the {@link #message()} was dead-lettered.
*
* @return
* @return The moment in time when the {@link #message()} was dead-lettered.
*/
Instant deadLettered();

/**
* The moment in time when this letter may be evaluated.
*
* @return The moment in time when this letter may be evaluated.
*/
Instant expiresAt();

/**
* Marks this {@link DeadLetterEntry dead-letter} as successfully evaluated. This will remove the entry from its
* queue.
*/
void release();

/**
* Compares two {@link DeadLetterEntry dead-letters} with one another, based on when they {@link #expiresAt()}.
*
* @param one
* @param two
* @return
* @param first The first {@link DeadLetterEntry dead-letter} to compare with.
* @param second The second {@link DeadLetterEntry dead-letter} to compare with.
* @return The result of {@link Instant#compareTo(Instant)} between the {@code first} and {@code second} {@link
* DeadLetterEntry dead-letter}.
*/
static int compare(DeadLetterEntry<?> one, DeadLetterEntry<?> two) {
return one.deadLettered().compareTo(two.deadLettered());
static int compare(DeadLetterEntry<?> first, DeadLetterEntry<?> second) {
return first.expiresAt().compareTo(second.expiresAt());
}
}

0 comments on commit e87e575

Please sign in to comment.