Navigation Menu

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce CoroutineDispatcher.limitedParallelism as an alternative to standalone executors and slices #2919

Closed
qwwdfsad opened this issue Sep 6, 2021 · 7 comments

Comments

@qwwdfsad
Copy link
Member

qwwdfsad commented Sep 6, 2021

See the original idea behind slices here: #261
Since then, a lot of API was introduced and refined, so the proposed API no longer fits kotlinx.coroutines.

Use-cases

newFixedThreadPoolContext and newSingleThreadContext, as well as regular Java executors, are often used as the only way to control concurrency. Various factors contribute to that -- limited external resources (e.g. pool of database connections), functional restrictions (e.g. CPU consumption or rate-limiting), or need to have an isolated serializable execution flow (an extreme example would be IDEA code-base that contains 62 single-threaded dispatchers).

As a result, an average application creates unnecessary many threads, most of which are idle, consuming the memory, CPU, and device battery.

Design overview

To address such use-cases, we aim to provide API to take a view of the dispatcher. The dispatcher's view does not create any additional threads and does not allocate any resources. Instead, it piggybacks the original dispatcher but limits the parallelism of submitted tasks and coroutines. Views themselves are independent and it is possible to have views that have more declared parallelism in total than the original dispatchers. It saves users from meticulous counting of resources, especially when views are used along with global Dispatchers.Default and Dispatchers.IO.

With that API, it is possible to have fine-grained control over the parallelism and leverage already existing dispatchers and executors, saving resources and the number of threads.

The API changes are minimal, i.e. instead of

private val myDatabaseDispatcher = newFixedThreadPool(10, "DB")  // At most 10 connections to the database
private val myCpuHeavyImageProcessingDispatcher = newFixedThreadPool(2, "Image rescaling") // Prevent more than 200% of CPU consumption on images

it is possible to write this instead:

private val myDatabaseDispatcher = Dispatchers.IO.limitedParallelism(10)  // At most 10 connections to the database
private val myCpuHeavyImageProcessingDispatcher = Dispatchers.Default.limitedParallelism(2) // Prevent more than 200% of CPU consumption on images

API changes

Conceptually, any coroutine dispatcher can limit its own parallelism. If we are allowing the sum of views to be greater than the original dispatcher, nothing prevents users to take a view of a single-threaded dispatcher (e.g. Dispatchers.Main) as long as its contract allows it to short-circuit the implementation to return this.

As a result, a single public signature will be added to CoroutineDispatcher class

public abstract class CoroutineDispatcher {

    public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher
   
    ...the rest of the methods...
}

with the default implementation provided.

Rejected alternative namings

Dispatchers.Default.slice(parallelism = 2)
Dispatchers.Default.view(2)
Dispatchers.Default.limited(parallelism = 2)
Dispatchers.Default.subset(2)
Dispatchers.Default.dedicated(2)

While being short and visually attractive, it's easy to misinterpret their semantics.

  • For slice, subset, and view it can be reasonable to assume that the sum of all the slices/subsets/views cannot be greater than the original
  • For dedicated it can be reasonable to assume that dedicated threads are used for such dispatcher
  • The biggest concern for limited is its name. We cannot enforce the usage of named parameters on the language level, and seeing myDispatcher.limited(2) or myDispatcher.limited(myLimit) in the wild may be quite misleading.
@qwwdfsad qwwdfsad self-assigned this Sep 6, 2021
@qwwdfsad qwwdfsad changed the title Introduce CoroutineDispatcher.limitedParallelism as an alternative to standalone executors and slices Introduce CoroutineDispatcher.limitedParallelism as an alternative to standalone executors and slices Sep 6, 2021
@LouisCAD
Copy link
Contributor

LouisCAD commented Sep 6, 2021

Very nice API, I like the interesting capabilities it can give with such simplicity.

qwwdfsad added a commit that referenced this issue Sep 7, 2021
…ate API

    * Mention CoroutineDispatcher.limitedParallelism as an intended replacement
    * Prepare the API to sharing with K/N new memory model where we _have_ to have this API

Addresses #2919
@qwwdfsad
Copy link
Member Author

The complementary proposal: #2943

@fvasco
Copy link
Contributor

fvasco commented Sep 20, 2021

This proposal is a great improvement of newFixedThreadPoolContext, are you considered to use simply Dispatchers.limitedParallelism(10) or similar?

qwwdfsad added a commit that referenced this issue Sep 23, 2021
…ate API (#2922)

* Mention CoroutineDispatcher.limitedParallelism as an intended replacement
* Prepare the API to sharing with K/N new memory model where we _have_ to have this API

Addresses #2919
yorickhenning pushed a commit to yorickhenning/kotlinx.coroutines that referenced this issue Oct 14, 2021
…ate API (Kotlin#2922)

* Mention CoroutineDispatcher.limitedParallelism as an intended replacement
* Prepare the API to sharing with K/N new memory model where we _have_ to have this API

Addresses Kotlin#2919
yorickhenning pushed a commit to yorickhenning/kotlinx.coroutines that referenced this issue Oct 14, 2021
….IO unbounded for limited parallelism (Kotlin#2918)

* Introduce CoroutineDispatcher.limitedParallelism for granular concurrency control

* Elastic Dispatchers.IO:

    * Extract Ktor-obsolete API to a separate file for backwards compatibility
    * Make Dispatchers.IO being a slice of unlimited blocking scheduler
    * Make Dispatchers.IO.limitParallelism take slices from the same internal scheduler

Fixes Kotlin#2943
Fixes Kotlin#2919
@patkujawa-wf
Copy link

@ultraon
Copy link

ultraon commented Dec 23, 2021

@qwwdfsad Hello, I've found some inconsistency with Dispatchers.Default.limitedParallelism(1) using it instead of newSingleThreadContext().

Example:

suspend fun main() {
    runBlocking {
        println("Start")
        testDispatchers()
        println("finish")
    }
}

suspend fun testDispatchers() = coroutineScope {
    val dispatcher = Dispatchers.Default

    println("Test with limitedParallelism:")
    // This one works in a not-expected way!!!
    runTestWithDispatcher(dispatcher, dispatcher.limitedParallelism(1))

    println("\nTest with newSingleThreadContext:")
    // This one works OK.
    runTestWithDispatcher(dispatcher, newSingleThreadContext("SingleThread"))

    println("\nTest with newSingleThreadExecutor:")
    // This one works OK, but need to shutdown explicitly.
    val singleExecutorDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
    runTestWithDispatcher(dispatcher, singleExecutorDispatcher)
    // We need explicitly shutdown ExecutorService to finish a root coroutine.
    (singleExecutorDispatcher.executor as? ExecutorService)?.shutdown()
}

suspend fun runTestWithDispatcher(dispatcher: CoroutineDispatcher, limitedDispatcher: CoroutineDispatcher) = coroutineScope {
    buildList {
        this += launch(dispatcher) {
            println("1 final job with thread '${Thread.currentThread().name}'")
            Thread.sleep(100)
        }

        this += launch(limitedDispatcher) {
            println("2 final job (limited) with thread '${Thread.currentThread().name}'")
            Thread.sleep(100)
        }
    }.joinAll()

    buildList {
        this += launch(dispatcher) {
            println("3 final job with thread '${Thread.currentThread().name}'")
            Thread.sleep(100)
        }

        this += launch(limitedDispatcher) {
            println("4 final job (limited) with thread '${Thread.currentThread().name}'")
            Thread.sleep(100)
        }
    }.joinAll()
}

Output:

Start
Test with limitedParallelism:
1 final job with thread 'DefaultDispatcher-worker-1'
2 final job (limited) with thread 'DefaultDispatcher-worker-2'   // <------------- See DefaultDispatcher-worker-2
3 final job with thread 'DefaultDispatcher-worker-2'
4 final job (limited) with thread 'DefaultDispatcher-worker-1'  // <------------- But here it is DefaultDispatcher-worker-1'

Test with newSingleThreadContext:
1 final job with thread 'DefaultDispatcher-worker-1'
2 final job (limited) with thread 'SingleThread'
3 final job with thread 'DefaultDispatcher-worker-1'
4 final job (limited) with thread 'SingleThread'

Test with newSingleThreadExecutor:
1 final job with thread 'DefaultDispatcher-worker-1'
2 final job (limited) with thread 'pool-1-thread-1'
3 final job with thread 'DefaultDispatcher-worker-1'
4 final job (limited) with thread 'pool-1-thread-1'
finish

Process finished with exit code 0

This issue doesn't exist if I remove runBlocking {} from suspend fun main() {}.

Is it a bug?
Can we still consider newSingleThreadContext() as delicate API?

@qwwdfsad
Copy link
Member Author

@ultraon this is expected behaviour. limitedParallelism(1) does not guarantee that all coroutines launched in it will get the very same dedicated thread. It only guarantees the parallelism restriction -- at most 1 coroutine can be executed concurrently in this dispatcher

@ultraon
Copy link

ultraon commented Dec 23, 2021

@qwwdfsad It is so interesting.
Thanks for the clarification.

And if I need to run coroutines in a dedicated thread, what is the preferred approach (taking into account that newSingleThreadContext() is delicate API and I need it in KMM)?

pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this issue Sep 14, 2022
…ate API (Kotlin#2922)

* Mention CoroutineDispatcher.limitedParallelism as an intended replacement
* Prepare the API to sharing with K/N new memory model where we _have_ to have this API

Addresses Kotlin#2919
pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this issue Sep 14, 2022
….IO unbounded for limited parallelism (Kotlin#2918)

* Introduce CoroutineDispatcher.limitedParallelism for granular concurrency control

* Elastic Dispatchers.IO:

    * Extract Ktor-obsolete API to a separate file for backwards compatibility
    * Make Dispatchers.IO being a slice of unlimited blocking scheduler
    * Make Dispatchers.IO.limitParallelism take slices from the same internal scheduler

Fixes Kotlin#2943
Fixes Kotlin#2919
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants