Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1967] Fetch available segements only from the TokenStore #1997

Merged
merged 18 commits into from
Jan 3, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
aee99f1
Added a new fetchAvailableSegments method to the TokenStore, which on…
Vermorkentech Oct 19, 2021
5f81dd9
Modified Streaming event processors to now use TokenStore::fetchAvail…
Vermorkentech Nov 3, 2021
600cfa3
Update javadoc messaging/src/main/java/org/axonframework/eventhandlin…
Vermorkentech Nov 9, 2021
c9c6d0a
Added a test for TokenStore::fetchAvailableSegments default and fixed…
Vermorkentech Nov 9, 2021
06c0175
Merge branch 'master' into feature/1967
Vermorkentech Nov 29, 2021
717c64b
Refactored TokenStore::fetchAvailableSegments to return a List of Seg…
Vermorkentech Nov 30, 2021
bd1b039
Fixed some incorrect indentation
Vermorkentech Dec 1, 2021
ae5aa71
Moved fetching of segment into try-catch block
Vermorkentech Dec 21, 2021
b3989b6
Added a new fecthToken method to the TokenStore that takes an entire …
Vermorkentech Dec 28, 2021
f22f5c7
Merge branch 'master' into feature/1967
Vermorkentech Dec 28, 2021
8f18d74
Corrected indentation after merge conflict
Vermorkentech Dec 29, 2021
569fcbe
Update messaging/src/main/java/org/axonframework/eventhandling/tokens…
Vermorkentech Dec 30, 2021
1ae8de8
Update messaging/src/main/java/org/axonframework/eventhandling/tokens…
Vermorkentech Dec 30, 2021
205401a
Update messaging/src/main/java/org/axonframework/eventhandling/tokens…
Vermorkentech Dec 30, 2021
de445fc
Calling the TokenStore in a transaction instead of directly
Vermorkentech Dec 30, 2021
8ab50d6
Merge branch 'master' into feature/1967
Vermorkentech Dec 30, 2021
38b9ac0
Update messaging/src/main/java/org/axonframework/eventhandling/tokens…
Vermorkentech Dec 31, 2021
3e0c340
Update messaging/src/main/java/org/axonframework/eventhandling/Tracki…
Vermorkentech Dec 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -280,7 +281,7 @@ void testPublishedEventsGetPassedToHandler() throws Exception {
}

@Test
void testBlacklist() throws Exception {
void testBlacklist() {
when(mockHandler.canHandle(any())).thenReturn(false);
when(mockHandler.canHandleType(String.class)).thenReturn(false);
Set<Class<?>> skipped = new HashSet<>();
Expand Down Expand Up @@ -729,8 +730,7 @@ void testProcessorSetsAndUnsetsErrorState() throws Exception {
fail.set(false);
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS), "Expected 5 invocations on Event Handler by now");
assertEquals(5, acknowledgedEvents.size());
Optional<EventTrackerStatus> inErrorState = testSubject.processingStatus().values().stream().filter(s -> s
.isErrorState()).findFirst();
Optional<EventTrackerStatus> inErrorState = testSubject.processingStatus().values().stream().filter(EventTrackerStatus::isErrorState).findFirst();
assertThat("no processor in error state when open stream succeeds again", !inErrorState.isPresent());
}

Expand Down Expand Up @@ -1906,6 +1906,30 @@ void testIsReplayingWhenNotCaughtUp() throws Exception {
);
}

@Test
void testFallbackToClaimingAllTokensIfAvailableTokensIsNotImplemented() {
when(tokenStore.fetchAvailableSegments(any())).thenReturn(Optional.empty());
testSubject.start();
assertWithin(1, TimeUnit.SECONDS, () -> assertEquals(1, testSubject.activeProcessorThreads()));
}

@Test
void testProcessorOnlyTriesToClaimAvailableSegments() {
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 0);
tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1);
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 2);
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 3);
when(tokenStore.fetchAvailableSegments(testSubject.getName())).thenReturn(Optional.of(new int[]{2}));

testSubject.start();

eventBus.publish(createEvents(1));

assertWithin(1, TimeUnit.SECONDS, () -> assertEquals(1, testSubject.processingStatus().size()));
assertWithin(1, TimeUnit.SECONDS, () -> assertTrue(testSubject.processingStatus().containsKey(2)));
verify(tokenStore, never()).fetchToken(eq(testSubject.getName()), intThat(i -> Arrays.asList(0, 1, 3).contains(i)));
}

private void waitForStatus(String description,
long time,
TimeUnit unit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,11 +1048,10 @@ public void run() {

// When in an initial stage, split segments to the requested number.
if (tokenStoreCurrentSegments.length == 0 && segmentsSize > 0) {
tokenStoreCurrentSegments = transactionManager.fetchInTransaction(
smcvb marked this conversation as resolved.
Show resolved Hide resolved
transactionManager.executeInTransaction(
() -> {
TrackingToken initialToken = initialTrackingTokenBuilder.apply(messageSource);
tokenStore.initializeTokenSegments(processorName, segmentsSize, initialToken);
return tokenStore.fetchSegments(processorName);
}
);
}
Expand All @@ -1076,8 +1075,9 @@ public void run() {
// Submit segmentation workers matching the size of our thread pool (-1 for the current dispatcher).
// Keep track of the last processed segments...
TrackingSegmentWorker workingInCurrentThread = null;
for (int i = 0; i < tokenStoreCurrentSegments.length && availableThreads.get() > 0; i++) {
int segmentId = tokenStoreCurrentSegments[i];
int[] availableSegments = tokenStore.fetchAvailableSegments(processorName).orElse(tokenStore.fetchSegments(processorName));
smcvb marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < availableSegments.length && availableThreads.get() > 0; i++) {
int segmentId = availableSegments[i];

if (!activeSegments.containsKey(segmentId) && canClaimSegment(segmentId)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ private CompletableFuture<Void> abortWorkPackages(Exception cause) {
*/
private Map<Segment, TrackingToken> claimNewSegments() {
Map<Segment, TrackingToken> newClaims = new HashMap<>();
int[] segments = transactionManager.fetchInTransaction(() -> tokenStore.fetchSegments(name));
int[] segments = transactionManager.fetchInTransaction(() -> tokenStore.fetchAvailableSegments(name).orElse(tokenStore.fetchSegments(name)));

// As segments are used for Segment#computeSegment, we cannot filter out the WorkPackages upfront.
int[] unClaimedSegments = Arrays.stream(segments)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright (c) 2010-2019. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -207,6 +207,19 @@ default boolean requiresExplicitSegmentInitialization() {
*/
int[] fetchSegments(String processorName);

/**
* Returns an array of known {@code segments} for a given {@code processorName}.
* <p>
* The segments returned are segments for which a token has been stored previously and have not been claimed by another processor. When the {@link
* TokenStore} is empty, an empty array is returned.
*
* @param processorName The process name for which to fetch the segments
* @return an array of segment identifiers for the specified {@code processorName}, or an empty Optional if this method has not been implemented.
Vermorkentech marked this conversation as resolved.
Show resolved Hide resolved
*/
default Optional<int[]> fetchAvailableSegments(String processorName) {
return Optional.empty();
}

/**
* Returns a unique identifier that uniquely identifies the storage location of the tokens in this store. Two token
* store implementations that share state, must return the same identifier. Two token store implementations that
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright (c) 2010-2019. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -112,6 +112,12 @@ public int[] fetchSegments(String processorName) {
.sorted().toArray();
}

@Override
public Optional<int[]> fetchAvailableSegments(String processorName) {
//The in-memory implementation isn't accessible by multiple processes, so we just return all tokens.
return Optional.of(fetchSegments(processorName));
}

@Override
public Optional<String> retrieveStorageIdentifier() {
return Optional.of(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,23 @@ public int[] fetchSegments(String processorName) {
}
}

@Override
public Optional<int[]> fetchAvailableSegments(String processorName) {
Connection connection = getConnection();
try {
List<Integer> integers = executeQuery(connection,
c -> selectForSegments(c, processorName, true),
listResults(rs -> rs.getInt(schema.segmentColumn())),
e -> new JdbcException(format(
"Could not load segments for processor [%s]", processorName
), e)
);
return Optional.of(integers.stream().mapToInt(i -> i).toArray());
} finally {
closeQuietly(connection);
}
}

/**
* Returns a {@link PreparedStatement} to select all segments ids for a given processorName from the underlying
* storage.
Expand All @@ -323,9 +340,24 @@ public int[] fetchSegments(String processorName) {
* @throws SQLException when an exception occurs while creating the prepared statement
*/
protected PreparedStatement selectForSegments(Connection connection, String processorName) throws SQLException {
return selectForSegments(connection, processorName, false);
}

/**
* Returns a {@link PreparedStatement} to select all segments ids for a given processorName from the underlying storage.
*
* @param connection the connection to the underlying database
* @param processorName the name of the processor to fetch the segments for
* @param unclaimedOnly if true, filter the tokens on unclaimed tokens only
* @return a {@link PreparedStatement} that will fetch segments when executed
* @throws SQLException when an exception occurs while creating the prepared statement
*/
protected PreparedStatement selectForSegments(Connection connection, String processorName, boolean unclaimedOnly) throws SQLException {
final String sql = "SELECT " + schema.segmentColumn() +
" FROM " + schema.tokenTable() +
" WHERE " + schema.processorNameColumn() + " = ? ORDER BY " + schema.segmentColumn() + " ASC";
" WHERE " + schema.processorNameColumn() + " = ? " +
(unclaimedOnly ? "AND " + schema.ownerColumn() + " IS NULL " : "") +
" ORDER BY " + schema.segmentColumn() + " ASC";
PreparedStatement preparedStatement =
connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
preparedStatement.setString(1, processorName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2020. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -230,6 +230,19 @@ public int[] fetchSegments(String processorName) {
return resultList.stream().mapToInt(i -> i).toArray();
}

@Override
public Optional<int[]> fetchAvailableSegments(String processorName) {
EntityManager entityManager = entityManagerProvider.getEntityManager();

final List<Integer> resultList = entityManager.createQuery(
"SELECT te.segment FROM TokenEntry te "
+ "WHERE te.processorName = :processorName AND te.owner IS NULL ORDER BY te.segment ASC",
Integer.class
).setParameter("processorName", processorName).getResultList();

return Optional.of(resultList.stream().mapToInt(i -> i).toArray());
}

/**
* Loads an existing {@link TokenEntry} or creates a new one using the given {@code entityManager} for given {@code
* processorName} and {@code segment}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -83,7 +84,7 @@ class PooledStreamingEventProcessorTest {
void setUp() {
stubMessageSource = new InMemoryMessageSource();
stubEventHandler = mock(EventHandlerInvoker.class);
tokenStore = new InMemoryTokenStore();
tokenStore = spy(new InMemoryTokenStore());
coordinatorExecutor = Executors.newScheduledThreadPool(2);
workerExecutor = Executors.newScheduledThreadPool(8);

Expand Down Expand Up @@ -167,6 +168,16 @@ void testSecondStartInvocationIsIgnored() {

@Test
void testStartingProcessorClaimsAllAvailableTokens() {
startAndAssertProcessorClaimsAllTokens();
}

@Test
void testFallbackToClaimingAllTokensIfAvailableTokensIsNotImplemented() {
when(tokenStore.fetchAvailableSegments(any())).thenReturn(Optional.empty());
startAndAssertProcessorClaimsAllTokens();
}

private void startAndAssertProcessorClaimsAllTokens() {
List<EventMessage<Integer>> events = IntStream.range(0, 100)
.mapToObj(GenericEventMessage::new)
.collect(Collectors.toList());
Expand All @@ -186,6 +197,21 @@ void testStartingProcessorClaimsAllAvailableTokens() {
assertEquals(8, testSubject.processingStatus().size());
}

@Test
void testProcessorOnlyTriesToClaimAvailableSegments() {
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 0);
tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1);
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 2);
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 3);
when(tokenStore.fetchAvailableSegments(testSubject.getName())).thenReturn(Optional.of(new int[]{2}));

testSubject.start();

assertWithin(1, TimeUnit.SECONDS, () -> assertEquals(1, testSubject.processingStatus().size()));
assertWithin(1, TimeUnit.SECONDS, () -> assertTrue(testSubject.processingStatus().containsKey(2)));
verify(tokenStore, never()).fetchToken(eq(testSubject.getName()), intThat(i -> Arrays.asList(0, 1, 3).contains(i)));
}

@Test
void testStartingAfterShutdownLetsProcessorProceed() {
when(stubEventHandler.supportsReset()).thenReturn(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright (c) 2010-2019. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -18,10 +18,10 @@

import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;

import java.util.Arrays;
import java.util.Optional;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -102,12 +102,32 @@ void testQuerySegments() {
final int[] segments = testSubject.fetchSegments("proc2");
assertThat(segments.length, is(1));
}

{
final int[] segments = testSubject.fetchSegments("proc3");
assertThat(segments.length, is(0));
}
}

@Test
void testQueryAvailableSegments() {
testSubject.storeToken(new GlobalSequenceTrackingToken(1L), "proc1", 0);
testSubject.storeToken(new GlobalSequenceTrackingToken(2L), "proc1", 1);
testSubject.storeToken(new GlobalSequenceTrackingToken(2L), "proc2", 1);

{
final Optional<int[]> tokensOptional = testSubject.fetchAvailableSegments("proc1");
assertTrue(tokensOptional.isPresent());
assertThat(tokensOptional.get().length, is(2));
}
{
final Optional<int[]> tokensOptional = testSubject.fetchAvailableSegments("proc2");
assertTrue(tokensOptional.isPresent());
assertThat(tokensOptional.get().length, is(1));
}
{
final Optional<int[]> tokensOptional = testSubject.fetchAvailableSegments("proc3");
assertTrue(tokensOptional.isPresent());
assertThat(tokensOptional.get().length, is(0));
}
}
}