Skip to content

Commit

Permalink
Merge pull request #1997 from AxonFramework/feature/1967
Browse files Browse the repository at this point in the history
[#1967] Fetch available segements only from the TokenStore
  • Loading branch information
Vermorkentech committed Jan 3, 2022
2 parents 3ecd81d + 3e0c340 commit 80e2033
Show file tree
Hide file tree
Showing 12 changed files with 652 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.axonframework.eventhandling.MultiEventHandlerInvoker;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventProcessor;
Expand All @@ -54,6 +55,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -280,7 +282,7 @@ void testPublishedEventsGetPassedToHandler() throws Exception {
}

@Test
void testBlacklist() throws Exception {
void testBlacklist() {
when(mockHandler.canHandle(any())).thenReturn(false);
when(mockHandler.canHandleType(String.class)).thenReturn(false);
Set<Class<?>> skipped = new HashSet<>();
Expand Down Expand Up @@ -729,8 +731,7 @@ void testProcessorSetsAndUnsetsErrorState() throws Exception {
fail.set(false);
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS), "Expected 5 invocations on Event Handler by now");
assertEquals(5, acknowledgedEvents.size());
Optional<EventTrackerStatus> inErrorState = testSubject.processingStatus().values().stream().filter(s -> s
.isErrorState()).findFirst();
Optional<EventTrackerStatus> inErrorState = testSubject.processingStatus().values().stream().filter(EventTrackerStatus::isErrorState).findFirst();
assertThat("no processor in error state when open stream succeeds again", !inErrorState.isPresent());
}

Expand Down Expand Up @@ -1559,7 +1560,7 @@ void testMergeInvertedSegmentOrder() throws InterruptedException {
* This test is a follow up from issue https://github.com/AxonFramework/AxonFramework/issues/1212
*/
@Test
public void testThrownErrorBubblesUp() {
void testThrownErrorBubblesUp() {
AtomicReference<Throwable> thrownException = new AtomicReference<>();

EventHandlerInvoker eventHandlerInvoker = mock(EventHandlerInvoker.class);
Expand Down Expand Up @@ -1906,6 +1907,23 @@ void testIsReplayingWhenNotCaughtUp() throws Exception {
);
}

@Test
void testProcessorOnlyTriesToClaimAvailableSegments() {
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 0);
tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1);
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 2);
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 3);
when(tokenStore.fetchAvailableSegments(testSubject.getName())).thenReturn(Collections.singletonList(Segment.computeSegment(2, 0, 1, 2, 3)));

testSubject.start();

eventBus.publish(createEvents(1));

assertWithin(1, TimeUnit.SECONDS, () -> assertEquals(1, testSubject.processingStatus().size()));
assertWithin(1, TimeUnit.SECONDS, () -> assertTrue(testSubject.processingStatus().containsKey(2)));
verify(tokenStore, never()).fetchToken(eq(testSubject.getName()), intThat(i -> Arrays.asList(0, 1, 3).contains(i)));
}

@Test
void testShutdownTerminatesWorkersAfterConfiguredWorkerTerminationTimeout() throws Exception {
int numberOfEvents = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1149,24 +1149,23 @@ public void run() {
int waitTime = 1;
String processorName = TrackingEventProcessor.this.getName();
while (getState().isRunning()) {
List<Segment> segmentsToClaim;
workLauncherRunning.set(true);
int[] tokenStoreCurrentSegments;

try {
tokenStoreCurrentSegments = transactionManager.fetchInTransaction(
int[] tokenStoreCurrentSegments = transactionManager.fetchInTransaction(
() -> tokenStore.fetchSegments(processorName)
);

// When in an initial stage, split segments to the requested number.
if (tokenStoreCurrentSegments.length == 0 && segmentsSize > 0) {
tokenStoreCurrentSegments = transactionManager.fetchInTransaction(
transactionManager.executeInTransaction(
() -> {
TrackingToken initialToken = initialTrackingTokenBuilder.apply(messageSource);
tokenStore.initializeTokenSegments(processorName, segmentsSize, initialToken);
return tokenStore.fetchSegments(processorName);
}
);
}
segmentsToClaim = transactionManager.fetchInTransaction(() -> tokenStore.fetchAvailableSegments(processorName));
waitTime = 1;
} catch (Exception e) {
if (waitTime == 1) {
Expand All @@ -1187,15 +1186,14 @@ public void run() {
// Submit segmentation workers matching the size of our thread pool (-1 for the current dispatcher).
// Keep track of the last processed segments...
TrackingSegmentWorker workingInCurrentThread = null;
for (int i = 0; i < tokenStoreCurrentSegments.length && availableThreads.get() > 0; i++) {
int segmentId = tokenStoreCurrentSegments[i];
for (int i = 0; i < segmentsToClaim.size() && availableThreads.get() > 0; i++) {
Segment segment = segmentsToClaim.get(i);
int segmentId = segment.getSegmentId();

if (!activeSegments.containsKey(segmentId) && canClaimSegment(segmentId)) {
try {
transactionManager.executeInTransaction(() -> {
TrackingToken token = tokenStore.fetchToken(processorName, segmentId);
int[] segmentIds = tokenStore.fetchSegments(processorName);
Segment segment = Segment.computeSegment(segmentId, segmentIds);
TrackingToken token = tokenStore.fetchToken(processorName, segment);
logger.info("Worker assigned to segment {} for processing", segment);
TrackerStatus newStatus = new TrackerStatus(segment, token);
TrackerStatus previousStatus = activeSegments.putIfAbsent(segmentId, newStatus);
Expand All @@ -1208,7 +1206,7 @@ public void run() {
});
} catch (UnableToClaimTokenException ucte) {
// When not able to claim a token for a given segment, we skip the
logger.debug("Unable to claim the token for segment: {}. It is owned by another process",
logger.debug("Unable to claim the token for segment: {}. It is owned by another process or has been split/merged concurrently",
segmentId);

TrackerStatus removedStatus = activeSegments.remove(segmentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
Expand All @@ -50,6 +50,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import static org.axonframework.common.io.IOUtils.closeQuietly;

Expand Down Expand Up @@ -695,16 +696,17 @@ private CompletableFuture<Void> abortWorkPackages(Exception cause) {
*/
private Map<Segment, TrackingToken> claimNewSegments() {
Map<Segment, TrackingToken> newClaims = new HashMap<>();
int[] segments = transactionManager.fetchInTransaction(() -> tokenStore.fetchSegments(name));
List<Segment> segments = transactionManager.fetchInTransaction(() -> tokenStore.fetchAvailableSegments(name));

// As segments are used for Segment#computeSegment, we cannot filter out the WorkPackages upfront.
int[] unClaimedSegments = Arrays.stream(segments)
.filter(segmentId -> !workPackages.containsKey(segmentId))
.toArray();
List<Segment> unClaimedSegments = segments.stream()
.filter(segment -> !workPackages.containsKey(segment.getSegmentId()))
.collect(Collectors.toList());

int maxSegmentsToClaim = maxClaimedSegments - workPackages.size();

for (int segmentId : unClaimedSegments) {
for (Segment segment : unClaimedSegments) {
int segmentId = segment.getSegmentId();
if (isSegmentBlockedFromClaim(segmentId)) {
logger.debug("Segment {} is still marked to not be claimed by Processor [{}].", segmentId, name);
processingStatusUpdater.accept(segmentId, u -> null);
Expand All @@ -713,12 +715,12 @@ private Map<Segment, TrackingToken> claimNewSegments() {
if (newClaims.size() < maxSegmentsToClaim) {
try {
TrackingToken token = transactionManager.fetchInTransaction(
() -> tokenStore.fetchToken(name, segmentId)
() -> tokenStore.fetchToken(name, segment)
);
newClaims.put(Segment.computeSegment(segmentId, segments), token);
newClaims.put(segment, token);
} catch (UnableToClaimTokenException e) {
processingStatusUpdater.accept(segmentId, u -> null);
logger.debug("Unable to claim the token for segment {}. It is owned by another process.",
logger.debug("Unable to claim the token for segment {}. It is owned by another process or has been split/merged concurrently.",
segmentId);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright (c) 2010-2019. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -17,9 +17,13 @@
package org.axonframework.eventhandling.tokenstore;

import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackingToken;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* Describes a component capable of storing and retrieving event tracking tokens. An {@link EventProcessor} that is
Expand Down Expand Up @@ -116,6 +120,28 @@ default void initializeTokenSegments(String processorName, int segmentCount, Tra
*/
TrackingToken fetchToken(String processorName, int segment) throws UnableToClaimTokenException;

/**
* Returns the last stored {@link TrackingToken token} for the given {@code processorName} and {@code segment}. Returns {@code null} if the stored token for
* the given process and segment is {@code null}.
* <p>
* This method should throw an {@code UnableToClaimTokenException} when the given {@code segment} has not been initialized with a Token (albeit {@code
* null}) yet. In that case, a segment must have been explicitly initialized. A TokenStore implementation's ability to do so is exposed by the {@link
* #requiresExplicitSegmentInitialization()} method. If that method returns false, this method may implicitly initialize a token and return that token upon
* invocation.
* <p>
* The token will be claimed by the current process (JVM instance), preventing access by other instances. To release the claim, use {@link
* #releaseClaim(String, int)}
*
* @param processorName The process name for which to fetch the token
* @param segment The segment for which to fetch the token
* @return The last stored TrackingToken or {@code null} if the store holds no token for given process and segment
* @throws UnableToClaimTokenException if there is a token for given {@code processorName} and {@code segment}, but they are claimed by another process, or
* if the {@code segment has been split or merged concurrently}
*/
default TrackingToken fetchToken(String processorName, Segment segment) throws UnableToClaimTokenException {
return fetchToken(processorName, segment.getSegmentId());
}

/**
* Extends the claim on the current token held by the this node for the given {@code processorName} and
* {@code segment}.
Expand Down Expand Up @@ -207,6 +233,25 @@ default boolean requiresExplicitSegmentInitialization() {
*/
int[] fetchSegments(String processorName);

/**
* Returns a List of known available {@code segments} for a given {@code processorName}. A segment is considered available if it is not claimed by any
* other event processor.
* <p>
* The segments returned are segments for which a token has been stored previously and have not been claimed by another processor. When the {@link
* TokenStore} is empty, an empty list is returned.
*
* By default, if this method is not implemented, we will return all segments instead, whether they are available or not.
*
* @param processorName the processor's name for which to fetch the segments
* @return a List of available segment identifiers for the specified {@code processorName}
*/
default List<Segment> fetchAvailableSegments(String processorName) {
int[] allSegments = fetchSegments(processorName);
return Arrays.stream(allSegments).boxed()
.map(segment -> Segment.computeSegment(segment, allSegments))
.collect(Collectors.toList());
}

/**
* Returns a unique identifier that uniquely identifies the storage location of the tokens in this store. Two token
* store implementations that share state, must return the same identifier. Two token store implementations that
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/*
* Copyright (c) 2010-2019. Axon Framework
* Copyright (c) 2010-2021. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down

0 comments on commit 80e2033

Please sign in to comment.