Skip to content

Commit

Permalink
Merge pull request #2311 from AxonFramework/bug/2293
Browse files Browse the repository at this point in the history
Improve the concurrent behaviour of the tracking event processor.
  • Loading branch information
smcvb committed Jul 29, 2022
2 parents c7fb25a + 74a89ec commit c783ce6
Showing 1 changed file with 54 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -216,6 +217,7 @@ public void start() {
}
State previousState = state.getAndSet(State.STARTED);
if (!previousState.isRunning()) {
workLauncherRunning.set(true);
startSegmentWorkers();
}
}
Expand Down Expand Up @@ -496,6 +498,21 @@ private void ignoreEvent(BlockingStream<TrackedEventMessage<?>> eventStream,
reportIgnored(trackedEventMessage);
}

/**
* Will remove a thread and log at warn in case the named thread wasn't actually removed.
*
* @param name the expected name of the thread
*/
private void removeThread(String name) {
Thread removed = workerThreads.remove(name);
if (isNull(removed)) {
logger.warn(
"Expected to remove thread with name: '{}' from workerThreads, but it was not part of the map.",
name
);
}
}

/**
* Indicates whether any of the components handling events for this Processor are able to handle the given {@code
* eventMessage} for any of the given {@code segments}.
Expand Down Expand Up @@ -1075,37 +1092,45 @@ public String name() {
return TrackingSegmentWorker.class.getSimpleName() + segment.getSegmentId();
}

@Override
public void cleanUp() {
private void freeSegment() {
TrackerStatus removedStatus = activeSegments.remove(segment.getSegmentId());
if (removedStatus != null) {
trackerStatusChangeListener.onEventTrackerStatusChange(
singletonMap(segment.getSegmentId(), new RemovedTrackerStatus(removedStatus))
);
}
workerThreads.remove(name());
}

@Override
public void cleanUp() {
freeSegment();
removeThread(name());
logger.info("Worker for segment {} stopped.", segment);

final int currentAvailableThreads = availableThreads.getAndIncrement();

if (!workLauncherRunning.get() && currentAvailableThreads == 0 && getState().isRunning()) {
logger.info("No Worker Launcher active. Using current thread to assign segments.");
new WorkerLauncher().run();
boolean launchedSinceGetCalled = workLauncherRunning.getAndSet(true);
if (!launchedSinceGetCalled) {
logger.info("No Worker Launcher active. Using current thread to assign segments.");
Worker workerLauncher = new WorkerLauncher();
workerThreads.put(workerLauncher.name(), Thread.currentThread());
workerLauncher.run();
}
}
}
}

private class WorkerLauncher implements Worker {

private boolean asSegmentWorker = false;
private TrackingSegmentWorker workingInCurrentThread = null;

@Override
public void run() {
try {
int waitTime = 1;
String processorName = TrackingEventProcessor.this.getName();
while (getState().isRunning()) {
workLauncherRunning.set(true);
int[] tokenStoreCurrentSegments;

try {
Expand Down Expand Up @@ -1142,7 +1167,6 @@ public void run() {

// Submit segmentation workers matching the size of our thread pool (-1 for the current dispatcher).
// Keep track of the last processed segments...
TrackingSegmentWorker workingInCurrentThread = null;
for (int i = 0; i < tokenStoreCurrentSegments.length && availableThreads.get() > 0; i++) {
int segmentId = tokenStoreCurrentSegments[i];

Expand Down Expand Up @@ -1206,24 +1230,12 @@ segmentId, getName(), e
spawnWorkerThread(trackingSegmentWorker).start();
} else {
workingInCurrentThread = trackingSegmentWorker;
break;
return;
}
}
}

// We're not able to spawn new threads, so this thread should also start processing.
if (nonNull(workingInCurrentThread)) {
logger.info("Using current Thread for last segment worker: {}", workingInCurrentThread);
workerThreads.remove(name());
workerThreads.put(workingInCurrentThread.name(), Thread.currentThread());
asSegmentWorker = true;
workLauncherRunning.set(false);
workingInCurrentThread.run();
return;
}
doSleepFor(tokenClaimInterval);
}
workLauncherRunning.set(false);
} finally {
cleanUp();
}
Expand All @@ -1234,10 +1246,29 @@ public String name() {
return WorkerLauncher.class.getSimpleName();
}

/**
* Cleans up once the worker is done. To make sure a shutdown run at almost the same time the cleanup is
* triggered this has some complexity. We need to make sure when it's still running, and becoming a worker, the
* thread is added before switching the {@code workLauncherRunning} as a shutdown might be called in between,
* and might leave the thread running.
*/
@Override
public void cleanUp() {
if (! asSegmentWorker){
workerThreads.remove(name());
removeThread(name());
if (nonNull(workingInCurrentThread)) {
if (getState().isRunning()) {
logger.info("Using current Thread for last segment worker: {}", workingInCurrentThread);
workerThreads.put(workingInCurrentThread.name(), Thread.currentThread());
workLauncherRunning.set(false);
workingInCurrentThread.run();
} else {
logger.info("freeing segment since segment worker will not be started for worker: {}",
workingInCurrentThread);
workingInCurrentThread.freeSegment();
workLauncherRunning.set(false);
}
} else {
workLauncherRunning.set(false);
}
}
}
Expand Down

0 comments on commit c783ce6

Please sign in to comment.