Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make runBlocking release worker permits before park and reacquire after unpark to avoid starvation #4084

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from

Conversation

vsalavatov
Copy link

This is a draft of a solution proposal for #3983. I believe that at this point the code may not be ready for merging, but it's ready to be a subject of discussion.

Solution idea

The problem of dispatcher starvation arises when runBlocking gets called on a dispatcher thread, i.e., somewhere inside a coroutine stack, but in a blocking execution context. Let's imagine that park() in runBlocking is wrapped in a withContext(UnlimitedIODispatcher) { park() } kind of thing. This would automatically resolve the problem with starvation because parking and unparking would happen on a different, unlimited dispatcher. Hence the original dispatcher would've carried on with other tasks until runBlocking wakes up and redispatches itself back.

The only problem we have is that withContext is suspendable. Let's build a variant of this method for a blocking execution context. Namely, fun withUnlimitedIOScheduler(blocking: () -> Unit). It should emulate a dispatch to UnlimitedIOScheduler and after the blocking part completes, it should return the execution context back to the state it was called in.

To perform such a dispatch, it's enough to release a CPU permit, if the current worker has claimed one, and release a permit of a LimitedDispatcher, if current task is being executed inside one. After the blocking part completes, we must reacquire all the permits we gave up. This is the trickiest part. Let's focus on reacquiring CPU permit first.

One of the first ideas one may come up with is to say "let's just forcefully acquire a CPU permit even if there are none available at the moment, perhaps a little bit later some other worker will release its permit and balances the whole thing". But in this case we may actually end up with more than CORE_POOL_SIZE CPU workers for indefinite amount of time, thus we'll experience a performance impact.

The proposed solution here is to do the following. First, let's try to grab a permit, perhaps there is one available. If there are no permits available, then there are already CORE_POOL_SIZE CPU workers that are running and doing some work. Let's not interfere with them and instead dispatch a special task into the CPU task queue that would take away the CPU permit from the worker that would run it. While that task is in the queue we may park again. Once the task is grabbed and processed by a CPU worker, it gives up its permit and notifies the blocked thread. The blocked thread wakes up, acquires the permit and carries on.

Same protocol applies to LimitedDispatcher. But in order to run tasks, LimitedDispatcher dispatches its Worker to the underlying dispatcher and it should somehow be notified when blocking dispatch happens. I created a special BlockingDispatchAware interface for that and withUnlimitedIOScheduler also checks if the current scheduler worker's task implements it to perform a notification. To obtain the current task I added currentTask field into the CoroutineScheduler.Worker. I didn't come up with a better approach and this one is causing some problems, see the Tests section.

Tests

  • All tests pass except for flaky fails of kotlinx.coroutines.ReusableCancellableContinuationLeakStressTest#testReusableContinuationLeak. Not quite sure what to do with it at the moment.
DefaultDispatcher-worker-2 @coroutine#84

Unexpected number objects. Expected 0, found 1
	|StandaloneCoroutine::context|CombinedContext::element|DefaultScheduler::coroutineScheduler|CoroutineScheduler::workers|ResizableAtomicArray::array[2]|Worker::currentTask|CancellableContinuationImpl::_state$volatile
  • After I wrote kotlinx.coroutines.scheduling.RunBlockingCoroutineSchedulerLivenessStressTest, I debugged several scheduler stall scenarios:
    • One of them, IIRC, was that some tasks were residing in the local queue after the permit release, and the next CPU worker didn't look at them. I fixed it by giveAwayLocalTasks method, not sure if it's a correct way to fix it. I suppose giveAwayLocalTasks is necessary in the releaseFun part, but may not be necessary in the withUnlimitedIOScheduler part. I added both for safety.
    • The other one was due to a race condition in the scheduler itself, see the corresponding commit.

Performance considerations of withUnlimitedIOScheduler

If runBlocking never tries to park on scheduler threads, then the only performance impact the code gets (apart from the increase in bytecode size) is currentTask field setting/resetting on each task in CoroutineScheduler.Worker (which is thread local).

Otherwise, there is always a giveAwayLocalTasks() which is probably bad, because of unpredictable latencies. Then there is a permit release, which is fast. Permit reacquiring has a chance to be resolved via the fast path, otherwise we pay mostly for an additional park/unpark in PermitTransfer.

Subtle points and possible problems

  • There is an additional park/unpark to acquire a permit in the worst case. In case of a LimitedDispatcher on top of a Default Dispatcher this may be even two additional park/unpark. I think at this point it's better to leave it as a field for further improvement :)
  • I didn't pay enough attention to scheduler shutdown scenario, probably some related logic is missing.

…h a CPU permit

And reacquire CPU permit after runBlocking finishes. This should resolve Dispatchers.Default starvation in cases where runBlocking is used to run suspend functions from non-suspend execution context.

Kotlin#3983 / IJPL-721
PermitTransfer is extracted to be used both in CoroutineScheduler and in LimitedDispatcher.
BlockingDispatchAware interface is introduced for LimitedDispatcher.Worker to be accounted by CoroutineScheduler.

Kotlin#3983 / IJPL-721
…ite because preceding unpark() guarantees it to be non-blocking
…, otherwise they may be missed by other workers
…inner tasks

also fix a probable worker deallocation race issue
also fix wrong package names
@qwwdfsad qwwdfsad self-assigned this Apr 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants