Skip to content

Commit

Permalink
Update DeadLetterQueue approach
Browse files Browse the repository at this point in the history
Adjust the DeadLetterQueue on several perspectives, like:
- Replace 'add' for 'enqueue'
- Enqueue Message instances instead of DeadLetter instances
- Rename sequenceId to identifier to be more generic
- Add group name to allow uniqueness among e.g. processors or aggregates
- Rename DeadLetter to DeadLetterEntry
- Add expiresAt field to DeadLetterEntry, as a handle to trigger
evaluation
- Remove poll() method from DeadLetterQueue
- Return a single DeadLetterEntry from peek()
- Add failed/succeeded methods to the DeadLetterQueue to mark a letter
as successfully or failed evaluation. Failing should allow updating the
cause
- Introduce notion of the queue being full
- Update test suite according to the above

#2021
  • Loading branch information
smcvb committed Dec 3, 2021
1 parent 1bab855 commit 2c1fcfa
Show file tree
Hide file tree
Showing 5 changed files with 421 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,43 @@
* @author Steven van Beelen
* @since 4.6.0
*/
public interface DeadLetter<T extends Message<?>> {
public interface DeadLetterEntry<T extends Message<?>> {

/**
*
* @return
*/
// TODO: 26-11-21 replace for ProcessingIdentifier interface?
String sequenceIdentifier();
String identifier();

/**
*
* @return
*/
Instant deadLettered();
String group();

/**
*
* @return
*/
T deadLetter();
T message();

Throwable cause();

Throwable failure();
Instant expiresAt();

/**
*
* @return
*/
Instant deadLettered();

/**
*
* @param one
* @param two
* @return
*/
static int compare(DeadLetter<?> one, DeadLetter<?> two) {
static int compare(DeadLetterEntry<?> one, DeadLetterEntry<?> two) {
return one.deadLettered().compareTo(two.deadLettered());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,76 +18,90 @@

import org.axonframework.messaging.Message;

import java.util.function.Supplier;
import java.util.stream.Stream;

/**
* @author Steven van Beelen
* @since 4.6.0
*/
public interface DeadLetterQueue<T extends Message<?>> {

/**
* Add a {@link DeadLetter} to this queue. The {@code deadLetter} will be FIFO ordered with all other dead letters
* of the same {@link DeadLetter#sequenceIdentifier()}.
*
* @param deadLetter the {@link DeadLetter} to add to this queue
*/
void add(DeadLetter<T> deadLetter);

/**
* Adds the given {@code deadLetter} if this queue contains the given {@code deadLetter's} {@link
* DeadLetter#sequenceIdentifier()}. If there's no queue for the {@code deadLetter} it is ignored.
* Add a {@link Message} to this queue. The {@code deadLetter} will be FIFO ordered with all other dead letters
* having the same {@code identifier} and {@code group} combination.
*
* @param deadLetter the {@link DeadLetter} to attached in FIFO ordering for the given {@code sequenceIdentifier}
* @return {@code true} if the {@code deadLetter} is added, {@code false} otherwise
* @param identifier the identifier of given {@code deadLetter}
* @param group the group the {@code deadLetter} originates from
* @param deadLetter the {@link Message} to add to this queue
* @param cause the cause for enqueueing the given {@code deadLetter}
*/
default boolean addIfPresent(DeadLetter<T> deadLetter) {
return addIfPresent(deadLetter.sequenceIdentifier(), () -> deadLetter);
}
void enqueue(String identifier, String group, T deadLetter, Throwable cause) throws DeadLetterQueueFilledException;

/**
* Adds the result of the given {@code deadLetterSupplier} if this queue contains the given {@code
* sequenceIdentifier}. If there's no queue for the {@code deadLetter} it is ignored.
*
* @param sequenceIdentifier the identifier used to validate for contained {@link DeadLetter} instances
* @param deadLetterSupplier the {@link Supplier} of the {@link DeadLetter}. Only invoked if the given {@code
* sequenceIdentifier} is contained in this queue
* @return {@code true} if the {@code deadLetter} is added, {@code false} otherwise
* @param identifier the identifier of given {@code message}. Used ot validate if the given {@code message} should
* be enqueued
* @param group the group the {@code deadLetter} originates from. Used ot validate if the given {@code message}
* should be enqueued
* @param message the {@link Message} validated if it should be enqueued
* @return {@code true} if the {@code message} is added, {@code false} otherwise
*/
default boolean addIfPresent(String sequenceIdentifier, Supplier<DeadLetter<T>> deadLetterSupplier) {
boolean canAdd = !isEmpty() && contains(sequenceIdentifier);
if (canAdd) {
add(deadLetterSupplier.get());
default boolean enqueueIfPresent(String identifier, String group, T message) {
if (!isEmpty() && !isFull() && contains(identifier, group)) {
enqueue(identifier, group, message, null);
return true;
}
return canAdd;
return false;
}

/**
* Check whether there's a FIFO ordered queue of {@link DeadLetter} instances with the given {@code
* sequenceIdentifier}.
* Check whether there's a FIFO ordered queue of {@link DeadLetterEntry} instances with the given {@code
* identifier}.
*
* @param sequenceIdentifier the identifier used to validate for contained {@link DeadLetter} instances
* @return {@code true} if the identifiers has {@link DeadLetter} instances in this queue, {@code false} otherwise
* @param identifier the identifier used to validate for contained {@link DeadLetterEntry} instances
* @return {@code true} if the identifiers has {@link DeadLetterEntry} instances in this queue, {@code false}
* otherwise
*/
boolean contains(String sequenceIdentifier);
boolean contains(String identifier, String group);

/**
* Validates whether this queue is empty.
*
* @return {@code true} if this queue does not contain any {@link DeadLetter} instances, {@code false} otherwise
* @return {@code true} if this queue does not contain any {@link DeadLetterEntry} instances, {@code false}
* otherwise
*/
boolean isEmpty();

/**
* Peeks the most recently introduced {@link DeadLetter} instances.
* Validates whether this queue is full. When full, no new messages can be dead lettered.
*
* @return {@code true} if this queue is full, {@code false} otherwise
*/
boolean isFull();

/**
*
* @return
*/
Stream<DeadLetter<T>> peek();
long maxSize();

/**
* Peeks the most recently introduced {@link DeadLetterEntry} instances.
*
* @return
*/
Stream<DeadLetter<T>> poll();
DeadLetterEntry<T> peek();

/**
*
* @param deadLetter
*/
void evaluationSucceeded(DeadLetterEntry<T> deadLetter);

/**
*
* @param deadLetter
* @param cause
*/
void evaluationFailed(DeadLetterEntry<T> deadLetter, Throwable cause);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.messaging.deadletter;

import org.axonframework.common.AxonException;

/**
* @author Steven van Beelen
* @since 4.6.0
*/
public class DeadLetterQueueFilledException extends AxonException {

public DeadLetterQueueFilledException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.messaging.deadletter;

import org.axonframework.messaging.Message;
import org.junit.jupiter.api.*;

import java.time.Instant;

import static org.junit.jupiter.api.Assertions.*;

class DeadLetterEntryTest {

@Test
void testCompareReturnsValueLowerThanZeroIfFirstLetterOccurredBeforeSecond() {
Instant firstLetterTimestamp = Instant.ofEpochMilli(3000);
Instant secondLetterTimestamp = Instant.ofEpochMilli(6000);
TestDeadLetterEntry firstLetter = new TestDeadLetterEntry(firstLetterTimestamp);
TestDeadLetterEntry secondLetter = new TestDeadLetterEntry(secondLetterTimestamp);

assertTrue(DeadLetterEntry.compare(firstLetter, secondLetter) < 0);
}

@Test
void testCompareReturnsValueHigherThanZeroIfFirstLetterOccurredAfterSecond() {
Instant firstLetterTimestamp = Instant.ofEpochMilli(6000);
Instant secondLetterTimestamp = Instant.ofEpochMilli(3000);
TestDeadLetterEntry firstLetter = new TestDeadLetterEntry(firstLetterTimestamp);
TestDeadLetterEntry secondLetter = new TestDeadLetterEntry(secondLetterTimestamp);

assertTrue(DeadLetterEntry.compare(firstLetter, secondLetter) > 0);
}

@Test
void testCompareReturnsZeroIfFirstAndSecondLetterOccurredAtTheSameTime() {
Instant firstLetterTimestamp = Instant.ofEpochMilli(3000);
Instant secondLetterTimestamp = Instant.ofEpochMilli(3000);
TestDeadLetterEntry firstLetter = new TestDeadLetterEntry(firstLetterTimestamp);
TestDeadLetterEntry secondLetter = new TestDeadLetterEntry(secondLetterTimestamp);

assertEquals(0, DeadLetterEntry.compare(firstLetter, secondLetter));
}

private static class TestDeadLetterEntry implements DeadLetterEntry<Message<?>> {

private final Instant deadLettered;

private TestDeadLetterEntry(Instant deadLettered) {
this.deadLettered = deadLettered;
}

@Override
public String identifier() {
return "some-identifier";
}

@Override
public String group() {
return "some-group";
}

@Override
public Message<?> message() {
return null;
}

@Override
public Throwable cause() {
return null;
}

@Override
public Instant expiresAt() {
return null;
}

@Override
public Instant deadLettered() {
return deadLettered;
}
}
}

0 comments on commit 2c1fcfa

Please sign in to comment.