Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the concurrent behaviour of the tracking event processor. #2311

Merged
merged 1 commit into from
Jul 29, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@

import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -216,6 +217,7 @@ public void start() {
}
State previousState = state.getAndSet(State.STARTED);
if (!previousState.isRunning()) {
workLauncherRunning.set(true);
startSegmentWorkers();
}
}
Expand Down Expand Up @@ -496,6 +498,21 @@ private void ignoreEvent(BlockingStream<TrackedEventMessage<?>> eventStream,
reportIgnored(trackedEventMessage);
}

/**
* Will remove a thread and log at warn in case the named thread wasn't actually removed.
*
* @param name the expected name of the thread
*/
private void removeThread(String name) {
Thread removed = workerThreads.remove(name);
if (isNull(removed)) {
logger.warn(
"Expected to remove thread with name: '{}' from workerThreads, but it was not part of the map.",
name
);
}
}

/**
* Indicates whether any of the components handling events for this Processor are able to handle the given {@code
* eventMessage} for any of the given {@code segments}.
Expand Down Expand Up @@ -1075,37 +1092,45 @@ public String name() {
return TrackingSegmentWorker.class.getSimpleName() + segment.getSegmentId();
}

@Override
public void cleanUp() {
private void freeSegment() {
TrackerStatus removedStatus = activeSegments.remove(segment.getSegmentId());
if (removedStatus != null) {
trackerStatusChangeListener.onEventTrackerStatusChange(
singletonMap(segment.getSegmentId(), new RemovedTrackerStatus(removedStatus))
);
}
workerThreads.remove(name());
}

@Override
public void cleanUp() {
freeSegment();
removeThread(name());
logger.info("Worker for segment {} stopped.", segment);

final int currentAvailableThreads = availableThreads.getAndIncrement();

if (!workLauncherRunning.get() && currentAvailableThreads == 0 && getState().isRunning()) {
logger.info("No Worker Launcher active. Using current thread to assign segments.");
new WorkerLauncher().run();
boolean launchedSinceGetCalled = workLauncherRunning.getAndSet(true);
if (!launchedSinceGetCalled) {
logger.info("No Worker Launcher active. Using current thread to assign segments.");
Worker workerLauncher = new WorkerLauncher();
workerThreads.put(workerLauncher.name(), Thread.currentThread());
workerLauncher.run();
}
}
}
}

private class WorkerLauncher implements Worker {

private boolean asSegmentWorker = false;
private TrackingSegmentWorker workingInCurrentThread = null;

@Override
public void run() {
try {
int waitTime = 1;
String processorName = TrackingEventProcessor.this.getName();
while (getState().isRunning()) {
workLauncherRunning.set(true);
int[] tokenStoreCurrentSegments;

try {
Expand Down Expand Up @@ -1142,7 +1167,6 @@ public void run() {

// Submit segmentation workers matching the size of our thread pool (-1 for the current dispatcher).
// Keep track of the last processed segments...
TrackingSegmentWorker workingInCurrentThread = null;
for (int i = 0; i < tokenStoreCurrentSegments.length && availableThreads.get() > 0; i++) {
int segmentId = tokenStoreCurrentSegments[i];

Expand Down Expand Up @@ -1206,24 +1230,12 @@ segmentId, getName(), e
spawnWorkerThread(trackingSegmentWorker).start();
} else {
workingInCurrentThread = trackingSegmentWorker;
break;
return;
}
}
}

// We're not able to spawn new threads, so this thread should also start processing.
if (nonNull(workingInCurrentThread)) {
logger.info("Using current Thread for last segment worker: {}", workingInCurrentThread);
workerThreads.remove(name());
workerThreads.put(workingInCurrentThread.name(), Thread.currentThread());
asSegmentWorker = true;
workLauncherRunning.set(false);
workingInCurrentThread.run();
return;
}
doSleepFor(tokenClaimInterval);
}
workLauncherRunning.set(false);
} finally {
cleanUp();
}
Expand All @@ -1234,10 +1246,29 @@ public String name() {
return WorkerLauncher.class.getSimpleName();
}

/**
* Cleans up once the worker is done. To make sure a shutdown run at almost the same time the cleanup is
* triggered this has some complexity. We need to make sure when it's still running, and becoming a worker, the
* thread is added before switching the {@code workLauncherRunning} as a shutdown might be called in between,
* and might leave the thread running.
*/
@Override
public void cleanUp() {
if (! asSegmentWorker){
workerThreads.remove(name());
removeThread(name());
if (nonNull(workingInCurrentThread)) {
if (getState().isRunning()) {
logger.info("Using current Thread for last segment worker: {}", workingInCurrentThread);
workerThreads.put(workingInCurrentThread.name(), Thread.currentThread());
workLauncherRunning.set(false);
workingInCurrentThread.run();
} else {
logger.info("freeing segment since segment worker will not be started for worker: {}",
workingInCurrentThread);
workingInCurrentThread.freeSegment();
workLauncherRunning.set(false);
}
} else {
workLauncherRunning.set(false);
}
}
}
Expand Down