Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1967] Fetch available segements only from the TokenStore #1997

Merged
merged 18 commits into from
Jan 3, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
aee99f1
Added a new fetchAvailableSegments method to the TokenStore, which on…
Vermorkentech Oct 19, 2021
5f81dd9
Modified Streaming event processors to now use TokenStore::fetchAvail…
Vermorkentech Nov 3, 2021
600cfa3
Update javadoc messaging/src/main/java/org/axonframework/eventhandlin…
Vermorkentech Nov 9, 2021
c9c6d0a
Added a test for TokenStore::fetchAvailableSegments default and fixed…
Vermorkentech Nov 9, 2021
06c0175
Merge branch 'master' into feature/1967
Vermorkentech Nov 29, 2021
717c64b
Refactored TokenStore::fetchAvailableSegments to return a List of Seg…
Vermorkentech Nov 30, 2021
bd1b039
Fixed some incorrect indentation
Vermorkentech Dec 1, 2021
ae5aa71
Moved fetching of segment into try-catch block
Vermorkentech Dec 21, 2021
b3989b6
Added a new fecthToken method to the TokenStore that takes an entire …
Vermorkentech Dec 28, 2021
f22f5c7
Merge branch 'master' into feature/1967
Vermorkentech Dec 28, 2021
8f18d74
Corrected indentation after merge conflict
Vermorkentech Dec 29, 2021
569fcbe
Update messaging/src/main/java/org/axonframework/eventhandling/tokens…
Vermorkentech Dec 30, 2021
1ae8de8
Update messaging/src/main/java/org/axonframework/eventhandling/tokens…
Vermorkentech Dec 30, 2021
205401a
Update messaging/src/main/java/org/axonframework/eventhandling/tokens…
Vermorkentech Dec 30, 2021
de445fc
Calling the TokenStore in a transaction instead of directly
Vermorkentech Dec 30, 2021
8ab50d6
Merge branch 'master' into feature/1967
Vermorkentech Dec 30, 2021
38b9ac0
Update messaging/src/main/java/org/axonframework/eventhandling/tokens…
Vermorkentech Dec 31, 2021
3e0c340
Update messaging/src/main/java/org/axonframework/eventhandling/Tracki…
Vermorkentech Dec 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.axonframework.eventhandling.MultiEventHandlerInvoker;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventProcessor;
Expand All @@ -54,6 +55,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -280,7 +282,7 @@ void testPublishedEventsGetPassedToHandler() throws Exception {
}

@Test
void testBlacklist() throws Exception {
void testBlacklist() {
when(mockHandler.canHandle(any())).thenReturn(false);
when(mockHandler.canHandleType(String.class)).thenReturn(false);
Set<Class<?>> skipped = new HashSet<>();
Expand Down Expand Up @@ -729,8 +731,7 @@ void testProcessorSetsAndUnsetsErrorState() throws Exception {
fail.set(false);
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS), "Expected 5 invocations on Event Handler by now");
assertEquals(5, acknowledgedEvents.size());
Optional<EventTrackerStatus> inErrorState = testSubject.processingStatus().values().stream().filter(s -> s
.isErrorState()).findFirst();
Optional<EventTrackerStatus> inErrorState = testSubject.processingStatus().values().stream().filter(EventTrackerStatus::isErrorState).findFirst();
assertThat("no processor in error state when open stream succeeds again", !inErrorState.isPresent());
}

Expand Down Expand Up @@ -1559,7 +1560,7 @@ void testMergeInvertedSegmentOrder() throws InterruptedException {
* This test is a follow up from issue https://github.com/AxonFramework/AxonFramework/issues/1212
*/
@Test
public void testThrownErrorBubblesUp() {
void testThrownErrorBubblesUp() {
AtomicReference<Throwable> thrownException = new AtomicReference<>();

EventHandlerInvoker eventHandlerInvoker = mock(EventHandlerInvoker.class);
Expand Down Expand Up @@ -1906,6 +1907,23 @@ void testIsReplayingWhenNotCaughtUp() throws Exception {
);
}

@Test
void testProcessorOnlyTriesToClaimAvailableSegments() {
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 0);
tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1);
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 2);
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 3);
when(tokenStore.fetchAvailableSegments(testSubject.getName())).thenReturn(Collections.singletonList(Segment.computeSegment(2, 0, 1, 2, 3)));

testSubject.start();

eventBus.publish(createEvents(1));

assertWithin(1, TimeUnit.SECONDS, () -> assertEquals(1, testSubject.processingStatus().size()));
assertWithin(1, TimeUnit.SECONDS, () -> assertTrue(testSubject.processingStatus().containsKey(2)));
verify(tokenStore, never()).fetchToken(eq(testSubject.getName()), intThat(i -> Arrays.asList(0, 1, 3).contains(i)));
}

@Test
void testShutdownTerminatesWorkersAfterConfiguredWorkerTerminationTimeout() throws Exception {
int numberOfEvents = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1149,24 +1149,23 @@ public void run() {
int waitTime = 1;
String processorName = TrackingEventProcessor.this.getName();
while (getState().isRunning()) {
List<Segment> segmentsToClaim;
workLauncherRunning.set(true);
int[] tokenStoreCurrentSegments;

try {
tokenStoreCurrentSegments = transactionManager.fetchInTransaction(
smcvb marked this conversation as resolved.
Show resolved Hide resolved
int[] tokenStoreCurrentSegments = transactionManager.fetchInTransaction(
() -> tokenStore.fetchSegments(processorName)
);

// When in an initial stage, split segments to the requested number.
if (tokenStoreCurrentSegments.length == 0 && segmentsSize > 0) {
tokenStoreCurrentSegments = transactionManager.fetchInTransaction(
transactionManager.executeInTransaction(
() -> {
TrackingToken initialToken = initialTrackingTokenBuilder.apply(messageSource);
tokenStore.initializeTokenSegments(processorName, segmentsSize, initialToken);
return tokenStore.fetchSegments(processorName);
}
);
}
segmentsToClaim = transactionManager.fetchInTransaction(() ->tokenStore.fetchAvailableSegments(processorName));
Vermorkentech marked this conversation as resolved.
Show resolved Hide resolved
waitTime = 1;
} catch (Exception e) {
if (waitTime == 1) {
Expand All @@ -1187,15 +1186,14 @@ public void run() {
// Submit segmentation workers matching the size of our thread pool (-1 for the current dispatcher).
// Keep track of the last processed segments...
TrackingSegmentWorker workingInCurrentThread = null;
for (int i = 0; i < tokenStoreCurrentSegments.length && availableThreads.get() > 0; i++) {
int segmentId = tokenStoreCurrentSegments[i];
for (int i = 0; i < segmentsToClaim.size() && availableThreads.get() > 0; i++) {
Segment segment = segmentsToClaim.get(i);
int segmentId = segment.getSegmentId();

if (!activeSegments.containsKey(segmentId) && canClaimSegment(segmentId)) {
try {
transactionManager.executeInTransaction(() -> {
TrackingToken token = tokenStore.fetchToken(processorName, segmentId);
int[] segmentIds = tokenStore.fetchSegments(processorName);
Segment segment = Segment.computeSegment(segmentId, segmentIds);
TrackingToken token = tokenStore.fetchToken(processorName, segment);
logger.info("Worker assigned to segment {} for processing", segment);
TrackerStatus newStatus = new TrackerStatus(segment, token);
TrackerStatus previousStatus = activeSegments.putIfAbsent(segmentId, newStatus);
Expand All @@ -1208,7 +1206,7 @@ public void run() {
});
} catch (UnableToClaimTokenException ucte) {
// When not able to claim a token for a given segment, we skip the
logger.debug("Unable to claim the token for segment: {}. It is owned by another process",
logger.debug("Unable to claim the token for segment: {}. It is owned by another process or has been split/merged concurrently",
segmentId);

TrackerStatus removedStatus = activeSegments.remove(segmentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
Expand All @@ -50,6 +50,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static org.axonframework.common.io.IOUtils.closeQuietly;

Expand Down Expand Up @@ -695,16 +696,17 @@ private CompletableFuture<Void> abortWorkPackages(Exception cause) {
*/
private Map<Segment, TrackingToken> claimNewSegments() {
Map<Segment, TrackingToken> newClaims = new HashMap<>();
int[] segments = transactionManager.fetchInTransaction(() -> tokenStore.fetchSegments(name));
List<Segment> segments = transactionManager.fetchInTransaction(() -> tokenStore.fetchAvailableSegments(name));

// As segments are used for Segment#computeSegment, we cannot filter out the WorkPackages upfront.
int[] unClaimedSegments = Arrays.stream(segments)
.filter(segmentId -> !workPackages.containsKey(segmentId))
.toArray();
List<Segment> unClaimedSegments = segments.stream()
.filter(segment -> !workPackages.containsKey(segment.getSegmentId()))
.collect(Collectors.toList());

int maxSegmentsToClaim = maxClaimedSegments - workPackages.size();

for (int segmentId : unClaimedSegments) {
for (Segment segment : unClaimedSegments) {
int segmentId = segment.getSegmentId();
if (isSegmentBlockedFromClaim(segmentId)) {
logger.debug("Segment {} is still marked to not be claimed by Processor [{}].", segmentId, name);
processingStatusUpdater.accept(segmentId, u -> null);
Expand All @@ -713,12 +715,12 @@ private Map<Segment, TrackingToken> claimNewSegments() {
if (newClaims.size() < maxSegmentsToClaim) {
try {
TrackingToken token = transactionManager.fetchInTransaction(
() -> tokenStore.fetchToken(name, segmentId)
() -> tokenStore.fetchToken(name, segment)
);
newClaims.put(Segment.computeSegment(segmentId, segments), token);
newClaims.put(segment, token);
} catch (UnableToClaimTokenException e) {
processingStatusUpdater.accept(segmentId, u -> null);
logger.debug("Unable to claim the token for segment {}. It is owned by another process.",
logger.debug("Unable to claim the token for segment {}. It is owned by another process or has been split/merged concurrently.",
segmentId);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright (c) 2010-2019. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -17,9 +17,13 @@
package org.axonframework.eventhandling.tokenstore;

import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackingToken;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Describes a component capable of storing and retrieving event tracking tokens. An {@link EventProcessor} that is
Expand Down Expand Up @@ -116,6 +120,28 @@ default void initializeTokenSegments(String processorName, int segmentCount, Tra
*/
TrackingToken fetchToken(String processorName, int segment) throws UnableToClaimTokenException;

/**
* Returns the last stored {@link TrackingToken token} for the given {@code processorName} and {@code segment}. Returns {@code null} if the stored token for
* the given process and segment is {@code null}.
* <p>
* This method should throw an {@code UnableToClaimTokenException} when the given {@code segment} has not been initialized with a Token (albeit {@code
* null}) yet. In that case, a segment must have been explicitly initialized. A TokenStore implementation's ability to do so is exposed by the {@link
* #requiresExplicitSegmentInitialization()} method. If that method returns false, this method may implicitly initialize a token and return that token upon
* invocation.
* <p>
* The token will be claimed by the current process (JVM instance), preventing access by other instances. To release the claim, use {@link
* #releaseClaim(String, int)}
*
* @param processorName The process name for which to fetch the token
* @param segment The segment for which to fetch the token
* @return The last stored TrackingToken or {@code null} if the store holds no token for given process and segment
* @throws UnableToClaimTokenException if there is a token for given {@code processorName} and {@code segment}, but they are claimed by another process, or
* if the {@code segment has been split or merged concurrently}
*/
default TrackingToken fetchToken(String processorName, Segment segment) throws UnableToClaimTokenException {
return fetchToken(processorName, segment.getSegmentId());
}

/**
* Extends the claim on the current token held by the this node for the given {@code processorName} and
* {@code segment}.
Expand Down Expand Up @@ -207,6 +233,25 @@ default boolean requiresExplicitSegmentInitialization() {
*/
int[] fetchSegments(String processorName);

/**
* Returns a List of known available {@code segments} for a given {@code processorName}. A segment is considered available if it is not claimed by any
* other event processor.
* <p>
* The segments returned are segments for which a token has been stored previously and have not been claimed by another processor. When the {@link
* TokenStore} is empty, an empty list is returned.
*
* By default, if this method is not implemented, we will return all segments instead, whether they are available or not.
*
* @param processorName the processor's name for which to fetch the segments
* @return a List of available segment identifiers for the specified {@code processorName}
*/
default List<Segment> fetchAvailableSegments(String processorName) {
int[] allSegments = fetchSegments(processorName);
return Arrays.stream(allSegments).boxed()
.map(segment -> Segment.computeSegment(segment, allSegments))
.collect(Collectors.toList());
}

/**
* Returns a unique identifier that uniquely identifies the storage location of the tokens in this store. Two token
* store implementations that share state, must return the same identifier. Two token store implementations that
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright (c) 2010-2019. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down