Skip to content

Commit

Permalink
Release and onAvailable implementation
Browse files Browse the repository at this point in the history
Instead of the DeadLetterEvaluator, the DeadLetterQueue should be able
to invoke callbacks/runnables once entries are ready to be taken. Or,
when they are released. To that end, callbacks should be registered with
 the DeadLetterQueue. These callbacks are scheduled for invocation
 whenever an entry matching the provided group expires through the
 expireThreshold. These callbacks are also invoked upon invocation of
 release. Release should be invokable with a predicate, filtering out
 entries that should be released. Releasing means that the expireAt time
 is set to now, thus making them available for "the taking." The
 InMemoryDeadLetterQueue implementation should have a configurable
 ScheduledExecutorService, and allow for updating the expireAt time on
 release invocation.

#2021
  • Loading branch information
smcvb committed Feb 1, 2022
1 parent 833b423 commit 4e14870
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public interface DeadLetterEntry<T extends Message<?>> {
Instant deadLettered();

/**
* The moment in time when this letter may be evaluated.
* The moment in time when this letter may be evaluated again. Will equal the {@link #deadLettered()} value if this
* entry is enqueued as part of a sequence.
*
* @return The moment in time when this letter may be evaluated.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@
* several FIFO-ordered queues.
* <p>
* The contained queues are uniquely identifiable through the {@link QueueIdentifier}. Dead-letters are kept in the form
* of a {@link DeadLetterEntry DeadLetterEntries}. When retrieving letters through {@link #peek(String)} for evaluation,
* they can be removed with {@link DeadLetterEntry#acknowledge()} to clear them from this queue.
* of a {@link DeadLetterEntry DeadLetterEntries}. When retrieving letters through {@link #take(String)} for evaluation,
* they can be removed with {@link DeadLetterEntry#acknowledge()} or {@link DeadLetterEntry#evict()} to clear them from
* this queue.
* <p>
* A callback can be configured through {@link #onAvailable(String, Runnable)} that is automatically invoked when
* dead-letters are released and thus ready to be taken. Entries may be released earlier by invoking {@link
* #release(Predicate)}.
*
* @param <T> An implementation of {@link Message} that represent the dead-letter.
* @author Steven van Beelen
Expand Down Expand Up @@ -135,18 +140,52 @@ default Optional<DeadLetterEntry<T>> enqueueIfPresent(QueueIdentifier identifier
long maxQueueSize();

/**
* Peeks the oldest {@link DeadLetterEntry} from this dead-letter queue for the given {@code group}.
* Take the oldest {@link DeadLetterEntry} from this dead-letter queue for the given {@code group} that is ready to
* be released. Entries can be made available earlier through {@link #release(Predicate)} when necessary.
* <p>
* Upon peeking, the returned {@link DeadLetterEntry dead-letter} is automatically reentered into the queue with an
* updated {@link DeadLetterEntry#expiresAt()}. Doing so guards the queue against concurrent peek operations
* accidentally retrieving (and thus handling) the same letter.
* Upon taking, the returned {@link DeadLetterEntry dead-letter} is kept in the queue with an updated {@link
* DeadLetterEntry#expiresAt()} and {@link DeadLetterEntry#numberOfRetries()}. Doing so guards the queue against
* concurrent take operations accidentally retrieving (and thus handling) the same letter.
* <p>
* Will return an {@link Optional#empty()} if there are no entries present for the given {@code group}.
* Will return an {@link Optional#empty()} if there are no entries ready to be released or present for the given
* {@code group}.
*
* @param group The group descriptor of a {@link QueueIdentifier} to peek an entry for.
* @return The oldest {@link DeadLetterEntry} belonging to the given {@code group} from this dead-letter queue.
* @see #release(Predicate)
*/
Optional<DeadLetterEntry<T>> take(String group);

/**
* Release all {@link DeadLetterEntry dead-letters} within this queue that match the given {@code entryFilter}.
* <p>
* This makes the matching letters ready to be {@link #take(String) taken}. Furthermore, it signals any matching
* (based on the {@code group} name) callbacks registered through {@link #onAvailable(String, Runnable)}.
*
* @param entryFilter A lambda selecting the entries within this queue to be released.
*/
void release(Predicate<DeadLetterEntry<T>> entryFilter);

/**
* Release all {@link DeadLetterEntry dead-letters} within this queue.
* <p>
* This makes the letters ready to be {@link #take(String) taken}. Furthermore, it signals any callbacks registered
* through {@link #onAvailable(String, Runnable)}.
*/
default void release() {
release(entry -> true);
}

/**
* Set the given {@code callback} for the given {@code group} to be invoked when {@link DeadLetterEntry
* dead-letters} are ready to be {@link #take(String) taken} from the queue. Dead-letters may be released earlier
* through {@link #release(Predicate)} to automatically trigger the {@code callback} if the {@code group} matches.
*
* @param group The group descriptor of a {@link QueueIdentifier} to register a {@code callback} for.
* @param callback The operation to run whenever {@link DeadLetterEntry dead-letters} are released and ready to be
* taken.
*/
Optional<DeadLetterEntry<T>> peek(String group);
void onAvailable(String group, Runnable callback);

/**
* Clears out all {@link DeadLetterEntry dead-letters} matching the given {@link Predicate queueFilter}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,35 @@
package org.axonframework.messaging.deadletter;

import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.messaging.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.axonframework.common.BuilderUtils.assertStrictPositive;
import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.common.BuilderUtils.assertThat;

/**
Expand All @@ -57,11 +70,13 @@ public class InMemoryDeadLetterQueue<T extends Message<?>> implements DeadLetter
*/
public static Clock clock = Clock.systemUTC();

private final ConcurrentNavigableMap<QueueIdentifier, Deque<DeadLetterEntry<T>>> deadLetters = new ConcurrentSkipListMap<>();
private final Map<QueueIdentifier, Deque<DeadLetterEntry<T>>> deadLetters = new ConcurrentSkipListMap<>();
private final Map<String, Runnable> availabilityCallbacks = new ConcurrentSkipListMap<>();

private final int maxQueues;
private final int maxQueueSize;
private final long expireThreshold;
private final Duration expireThreshold;
private final ScheduledExecutorService scheduledExecutorService;

/**
* Instantiate an in-memory {@link DeadLetterQueue} based on the given {@link Builder builder}.
Expand All @@ -73,13 +88,16 @@ protected InMemoryDeadLetterQueue(Builder<T> builder) {
this.maxQueues = builder.maxQueues;
this.maxQueueSize = builder.maxQueueSize;
this.expireThreshold = builder.expireThreshold;
this.scheduledExecutorService = builder.scheduledExecutorService;
}

/**
* Instantiate a builder to construct an {@link InMemoryDeadLetterQueue}.
* <p>
* The maximum number of queues defaults to {@code 1024}, the maximum amount of dead letters inside a queue defaults
* to {@code 1024}, and the dead letter expire threshold defaults to {@code 5000} milliseconds.
* to {@code 1024}, the dead letter expire threshold defaults to a {@link Duration} of 5000 milliseconds, and the {@link
* ScheduledExecutorService} defaults to a {@link Executors#newSingleThreadScheduledExecutor(ThreadFactory)}, using
* an {@link AxonThreadFactory}.
*
* @param <T> The type of {@link Message} maintained in this {@link DeadLetterQueue}.
* @return A Builder that can construct an {@link InMemoryDeadLetterQueue}.
Expand All @@ -92,7 +110,9 @@ public static <T extends Message<?>> Builder<T> builder() {
* Construct a default {@link InMemoryDeadLetterQueue}.
* <p>
* The maximum number of queues defaults to {@code 1024}, the maximum amount of dead letters inside a queue defaults
* to {@code 1024}, and the dead letter expire threshold defaults to {@code 5000} milliseconds.
* to {@code 1024}, the dead letter expire threshold defaults to a {@link Duration} of 5000 milliseconds, and the {@link
* ScheduledExecutorService} defaults to a {@link Executors#newSingleThreadScheduledExecutor(ThreadFactory)}, using
* an {@link AxonThreadFactory}.
*
* @return A default {@link InMemoryDeadLetterQueue}.
*/
Expand All @@ -112,19 +132,27 @@ public DeadLetterEntry<T> enqueue(QueueIdentifier identifier,
cause
);
}
logger.debug("Adding dead letter [{}] because [{}].", deadLetter, cause);

if (cause != null) {
logger.debug("Adding dead letter [{}] because [{}].", deadLetter, cause);
} else {
logger.debug("Adding dead letter [{}] because the queue identifier [{}] is already present.",
deadLetter, identifier);
}

DeadLetterEntry<T> entry = buildEntry(identifier, deadLetter, cause);

deadLetters.computeIfAbsent(identifier, id -> new ConcurrentLinkedDeque<>())
.add(entry);

scheduleAvailabilityCallbacks(identifier);

return entry;
}

private DeadLetterEntry<T> buildEntry(QueueIdentifier identifier, T deadLetter, Throwable cause) {
Instant deadLettered = clock.instant();
Instant expiresAt = deadLettered.plusMillis(expireThreshold);
Instant expiresAt = cause == null ? deadLettered : deadLettered.plus(expireThreshold);
return new GenericDeadLetterMessage(identifier,
deadLetter,
cause,
Expand All @@ -142,6 +170,16 @@ private void releaseOperation(DeadLetterEntry<T> deadLetter) {
}
}

private void scheduleAvailabilityCallbacks(QueueIdentifier identifier) {
availabilityCallbacks.entrySet()
.stream()
.filter(callbackEntry -> callbackEntry.getKey().equals(identifier.group()))
.map(Map.Entry::getValue)
.forEach(callback -> scheduledExecutorService.schedule(
callback, expireThreshold.toMillis(), TimeUnit.MILLISECONDS
));
}

@Override
public boolean contains(QueueIdentifier identifier) {
logger.debug("Validating existence of sequence identifier [{}].", identifier.combinedIdentifier());
Expand All @@ -155,7 +193,7 @@ public boolean isEmpty() {

@Override
public boolean isFull(QueueIdentifier queueIdentifier) {
return maximumNumberOfQueuesReached(queueIdentifier) || maximumQueueSizeReached(queueIdentifier);
return maximumNumberOfQueuesReached(queueIdentifier) || maximumQueueSizeReached(queueIdentifier);
}

private boolean maximumNumberOfQueuesReached(QueueIdentifier queueIdentifier) {
Expand All @@ -177,7 +215,7 @@ public long maxQueueSize() {
}

@Override
public Optional<DeadLetterEntry<T>> peek(String group) {
public synchronized Optional<DeadLetterEntry<T>> take(String group) {
if (deadLetters.isEmpty()) {
logger.debug("Queue is empty while peeking. Returning an empty optional.");
return Optional.empty();
Expand Down Expand Up @@ -224,13 +262,42 @@ private DeadLetterEntry<T> peekEarliestLetter(List<QueueIdentifier> queueIds) {

private void updateAndReinsert(DeadLetterEntry<T> letter) {
// Reinsert the entry with an updated expireAt. This solves concurrent access of the same letter.
Instant newExpiresAt = clock.instant().plusMillis(expireThreshold);
// And ensures letters that have reached the top of the queue are considered at the top.
Instant newExpiresAt = clock.instant().plus(expireThreshold);
DeadLetterEntry<T> updatedLetter = new GenericDeadLetterMessage(letter, newExpiresAt, this::releaseOperation);
Queue<DeadLetterEntry<T>> queue = deadLetters.get(updatedLetter.queueIdentifier());
Deque<DeadLetterEntry<T>> queue = deadLetters.get(updatedLetter.queueIdentifier());
queue.remove(letter);
queue.add(updatedLetter);
}

@Override
public void release(Predicate<DeadLetterEntry<T>> entryFilter) {
Instant expiresAt = clock.instant();
Set<String> releasedGroups = new HashSet<>();
logger.debug("Received a request to release matching dead-letters for evaluation.");

deadLetters.values()
.stream()
.flatMap(Collection::stream)
.filter(entryFilter)
.map(entry -> (GenericDeadLetterMessage) entry)
.forEach(entry -> {
entry.setExpiresAt(expiresAt);
releasedGroups.add(entry.queueIdentifier().group());
});
// todo task should be repeated as long as there are entries
releasedGroups.stream()
.map(availabilityCallbacks::get)
.forEach(scheduledExecutorService::submit);
}

@Override
public void onAvailable(String group, Runnable callback) {
if (availabilityCallbacks.put(group, callback) != null) {
logger.info("Replaced the availability callback for group [{}].", group);
}
}

@Override
public void clear(Predicate<QueueIdentifier> queueFilter) {
List<QueueIdentifier> queuesToClear = deadLetters.keySet()
Expand All @@ -249,15 +316,20 @@ public void clear(Predicate<QueueIdentifier> queueFilter) {
* Builder class to instantiate an {@link InMemoryDeadLetterQueue}.
* <p>
* The maximum number of queues defaults to {@code 1024}, the maximum amount of dead letters inside a queue defaults
* to {@code 1024}, and the dead letter expire threshold defaults to {@code 5000} milliseconds.
* to {@code 1024}, the dead letter expire threshold defaults to a {@link Duration} of 5000 milliseconds, and the {@link
* ScheduledExecutorService} defaults to a {@link Executors#newSingleThreadScheduledExecutor(ThreadFactory)}, using
* an {@link AxonThreadFactory}.
*
* @param <T> The type of {@link Message} maintained in this {@link DeadLetterQueue}.
*/
public static class Builder<T extends Message<?>> {

private int maxQueues = 1024;
private int maxQueueSize = 1024;
private long expireThreshold = 5000;
// TODO: 01-02-22 should we increase this value to something higher to not trigger to many checks?
private Duration expireThreshold = Duration.ofMillis(5000);
private ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new AxonThreadFactory("InMemoryDeadLetterQueue"));

/**
* Sets the maximum number of queues this {@link DeadLetterQueue} may contain. This requirement reflects itself
Expand Down Expand Up @@ -296,20 +368,38 @@ public Builder<T> maxQueueSize(int maxQueueSize) {

/**
* Sets the threshold when newly {@link #enqueue(QueueIdentifier, Message, Throwable) enqueued} letters are
* considered expired.
* considered ready to be {@link #take(String) taken}.
* <p>
* The earlier the {@link DeadLetterEntry#expiresAt()} date, the earlier it is returned from {@link
* #peek(String)}. Defaults to {@code 5000} milliseconds.
* The provided threshold is also used to schedule {@link #onAvailable(String, Runnable) configured availability
* checks}. Defaults to a {@link Duration} of 5000 milliseconds.
*
* @param expireThreshold The threshold for enqueued {@link DeadLetterEntry letters} to be considered expired.
* @param expireThreshold The threshold for enqueued {@link DeadLetterEntry letters} to be considered ready to
* be {@link #take(String) taken}.
* @return The current Builder, for fluent interfacing.
*/
public Builder<T> expireThreshold(long expireThreshold) {
assertStrictPositive(expireThreshold, "the expire threshold should be strictly positive");
public Builder<T> expireThreshold(Duration expireThreshold) {
assertThat(expireThreshold,
threshold -> threshold != null && (!threshold.isZero() || !threshold.isNegative()),
"The expire threshold should be strictly positive");
this.expireThreshold = expireThreshold;
return this;
}

/**
* Sets the {@link ScheduledExecutorService} this queue uses to invoke {@link #onAvailable(String, Runnable)
* configured availability callbacks}. Defaults to a {@link Executors#newSingleThreadScheduledExecutor(ThreadFactory)},
* using an {@link AxonThreadFactory}.
*
* @param scheduledExecutorService The {@link ScheduledExecutorService} this queue uses to invoke {@link
* #onAvailable(String, Runnable) configured availability callbacks}.
* @return The current Builder, for fluent interfacing.
*/
public Builder<T> scheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
assertNonNull(scheduledExecutorService, "The ScheduledExecutorService should be non null");
this.scheduledExecutorService = scheduledExecutorService;
return this;
}

/**
* Initializes a {@link InMemoryDeadLetterQueue} as specified through this Builder.
*
Expand Down Expand Up @@ -338,7 +428,7 @@ private class GenericDeadLetterMessage implements DeadLetterEntry<T> {
private final QueueIdentifier queueIdentifier;
private final T message;
private final Throwable cause;
private final Instant expiresAt;
private Instant expiresAt;
private final int numberOfRetries;
private final Instant deadLettered;
private final Consumer<GenericDeadLetterMessage> releaseOperation;
Expand Down Expand Up @@ -405,6 +495,10 @@ public Instant expiresAt() {
return expiresAt;
}

public void setExpiresAt(Instant expiresAt) {
this.expiresAt = expiresAt;
}

@Override
public int numberOfRetries() {
return numberOfRetries;
Expand Down

0 comments on commit 4e14870

Please sign in to comment.