Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple subscribeOn calls with a BoundedElasticScheduler can cause deadlock #1992

Closed
gpod opened this issue Dec 12, 2019 · 8 comments · Fixed by #2040
Closed

Multiple subscribeOn calls with a BoundedElasticScheduler can cause deadlock #1992

gpod opened this issue Dec 12, 2019 · 8 comments · Fixed by #2040
Labels
type/enhancement A general enhancement
Milestone

Comments

@gpod
Copy link

gpod commented Dec 12, 2019

The following code, containing two subscribeOn calls never completes:

final Scheduler scheduler = Schedulers.newBoundedElastic(1, Integer.MAX_VALUE, "test");
        
final Mono<Integer> integerMono = Mono.fromSupplier(() -> 1)
                .subscribeOn(scheduler)
                .subscribeOn(scheduler);

        integerMono.block();

The issue is unique to BoundedElasticScheduler. Other bounded schedulers do not show this behaviour. If one subscribeOn is removed, then it does run to completion.

Expected Behavior

As per https://projectreactor.io/docs/core/release/reference/#_the_subscribeon_method,

Only the earliest subscribeOn call in the chain is actually taken into account.

Therefore the code above should be equivalent to having a single subscribeOn.

Actual Behavior

The code never terminates. A stack dump shows it is awaiting a CountDownLatch

"Test worker" #13 prio=5 os_prio=31 cpu=310.15ms elapsed=672.58s tid=0x00007faa55190800 nid=0x5b03 waiting on condition  [0x000070000a3dc000]
   java.lang.Thread.State: WAITING (parking)
        at jdk.internal.misc.Unsafe.park(java.base@11.0.4/Native Method)
        - parking to wait for  <0x00000007ff86c888> (a java.util.concurrent.CountDownLatch$Sync)
        at java.util.concurrent.locks.LockSupport.park(java.base@11.0.4/LockSupport.java:194)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.4/AbstractQueuedSynchronizer.java:885)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.4/AbstractQueuedSynchronizer.java:1039)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.4/AbstractQueuedSynchronizer.java:1345)
        at java.util.concurrent.CountDownLatch.await(java.base@11.0.4/CountDownLatch.java:232)
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:81)
        at reactor.core.publisher.Mono.block(Mono.java:1663)
        at test.SingleThreadedExecutorSubscribeOnTest.getOneFromMono(SingleThreadedExecutorSubscribeOnTest.java:39)
        at test.SingleThreadedExecutorSubscribeOnTest.subscribeOn_twice_for_single_threaded_boundedElastic_should_complete(SingleThreadedExecutorSubscribeOnTest.java:

Steps to Reproduce

The following unit test illustrates two bounded schedulers which do work as expected (fromExecutor, fromSingle), and the one which doesn't (newBoundedElastic)

class SingleThreadedExecutorSubscribeOnTest {
    @Test
    void subscribeOn_twice_for_single_executor_should_complete() {
        getOneFromMono(Schedulers.newSingle("test"));
    }

    @Test
    void subscribeOn_twice_for_single_threaded_boundedElastic_should_complete() {
        // This never completes
        getOneFromMono(Schedulers.newBoundedElastic(1, Integer.MAX_VALUE, "test"));
    }

    @Test
    void subscribeOn_twice_for_single_threaded_executor_should_complete() {
        final Scheduler executorScheduler = Schedulers.fromExecutor(
                new ThreadPoolExecutor(0, 1,
                        60L, TimeUnit.SECONDS,
                        new LinkedBlockingDeque<>()));

        getOneFromMono(executorScheduler);
    }

    private void getOneFromMono(final Scheduler scheduler) {
        final Mono<Integer> integerMono = Mono.fromSupplier(() -> 1)
                .subscribeOn(scheduler)
                .subscribeOn(scheduler);

        integerMono.block();
    }
}

Possible Solution

Not researched in detail, however it appears that having n subscribeOn calls in a chain subscribing with a scheduler with n-1 threadCap triggers the issue.

Your Environment

  • Reactor version(s) used: io.projectreactor:reactor-core:3.3.1.RELEASE

  • JVM version (javar -version):
    openjdk version "11.0.2" 2019-01-15
    OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
    OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

  • OS and version (eg uname -a): MacOS

@simonbasle simonbasle added the status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor label Dec 12, 2019
@simonbasle
Copy link
Member

Each subscribeOn (and by extension most uses of a Scheduler by an operator) requires a Worker from the Scheduler. Your code is thus requiring 2 workers, and you've explicitly configured your boundedElastic scheduler to limit itself to one Worker. This cannot work, architecturally speaking.

Having two separate Flux would work because the first Flux could then run to completion, freeing the Worker for the second one.

Generally speaking, the goal of the boundedElastic Scheduler is to put a ceiling to the number of Thread. But that ceiling can and should be higher than single digits. Maybe we should make that recommendation explicit in the javadoc?
For instance, the default Schedulers.boundedElastic() has a threadCap of 10 * Runtime.getRuntime().availableProcessors().

@gpod
Copy link
Author

gpod commented Dec 12, 2019

Thanks for looking into this, I have a few more questions though:

How does this differ from newSingle which as per the documentation

"hosts a single worker", yet does not suffer from this issue, nor does using a (one thread) bounded java Executor?

Although this may not be the typical indended use case, it is problematic that is behaves differently to other schedulers.

Let me provide a bit more context: this issue surfaced for us as we were passing in a thread count as an argument to a class which in turn creates the bounded scheduler. In order to debug an issue we created a unit test, which restricted the thread count to 1 to simplify analysis. The composition of our code is such that it is possible and expected to have multiple subscribeOn calls in an assembly, due to the recursive nature of a code - a subscriber subscribes to some input, which in turn depends on other inputs which are produced through the same code path.

Even if the threadCap is double digits, a complex enough Flux assembly could result in hitting the issue and if that assembly is determined at runtime could result in very hard-to-diagnose bugs.

@simonbasle simonbasle added for/team-attention This issue needs team attention or action status/need-decision This needs a decision from the team status/need-design This needs more in depth design work labels Dec 13, 2019
@simonbasle
Copy link
Member

With Schedulers.single(), there is a single ExecutorService that backs the whole Scheduler, so all createWorker invocations are backed by that same active executor, giving the impression that they are all active.

I believe parallel() is prone to the same issue if you submit too many blocking tasks to it. The case of blocking tasks is what we try to cover with elastic() and boundedElastic().

The boundedElasticScheduler was introduced and optimize to deal with the issue of too many blocking calls being wrapped in reactive types offset to the elasticScheduler and causing too many threads to spawn. Its goal is to first try to limit the number of threads, then ultimately if tasks continue to pile up, to fail faster (because that is likely a design problem in a reactive application to have too many blocking tasks).

Maybe we need to revisit it to not assume long-lived tasks, or more explicitly detail the risk of deadlock in the javadoc, I'm not sure yet.

@simonbasle simonbasle reopened this Dec 13, 2019
@simonbasle simonbasle removed the status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor label Dec 13, 2019
@vy
Copy link

vy commented Dec 17, 2019

Related(?) with the same issue, my colleague @jvwilge stumbled upon the following one:

public static void main(String[] args) {
    testScheduler("par-1", Schedulers.newParallel("par"));
    testScheduler("nbe-2-1", Schedulers.newBoundedElastic(2, 1, "nbe-2-1"));
    testScheduler("nbe-1-1", Schedulers.newBoundedElastic(1, 1, "nbe-1-1"));
}

private static void testScheduler(String name, Scheduler scheduler) {
    try {
        Flux
                .interval(Duration.ofSeconds(1), scheduler)
                .doOnNext(ignored -> System.out.println(name + " emitted"))
                .publishOn(scheduler)
                .doOnNext(ignored -> System.out.println(name + " published"))
                .blockFirst(Duration.ofSeconds(2));
        System.out.println(name + " completed");
    } catch (Exception error) {
        System.err.println(name + " failed: " + error);
    } finally {
        scheduler.dispose();
        System.out.println(name + " disposed");
    }
}

/*
par-1 emitted
par-1 published
par-1 completed
par-1 disposed
nbe-2-1 emitted
nbe-2-1 published
nbe-2-1 completed
nbe-2-1 disposed
nbe-1-1 failed: java.lang.IllegalStateException: Timeout on blocking read for 2000 MILLISECONDS
nbe-1-1 disposed
*/

@simonbasle
Copy link
Member

Each operator that takes the Scheduler generally will call its createWorker method. The difference being that since this particular Scheduler is expressively aimed at dealing with blocking tasks, it tries to isolate each request for a Worker in its own facade, whereas the parallel scheduler will happily round robin its workers.

The consequence is that submitting a blocking task to a parallel Scheduler can negatively impact other task submitters (eg. another Flux), whereas with the current boundedElastic implementation it only negatively impacts the same submitter.

Downside is that in the case of non-blocking tasks, we don't have re-entrancy: providing a bounded Scheduler with a threadCap of N to N operators in the same chain will deadlock.

One avenue of mitigation would be to impose a minimum to threadCap (available CPU cores?).

vy added a commit to vy/reactor-pubsub that referenced this issue Jan 15, 2020
My personal experience with Reactor BoundedElasticScheduler has been
nothing but suffering[1]. I am not happy with packaging something that
should have been provided by Reactor, but this single file is still way
simpler, maintainable, *and* without any catches compared to the
BoundedElasticScheduler.

[1] reactor/reactor-core#1992
@simonbasle
Copy link
Member

@vy I saw your disappointed rollback to your BoundedScheduledThreadPoolExecutor. I'll try to improve the situation with the BoundedElasticScheduler or see if I can come up with a different implementation... In the meantime, keep in mind that your solution is likely to fall apart with undefined behavior in case the executor is configured with more than one backing thread, as onNext serialization wouldn't be guaranteed anymore...

@vy
Copy link

vy commented Jan 23, 2020

Hey @simonbasle! It is very kind of you to pay such a minute attention to the feature's community implications. I deeply appreciate this, thank you.

As I tried to briefly explain in reactor-pubsub README, after years of experience in RxJava and Reactor to implement reactive solutions, and in particular, of which that (unfortunately) needs to deal with blocking calls (e.g., JDBC), backpressure surfaces as a major issue for systems under load. The available unbounded scheduler solutions are the perfect disguise for this issue while your system is collapsing on production. Just introducing a simple bound on the task queue (as in BoundedScheduledThreadPoolExecutor) exposed us many hidden problems and new ideas on system design too. Now it became a must component of every reactive service we implement.

Once boundedElastic has introduced -- even though I have communicated my objection to elastic schedulers and their public exposure in the documentation many times in the past -- we eagerly started replacing all our custom schedulers with this brand new Reactor tool. And the very first day on production with a slight peak in the traffic... millions of queue messages started piling up, applications became unresponsive, etc. We rolled back, reverted all of our changes. I hope you understand my stance about boundedElastic in this context. We just needed a bounded scheduler, but ended up with a chainsaw in a blood bath.

To conclude the briefing about my frustration with boundedElastic, let me share a simple example: In a particular service, integration tests that work very well with single(), fails with boundedElastic(). This is a no-go for us on its own.

keep in mind that your solution is likely to fall apart with undefined behavior in case the executor is configured with more than one backing thread, as onNext serialization wouldn't be guaranteed anymore

I have guessed that, but could not find any indication about it in the (java)docs. Am I missing something?

simonbasle added a commit that referenced this issue Feb 17, 2020
The general idea is to abandon the facade Worker and instead always
submit tasks to an executor-backed worker. In order of preference, when
an operator requests a Worker:

 - if thread cap not reached, create and pick a new worker
 - else if idle workers, pick an idle worker
 - else pick a busy worker

This implies a behavior under contention that is closer to parallel(),
but with a pool that is expected to be quite larger than the typical
parallel pool.

The drawback is that once we get to pick a busy worker, there's no
telling when its tasks (typically blocking tasks for a
BoundedElasticScheduler) will finish. So even though another executor
might become idle in the meantime, the operator's tasks will be pinned
to the (potentially still busy) executor initially picked.

To try to counter that effect a bit, we use a priority queue for the
busy executors, favoring executors that are tied to less Workers (and
thus less operators). We don't yet go as far as factoring in the task
queue of each executor.

Finally, one noticeable change is that the second int parameter in
the API, maxPendingTask, is now influencing EACH executor's queue
instead of being a shared counter. It should be safe in the sense that
the number set with previous version in mind is bound to be
over-dimensionned for the new version, but it would be recommended for
users to reconsider that number.
@simonbasle simonbasle added type/enhancement A general enhancement and removed for/team-attention This issue needs team attention or action status/need-decision This needs a decision from the team status/need-design This needs more in depth design work labels Feb 17, 2020
@simonbasle simonbasle added this to the 3.3.3.RELEASE milestone Feb 17, 2020
simonbasle added a commit that referenced this issue Feb 18, 2020
The general idea is to abandon the facade Worker and instead always
submit tasks to an executor-backed worker. In order of preference, when
an operator requests a Worker:

 - if thread cap not reached, create and pick a new worker
 - else if idle workers, pick an idle worker
 - else pick a busy worker

This implies a behavior under contention that is closer to parallel(),
but with a pool that is expected to be quite larger than the typical
parallel pool.

The drawback is that once we get to pick a busy worker, there's no
telling when its tasks (typically blocking tasks for a
BoundedElasticScheduler) will finish. So even though another executor
might become idle in the meantime, the operator's tasks will be pinned
to the (potentially still busy) executor initially picked.

To try to counter that effect a bit, we use a priority queue for the
busy executors, favoring executors that are tied to less Workers (and
thus less operators). We don't yet go as far as factoring in the task
queue of each executor.

Finally, one noticeable change is that the second int parameter in
the API, maxPendingTask, is now influencing EACH executor's queue
instead of being a shared counter. It should be safe in the sense that
the number set with previous version in mind is bound to be
over-dimensionned for the new version, but it would be recommended for
users to reconsider that number.

Reviewed-in: #2040
@simonbasle
Copy link
Member

@vy I've modified the implementation of boundedElastic() in 3.3.3, just released today. please try it and tell us if that improved usability for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
3 participants