Skip to content

Commit

Permalink
Implement letter evaluation change and start/shutdown solution (WIP)
Browse files Browse the repository at this point in the history
Introduce a dedicated letter evaluation trigger. This trigger should
be able to run the evaluation task from the
DeadLetteringEventHandlerInvoker. Also, it should be "triggerable" on
various scenarios, like manual, fixed delay and on enqueue count. For it
 to be able to run the evaluation task it'll likely require moving the
 Executor to the "evaluator" implementation(s) in favor of the
 DeadLetteringEventHandlerInvoker. With this move, the start/shutdown
 process of the invoker should change to accommodate correct
 start/shutdown of the evaluation tasks.

#2021
  • Loading branch information
smcvb committed Jan 24, 2022
1 parent 4d2f353 commit df021a4
Show file tree
Hide file tree
Showing 6 changed files with 520 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,30 @@
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.lifecycle.Phase;
import org.axonframework.messaging.deadletter.DeadLetterEntry;
import org.axonframework.messaging.deadletter.DeadLetterEvaluator;
import org.axonframework.messaging.deadletter.DeadLetterQueue;
import org.axonframework.messaging.deadletter.FixedDelayLetterEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import static org.axonframework.common.BuilderUtils.assertNonEmpty;
import static org.axonframework.common.BuilderUtils.assertNonNull;

/**
* Implementation of the {@link SimpleEventHandlerInvoker} utilizing a {@link DeadLetterQueue} to enqueue {@link
* EventMessage} for which event handling failed. This dead-lettering {@link EventHandlerInvoker} takes into account
* that events part of the same sequence (as according to the {@link org.axonframework.eventhandling.async.SequencingPolicy})
* should be enqueued in order too.
* EventMessage} for which event handling failed.
* <p>
* This dead-lettering {@link EventHandlerInvoker} takes into account that events part of the same sequence (as
* according to the {@link org.axonframework.eventhandling.async.SequencingPolicy}) should be enqueued in order.
* TODO - add a bit about the evaluation process...
*
* @author Steven van Beelen
* @since 4.6.0
Expand All @@ -58,9 +60,8 @@ public class DeadLetteringEventHandlerInvoker extends SimpleEventHandlerInvoker
private final DeadLetterQueue<EventMessage<?>> queue;
private final String processingGroup;
private final boolean allowReset;
private final ScheduledExecutorService executorService;

private final AtomicBoolean running = new AtomicBoolean(false);
private final DeadLetterEvaluator letterEvaluator;
private final AtomicReference<RunState> runState;

/**
* Instantiate a dead-lettering {@link EventHandlerInvoker} based on the given {@link Builder builder}. Uses a
Expand All @@ -73,7 +74,8 @@ protected DeadLetteringEventHandlerInvoker(Builder builder) {
this.queue = builder.queue;
this.processingGroup = builder.processingGroup;
this.allowReset = builder.allowReset;
this.executorService = builder.executorService;
this.letterEvaluator = builder.letterEvaluator;
this.runState = new AtomicReference<>(RunState.initial(letterEvaluator::shutdown));
}

/**
Expand All @@ -97,13 +99,13 @@ public void registerLifecycleHandlers(LifecycleRegistry lifecycle) {
* Starts the {@link ScheduledExecutorService} used by this invoker for evaluating dead-lettered events.
*/
public void start() {
// TODO: 03-12-21 replace for trigger mechanism instead of fixed delays
// TODO: 03-12-21 or, do this as a follow up?

// TODO: 06-12-21 introduce a 'release' mechanism for the queue as a solution for triggering evaluation
executorService.scheduleWithFixedDelay(
new EvaluationTask(super::invokeHandlers/*, add predicate*/), 300, 300, TimeUnit.SECONDS
);
RunState currentState = this.runState.updateAndGet(RunState::start);
if (currentState.isRunning()) {
letterEvaluator.start(() -> new EvaluationTask(super::invokeHandlers/*, add predicate*/));
} else {
// TODO: 21-01-22 fine tune exception
throw new IllegalStateException("Starting this invoker is blocked somehow");
}
}

/**
Expand All @@ -113,10 +115,8 @@ public void start() {
* @return A future that completes once this invoker's {@link ScheduledExecutorService} is properly shut down.
*/
public CompletableFuture<Void> shutdown() {
// TODO: 03-12-21 add state to stop the task
// TODO: 17-01-22 should this invoker stop if it's event processors is stopped?
executorService.shutdown();
return CompletableFuture.completedFuture(null);
return runState.updateAndGet(RunState::attemptStop)
.shutdownHandle();
}

@Override
Expand All @@ -128,7 +128,9 @@ public void handle(EventMessage<?> message, Segment segment) throws Exception {

Object sequenceId = super.sequenceIdentifier(message);
EventHandlingQueueIdentifier identifier = new EventHandlingQueueIdentifier(sequenceId, processingGroup);
if (queue.enqueueIfPresent(identifier, message).isPresent()) {
Optional<DeadLetterEntry<EventMessage<?>>> optionalEntry = queue.enqueueIfPresent(identifier, message);
if (optionalEntry.isPresent()) {
letterEvaluator.enqueued(optionalEntry.get());
logger.info("Event [{}] is added to the dead-letter queue since its queue id [{}] was already present.",
message, identifier.combinedIdentifier());
} else {
Expand All @@ -143,7 +145,8 @@ public void handle(EventMessage<?> message, Segment segment) throws Exception {
// TODO: 14-01-22 We could (1) move the errorHandler invocation to a protected method to override,
// ensuring enqueue is invoked at all times with a try-catch block
// or (2) enforce a PropagatingErrorHandler at all times or (3) do nothing.
queue.enqueue(identifier, message, e);
DeadLetterEntry<EventMessage<?>> letter = queue.enqueue(identifier, message, e);
letterEvaluator.enqueued(letter);
}
}
}
Expand Down Expand Up @@ -174,7 +177,8 @@ public static class Builder extends SimpleEventHandlerInvoker.Builder<Builder> {
private DeadLetterQueue<EventMessage<?>> queue;
private String processingGroup;
private boolean allowReset = false;
private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
// TODO: 24-01-22 should we even do a default?
private DeadLetterEvaluator letterEvaluator = FixedDelayLetterEvaluator.defaultEvaluator();

/**
* Sets the {@link DeadLetterQueue} this {@link EventHandlerInvoker} maintains dead-letters with.
Expand Down Expand Up @@ -215,16 +219,12 @@ public Builder allowReset(boolean allowReset) {
}

/**
* Sets the {@link ScheduledExecutorService} this invoker uses to evaluated dead-letters. Defaults to a {@link
* Executors#newSingleThreadScheduledExecutor()}.
*
* @param executorService The scheduled executor used to evaluate dead-letters from the {@link
* DeadLetterQueue}.
* @param letterEvaluator
* @return The current Builder instance, for fluent interfacing.
*/
public Builder executorService(ScheduledExecutorService executorService) {
assertNonNull(executorService, "The ScheduledExecutorService may not be null");
this.executorService = executorService;
public Builder letterEvaluator(DeadLetterEvaluator letterEvaluator) {
assertNonNull(letterEvaluator, "The DeadLetterEvaluator may not be null");
this.letterEvaluator = letterEvaluator;
return this;
}

Expand Down Expand Up @@ -259,15 +259,26 @@ private class EvaluationTask implements Runnable {
private final HandlerInvoker handle;

private EvaluationTask(HandlerInvoker handle) {
// TODO: 06-12-21 introduce a 'release' mechanism for the queue/task as a solution for triggering actual evaluation
this.handle = handle;
}

@Override
public void run() {
Instant now = GenericEventMessage.clock.instant();
// TODO: 24-01-22 correctly remove the running task boolean when done
if (runState.updateAndGet(RunState::attemptTaskStart).hasRunningTask()) {
logger.debug("This runnable is stopped since an Evaluation Task is already running.");
return;
}

Instant now = GenericEventMessage.clock.instant();
boolean done = false;
while (!done) {
if (!runState.get().isRunning()) {
runState.get().shutdownHandle().complete(null);
return;
}

Optional<DeadLetterEntry<EventMessage<?>>> optionalLetter = queue.peek(processingGroup);
if (!optionalLetter.isPresent()) {
logger.debug("Ending the evaluation task as there are no dead-letters for queue [{}] present.",
Expand Down Expand Up @@ -321,4 +332,68 @@ default void accept(EventMessage<?> eventMessage) {
}
}
}

/**
* Run state container for this invoker. Maintains whether this invoker {@link #isRunning()}, has a {@link
* #taskRunning}
*/
private static class RunState {

private final boolean isRunning;
private final boolean taskRunning;
private final CompletableFuture<Void> shutdownHandle;
private final Runnable shutdownAction;

private RunState(boolean isRunning,
boolean taskRunning,
CompletableFuture<Void> shutdownHandle,
Runnable shutdownAction) {
this.isRunning = isRunning;
this.taskRunning = taskRunning;
this.shutdownHandle = shutdownHandle;
this.shutdownAction = shutdownAction;
}

public static RunState initial(Runnable shutdownAction) {
return new RunState(false, false, CompletableFuture.completedFuture(null), shutdownAction);
}

public RunState start() {
return new RunState(true, taskRunning, null, shutdownAction);
}

public RunState attemptTaskStart() {
// Starts a task if the invoker is running and no task is active. Otherwise, return the state as is.
return isRunning && !taskRunning
? new RunState(true, true, shutdownHandle, shutdownAction)
: this;
}

public RunState attemptStop() {
if (!isRunning || shutdownHandle != null) {
// It's already stopped
return this;
} else if (taskRunning) {
// A task is active, so we wait for it to complete the shutdown handle
CompletableFuture<Void> handle = new CompletableFuture<>();
handle.whenComplete((r, e) -> shutdownAction.run());
return new RunState(false, false, handle, shutdownAction);
} else {
// There's no task active, so we can immediately complete the future and invoke the shutdown action.
return new RunState(false, false, CompletableFuture.runAsync(shutdownAction), shutdownAction);
}
}

public boolean isRunning() {
return isRunning;
}

public boolean hasRunningTask() {
return taskRunning;
}

public CompletableFuture<Void> shutdownHandle() {
return shutdownHandle;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.messaging.deadletter;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

import static org.axonframework.common.BuilderUtils.assertNonNull;

/**
* TODO JavaDoc
* @author Steven van Beelen
* @since 4.6.0
*/
public abstract class AbstractDeadLetterEvaluator implements DeadLetterEvaluator {

protected final ScheduledExecutorService executor;
private final boolean customExecutorService;

protected Supplier<Runnable> taskBuilder;

/**
*
* @param builder
*/
public AbstractDeadLetterEvaluator(Builder<?> builder) {
builder.validate();
this.executor = builder.scheduledExecutorService;
this.customExecutorService = builder.customExecutorService;
}

@Override
public void start(Supplier<Runnable> evaluationTaskBuilder) {
this.taskBuilder = evaluationTaskBuilder;
}

@Override
public CompletableFuture<Void> shutdown() {
// If the Executor is customized we can expect the user to shut it down properly.
return customExecutorService
? CompletableFuture.completedFuture(null)
: CompletableFuture.runAsync(executor::shutdown);
}

/**
*
* @param <B>
*/
public static class Builder<B extends Builder<?>> {

private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private boolean customExecutorService = false;

/**
* Sets the {@link ScheduledExecutorService} this {@link DeadLetterEvaluator evaluator} uses to evaluate
* dead-letters. Defaults to a {@link Executors#newSingleThreadScheduledExecutor()}.
*
* @param scheduledExecutorService The scheduled executor used to evaluate dead-letters from a {@link
* DeadLetterQueue}.
* @return The current Builder instance, for fluent interfacing.
*/
public B scheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
assertNonNull(scheduledExecutorService, "The ScheduledExecutorService may not be null");
this.scheduledExecutorService = scheduledExecutorService;
this.customExecutorService = true;
//noinspection unchecked
return (B) this;
}

public void validate() {
// Method kept for overriding
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.messaging.deadletter;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

// TODO: 24-01-22 the only thing making this dead letter specific is the enqueued method...is that enough?
/**
* TODO JavaDoc
* @author Steven van Beelen
* @since 4.6.0
*/
public interface DeadLetterEvaluator {

/**
* TODO: 21-01-22 original intent to construct new tasks, is to potentially allow parallel tasks
* TODO: 24-01-22 replace the Runnable for an interface?
* @param evaluationTaskBuilder
*/
void start(Supplier<Runnable> evaluationTaskBuilder);

/**
*
* @return
*/
CompletableFuture<Void> shutdown();

/**
*
* @param letter
*/
void enqueued(DeadLetterEntry<?> letter);
}

0 comments on commit df021a4

Please sign in to comment.