Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2290] TrackingEventProcessor does not wait for his worker threads to shut down #2292

Merged
merged 5 commits into from
Jul 20, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,9 @@ private void releaseToken(Segment segment) {
try {
transactionManager.executeInTransaction(() -> tokenStore.releaseClaim(getName(), segment.getSegmentId()));
logger.info("Released claim");
} catch (Exception e) {
logger.info("Release claim failed", e);
} catch (Exception e) {
logger.info("Release claim failed");
nils-christian marked this conversation as resolved.
Show resolved Hide resolved
logger.debug("Release claim failed", e);
// Ignore exception
}
}
Expand Down Expand Up @@ -696,6 +697,12 @@ private CompletableFuture<Void> awaitTermination() {
if (workerThread.isAlive()) {
workerThread.interrupt();
workerThread.join(workerTerminationTimeout);
if (workerThread.isAlive( )) {
smcvb marked this conversation as resolved.
Show resolved Hide resolved
logger.warn(
"Forced shutdown of TrackingProcessor Worker '{}' was not successful. Consider increasing workerTerminationTimeout.",
nils-christian marked this conversation as resolved.
Show resolved Hide resolved
worker.getKey()
);
}
}
} catch (InterruptedException e) {
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,32 @@ public TrackingEventProcessorConfiguration andEventTrackerStatusChangeListener(
}

/**
* Sets the shutdown timeout to terminate active workers. Defaults to 5000ms.
* Sets the shutdown timeout to terminate active workers. This is used for both the graceful termination and the
* potential forced termination of active workers. It is thus possible that it is used twice during the shutdown
* phase. Defaults to 5000ms.
*
* @param workerTerminationTimeout the timeout for workers to terminate on a shutdown
* @param workerTerminationTimeout the timeout for workers to terminate on a shutdown in milliseconds
* @return {@code this} for method chaining
*
* @deprecated Use {@link #andWorkerTerminationTimeout(long, TimeUnit)} instead.
*/
public TrackingEventProcessorConfiguration andWorkerTerminationTimeout(long workerTerminationTimeout) {
@Deprecated
public TrackingEventProcessorConfiguration andWorkerTerminationTimeout(long workerTerminationTimeoutInMilliseconds) {
return andWorkerTerminationTimeout(workerTerminationTimeoutInMilliseconds, TimeUnit.MILLISECONDS);
}

/**
* Sets the shutdown timeout to terminate active workers. This is used for both the graceful termination and the
* potential forced termination of active workers. It is thus possible that it is used twice during the shutdown
* phase. Defaults to 5000ms.
*
* @param workerTerminationTimeout the timeout for workers to terminate on a shutdown.
* @param timeUnit The unit of time
* @return {@code this} for method chaining
*/
public TrackingEventProcessorConfiguration andWorkerTerminationTimeout(long workerTerminationTimeout, TimeUnit timeUnit) {
assertStrictPositive(workerTerminationTimeout, "The worker termination timeout should be strictly positive");
this.workerTerminationTimeout = workerTerminationTimeout;
this.workerTerminationTimeout = timeUnit.toMillis(workerTerminationTimeout);
return this;
}

Expand Down