Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coroutines launch on default dispatcher when they should not #2003

Closed
maxpert opened this issue May 8, 2020 · 19 comments
Closed

Coroutines launch on default dispatcher when they should not #2003

maxpert opened this issue May 8, 2020 · 19 comments

Comments

@maxpert
Copy link

maxpert commented May 8, 2020

In one of our production services we had a fixed thread-pool for handling blocking DB calls, and we are using withContext and asCoroutineDispatcher to launch such calls in context of DB threads. These calls have to stick to these thread-pool we have created otherwise they fail:

          val executor = ThreadPoolExecutor(
                affinity,
                MAX_IO_THREADS,
                DEFAULT_TTL_SECONDS_THREAD,
                TimeUnit.SECONDS,
                SynchronousQueue()
            ) { runnable ->
                Thread(runnable).also {
                    it.name = "io-pool-${it.id}"
                    it.isDaemon = true
                    it.priority = Thread.NORM_PRIORITY
                }
            }
          
          val dispatcher = executor.asCoroutineDispatcher()

Notice SynchronousQueue to block spinning off any further threads/calls in when pool is full. Turns out looking at code of ExecutorCoroutineDispatcherBase

override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
executor.execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
DefaultExecutor.enqueue(block)
}
}
turns out we launch/enqueue or coroutine on DefaultExecutor. This has multiple problems:

  • It breaks the foundational assumption of respecting dispatcher that was supplied, worst part of this is that it's undocumented caveat.
  • It can result in a flurry of undesired threads; ultimately leading to OOM with it's existing behavior. Having an unbound queue or unbound set of threads is exactly what should be avoided at all costs.

Here is a simple example to reproduce this issue locally:

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.util.concurrent.Executors
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

val executor = ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, SynchronousQueue()) { runnable ->
    Thread(runnable).also {
        it.name = "io-pool-${it.id}"
        it.isDaemon = true
        it.priority = Thread.NORM_PRIORITY
    }
}

fun main() {
    runBlocking(executor.asCoroutineDispatcher()) {
        for (i in 1..5) {
            launch {
                println(Thread.currentThread().name)
                Thread.sleep(1000)
            }
        }
    }
}

I believe the subsequent co-routines should be rejected; launching by default on DefaultExecutor is an undesired behavior and breaks fundamental assumption.

@radityagumay
Copy link

radityagumay commented May 11, 2020

Hi @maxpert

I believe the subsequent co-routines should be rejected; launching by default on DefaultExecutor is an undesired behavior and breaks fundamental assumption.

IMO, the subsequent process should be waiting until there is a thread that can pick up the process.

Take a look my plain thread example here.

launch such calls in context of DB threads. These calls have to stick to these thread-pool we have created otherwise they fail:

is your DB transactions are thread confined?.

what I mean by that is when a query is executed within an ongoing transaction on the current thread, then it is considered part of that transaction and can proceed. But, on the other hand, If the query was instead executed on a different thread, then it is not considered part of that transaction and will block until the transaction on the other thread ends.

given your example above. a quick resolution would increase the number of thread-pool. to something like.

import kotlinx.*
import java.util.*

val executor = ThreadPoolExecutor(
    10, 10, 60L, TimeUnit.SECONDS, SynchronousQueue<Runnable>()
) { runnable ->
    Thread(runnable).also {
        it.name = "io-pool-${it.id}"
        it.isDaemon = true
        it.priority = Thread.NORM_PRIORITY
    }
}

fun main() {
    runBlocking(executor.asCoroutineDispatcher()) {
        for (i in 1..5) {
            launch {
                println(Thread.currentThread().name)
                delay(1000)
            }
        }
    }
}

not sure for applying the same with Mutex since it is non-reentrant

@fvasco
Copy link
Contributor

fvasco commented May 11, 2020

Hi @maxpert,
regarding this issue, I think there is a documentation problem.
The CoroutineDispatcher.dispatch function must invoke the given Runnable at some time in the future.
So that Executor must not block the dispatchs thread and must not reject the task.
I suppose that your executor is a good choice as CoroutineDispatcher.

It breaks the foundational assumption of respecting dispatcher that was supplied, the worst part of this is that it's undocumented caveat.
Good point, can you explain how to execute the block if the provided Executor rejects tasks?

It can result in a flurry of undesired threads; ultimately leading to OOM with it's existing behavior.
Your reproducer does not expose this problem, is this statement related to this issue?

Having an unbound queue or unbound set of threads is exactly what should be avoided at all costs.
I don't think that drop random tasks are the way to go.

I replied to your concerns, but the answer isn't there.

Does fun ExecutorService.asCoroutineDispatcher(rejectTaskHandler:(Runnable)->Unit): ExecutorCoroutineDispatcher fix this issue? How?

These calls have to stick to these thread-pool we have created otherwise they fail

What is your use case? What behavior do you need?

@maxpert
Copy link
Author

maxpert commented May 11, 2020

Hi @fvasco ,

What is your use case?

So I've slept over this problem. Here are couple of observations that I have, and ideas that I am working with since for us it's a production issue right now. In traditional Java world the way you build back-pressure and don't run out of memory is using the rejection techniques.
Looking at the problem I am trying to address; I am creating a fixed thread-pool to keep blocking DB calls on how I can control the explosion of DB threads if the underlying DB slows down or chokes up, and the amount of incoming requests are high. Now I transactions can be bound to threads etc so that traditional DB stuff works as expected. Whenever the called invokes DB calls it invokes the DB calls with DB context. I can't let me thread queue go to infinite or worst let co-routines spin-up threads to compensate. I've figure out a work around for now, by throwing a custom exception from RejectedExecutionHandler, but this is definitely something that people using co-routines in production will eventually get bitten by.

What behavior do you need?
+
Does fun ExecutorService.asCoroutineDispatcher(rejectTaskHandler:(Runnable)->Unit): ExecutorCoroutineDispatcher fix this issue? How?

If you look the problem statement we trying to build back-pressure/rejection mechanism. This is typical for any production service/app. I guess the behavior is either to have some constructs in co-routines itself that allows maximum parallel calls that can be active at a time on context. I was thrown a scenario by Roman Elizarov:

launch {
    val file = openFile()
    try { .... delay(1000L) ... } 
    finally { file.close() }
}

Here just rejecting might end-up in situation where we have no finally being invoked. The Dispatcher should have capability to understand if the co-routine being launched is new co-routine or is it resuming. In case of new co-routines we can define a different behavior (wait for availability unless a thread is available to execute); while for new ones we can define different behavior (may be use rejectTaskHandler you proposed).

But it's pretty obvious that solution we are looking for is gonna restrict the maximum parallel co-routines in a context. I am already reading code to see if I can make a Dispatcher that can do so.

@fvasco
Copy link
Contributor

fvasco commented May 11, 2020

TL;DR Use a Semaphore

Your considerations are applicable in a multi-threading programming, but they can lead to wrong consideration.

Please consider this example:

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    runBlocking {
        repeat(1_000) {
            launch {
                println("Hello")
                delay(1_000)
                println("world")
            }
        }
    }
}

This code executes a thousand of coroutine in a single thread, so it is not possible to use a thread to limit the parallelism.
Moreover, to drop some random tasks does not help to reduce concurrency: who want to print "Hello" without "world"?

So you have to serialize the coroutine execution, using a Mutex or a Semaphore.

This example requires a lot more time and it uses a single thread.

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit

val semaphore = Semaphore(10)

fun main() {
    runBlocking {
        repeat(1_000) {
            launch {
                semaphore.withPermit {
                    println("Hello")
                    delay(1_000)
                    println("world")
                }
            }
        }
    }
}

For your use case, the choosen executor should have at least one thread for a Semaphore's permit, for this example you can consider Executors.newFixedThreadPool(10).

@maxpert
Copy link
Author

maxpert commented May 12, 2020

What you are implicitly implying is developer ensuring that launch/async etc. are not invoked by developer at all. Imagine the same being done by ExecutorService where Java folks would have denied giving you a mechanism to know that thread-pool is full, it would have been a disaster. At bare minimum can we have something like MaxParallelContext the exact semaphore solution?

IDK but IMHO this can be potentially fatal trip-wire!

elizarov added a commit that referenced this issue May 12, 2020
When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects submitted task, it means that it had reached its capacity and so the executing current Job should be cancelled to terminate it as soon as possible. This way RejectedExecutionException works as a rate-limiter just like it serves this purpose in executor-based Java code.

Fixes #2003
@elizarov
Copy link
Contributor

See a proposed solution in #2012. No new APIs. Just cancel the Job on rejection. I don't think it is worth over-engineering a solution and providing any further customization capabilities.

@fvasco
Copy link
Contributor

fvasco commented May 12, 2020

Hi @elizarov,
I don't think that #2012 really fixes this issue for two reasons:

  1. Job isn't required in a CoroutineContext
  2. Cancellation should not be relied to DefaultExecutor (the current issue)

@maxpert
Copy link
Author

maxpert commented May 12, 2020

I don't think it is worth over-engineering a solution and providing any further customization capabilities.

Agreed, same here wanted simple solution too. I think it gets the job done really well 👍. Right now even I am doing something similar by throwing custom exception.

elizarov added a commit that referenced this issue May 12, 2020
When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects submitted task, it means that it had reached its capacity and so the executing current Job should be cancelled to terminate it as soon as possible. This way RejectedExecutionException works as a rate-limiter just like it serves this purpose in executor-based Java code.

Fixes #2003
@elizarov
Copy link
Contributor

elizarov commented May 12, 2020

@fvasco The proposed solution is definitely not a generic one, but I belive it does solve this particular issue.

  1. Job isn't required in a CoroutineContext

Indeed, but you cannot really launch a coorutine with kotlinx.coroutines without a Job nor you can move it to your own dispatcher that might throw RejectedExecutionException. Even withContext(...) { ... } creates a Job.

  1. Cancellation should not be relied to DefaultExecutor (the current issue)

We don't have much choice when the task is rejected by the executor. We cannot abandon a coroutine and not execute it, because it may leak resources, so in all such cases we use DefaultExecutor as a "last resort" measure.

Note, that RejectedExecutionException also happens when the executor is being shutdown. Previously, it caused all the remaining coroutines on this executor to move to DefaultExecutor as it was generally good for this purpose since by that time all the application jobs are usually cancelled by the application itself anyway. This issue highlights a new case in which RejectedExecutionException is thrown. Here simply moving them DefaultExecutor leads to an unexpected effect, since the application is still active. Canceling the affected Job should lead it the same behavior as you'd expect when you use a limited-size queue in your Java code (you stop doing the work on rejection, you cancel the work).

@fvasco
Copy link
Contributor

fvasco commented May 12, 2020

Hi, @elizarov,
I just note that #2012 can shift some works on the default dispatcher, this behaviour can become unexpected, so it should be documented, at least.

If I am not wrong, please consider this example:

launch(myDispatcher) {
    val fos = FileOutputStream("bigdata")
    try {
        // myExecutor.close() -> Job cancelled
        dataFlow.collect { data ->
            fos.write(data) // myDispatcher
        }
    } finally {
        fos.close() // Default dispatcher
    }
}

The DefaultExecutor is the default choice, but Dispatchers.IO is a safe choice for @maxpert's use case.

@elizarov
Copy link
Contributor

@fvasco In fact, #2012 is a fix that makes sure that less work is shitted to the default executor that it is now. Now, if RejectedExecution happens the continuation is dispatched to a fall-back default executor and stays there. With the fix, the Job also becomes cancelled, which means less work a for the default executor.

We could also start using some other fall-back dispatcher (like Dispatchers.IO). The problem is that it is a bigger change, since we'll have to figure out where to send continuation if Dispatchers.IO itself is being shutdown (happens a lot in our tests). Maybe a staged policy would work, though: first try Dispatchers.IO and, if it rejects, send to the default executor. I'll check it out.

P.S I've also updated the docs.

@fvasco
Copy link
Contributor

fvasco commented May 13, 2020

Hi @elizarov,
have you evaluated to provide a fallback in case of a rejected task?

fun Executor.asCoroutineDispatcher(rejectedTaskDispatcher: CoroutineDispatcher = Dispatchers.Default)

In that Dispatcher the cancelled jobs should run.

This should not require a test rework and allow a user to customize the CoroutineDispatcher's behaviour, it is hard to predict what is the purpose of the Executor.
This should fix this issue, what do you think, @maxpert?

@elizarov
Copy link
Contributor

@fvasco This kind of customization is hard to pull off. It expands API surface for this very rare use-case and makes it hard to freely convert dispatchers and executors because this conversion will have to preserve rejectedTaskDispatcher configuration. Tricky and requires a bunch of additional tests.

I've changed default fallback to Dispatchers.IO in #2012 and updated the docs. If anyone really needs further customization in the future the right way to it is via additional context element (this way, it will get preserved properly).

@fvasco
Copy link
Contributor

fvasco commented May 14, 2020

Hi @elizarov,
I added a couple of trivial comment to your PR.

My main concern is that #2012 is not useful for the @maxpert's use case: it will be possible to create unlimited active connection and some of these will be cancelled randomly, except under a specific condition; so I proposed to improve a little the documentation.

To limit the connection count using a thread pool, the connection handling have to be fully synchronous, but synchronous programming does not require a CoroutineDispatcher.

@elizarov
Copy link
Contributor

elizarov commented May 14, 2020

Actually, cancelling random connections might good as a load-limiting mechanism. I, for one, would not recommend doing it (or at least no rely on it as a sole mechanism to limit the number of connections), but it might be a helpful strategy. For example, routers used to use (and maybe are still using) RAD-derived algorithms where they drop random packets when their queues grow, but before their queues overflow, which helps to throttle down some connection going through them and to avoid total exhaustion of buffer space (which would have forced them to drop all packets).

@fvasco
Copy link
Contributor

fvasco commented May 14, 2020

Hi @elizarov,
you should consider that DB connections are involved in this use case, cancelling these require rollbacks.

I don't want debate on this, but a router handle IP packets, not TCP connections: it drops IP packets, and these are rather easy to recover, hopefully on another route.
In analogy with this use case, the router should drop the TCP connection (RST packet), this looks more expensive to me.

However I agree with the proposed changes.

@maxpert
Copy link
Author

maxpert commented May 25, 2020

Will there be an patch version update to co-routines that fixes this issue? Any proposed back-ports. I tried throwing custom exception from rejection policy in my ThreadPool, and at random I ended up in a state where dispatcher will always throw the exception even if threads are available. Is that expected behavior? Here is what ThreadPool looked like:

            ThreadPoolExecutor(
                16,
                32
                60,
                TimeUnit.SECONDS,
                SynchronousQueue(),
                ThreadFactory { runnable ->
                    Thread(runnable).also {
                        it.name = "io-pool-${it.id}"
                        it.isDaemon = true
                        it.priority = Thread.NORM_PRIORITY
                    }
                },
                RejectedExecutionHandler { _, _ ->
                    throw OverloadException("Too many IO requests")
                }
            )

@elizarov
Copy link
Contributor

elizarov commented May 25, 2020

Will there be an patch version update to co-routines that fixes this issue? Any proposed back-ports. I tried throwing custom exception from rejection policy in my ThreadPool, and at random I ended up in a state where dispatcher will always throw the exception even if threads are available. Is that expected behavior?

@maxpert We don't support custom exceptions throws from Executor.execute, since they violate the contract of execute method. It should be considered to be a fatal error by coroutine machinery and will cause all sorts of problems (coroutines hang, resources leak, etc)

elizarov added a commit that referenced this issue May 25, 2020
When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects submitted task, it means that it had reached its capacity and so the executing current Job should be cancelled to terminate it as soon as possible. This way RejectedExecutionException works as a rate-limiter just like it serves this purpose in executor-based Java code.

Fixes #2003
@elizarov
Copy link
Contributor

@maxpert The proper fix will appear in the next major release, since it affect some of the internal APIs. Meanwhile, you can build your own patched version from executor-reject branch if you badly need it.

elizarov added a commit that referenced this issue Sep 14, 2020
When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects submitted task, it means that it had reached its capacity and so the executing current Job should be cancelled to terminate it as soon as possible. This way RejectedExecutionException works as a rate-limiter just like it serves this purpose in executor-based Java code.

Fixes #2003
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects the submitted task, it means that it had reached its capacity and so the executing current Job should be canceled to terminate it as soon as possible. This way RejectedExecutionException works as a rate-limiter just like it serves this purpose in executor-based Java code.

Fixes Kotlin#2003
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects the submitted task, it means that it had reached its capacity and so the executing current Job should be canceled to terminate it as soon as possible. This way RejectedExecutionException works as a rate-limiter just like it serves this purpose in executor-based Java code.

Fixes Kotlin#2003
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants