Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use shutdown() instead of shutdownNow() when BoundedElasticScheduler call dispose. #3068

Closed
ZejiaJiang opened this issue Jun 8, 2022 · 17 comments · Fixed by #3089
Closed
Assignees
Labels
status/need-decision This needs a decision from the team type/enhancement A general enhancement
Milestone

Comments

@ZejiaJiang
Copy link

ZejiaJiang commented Jun 8, 2022

I post hundreds of http request task with block() to scheduler. Sometimes I need to close my application and dispose the scheduler, then it'll throw interruptedException when each task get http response and notice the scheduler is disposed.

I'm wondering why not use shutdown() instead of shutdownNow() in dispose()?

@Override
public void dispose() {
BoundedServices services = BOUNDED_SERVICES.get(this);
if (services != SHUTDOWN && BOUNDED_SERVICES.compareAndSet(this, services, SHUTDOWN)) {
ScheduledExecutorService e = EVICTOR.getAndSet(this, null);
if (e != null) {
e.shutdownNow();
}
services.dispose();
}
}

Motivation

Currently when BoundedElasticScheduler call dispose(), it will halt all waiting tasks. Let waiting task finish not halt them toughly.

Desired solution

Use shutdown() instead of shutdownNow() in dispose().

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Jun 8, 2022
@simonbasle simonbasle added type/enhancement A general enhancement status/declined We feel we shouldn't currently apply this change/suggestion status/need-decision This needs a decision from the team and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet status/declined We feel we shouldn't currently apply this change/suggestion labels Jun 8, 2022
@simonbasle
Copy link
Member

I'm not sure there is a strong rationale behind it, but the core implementations have historically used shutdownNow. do you have a particular example where that hard shutdown is actually causing an issue, or is it more of a rhetorical question @ZejiaJiang ?

Pretty much the only change we make is to add a disposeGracefully() method to Scheduler interface, with a default implementation of calling dispose() (and as a result a caveat that its "graceful" aspect is not guaranteed, which would be mentioned in the javadoc).

@simonbasle simonbasle added this to the 3.5.0 planning milestone Jun 8, 2022
@simonbasle
Copy link
Member

simonbasle commented Jun 8, 2022

one alternative to the above would be to throw UnsupportedOperationException in the disposeGracefully() default implementation. this begs the question of which is more important. is it:

  1. if the graceful part is the most important (aka preserve ongoing tasks), then better default to doing nothing, failing fast and throwing if no specific implementation is provided
  2. if the dispose part is the most important (aka dispose resources likes threads and avoid leaks) then better defaulting to dispose() if no specific implementation is provided

@ZejiaJiang
Copy link
Author

Hi @simonbasle , thank you for your reply.

I'm trying to provide a simple code snippet recently. In my case, I post hundreds of http request task with block() to scheduler. Sometimes I need to close my application, I dispose the scheduler, then it'll throw interruptedException when each task get http response and notice the scheduler is disposed.

I think graceful is more important, in my case there are too many tasks, I can't easily handle all of them. I understand your concern about leaks, if users choose disposeGracefully(), users should gurantee the task could complete or users should use dispose() directly. In the worst case, users can call dispose() after call disposeGracefully() for a while after checking the scheduler is staill not disposed. disposeGracefully() provide a new choice.

@simonbasle
Copy link
Member

@ZejiaJiang I agree that the two separate methods would provide choice.

I'm just wondering what's best to happen if you do call disposeGracefully with an old Scheduler that didn't specifically implement that method.

  1. the Scheduler is NOT shut down, and instead you get an UnsupportedOperationException (which might not be that obvious if shutting done is done inside the reactive chain)
  2. the Scheduler does get shut down, albeit forcefully

Granted, this is a corner case because none of the core schedulers would rely on the default method body.

@ZejiaJiang
Copy link
Author

@ZejiaJiang I agree that the two separate methods would provide choice.

I'm just wondering what's best to happen if you do call disposeGracefully with an old Scheduler that didn't specifically implement that method.

  1. the Scheduler is NOT shut down, and instead you get an UnsupportedOperationException (which might not be that obvious if shutting done is done inside the reactive chain)
  2. the Scheduler does get shut down, albeit forcefully

Granted, this is a corner case because none of the core schedulers would rely on the default method body.

@simonbasle In this case, I prefer 2.

@simonbasle simonbasle linked a pull request Jun 23, 2022 that will close this issue
@chemicL
Copy link
Member

chemicL commented Jun 28, 2022

@ZejiaJiang we are discussing the API that is in draft state in #3089. We are considering two possible behaviours. First the API:

Mono<Void> disposeGracefully(Duration gracefulPeriod);

which can either complete successfully or propagate a timeout exception. The exception is thrown when graceful shutdown() combined with awaitTermination(...) have not yielded a success.
What is the subject of the debate is what that exception signifies:

  1. Before that signal, the Scheduler was also forcefully terminated via shutdownNow() and the exception serves for informational purposes. Retrying does not have any effect.

E.g.

// Here the error is not recoverable - shutdownNow() has been called after timeout
return scheduler.disposeGracefully(Duration.ofMillis(100))
    .doOnError(e -> logger.warn("Scheduler {} shutdown forcefully", scheduler.name(), e));
  1. The Mono can be retried and will still attempt to shutdown() and awaitTermination(...) and it's up to the user to follow that with a forceful dispose() if they need to.

E.g.

// Retry 5 times and if still no success, forcefully shutdown the Scheduler
return scheduler.disposeGracefully(Duration.ofMillis(100))
    .retryWhen(Retry.backoff(5, Duration.ofMillis(50)))
    .onErrorResume(e -> Mono.fromRunnable(scheduler::dispose));

Which do you prefer @ZejiaJiang?

@ZejiaJiang
Copy link
Author

Hi @chemicL ,

Thanks for your asking. For these two behaviors, I have a discussion with my colleagues. They prefer the sceond one, because they think it's more flexible. But from my side, I prefer the fisrt one, I think when we provide a gracefulPeriod, it means we have a expectation about how long we can shut down the scheduler. If the schduler doesn't shut down after gracefulPeriod elapse, I don't think retry will success.

Hope this can help you decide.

@cavallium
Copy link

cavallium commented Jul 1, 2022

I'm not sure there is a strong rationale behind it, but the core implementations have historically used shutdownNow. do you have a particular example where that hard shutdown is actually causing an issue, or is it more of a rhetorical question @ZejiaJiang ?

Pretty much the only change we make is to add a disposeGracefully() method to Scheduler interface, with a default implementation of calling dispose() (and as a result a caveat that its "graceful" aspect is not guaranteed, which would be mentioned in the javadoc).

I also need a graceful alternative to .dispose(). I had to make a custom "uninterruptible scheduler" because the shutdownNow() inside .dispose() was killing my Lucene instances.

From the Lucene documentation:

NOTE: Accessing this class either directly or indirectly from a thread while it's interrupted can close the underlying file descriptor immediately if at the same time the thread is blocked on IO. The file descriptor will remain closed and subsequent access to NIOFSDirectory will throw a ClosedChannelException. If your application uses either Thread.interrupt() or Future.cancel(boolean) you should use the legacy RAFDirectory from the Lucene misc module in favor of NIOFSDirectory.

@chemicL
Copy link
Member

chemicL commented Jul 4, 2022

@cavallium thank you for sharing. Do you think the proposed solutions can aid in simplifying your code? If you have any comments regarding the disposeGracefully(Duration) mechanics described in this discussion, feel free to share to include in the design.

@cavallium
Copy link

cavallium commented Jul 4, 2022

@cavallium thank you for sharing. Do you think the proposed solutions can aid in simplifying your code? If you have any comments regarding the disposeGracefully(Duration) mechanics described in this discussion, feel free to share to include in the design.

@ZejiaJiang we are discussing the API that is in draft state in #3089. We are considering two possible behaviours. First the API:

Mono<Void> disposeGracefully(Duration gracefulPeriod);

which can either complete successfully or propagate a timeout exception. The exception is thrown when graceful shutdown() combined with awaitTermination(...) have not yielded a success. What is the subject of the debate is what that exception signifies:

  1. Before that signal, the Scheduler was also forcefully terminated via shutdownNow() and the exception serves for informational purposes. Retrying does not have any effect.

E.g.

// Here the error is not recoverable - shutdownNow() has been called after timeout
return scheduler.disposeGracefully(Duration.ofMillis(100))
    .doOnError(e -> logger.warn("Scheduler {} shutdown forcefully", scheduler.name(), e));
  1. The Mono can be retried and will still attempt to shutdown() and awaitTermination(...) and it's up to the user to follow that with a forceful dispose() if they need to.

E.g.

// Retry 5 times and if still no success, forcefully shutdown the Scheduler
return scheduler.disposeGracefully(Duration.ofMillis(100))
    .retryWhen(Retry.backoff(5, Duration.ofMillis(50)))
    .onErrorResume(e -> Mono.fromRunnable(scheduler::dispose));

Which do you prefer @ZejiaJiang?

In my case, I absolutely need to be sure that no task is interrupted via shutdownNow(), so I prefer the 2nd solution, in which disposeGracefully(Duration) can fail without calling dispose() internally.

If disposeGracefully(Duration) will be able to interrupt some threads with shutdownNow automatically, it would create the same problems that dispose creates for me now.

Currently I use a wrapped bounded-elastic scheduler that creates empty disposables, but it's very hacky, it has a huge overhead (reschedules everything two more times), and every time I need to do this:

Mono.fromCallable(() -> {
  // Interrupting this task kills the file descriptor, making the lucene instance unusable forever.
  lucene.thingThatShouldNotBeInterrupted();
})
// Wrap a scheduler with a wrapper that creates empty disposables
.subscribeOn(new FakeDisposablesScheduler(Schedulers.boundedElastic()))
// Publish the result on a scheduler to re-enable task cancellation
.publishOn(Schedulers.parallel())

@cavallium
Copy link

cavallium commented Jul 4, 2022

What about adding a boolean parameter disposeGracefully(Duration timeout, boolean disposeOnTimeout) that can specify if you want to automatically kill or not the tasks after the timeout? It can simplify a lot the code that a user needs to write in both circumstances.

chemicL added a commit that referenced this issue Aug 16, 2022
`Scheduler`s now can be disposed lazily using a new method,
`disposeGracefully()`. It returns a `Mono<Void>`, which allows awaiting
for the signal that all underlying resources have been properly shut
down. In contrast to the existing `dispose()`, which calls
`shutdownNow()` on underlying `ExecutorService`s, the lazy variant calls
`shutdown()` and sends the termination signal once a monitoring `Thread`
successfully observes a positive result from `awaitTermination()`.

Fixes ##3068.
@chemicL
Copy link
Member

chemicL commented Aug 16, 2022

@ZejiaJiang @cavallium first of all, thank you for the feedback and ideas. After some back-and-forth we landed on a design, that should be the least surprising to users. A disposeGracefully() with no arguments, that allows timeout, retries, and onErrorResume fallback to dispose(). It does not cause a forceful shutdown on its' own, the user is free to combine when necessary. Please let us know if anything needs addressing.

@chemicL chemicL closed this as completed Aug 16, 2022
@ZejiaJiang
Copy link
Author

@ZejiaJiang @cavallium first of all, thank you for the feedback and ideas. After some back-and-forth we landed on a design, that should be the least surprising to users. A disposeGracefully() with no arguments, that allows timeout, retries, and onErrorResume fallback to dispose(). It does not cause a forceful shutdown on its' own, the user is free to combine when necessary. Please let us know if anything needs addressing.

Thank you for your effort! Will try it when the new version released.

@chemicL
Copy link
Member

chemicL commented Aug 16, 2022

Consider experimenting with the snapshot version, we can integrate the feedback in the release.

@chemicL chemicL modified the milestones: 3.4.x Backlog, 3.4.23 Aug 16, 2022
@cavallium
Copy link

@ZejiaJiang @cavallium first of all, thank you for the feedback and ideas. After some back-and-forth we landed on a design, that should be the least surprising to users. A disposeGracefully() with no arguments, that allows timeout, retries, and onErrorResume fallback to dispose(). It does not cause a forceful shutdown on its' own, the user is free to combine when necessary. Please let us know if anything needs addressing.

It seems good!

By the way, if a Flux is cancelled it will still call dispose, is there a way to prevent it?

@chemicL
Copy link
Member

chemicL commented Aug 25, 2022

@cavallium can you be more specific here?:

By the way, if a Flux is cancelled it will still call dispose, is there a way to prevent it?

Do you mean the publishOn and subscribeOn operators? In their case, to my understanding, the dispose() method goes to a particular worker (Scheduler#createWorker()). The behavior of dispose() in case of Scheduler.Worker is not the same as the actual Scheduler#dispose() and it's mostly releasing pending tasks and freeing up resources dedicated to it, e.g. BoundedElasticScheduler.BoundedState and it does not imply shutdown() upon an ExecutorService.

@cavallium
Copy link

@cavallium can you be more specific here?:

By the way, if a Flux is cancelled it will still call dispose, is there a way to prevent it?

Do you mean the publishOn and subscribeOn operators? In their case, to my understanding, the dispose() method goes to a particular worker (Scheduler#createWorker()). The behavior of dispose() in case of Scheduler.Worker is not the same as the actual Scheduler#dispose() and it's mostly releasing pending tasks and freeing up resources dedicated to it, e.g. BoundedElasticScheduler.BoundedState and it does not imply shutdown() upon an ExecutorService.

I've tested it, you are right. It doesn't interrupt anything and doesn't call dispose. thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-decision This needs a decision from the team type/enhancement A general enhancement
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants