Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support custom notifier for suspend action #3010

Closed
mogud opened this issue Nov 5, 2021 · 7 comments
Closed

Support custom notifier for suspend action #3010

mogud opened this issue Nov 5, 2021 · 7 comments

Comments

@mogud
Copy link

mogud commented Nov 5, 2021

In kotlin coroutines, we have resumeWith to intercept resume action, but have no chance to prepare current context while in suspending. This limited the coroutines' power.
Use case:

abstract class Service : CoroutineScope {
    companion object {
        val DefaultCoroutineDispatcher = Executors.newFixedThreadPool(16).asCoroutineDispatcher()
    }

    private val executionLock = Mutex()

    override val coroutineContext: CoroutineContext = DefaultCoroutineDispatcher

    fun addTask(block: suspend () -> Unit) {
        launch {
            executionLock.withLock {
                block()
            }
        }
    }

    suspend fun myDelay(timeMillis: Long) {
        executionLock.unlock()
        delay(timeMillis)
        executionLock.lock()
    }
}

class MyService : Service() {
    init {
        addTask { // task 1
            while (true) {
                delay(1)
                // do something
            }
        }
        addTask { // task 2
            // do something else
        }
    }
}

Here i have a multithread dispatcher, and i can run some tasks by adding them to a MyService instance which can only be run in one thread at the same time because of the executionLock. As above codes, MyService instance's tasks can run in any threads when a suspended coroutine resumed. It's all good. But the task 1 blocks all other tasks all the time, not only in 1ms. Because all tasks run within executionLock, other tasks can not get the lock because task 1 never finished. One of the workarounds is to implement myDelay which wraps delay with manual unlock and lock. But all suspend functions called in tasks must do this, too. So i think this kind of prepare/clean behavior may be needed by other cases.

One general-purpose solution for above problem:

public interface SuspendNotifier : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<SuspendNotifier>

    public fun <T> notify(continuation: Continuation<T>)
}

// only idea
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
    return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->

        c.context[SuspendNotifier]?.notify(c) // !

        block(c)
    }
}

Now we can intercept continuation's resumeWith with executionLock.lock() and add a custom SuspendNotifier which call executionLock.unlock() in notify funtion to MyService's coroutineContext. And all other suspend funtions remain the same without manual wrappers.

@dkhalanskyjb
Copy link
Collaborator

Hi! Sorry, I don't understand this use case at all, could you clarify it? What is it that you are trying to accomplish in the end? If you want the tasks to run in parallel, then simply remove the lock—what is it guarding against? If you don't want the tasks to run in parallel, then use a single-threaded dispatcher—why do you need the extra threads?

@mogud
Copy link
Author

mogud commented Nov 8, 2021

In my situation, there're thousands of services run in a single vm with complex states. And there's no easy way to determine which one may be the bottleneck in cpu use. So i have to transfer services' execution to each thread dynamically. But it's hard for logic programmers to deal with multi threads in a stateful circumstance. As a result, i lock the services to make them can only run in one thread in each execution time. Then i add RPC support to services, so that it looks like an actor model with no locks.

@dkhalanskyjb
Copy link
Collaborator

Do I understand correctly that you want the following?

  • Different services can be run in parallel;
  • Different tasks for each service can only run sequentially;
  • The total number of threads in use is bounded.

@mogud
Copy link
Author

mogud commented Nov 8, 2021

Exactly. Sorry for my English expression.
One last point, service can not bind to one thread like netty does.

@dkhalanskyjb
Copy link
Collaborator

Ok. Then, I think, the approach you're proposing will not work. The reason is, there's no way to distinguish tasks from different services: let's say some task has finished its delay—how would it know if it should re-acquire the lock? The only reliable solution is to always re-acquire the lock, but then only one thread will ever be used. In this case, just use a single-threaded dispatcher for exactly the same behavior.

What you probably want to do is to have multiple dispatchers, each of which is responsible for just a single service. Each dispatcher would then ensure that the tasks for each service run sequentially.

The only question then is how to limit the total number of threads. We do plan to solve this in the following release with #2919.

val sharedThreadPool = Executors.newFixedThreadPool(16).asCoroutineDispatcher()
val service1dispatcher = sharedThreadPool.limitedParallelism(1)
val service2dispatcher = sharedThreadPool.limitedParallelism(1)
launch(service1dispatcher) {
  // child coroutines here will not run in parallel
}
launch(service2dispatcher) {
  // child coroutines here will not run in parallel
}

Does this help?

@mogud
Copy link
Author

mogud commented Nov 8, 2021

So service1dispatcher can dispatch runnable tasks across each threads in the thread pool with specified context? If so, that's all what i need. Thanks very much.

@dkhalanskyjb
Copy link
Collaborator

Yes, it can use any thread in the pool, but tasks on the same dispatcher will never run in parallel. I'll close the issue then; feel free to reopen it if you have further questions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants