Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add disposeGracefully method to Scheduler #3089

Merged
merged 35 commits into from Aug 16, 2022
Merged

Conversation

chemicL
Copy link
Member

@chemicL chemicL commented Jun 23, 2022

Currently, all Schedulers forcefully shutdown the underlying ExecutorServices by calling shutdownNow() method. In some scenarios, that is undesired as it does not allow proper cleanup.
Schedulers should allow shutting down by not accepting new work, but giving the currently executing tasks a chance to finish without interruption. A Mono<Void> disposeGracefully() method has been added for that purpose. Upon subscription, it calls shutdown() instead of shutdownNow() and creates a background task that does awaitTermination() to complete the returned Mono. It can be combined with timeout() and retry() operators in realistic scenarios.

Some `Disposable`s should be disposed with a chance to clean up the
underlying resources. At the same time it is desired to coordinate logic
that depends on successful disposal.
Specifically, instances of `Scheduler` should allow shutting down by not
accepting new work, but giving the currently executing tasks to finish
without interruption.
Therefore, a `Disposable.Graceful` interface has been added, that
provides the means to do a timely cleanup and observing the result via a
`Mono<Void> disposeGracefully(Duration)` method.
Copy link
Member

@simonbasle simonbasle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is shaping up good, but the review triggers a few thoughts on my part and surface additional corner cases 🤔

@@ -165,4 +167,11 @@ default boolean addAll(Collection<? extends Disposable> ds) {
*/
int size();
}

// TODO(dj): add javadoc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the semantics of disposeGracefully may vary widely, so I would make the documented contract say that explicitly (eg. "each class implementing this trait should define how subsequent calls behave during the grace period and after it")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one limitation I'm thinking of with this API is that most of the time the underlying resource(s) being disposed gracefully will be atomically swapped out. Which means that even if one re-subscribes to the Mono, including with onErrorResume() or retry(), the underlying resources won't be reachable anymore.

thus, there will be no way of composing operators to fall back to a "hard" shutdown once the graceful shutdown is initiated.

I'm thinking this is fine if documented. The recommendation for implementors should probably be to trigger a hard dispose at the end of the gracePeriod THEN propagate a TimeoutException, noting that it only serves as a warning / logging but cannot be recovered.

Copy link
Member Author

@chemicL chemicL Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed the possible contracts here with @OlegDokuka. The approach with forceful shutdown before propagating the TimeoutException and non-recoverable errors might be confusing to users if they don't read the specific Scheduler documentation, but it has some advantages implementation wise.
Another approach could be to propagate a retry-able error and allow re-initiating the shutdown() + awaitTermination(...) procedure, while also allowing for an explicit final call to explicit shutdownNow() when desired. I'll go back to the original issue and ask for opinion from the user's perspective to guide the design.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal opinion is that once we fix specific behavior (e.g. call shutdownNow() if a timeout or InteruptedException) then we probably will end up with everyone doing scheduler.disposeGracefully(Duration.ofHours(9999999)).subscribe() or then complaining that they did not wont to have shutdownNow called but rather retry later

Another thought on TimoutException - any exception is useless if we can not do anything useful after that. I'm not sure that logging such an event makes any sense. This exception is just a fact that we forced shutdown process so a user just has to take it. Also, taking into account the impl details - all other active subscribers are going to get the same notification but the other late subscriber will not get it, then it is going to be too confusing so even having it documented will not resolve this confusion.

My personal recommendation is to prefer flexibility over fixed behavior. One can always write something like the following to mimic what we can hardcode

scheduler.disposeGracefully(Duration.ofMillis(100))
    .retryWhen(Retry.backoff(5, Duration.ofMillis(50)))
    .onErrorResume(e -> Mono.fromRunnable(scheduler::dispose));

@simonbasle
Copy link
Member

@chemicL what do you think of this piece of code to shutdown multiple executors at once, try to await as close to the grace period as possible while still only needing one thread, and finally shutdownNow in case the whole thing takes more than gracePeriod?

	//this supposes that we somehow can get all the executors and swap them with an empty array
	//with more advanced schedulers like BoundedElasticScheduler, we might not get an ExecutorService array
	//but an array of another resource (like BoundedState[]), hence the Function
	static <RES> void shutdownAndAwait(final RES[] resources, Function<RES, ExecutorService> serviceExtractor, Duration gracePeriod, Sinks.Empty<Void> disposeNotifier) {
		for (int i = 0; i < resources.length; i++) {
			ExecutorService service = serviceExtractor.apply(resources[i]);
			service.shutdown();
		}

		//TODO: use a configurable separate pool?
		final ExecutorService service = Executors.newSingleThreadExecutor();
		service.submit(() -> {
			long nanoStart = System.nanoTime();
			long nanoGraceRemaining = gracePeriod.toNanos();

			boolean allAwaited = true;
			//wait for one executor at a time
			int index = 0;
			while (index < resources.length) {
				ExecutorService toAwait = serviceExtractor.apply(resources[index]);;
				//short case: the current executor has already terminated
				if (toAwait.isTerminated()) {
					index++;
					continue;
				}

				//we're inspecting the next executor and giving it nanoGraceRemaining ns to terminate gracefully
				try {
					if (!toAwait.awaitTermination(nanoGraceRemaining, TimeUnit.NANOSECONDS)) {
						//if it didn't terminate gracefully, the whole graceful operation can be considered a failure
						allAwaited = false;
						break;
					}
					else {
						//update the nanoGraceRemaining so that the global operation is within gracePeriod bounds
						long oldStart = nanoStart;
						nanoStart = System.nanoTime();
						nanoGraceRemaining = Math.max(0, nanoStart - oldStart);
						index++;
					}
				}
				catch (InterruptedException e) {
					allAwaited = false;
					break;
				}
			}
			if (allAwaited) {
				disposeNotifier.tryEmitEmpty();
			}
			else {
				for (int i = 0; i < resources.length; i++) {
					ExecutorService executorService = serviceExtractor.apply(resources[i]);
					executorService.shutdownNow();
				}
				disposeNotifier.tryEmitError(new TimeoutException("Scheduler didn't shutdown gracefully in time (" + gracePeriod + "), used shutdownNow"));
			}
		});
	}

@chemicL
Copy link
Member Author

chemicL commented Jun 27, 2022

@chemicL what do you think of this piece of code to shutdown multiple executors at once, try to await as close to the grace period as possible while still only needing one thread, and finally shutdownNow in case the whole thing takes more than gracePeriod?
(...)

Yep, that's a great optimization, thanks for the suggestion.

@chemicL chemicL force-pushed the 3068-schedulerGracefulClose branch from 22e8b1d to 9026186 Compare July 13, 2022 10:09
@@ -734,34 +734,6 @@ public void immediateTaskIsExecuted() throws Exception {
assertThat((end - start) >= 1000).as("Timeout too long").isTrue();
}

@Test
public void immediateTaskIsSkippedIfDisposeRightAfter() throws Exception {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer the case. It still could race, but now there's a few more instructions in dispose() so the task gets aborted. IMO it's not a feature worth testing - the test was introduced in 6f3383d but one should not rely on a race to dispose tasks.

Copy link
Contributor

@OlegDokuka OlegDokuka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice progress overall. Left my comments. Also, there are a set of general polishing points:

  1. Let's use plain volatile access (e.g. this.state instead of STATE.get(this)) plain access is the fastest approach and we use it consistently through the codebase. AtomicXXXFieldUpdate#get is an option to get value when plain access is impossible (e.g. shared utils function i.e Operators#requested)
  2. Let's make sure imports are not collapsed
  3. ShedulerState#terminated is not making useful of the old state except bounded elastic, thus lets avoid terminated(old) and use static final TERMINATED_STATE = new SchedulerState(dead_executor_service, Mono.empty()) where possible ;

@chemicL chemicL marked this pull request as ready for review July 28, 2022 16:15
@chemicL
Copy link
Member Author

chemicL commented Aug 2, 2022

The latest changes include improvements for avoiding looping in the dispose/disposeGracefully/start methods as suggested by @OlegDokuka. This was only possible by revisiting BoundedElasticScheduler's inner workings and avoiding atomic replacing of underlying BoundedStates by a tombstone. Instead, an encapsulating class was introduced for state management of BoundedServices, which in turn simplified the state transitions in BoundedElasticScheduler itself with regards to the generic SchedulerState. As convoluted as it sounds, the latest set of changes also incorporates a fix for a leakage of BoundedState which would not be shutdown if dispose happened while an ExecutorService was being picked.

@chemicL chemicL changed the title [WIP] Add Disposable.Graceful interface Add Disposable.Graceful interface and make Scheduler extend it Aug 3, 2022
@chemicL chemicL changed the title Add Disposable.Graceful interface and make Scheduler extend it Add disposeGracefully method to Scheduler Aug 10, 2022
Copy link
Contributor

@OlegDokuka OlegDokuka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good overall with few minor comments

@simonbasle
Copy link
Member

simonbasle commented Aug 11, 2022

great job @chemicL ! finally approved and ready to merge 😄

now the only remaining step is to try to summarize the design of the change for the commit message 📖
feel free to ping me if you want me to also review that, or if you need help with merging / forward-merging.

@chemicL chemicL merged commit 4768c43 into 3.4.x Aug 16, 2022
@reactorbot
Copy link

@chemicL this PR seems to have been merged on a maintenance branch, please ensure the change is merge-forwarded to intermediate maintenance branches and up to main 🙇

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Use shutdown() instead of shutdownNow() when BoundedElasticScheduler call dispose.
4 participants