Skip to content

Commit

Permalink
Rename EnqueuePolicy containing methods to reference DeadLetterPolicy
Browse files Browse the repository at this point in the history
Rename EnqueuePolicy containing methods to reference DeadLetterPolicy
instead. This clarifies the intent for the users, as an EnqueuePolicy
doesn't state anything among event processor configuration.

#2021
  • Loading branch information
smcvb committed Aug 8, 2022
1 parent 7353059 commit d030c5b
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,14 @@ default Optional<SequencedDeadLetterQueue<EventMessage<?>>> deadLetterQueue(
}

/**
* Returns the {@link EnqueuePolicy} tied to the given {@code processingGroup} in an {@link Optional}. May return an
* {@link Optional} containing the
* {@link EventProcessingConfigurer#registerDefaultEnqueuePolicy(Function) default policy} if present.
* Returns the {@link EnqueuePolicy dead-letter policy} tied to the given {@code processingGroup} in an
* {@link Optional}. May return an {@link Optional} containing the
* {@link EventProcessingConfigurer#registerDefaultDeadLetterPolicy(Function) default policy} if present.
*
* @param processingGroup The name of the processing group for which to return an {@link EnqueuePolicy}.
* @return The {@link EnqueuePolicy} belonging to the given {@code processingGroup}.
*/
default Optional<EnqueuePolicy<EventMessage<?>>> enqueuePolicy(@Nonnull String processingGroup) {
default Optional<EnqueuePolicy<EventMessage<?>>> deadLetterPolicy(@Nonnull String processingGroup) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,25 +689,25 @@ default EventProcessingConfigurer registerDeadLetterQueue(
}

/**
* Register a default {@link EnqueuePolicy} for any processing group using a
* Register a default {@link EnqueuePolicy dead-letter policy} for any processing group using a
* {@link #registerDeadLetterQueue(String, Function) dead-letter queue}. The processing group uses the policy to
* deduce whether a failed {@link EventMessage} should be
* {@link SequencedDeadLetterQueue#enqueue(Object, DeadLetter) enqueued} for later evaluation.
* <p>
* Note that the configured component will not be used if the processing group <em>does not</em> have a dead-letter
* queue.
*
* @param policyBuilder A builder method to construct a default {@link EnqueuePolicy}.
* @param policyBuilder A builder method to construct a default {@link EnqueuePolicy dead-letter policy}.
* @return The current {@link EventProcessingConfigurer} instance, for fluent interfacing.
*/
default EventProcessingConfigurer registerDefaultEnqueuePolicy(
default EventProcessingConfigurer registerDefaultDeadLetterPolicy(
@Nonnull Function<Configuration, EnqueuePolicy<EventMessage<?>>> policyBuilder
) {
return this;
}

/**
* Register an {@link EnqueuePolicy} for the given {@code processingGroup} using a
* Register a {@link EnqueuePolicy dead-letter policy} for the given {@code processingGroup} using a
* {@link #registerDeadLetterQueue(String, Function) dead-letter queue}. The processing group uses the policy to
* deduce whether a failed {@link EventMessage} should be
* {@link SequencedDeadLetterQueue#enqueue(Object, DeadLetter) enqueued} for later evaluation.
Expand All @@ -716,11 +716,11 @@ default EventProcessingConfigurer registerDefaultEnqueuePolicy(
* queue.
*
* @param processingGroup The name of the processing group to build an {@link EnqueuePolicy} for.
* @param policyBuilder A builder method to construct an {@link EnqueuePolicy} for the given
* @param policyBuilder A builder method to construct a {@link EnqueuePolicy dead-letter policy} for the given
* {@code processingGroup}.
* @return The current {@link EventProcessingConfigurer} instance, for fluent interfacing.
*/
default EventProcessingConfigurer registerEnqueuePolicy(
default EventProcessingConfigurer registerDeadLetterPolicy(
@Nonnull String processingGroup,
@Nonnull Function<Configuration, EnqueuePolicy<EventMessage<?>>> policyBuilder
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public class EventProcessingModule
protected final Map<String, Component<RollbackConfiguration>> rollbackConfigurations = new HashMap<>();
protected final Map<String, Component<TransactionManager>> transactionManagers = new HashMap<>();
protected final Map<String, Component<SequencedDeadLetterQueue<EventMessage<?>>>> deadLetterQueues = new HashMap<>();
protected final Map<String, Component<EnqueuePolicy<EventMessage<?>>>> enqueuePolicies = new HashMap<>();
protected final Map<String, Component<EnqueuePolicy<EventMessage<?>>>> deadLetterPolicies = new HashMap<>();

protected final Map<String, Component<TrackingEventProcessorConfiguration>> tepConfigs = new HashMap<>();
protected final Map<String, PooledStreamingProcessorConfiguration> psepConfigs = new HashMap<>();
Expand Down Expand Up @@ -164,9 +164,9 @@ public class EventProcessingModule
c -> c.getComponent(TransactionManager.class, NoTransactionManager::instance)
);
@SuppressWarnings({"unchecked", "rawtypes"})
private final Component<EnqueuePolicy<EventMessage<?>>> defaultEnqueuePolicy = new Component<>(
private final Component<EnqueuePolicy<EventMessage<?>>> defaultDeadLetterPolicy = new Component<>(
() -> configuration,
"enqueuePolicy",
"deadLetterPolicy",
c -> c.getComponent(EnqueuePolicy.class, () -> (letter, cause) -> Decisions.enqueue(cause))
);
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -295,8 +295,8 @@ private DeadLetteringEventHandlerInvoker deadLetteringInvoker(String processorNa
"Cannot find a Dead Letter Queue for processing group [" + processingGroup + "]."
));
EnqueuePolicy<EventMessage<?>> enqueuePolicy =
enqueuePolicy(processingGroup).orElseThrow(() -> new IllegalStateException(
"Cannot find a Enqueue Policy for processing group [" + processingGroup + "]."
deadLetterPolicy(processingGroup).orElseThrow(() -> new IllegalStateException(
"Cannot find a Dead Letter Policy for processing group [" + processingGroup + "]."
));
DeadLetteringEventHandlerInvoker.Builder builder =
DeadLetteringEventHandlerInvoker.builder()
Expand Down Expand Up @@ -493,11 +493,11 @@ public Optional<SequencedDeadLetterQueue<EventMessage<?>>> deadLetterQueue(@Nonn
}

@Override
public Optional<EnqueuePolicy<EventMessage<?>>> enqueuePolicy(@Nonnull String processingGroup) {
public Optional<EnqueuePolicy<EventMessage<?>>> deadLetterPolicy(@Nonnull String processingGroup) {
validateConfigInitialization();
return enqueuePolicies.containsKey(processingGroup)
? Optional.ofNullable(enqueuePolicies.get(processingGroup).get())
: Optional.ofNullable(defaultEnqueuePolicy.get());
return deadLetterPolicies.containsKey(processingGroup)
? Optional.ofNullable(deadLetterPolicies.get(processingGroup).get())
: Optional.ofNullable(defaultDeadLetterPolicy.get());
}

@Override
Expand Down Expand Up @@ -829,19 +829,20 @@ public EventProcessingConfigurer registerDeadLetterQueue(
}

@Override
public EventProcessingConfigurer registerDefaultEnqueuePolicy(
public EventProcessingConfigurer registerDefaultDeadLetterPolicy(
@Nonnull Function<Configuration, EnqueuePolicy<EventMessage<?>>> policyBuilder
) {
this.defaultEnqueuePolicy.update(policyBuilder);
this.defaultDeadLetterPolicy.update(policyBuilder);
return this;
}

@Override
public EventProcessingConfigurer registerEnqueuePolicy(
public EventProcessingConfigurer registerDeadLetterPolicy(
@Nonnull String processingGroup,
@Nonnull Function<Configuration, EnqueuePolicy<EventMessage<?>>> policyBuilder
) {
enqueuePolicies.put(processingGroup, new Component<>(() -> configuration, "enqueuePolicy", policyBuilder));
deadLetterPolicies.put(processingGroup,
new Component<>(() -> configuration, "deadLetterPolicy", policyBuilder));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ void testRegisterDeadLetterQueueConstructsDeadLetteringEventHandlerInvoker(
Configuration config = configurer.start();

Optional<EnqueuePolicy<EventMessage<?>>> optionalPolicy = config.eventProcessingConfiguration()
.enqueuePolicy(processingGroup);
.deadLetterPolicy(processingGroup);
assertTrue(optionalPolicy.isPresent());
EnqueuePolicy<EventMessage<?>> expectedPolicy = optionalPolicy.get();

Expand Down Expand Up @@ -1154,7 +1154,7 @@ void testRegisterDeadLetterQueueConstructsDeadLetteringEventHandlerInvoker(
}

@Test
void testRegisterDefaultEnqueuePolicy(
void testRegisterDefaultDeadLetterPolicy(
@Mock SequencedDeadLetterQueue<EventMessage<?>> deadLetterQueue
) throws NoSuchFieldException, IllegalAccessException {
String processingGroup = "pooled-streaming";
Expand All @@ -1165,12 +1165,12 @@ void testRegisterDefaultEnqueuePolicy(
.registerPooledStreamingEventProcessor(processingGroup)
.registerEventHandler(config -> new PooledStreamingEventHandler())
.registerDeadLetterQueue(processingGroup, c -> deadLetterQueue)
.registerDefaultEnqueuePolicy(c -> expectedPolicy)
.registerDefaultDeadLetterPolicy(c -> expectedPolicy)
.registerTransactionManager(processingGroup, c -> NoTransactionManager.INSTANCE);
Configuration config = configurer.start();

Optional<EnqueuePolicy<EventMessage<?>>> optionalPolicy = config.eventProcessingConfiguration()
.enqueuePolicy(processingGroup);
.deadLetterPolicy(processingGroup);
assertTrue(optionalPolicy.isPresent());
EnqueuePolicy<EventMessage<?>> resultPolicy = optionalPolicy.get();
assertEquals(expectedPolicy, resultPolicy);
Expand Down Expand Up @@ -1200,7 +1200,7 @@ void testRegisterDefaultEnqueuePolicy(
}

@Test
void testRegisterEnqueuePolicy(
void testRegisterDeadLetterPolicy(
@Mock SequencedDeadLetterQueue<EventMessage<?>> deadLetterQueue
) throws NoSuchFieldException, IllegalAccessException {
String processingGroup = "pooled-streaming";
Expand All @@ -1212,13 +1212,13 @@ void testRegisterEnqueuePolicy(
.registerPooledStreamingEventProcessor(processingGroup)
.registerEventHandler(config -> new PooledStreamingEventHandler())
.registerDeadLetterQueue(processingGroup, c -> deadLetterQueue)
.registerEnqueuePolicy(processingGroup, c -> expectedPolicy)
.registerEnqueuePolicy("unused-processing-group", c -> unexpectedPolicy)
.registerDeadLetterPolicy(processingGroup, c -> expectedPolicy)
.registerDeadLetterPolicy("unused-processing-group", c -> unexpectedPolicy)
.registerTransactionManager(processingGroup, c -> NoTransactionManager.INSTANCE);
Configuration config = configurer.start();

Optional<EnqueuePolicy<EventMessage<?>>> optionalPolicy = config.eventProcessingConfiguration()
.enqueuePolicy(processingGroup);
.deadLetterPolicy(processingGroup);
assertTrue(optionalPolicy.isPresent());
EnqueuePolicy<EventMessage<?>> resultPolicy = optionalPolicy.get();
assertEquals(expectedPolicy, resultPolicy);
Expand Down

0 comments on commit d030c5b

Please sign in to comment.