Skip to content

Commit

Permalink
Add rough evaluation task implementation
Browse files Browse the repository at this point in the history
- Add an ExecutorService to start the queue release task
- Make the invoker a Lifecycle implementation
- Set incomplete start/shutdown methods
- Set todos

#2021
  • Loading branch information
smcvb committed Jan 20, 2022
1 parent 15d06fb commit f54551e
Showing 1 changed file with 142 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,25 @@
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.lifecycle.Phase;
import org.axonframework.messaging.deadletter.DeadLetterEntry;
import org.axonframework.messaging.deadletter.DeadLetterQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static org.axonframework.common.BuilderUtils.assertNonEmpty;
import static org.axonframework.common.BuilderUtils.assertNonNull;
Expand All @@ -39,13 +51,16 @@
* @author Steven van Beelen
* @since 4.6.0
*/
public class DeadLetteringEventHandlerInvoker extends SimpleEventHandlerInvoker {
public class DeadLetteringEventHandlerInvoker extends SimpleEventHandlerInvoker implements Lifecycle {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final DeadLetterQueue<EventMessage<?>> queue;
private final String processingGroup;
private final boolean allowReset;
private final ScheduledExecutorService executorService;

private final AtomicBoolean running = new AtomicBoolean(false);

/**
* Instantiate a dead-lettering {@link EventHandlerInvoker} based on the given {@link Builder builder}. Uses a
Expand All @@ -58,17 +73,52 @@ protected DeadLetteringEventHandlerInvoker(Builder builder) {
this.queue = builder.queue;
this.processingGroup = builder.processingGroup;
this.allowReset = builder.allowReset;
this.executorService = builder.executorService;
}

/**
* Instantiate a builder to construct a {@link DeadLetteringEventHandlerInvoker}.
* <p>
* TODO add requirements and defaults
*
* @return A builder that can construct a {@link DeadLetteringEventHandlerInvoker}.
*/
public static Builder builder() {
return new Builder();
}

@Override
public void registerLifecycleHandlers(LifecycleRegistry lifecycle) {
lifecycle.onStart(Phase.INBOUND_EVENT_CONNECTORS + 1, this::start);
lifecycle.onShutdown(Phase.INBOUND_EVENT_CONNECTORS + 1, this::shutdown);
}

/**
* Starts the {@link ScheduledExecutorService} used by this invoker for evaluating dead-lettered events.
*/
public void start() {
// TODO: 03-12-21 replace for trigger mechanism instead of fixed delays
// TODO: 03-12-21 or, do this as a follow up?

// TODO: 06-12-21 introduce a 'release' mechanism for the queue as a solution for triggering evaluation
executorService.scheduleWithFixedDelay(
new EvaluationTask(super::invokeHandlers/*, add predicate*/), 300, 300, TimeUnit.SECONDS
);
}

/**
* Shuts down the {@link ScheduledExecutorService} used by this invoker for evaluating dead-lettered events in a
* friendly manner.
*
* @return A future that completes once this invoker's {@link ScheduledExecutorService} is properly shut down.
*/
public CompletableFuture<Void> shutdown() {
// TODO: 03-12-21 add state to stop the task
// TODO: 17-01-22 should this invoker stop if it's event processors is stopped?
executorService.shutdown();
return CompletableFuture.completedFuture(null);
}

@Override
public void handle(EventMessage<?> message, Segment segment) throws Exception {
if (!super.sequencingPolicyMatchesSegment(message, segment)) {
Expand Down Expand Up @@ -115,13 +165,16 @@ public <R> void performReset(R resetContext) {
}

/**
*
* Builder class to instantiate a {@link DeadLetteringEventHandlerInvoker}.
* <p>
* TODO add requirements and defaults
*/
public static class Builder extends SimpleEventHandlerInvoker.Builder<Builder> {

private DeadLetterQueue<EventMessage<?>> queue;
private String processingGroup;
private boolean allowReset = false;
private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

/**
* Sets the {@link DeadLetterQueue} this {@link EventHandlerInvoker} maintains dead-letters with.
Expand Down Expand Up @@ -161,6 +214,20 @@ public Builder allowReset(boolean allowReset) {
return this;
}

/**
* Sets the {@link ScheduledExecutorService} this invoker uses to evaluated dead-letters. Defaults to a {@link
* Executors#newSingleThreadScheduledExecutor()}.
*
* @param executorService The scheduled executor used to evaluate dead-letters from the {@link
* DeadLetterQueue}.
* @return The current Builder instance, for fluent interfacing.
*/
public Builder executorService(ScheduledExecutorService executorService) {
assertNonNull(executorService, "The ScheduledExecutorService may not be null");
this.executorService = executorService;
return this;
}

/**
* Initializes a {@link DeadLetteringEventHandlerInvoker} as specified through this Builder.
*
Expand All @@ -181,4 +248,77 @@ protected void validate() {
assertNonEmpty(processingGroup, "The processing group is a hard requirement and should be provided");
}
}

/**
*
*/
private class EvaluationTask implements Runnable {

private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final HandlerInvoker handle;

private EvaluationTask(HandlerInvoker handle) {
this.handle = handle;
}

@Override
public void run() {
Instant now = GenericEventMessage.clock.instant();

boolean done = false;
while (!done) {
Optional<DeadLetterEntry<EventMessage<?>>> optionalLetter = queue.peek(processingGroup);
if (!optionalLetter.isPresent()) {
logger.debug("Ending the evaluation task as there are no dead-letters for queue [{}] present.",
processingGroup);
done = true;
continue;
}

DeadLetterEntry<EventMessage<?>> letter = optionalLetter.get();
if (letter.expiresAt().isAfter(now)) {
logger.debug("Ending the evaluation task as there are no expired dead-letters for queue [{}].",
processingGroup);
done = true;
continue;
}

try {
// TODO: 03-12-21 This will require us to store the segment with the entry too
//delegate.handle(letter.message(), letter.queueIdentifier().segment());
// TODO: 17-01-22 Do we care about the Segment at this stage?
// A different thread, separate from regular event handling, is performing this task any how.
// From that end, I anticipate it to be okay if anyone just picks it up, regardless of the Segment.
handle.invokerHandlers(letter.message());
letter.release();
logger.info("Dead-letter [{}] is released as it is successfully handled for processing group [{}].",
letter.message().getIdentifier(), processingGroup);
} catch (Exception e) {
logger.info("Failed handling dead-letter [{}] for processing group [{}].",
letter.message().getIdentifier(), processingGroup);
done = true;
}
}
}
}

/**
* A wrapper around a {@link Consumer} of the {@link EventMessage} to pass to the {@link EvaluationTask}. This
* solutions allows rethrowing any exceptions thrown by the event handlers.
*/
@FunctionalInterface
private interface HandlerInvoker extends Consumer<EventMessage<?>> {

void invokerHandlers(EventMessage<?> event) throws Exception;

@Override
default void accept(EventMessage<?> eventMessage) {
try {
invokerHandlers(eventMessage);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}

0 comments on commit f54551e

Please sign in to comment.