Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1967] Fetch available segements only from the TokenStore #1997

Merged
merged 18 commits into from
Jan 3, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
aee99f1
Added a new fetchAvailableSegments method to the TokenStore, which on…
Vermorkentech Oct 19, 2021
5f81dd9
Modified Streaming event processors to now use TokenStore::fetchAvail…
Vermorkentech Nov 3, 2021
600cfa3
Update javadoc messaging/src/main/java/org/axonframework/eventhandlin…
Vermorkentech Nov 9, 2021
c9c6d0a
Added a test for TokenStore::fetchAvailableSegments default and fixed…
Vermorkentech Nov 9, 2021
06c0175
Merge branch 'master' into feature/1967
Vermorkentech Nov 29, 2021
717c64b
Refactored TokenStore::fetchAvailableSegments to return a List of Seg…
Vermorkentech Nov 30, 2021
bd1b039
Fixed some incorrect indentation
Vermorkentech Dec 1, 2021
ae5aa71
Moved fetching of segment into try-catch block
Vermorkentech Dec 21, 2021
b3989b6
Added a new fecthToken method to the TokenStore that takes an entire …
Vermorkentech Dec 28, 2021
f22f5c7
Merge branch 'master' into feature/1967
Vermorkentech Dec 28, 2021
8f18d74
Corrected indentation after merge conflict
Vermorkentech Dec 29, 2021
569fcbe
Update messaging/src/main/java/org/axonframework/eventhandling/tokens…
Vermorkentech Dec 30, 2021
1ae8de8
Update messaging/src/main/java/org/axonframework/eventhandling/tokens…
Vermorkentech Dec 30, 2021
205401a
Update messaging/src/main/java/org/axonframework/eventhandling/tokens…
Vermorkentech Dec 30, 2021
de445fc
Calling the TokenStore in a transaction instead of directly
Vermorkentech Dec 30, 2021
8ab50d6
Merge branch 'master' into feature/1967
Vermorkentech Dec 30, 2021
38b9ac0
Update messaging/src/main/java/org/axonframework/eventhandling/tokens…
Vermorkentech Dec 31, 2021
3e0c340
Update messaging/src/main/java/org/axonframework/eventhandling/Tracki…
Vermorkentech Dec 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.axonframework.eventhandling.MultiEventHandlerInvoker;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventProcessor;
Expand All @@ -54,6 +55,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -280,7 +282,7 @@ void testPublishedEventsGetPassedToHandler() throws Exception {
}

@Test
void testBlacklist() throws Exception {
void testBlacklist() {
when(mockHandler.canHandle(any())).thenReturn(false);
when(mockHandler.canHandleType(String.class)).thenReturn(false);
Set<Class<?>> skipped = new HashSet<>();
Expand Down Expand Up @@ -729,8 +731,7 @@ void testProcessorSetsAndUnsetsErrorState() throws Exception {
fail.set(false);
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS), "Expected 5 invocations on Event Handler by now");
assertEquals(5, acknowledgedEvents.size());
Optional<EventTrackerStatus> inErrorState = testSubject.processingStatus().values().stream().filter(s -> s
.isErrorState()).findFirst();
Optional<EventTrackerStatus> inErrorState = testSubject.processingStatus().values().stream().filter(EventTrackerStatus::isErrorState).findFirst();
assertThat("no processor in error state when open stream succeeds again", !inErrorState.isPresent());
}

Expand Down Expand Up @@ -1559,7 +1560,7 @@ void testMergeInvertedSegmentOrder() throws InterruptedException {
* This test is a follow up from issue https://github.com/AxonFramework/AxonFramework/issues/1212
*/
@Test
public void testThrownErrorBubblesUp() {
void testThrownErrorBubblesUp() {
AtomicReference<Throwable> thrownException = new AtomicReference<>();

EventHandlerInvoker eventHandlerInvoker = mock(EventHandlerInvoker.class);
Expand Down Expand Up @@ -1906,6 +1907,23 @@ void testIsReplayingWhenNotCaughtUp() throws Exception {
);
}

@Test
void testProcessorOnlyTriesToClaimAvailableSegments() {
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 0);
tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1);
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 2);
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 3);
when(tokenStore.fetchAvailableSegments(testSubject.getName())).thenReturn(Collections.singletonList(Segment.computeSegment(2, 0, 1, 2, 3)));

testSubject.start();

eventBus.publish(createEvents(1));

assertWithin(1, TimeUnit.SECONDS, () -> assertEquals(1, testSubject.processingStatus().size()));
assertWithin(1, TimeUnit.SECONDS, () -> assertTrue(testSubject.processingStatus().containsKey(2)));
verify(tokenStore, never()).fetchToken(eq(testSubject.getName()), intThat(i -> Arrays.asList(0, 1, 3).contains(i)));
}

private void waitForStatus(String description,
long time,
TimeUnit unit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1050,20 +1050,17 @@ public void run() {
String processorName = TrackingEventProcessor.this.getName();
while (getState().isRunning()) {
workLauncherRunning.set(true);
int[] tokenStoreCurrentSegments;

try {
tokenStoreCurrentSegments = transactionManager.fetchInTransaction(
int[] tokenStoreCurrentSegments = transactionManager.fetchInTransaction(
() -> tokenStore.fetchSegments(processorName)
);

// When in an initial stage, split segments to the requested number.
if (tokenStoreCurrentSegments.length == 0 && segmentsSize > 0) {
tokenStoreCurrentSegments = transactionManager.fetchInTransaction(
smcvb marked this conversation as resolved.
Show resolved Hide resolved
transactionManager.executeInTransaction(
() -> {
TrackingToken initialToken = initialTrackingTokenBuilder.apply(messageSource);
tokenStore.initializeTokenSegments(processorName, segmentsSize, initialToken);
return tokenStore.fetchSegments(processorName);
}
);
}
Expand All @@ -1087,15 +1084,15 @@ public void run() {
// Submit segmentation workers matching the size of our thread pool (-1 for the current dispatcher).
// Keep track of the last processed segments...
TrackingSegmentWorker workingInCurrentThread = null;
for (int i = 0; i < tokenStoreCurrentSegments.length && availableThreads.get() > 0; i++) {
int segmentId = tokenStoreCurrentSegments[i];
List<Segment> segmentsToClaim = tokenStore.fetchAvailableSegments(processorName);
smcvb marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < segmentsToClaim.size() && availableThreads.get() > 0; i++) {
Segment segment = segmentsToClaim.get(i);
int segmentId = segment.getSegmentId();

if (!activeSegments.containsKey(segmentId) && canClaimSegment(segmentId)) {
try {
transactionManager.executeInTransaction(() -> {
TrackingToken token = tokenStore.fetchToken(processorName, segmentId);
int[] segmentIds = tokenStore.fetchSegments(processorName);
Segment segment = Segment.computeSegment(segmentId, segmentIds);
logger.info("Worker assigned to segment {} for processing", segment);
TrackerStatus newStatus = new TrackerStatus(segment, token);
TrackerStatus previousStatus = activeSegments.putIfAbsent(segmentId, newStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
Expand All @@ -50,6 +50,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static org.axonframework.common.io.IOUtils.closeQuietly;

Expand Down Expand Up @@ -695,16 +696,17 @@ private CompletableFuture<Void> abortWorkPackages(Exception cause) {
*/
private Map<Segment, TrackingToken> claimNewSegments() {
Map<Segment, TrackingToken> newClaims = new HashMap<>();
int[] segments = transactionManager.fetchInTransaction(() -> tokenStore.fetchSegments(name));
List<Segment> segments = transactionManager.fetchInTransaction(() -> tokenStore.fetchAvailableSegments(name));

// As segments are used for Segment#computeSegment, we cannot filter out the WorkPackages upfront.
int[] unClaimedSegments = Arrays.stream(segments)
.filter(segmentId -> !workPackages.containsKey(segmentId))
.toArray();
List<Segment> unClaimedSegments = segments.stream()
.filter(segment -> !workPackages.containsKey(segment.getSegmentId()))
.collect(Collectors.toList());

int maxSegmentsToClaim = maxClaimedSegments - workPackages.size();

for (int segmentId : unClaimedSegments) {
for (Segment segment : unClaimedSegments) {
int segmentId = segment.getSegmentId();
if (isSegmentBlockedFromClaim(segmentId)) {
logger.debug("Segment {} is still marked to not be claimed by Processor [{}].", segmentId, name);
processingStatusUpdater.accept(segmentId, u -> null);
Expand All @@ -715,7 +717,7 @@ private Map<Segment, TrackingToken> claimNewSegments() {
TrackingToken token = transactionManager.fetchInTransaction(
() -> tokenStore.fetchToken(name, segmentId)
);
newClaims.put(Segment.computeSegment(segmentId, segments), token);
newClaims.put(segment, token);
} catch (UnableToClaimTokenException e) {
processingStatusUpdater.accept(segmentId, u -> null);
logger.debug("Unable to claim the token for segment {}. It is owned by another process.",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright (c) 2010-2019. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -17,9 +17,13 @@
package org.axonframework.eventhandling.tokenstore;

import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackingToken;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Describes a component capable of storing and retrieving event tracking tokens. An {@link EventProcessor} that is
Expand Down Expand Up @@ -207,6 +211,25 @@ default boolean requiresExplicitSegmentInitialization() {
*/
int[] fetchSegments(String processorName);

/**
* Returns a List of known available {@code segments} for a given {@code processorName}. A segment is considered available if it is not claimed by any
* other event processor.
* <p>
* The segments returned are segments for which a token has been stored previously and have not been claimed by another processor. When the {@link
* TokenStore} is empty, an empty list is returned.
*
* By default, if this method is not implemented, we will return all segments instead, whether they are available or not.
*
* @param processorName the processor's name for which to fetch the segments
* @return a List of available segment identifiers for the specified {@code processorName}
*/
default List<Segment> fetchAvailableSegments(String processorName) {
int[] allSegments = fetchSegments(processorName);
return Arrays.stream(allSegments).boxed()
.map(segment -> Segment.computeSegment(segment, allSegments))
smcvb marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toList());
}

/**
* Returns a unique identifier that uniquely identifies the storage location of the tokens in this store. Two token
* store implementations that share state, must return the same identifier. Two token store implementations that
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright (c) 2010-2019. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.jdbc.ConnectionProvider;
import org.axonframework.common.jdbc.JdbcException;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.AbstractTokenEntry;
import org.axonframework.eventhandling.tokenstore.ConfigToken;
Expand All @@ -27,6 +28,7 @@
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToInitializeTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToRetrieveIdentifierException;
import org.axonframework.eventhandling.tokenstore.jpa.TokenEntry;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.SerializedType;
import org.axonframework.serialization.Serializer;
Expand All @@ -45,6 +47,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import static java.lang.String.format;
import static org.axonframework.common.BuilderUtils.assertNonNull;
Expand Down Expand Up @@ -313,6 +316,29 @@ public int[] fetchSegments(String processorName) {
}
}

@Override
public List<Segment> fetchAvailableSegments(String processorName) {
Connection connection = getConnection();
try {
List<AbstractTokenEntry<?>> tokenEntries = executeQuery(connection,
c -> selectTokenEntries(c, processorName),
listResults(this::readTokenEntry),
e -> new JdbcException(format(
"Could not load segments for processor [%s]", processorName
), e)
);
int[] allSegments = tokenEntries.stream()
.mapToInt(AbstractTokenEntry::getSegment)
.toArray();
return tokenEntries.stream()
.filter(tokenEntry -> tokenEntry.mayClaim(nodeId, claimTimeout))
smcvb marked this conversation as resolved.
Show resolved Hide resolved
.map(tokenEntry -> Segment.computeSegment(tokenEntry.getSegment(), allSegments))
.collect(Collectors.toList());
} finally {
closeQuietly(connection);
}
}

/**
* Returns a {@link PreparedStatement} to select all segments ids for a given processorName from the underlying
* storage.
Expand All @@ -325,7 +351,27 @@ public int[] fetchSegments(String processorName) {
protected PreparedStatement selectForSegments(Connection connection, String processorName) throws SQLException {
final String sql = "SELECT " + schema.segmentColumn() +
" FROM " + schema.tokenTable() +
" WHERE " + schema.processorNameColumn() + " = ? ORDER BY " + schema.segmentColumn() + " ASC";
" WHERE " + schema.processorNameColumn() + " = ?" +
" ORDER BY " + schema.segmentColumn() + " ASC";
PreparedStatement preparedStatement =
connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
preparedStatement.setString(1, processorName);
return preparedStatement;
}

/**
* Returns a {@link PreparedStatement} to select all {@link TokenEntry} for a given processorName from the underlying storage.
Vermorkentech marked this conversation as resolved.
Show resolved Hide resolved
*
* @param connection the connection to the underlying database
* @param processorName the name of the processor to fetch the segments for
* @return a {@link PreparedStatement} that will fetch TokenEntries when executed
* @throws SQLException when an exception occurs while creating the prepared statement
*/
protected PreparedStatement selectTokenEntries(Connection connection, String processorName) throws SQLException {
final String sql = "SELECT *" +
" FROM " + schema.tokenTable() +
" WHERE " + schema.processorNameColumn() + " = ?" +
" ORDER BY " + schema.segmentColumn() + " ASC";
PreparedStatement preparedStatement =
connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
preparedStatement.setString(1, processorName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2020. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.jpa.EntityManagerProvider;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.ConfigToken;
import org.axonframework.eventhandling.tokenstore.TokenStore;
Expand All @@ -30,8 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.persistence.EntityManager;
import javax.persistence.LockModeType;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.time.temporal.TemporalAmount;
Expand All @@ -40,6 +39,9 @@
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.persistence.EntityManager;
import javax.persistence.LockModeType;

import static java.lang.String.format;
import static org.axonframework.common.BuilderUtils.assertNonNull;
Expand Down Expand Up @@ -230,6 +232,24 @@ public int[] fetchSegments(String processorName) {
return resultList.stream().mapToInt(i -> i).toArray();
}

@Override
public List<Segment> fetchAvailableSegments(String processorName) {
EntityManager entityManager = entityManagerProvider.getEntityManager();

final List<TokenEntry> resultList = entityManager.createQuery(
"SELECT te FROM TokenEntry te "
+ "WHERE te.processorName = :processorName ORDER BY te.segment ASC",
TokenEntry.class
).setParameter("processorName", processorName).getResultList();
int[] allSegments = resultList.stream()
.mapToInt(TokenEntry::getSegment)
smcvb marked this conversation as resolved.
Show resolved Hide resolved
.toArray();
return resultList.stream()
.filter(tokenEntry -> tokenEntry.mayClaim(nodeId, claimTimeout))
.map(tokenEntry -> Segment.computeSegment(tokenEntry.getSegment(), allSegments))
.collect(Collectors.toList());
}

/**
* Loads an existing {@link TokenEntry} or creates a new one using the given {@code entityManager} for given {@code
* processorName} and {@code segment}.
Expand Down