Skip to content

Commit

Permalink
Implement reset/clear support
Browse files Browse the repository at this point in the history
Implement reset/clear support

#2021
  • Loading branch information
smcvb committed Jan 11, 2022
1 parent ddbc606 commit 03ee261
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,26 @@ public class DeadLetteringEventHandlerInvoker implements EventHandlerInvoker {
private final EventHandlerInvoker delegate;
private final DeadLetterQueue<EventMessage<?>> queue;
private final String processingGroup;
private final boolean allowReset;

/**
* @param builder
* Instantiate a dead-lettering {@link EventHandlerInvoker} based on the given {@link Builder builder}. Uses a
* {@link DeadLetterQueue} to maintain and retrieve dead-letters from.
*
* @param builder The {@link Builder} used to instantiate a {@link DeadLetteringEventHandlerInvoker} instance.
*/
protected DeadLetteringEventHandlerInvoker(Builder builder) {
builder.validate();
this.delegate = builder.delegate;
this.queue = builder.queue;
this.processingGroup = builder.processingGroup;
this.allowReset = builder.allowReset;
}

/**
* @return
* Instantiate a builder to construct a {@link DeadLetteringEventHandlerInvoker}.
*
* @return A builder that can consturct a {@link DeadLetteringEventHandlerInvoker}.
*/
public static Builder builder() {
return new Builder();
Expand Down Expand Up @@ -98,11 +105,17 @@ public boolean supportsReset() {

@Override
public void performReset() {
if (allowReset) {
queue.clear(processingGroup);
}
delegate.performReset();
}

@Override
public <R> void performReset(R resetContext) {
if (allowReset) {
queue.clear(processingGroup);
}
delegate.performReset(resetContext);
}

Expand All @@ -119,10 +132,15 @@ public static class Builder {
private EventHandlerInvoker delegate;
private DeadLetterQueue<EventMessage<?>> queue;
private String processingGroup;
private boolean allowReset = false;

/**
* @param delegate
* @return
* The {@link EventHandlerInvoker} wrapped by this dead-lettering implementation. Operations {@link
* #canHandle(EventMessage, Segment)}, {@link #canHandleType(Class)}, {@link #supportsReset()} and {@link
* #sequenceIdentifier(EventMessage)} are completely taken care of by the given {@code delegate}.
*
* @param delegate The {@link EventHandlerInvoker} instance to delegate operations to.
* @return The current Builder instance, for fluent interfacing.
*/
public Builder delegate(EventHandlerInvoker delegate) {
assertNonNull(delegate, "The delegate EventHandlerInvoker may not be null");
Expand All @@ -131,8 +149,10 @@ public Builder delegate(EventHandlerInvoker delegate) {
}

/**
* @param queue
* @return
* Sets the {@link DeadLetterQueue} this {@link EventHandlerInvoker} maintains dead-letters with.
*
* @param queue The {@link DeadLetterQueue} this {@link EventHandlerInvoker} maintains dead-letters with.
* @return The current Builder instance, for fluent interfacing.
*/
public Builder queue(DeadLetterQueue<EventMessage<?>> queue) {
assertNonNull(queue, "The DeadLetterQueue may not be null");
Expand All @@ -141,10 +161,10 @@ public Builder queue(DeadLetterQueue<EventMessage<?>> queue) {
}

/**
* Sets the name of this invoker.
* Sets the processing group name of this invoker.
*
* @param processingGroup the name of this {@link EventHandlerInvoker}
* @return the current Builder instance, for fluent interfacing
* @param processingGroup The processing group name of this {@link EventHandlerInvoker}.
* @return The current Builder instance, for fluent interfacing.
*/
public Builder processingGroup(String processingGroup) {
assertNonEmpty(processingGroup, "The processing group may not be null or empty");
Expand All @@ -153,17 +173,33 @@ public Builder processingGroup(String processingGroup) {
}

/**
* @return
* Sets whether this {@link DeadLetteringEventHandlerInvoker} supports resets of the provided {@link
* DeadLetterQueue}. If set to {@code true}, {@link DeadLetterQueue#clear(String)} will be invoked upon a {@link
* #performReset()}/{@link #performReset(Object)} invocation. Defaults to {@code false}.
*
* @param allowReset A toggle dictating whether this {@link DeadLetteringEventHandlerInvoker} supports resets of
* the provided {@link DeadLetterQueue}.
* @return The current Builder instance, for fluent interfacing.
*/
public Builder allowReset(boolean allowReset) {
this.allowReset = allowReset;
return this;
}

/**
* Initializes a {@link DeadLetteringEventHandlerInvoker} as specified through this Builder.
*
* @return A {@link DeadLetteringEventHandlerInvoker} as specified through this Builder.
*/
public DeadLetteringEventHandlerInvoker build() {
return new DeadLetteringEventHandlerInvoker(this);
}

/**
* Validates whether the fields contained in this Builder are set accordingly.
* Validate whether the fields contained in this Builder as set accordingly.
*
* @throws AxonConfigurationException if one field is asserted to be incorrect according to the Builder's
* specifications
* @throws AxonConfigurationException If one field is asserted to be incorrect according to the Builder's
* specifications.
*/
protected void validate() {
assertNonNull(delegate, "The delegate EventHandlerInvoker is a hard requirement and should be provided");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.jupiter.api.Test;

import java.util.Optional;
import java.util.function.UnaryOperator;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
Expand Down Expand Up @@ -55,11 +56,26 @@ void setUp() {
//noinspection unchecked
queue = mock(DeadLetterQueue.class);

testSubject = DeadLetteringEventHandlerInvoker.builder()
.delegate(delegate)
.queue(queue)
.processingGroup(TEST_PROCESSING_GROUP)
.build();
setTestSubject(createTestSubject());
}

private void setTestSubject(DeadLetteringEventHandlerInvoker testSubject) {
this.testSubject = testSubject;
}

private DeadLetteringEventHandlerInvoker createTestSubject() {
return createTestSubject(builder -> builder);
}

private DeadLetteringEventHandlerInvoker createTestSubject(
UnaryOperator<DeadLetteringEventHandlerInvoker.Builder> customization
) {
DeadLetteringEventHandlerInvoker.Builder invokerBuilder =
DeadLetteringEventHandlerInvoker.builder()
.delegate(delegate)
.queue(queue)
.processingGroup(TEST_PROCESSING_GROUP);
return customization.apply(invokerBuilder).build();
}

@Test
Expand Down Expand Up @@ -130,18 +146,46 @@ void testSupportsResetIsDelegated() {
}

@Test
void testPerformResetIsDelegated() {
void testPerformResetOnlyDelegatesForAllowResetSetToFalse() {
setTestSubject(createTestSubject(builder -> builder.allowReset(false)));

testSubject.performReset();

verifyNoInteractions(queue);
verify(delegate).performReset();
}

@Test
void testPerformResetClearsOutTheQueueForAllowResetSetToTrue() {
setTestSubject(createTestSubject(builder -> builder.allowReset(true)));

testSubject.performReset();

verify(queue).clear(TEST_PROCESSING_GROUP);
verify(delegate).performReset();
}

@Test
void testPerformResetWithContextIsDelegated() {
void testPerformResetWithContextOnlyDelegatesForAllowResetSetToFalse() {
setTestSubject(createTestSubject(builder -> builder.allowReset(false)));

String testContext = "some-reset-context";

testSubject.performReset(testContext);

verifyNoInteractions(queue);
verify(delegate).performReset(testContext);
}

@Test
void testPerformResetWithContextClearsOutTheQueueForAllowResetSetToTrue() {
setTestSubject(createTestSubject(builder -> builder.allowReset(true)));

String testContext = "some-reset-context";

testSubject.performReset(testContext);

verify(queue).clear(TEST_PROCESSING_GROUP);
verify(delegate).performReset(testContext);
}

Expand Down

0 comments on commit 03ee261

Please sign in to comment.