Skip to content

Commit

Permalink
[#2154] Fix missing propagation of ReplayToken to UnitOfWork for Pool…
Browse files Browse the repository at this point in the history
…edStreamingEventProcessor
  • Loading branch information
CodeDrivenMitch committed Mar 28, 2022
1 parent ab61170 commit a81b827
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,10 @@ private void processEvents() throws Exception {
ProcessingEntry entry = processingQueue.poll();
lastConsumedToken = WrappedToken.advance(lastConsumedToken, entry.eventMessage().trackingToken());
if (entry.canHandle()) {
eventBatch.add(entry.eventMessage());
eventBatch.add(
entry.eventMessage()
.withTrackingToken(lastConsumedToken)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.GenericTrackedEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackerStatus;
Expand Down Expand Up @@ -171,7 +172,7 @@ void testScheduleEventFailsOnBatchProcessor() throws ExecutionException, Interru
TrackedEventMessage<String> testEvent =
new GenericTrackedEventMessage<>(testToken, GenericEventMessage.asEventMessage("some-event"));
batchProcessorPredicate = event -> {
if (event.contains(testEvent)) {
if (event.stream().anyMatch(e -> ((TrackedEventMessage<?>) e).trackingToken().equals(testToken))) {
throw new IllegalStateException("Some exception");
}
return true;
Expand Down Expand Up @@ -207,7 +208,7 @@ void testScheduleEventRunsSuccessfully() {

List<EventMessage<?>> processedEvents = batchProcessor.getProcessedEvents();
assertWithin(500, TimeUnit.MILLISECONDS, () -> assertEquals(1, processedEvents.size()));
assertEquals(expectedEvent, processedEvents.get(0));
assertEquals(expectedEvent.trackingToken(), ((TrackedEventMessage<?>) processedEvents.get(0)).trackingToken());

ArgumentCaptor<TrackingToken> tokenCaptor = ArgumentCaptor.forClass(TrackingToken.class);
verify(tokenStore).storeToken(tokenCaptor.capture(), eq(PROCESSOR_NAME), eq(segment.getSegmentId()));
Expand All @@ -219,6 +220,48 @@ void testScheduleEventRunsSuccessfully() {
assertEquals(1L, resultPosition.getAsLong());
}

@Test
void testReplayTokenIsPropagatedAndAdvancedWithoutCurrent() {
testSubjectBuilder.initialToken(new ReplayToken(new GlobalSequenceTrackingToken(1L)));
testSubject = testSubjectBuilder.build();
TrackingToken expectedToken = new GlobalSequenceTrackingToken(1L);
TrackedEventMessage<String> expectedEvent =
new GenericTrackedEventMessage<>(expectedToken, GenericEventMessage.asEventMessage("some-event"));

testSubject.scheduleEvent(expectedEvent);

List<EventMessage<?>> processedEvents = batchProcessor.getProcessedEvents();
assertWithin(500, TimeUnit.MILLISECONDS, () -> assertEquals(1, processedEvents.size()));

ReplayToken expectedAdvancedToken = new ReplayToken(
new GlobalSequenceTrackingToken(1L),
new GlobalSequenceTrackingToken(1L)
);
assertEquals(expectedAdvancedToken, ((TrackedEventMessage<?>) processedEvents.get(0)).trackingToken());
}


@Test
void testReplayTokenIsPropagatedAndAdvancedWithCurrent() {
testSubjectBuilder.initialToken(new ReplayToken(new GlobalSequenceTrackingToken(1L),
new GlobalSequenceTrackingToken(0L)));
testSubject = testSubjectBuilder.build();
TrackingToken expectedToken = new GlobalSequenceTrackingToken(1L);
TrackedEventMessage<String> expectedEvent =
new GenericTrackedEventMessage<>(expectedToken, GenericEventMessage.asEventMessage("some-event"));

testSubject.scheduleEvent(expectedEvent);

List<EventMessage<?>> processedEvents = batchProcessor.getProcessedEvents();
assertWithin(500, TimeUnit.MILLISECONDS, () -> assertEquals(1, processedEvents.size()));

ReplayToken expectedAdvancedToken = new ReplayToken(
new GlobalSequenceTrackingToken(1L),
new GlobalSequenceTrackingToken(1L)
);
assertEquals(expectedAdvancedToken, ((TrackedEventMessage<?>) processedEvents.get(0)).trackingToken());
}

/**
* The "last delivered token" is configured as the initialToken for a fresh WorkPackage.
*/
Expand All @@ -236,7 +279,7 @@ void testScheduleEventIsScheduledIfTheLastDeliveredTokenEqualsTheEventsToken() {

List<EventMessage<?>> processedEvents = batchProcessor.getProcessedEvents();
assertWithin(500, TimeUnit.MILLISECONDS, () -> assertEquals(1, processedEvents.size()));
assertEquals(expectedEvent, processedEvents.get(0));
assertEquals(expectedEvent.trackingToken(), ((TrackedEventMessage<?>) processedEvents.get(0)).trackingToken());

ArgumentCaptor<TrackingToken> tokenCaptor = ArgumentCaptor.forClass(TrackingToken.class);
verify(tokenStore).storeToken(tokenCaptor.capture(), eq(PROCESSOR_NAME), eq(segment.getSegmentId()));
Expand Down Expand Up @@ -264,7 +307,7 @@ void testScheduleEventExtendsTokenClaimAfterClaimThresholdExtension() {
// Should have handled one event, so a subsequent run of WorkPackage#processEvents will extend the claim.
List<EventMessage<?>> processedEvents = batchProcessor.getProcessedEvents();
assertWithin(500, TimeUnit.MILLISECONDS, () -> assertEquals(1, processedEvents.size()));
assertEquals(expectedEvent, processedEvents.get(0));
assertEquals(expectedEvent.trackingToken(), ((TrackedEventMessage<?>) processedEvents.get(0)).trackingToken());
// We need to verify the TokenStore#storeToken operation, otherwise the extendClaim verify will not succeed.
ArgumentCaptor<TrackingToken> tokenCaptor = ArgumentCaptor.forClass(TrackingToken.class);
verify(tokenStore).storeToken(tokenCaptor.capture(), eq(PROCESSOR_NAME), eq(segment.getSegmentId()));
Expand Down Expand Up @@ -416,4 +459,4 @@ public List<EventMessage<?>> getProcessedEvents() {
return processedEvents;
}
}
}
}

0 comments on commit a81b827

Please sign in to comment.