Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use coroutines #277

Merged
merged 7 commits into from
Mar 7, 2023
Merged

Conversation

JcMinarro
Copy link
Collaborator

@JcMinarro JcMinarro commented Feb 28, 2023

Convert Handler.handleUpdate() method into a suspend method. It allows calling coroutines method from dispatcher block code.
The ExecutorLooper use coroutines as well instead of a ThreadPoolExecutor that only runs a new thread.
Fix #251

@JcMinarro
Copy link
Collaborator Author

@seik @vjgarciag96 are you keeping maintaining this repo?
Could I have a review of this PR?

@vjgarciag96
Copy link
Member

vjgarciag96 commented Mar 4, 2023

@seik @vjgarciag96 are you keeping maintaining this repo?
Could I have a review of this PR?

@JcMinarro It's still maintained, but as you can see it has not been very active lately. I'll try and have a look at the code through the weekend. Thank you so much for the contribution, excited to finally see coroutines support in the library!

@vjgarciag96 vjgarciag96 self-requested a review March 4, 2023 11:49
@vjgarciag96 vjgarciag96 self-assigned this Mar 4, 2023
@JcMinarro
Copy link
Collaborator Author

Cool!!!
Thank you for the project, it is handy.
If you feel something needs to be changed, let me know 🙃

Copy link
Member

@vjgarciag96 vjgarciag96 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking great! Thank you so much for the work. I added a few small comments, but nothing major. I think this version is very close to be ready to merge and ship 🚀

) {

internal lateinit var bot: Bot

private val commandHandlers = linkedSetOf<Handler>()
private val errorHandlers = arrayListOf<ErrorHandler>()

@Volatile private var stopped = false
private val scope: CoroutineScope = CoroutineScope(ioDispatcher)
private var job: Job? = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this one may need to be volatile to guarantee memory consistency. There's nothing preventing startCheckingUpdates and stopCheckingUpdates from being called from different threads, and both are accessing job.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me update it


internal fun startCheckingUpdates() {
stopped = false
updatesExecutor.execute { checkQueueUpdates() }
job = scope.launch { checkQueueUpdates() }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should protect here against multiple calls to startCheckingUpdates (without a call to stopCheckingUpdates) to avoid clients from using the library APIs incorrectly without noticing. But we weren't protecting against that with the previous implementation anyways, so that's out of scope for this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the case of running twice the startCheckingUpdates wasn't handled previously, but now with the current implementation should be as easy as canceling the previous job if it isn't null.
Let me update it

private val logLevel: LogLevel,
ioDispatcher: CoroutineDispatcher = Dispatchers.IO,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should be using Dispatchers.IO here. The dispatcher here is used to launch a single coroutine that will loop until stopped, checking for updates on a queue, and running functions provided by consumers of the library for each of those (in those functions consumers can decide whether they need to run in a different dispatcher or not). But we don't know whether those functions will be blocking IO operations so it doesn't feel right to use this Dispatcher here. The main downside I could see of this is reducing the potential for parallelism of Dispatchers.IO, which is global to the app, because we have one of its threads always busy. In practice, I'm not sure whether this will actually make a difference but it may for some use cases? Any thoughts on this? One alternative could be to just have a dispatcher backed by a singleThreadExecutor that we create only for this.

A bit unrelated. This also makes me wonder about next steps for better coroutine support, and feels like replacing BlockingQueue for a Channel could be a nice impovement, so we suspend while waiting for new updates rather than blocking. Also out of scope of this PR, but just bringing it up to get your thoughts about it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dispatcher is a bit different than Thread, A Dispatcher is more like a ThreadPool where coroutines are executed (Except the Dispatcher.Main on Android implementation that use the UI Thread), so it shouldnt be blocking IO Operations on the rest of the app. Even the coroutine we launch on this Dispatcher will be running indefinitely until the loop is stopped, on every iteration of our infinite loop we are "suspending" our logic and delegating to the dispatcher we can continue (our job has not been canceled) or if there are others coroutines waiting to be executed and need to be run before us. We achieve it by calling yield() method.

Apart from that, I had a doubt about using Dispatchers.Default or Dispatchers.IO. Reading the docs First one is used to run "hard jobs" that consume machine recurses and the second one to operations that will be interacting with IO Resources. In our case, I think the IO definition is closer to our implementation (We run IO calls on every iteration). Maybe, something we could do is run updates on Dispatchers.Default but switch to Dispatchers.IO when API calls are executed. For that would be better to improve the "Retrofit integration" to use coroutines internally, but I think it can be achieved in a separate PR if we decide it is needed.

Lastly, regarding the BlockingQueue, of course, we could use Channel instead, I didn't think about that, but it is a really good point

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to do separate PR using Channel instead of BlockingQueue, because If I change it here, lot of code will need to be updated (Tests are using fake blockingQueue that will need to be updated as well..)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, right. I see the point of this code being "cooperative" by doing yield every time an item is received from the queue, so it's not going to be taking a thread forever. For now, it will only be blocking in queue.take but that will become suspending when we migrate to use Channel so I think this class is doing good.

But I don't think we run any IO operation in Dispatcher (we run them on Updater) so that's why I was thinking that maybe it would be nicer to use our own dispatcher backed by a single thread executor so that we don't take any of the resources from Dispatchers.IO here. There's this nice extension function to make our work easy https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/as-coroutine-dispatcher.html.

I agree it could make sense to use Dispatchers.IO on the Updater class, which is where we actually run the getUpdates API operation.

Re. Retrofit it might be nice to use the suspend version of the functions but as far as I understand OkHttp doesn't support non-blocking IO operations yet (square/okio#531). One alternative would be to use Ktor instead https://github.com/ktorio/ktor, but that would be a big task, so I guess we can do with executing blocking IO operations on Dispatchers.IO for now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see. Let me create our own Dispatcher and inject it, instead of injecting the Dispatcher.IO

Re Network Layer, I think it is out of the scope of this PR, it can be covered later

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, completely agree.

internal class ExecutorLooper(
private val loopExecutor: Executor,
) : Looper {
internal class ExecutorLooper(ioDispatcher: CoroutineDispatcher) : Looper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not important. The name of this class and the comment seems a bit outdated now that we are using a CoroutineDispatcher rather than an Executor.

Comment on lines 22 to 23
private val scope: CoroutineScope = CoroutineScope(ioDispatcher)
private var job: Job? = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comments to the ones in the Dispatcher class:

  • We may want to make job volatile for memory consistency because we can't guarantee loop and quit are going to be called from the same thead, and both are accessing job.
  • I also wonder about the Dispatcher we should be using here. In this case, this is actually used to run the API operations so it might make sense for it to be Dispatchers.IO.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking and, as far as we cancel the job just before a new job is started, we shouldn't have memory problems. A job can be called to cancel() multiple times.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The memory consistency point was about memory visibility, sorry I may have made it confusing.

As we don't know or control what thread a consumer is calling loop and quit from, we could have thread A calling loop and thread B calling quit. loop writes job and quit reads it, and without additional synchronization, it's not guaranteed that thread B will be able to see the value that thread A wrote to job. That would be solved by making job volatile. Does this make sense?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, make sense. Let me update it.

repeat(loopIterations) {
loopBody()
override fun loop(loopBody: suspend () -> Unit) {
scope.launch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why we need to launch a coroutine here for some of the tests or could we keep this being executed in the caller context?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't because loopBody is a suspend function, but loop isn't, and suspend function can only be executed inside of a coroutine or from another suspend function.
For now, as far as it wasn't needed, I wanted to keep the Looper method signature but change its internal implementation. If it is needed, we can change it later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah true, that makes sense. Thanks for the explanation.

@Test
fun `loops until thread is interrupted`() {
var count = 0
println("JcLog: Start test $expectedCount")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine this test was giving some problems with the new changes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching from Thread to Coroutines always gives problems in tests.... 🙃
BTW, this log had to be removed before I committed it, but I forget, let me remove it now.

}

@Test
fun `loops until an exception is thrown`() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this one not needed anymore? What happens when a Exception is thrown now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When an exception is thrown, the job is stopped, so maybe we can keep this test as well, but I don't think it gives us a lot of value.
Let me see if I can keep it as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see, but the exception is still propagated because we are using launch?

Copy link
Collaborator Author

@JcMinarro JcMinarro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.
Let me update the PR

@Test
fun `loops until thread is interrupted`() {
var count = 0
println("JcLog: Start test $expectedCount")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching from Thread to Coroutines always gives problems in tests.... 🙃
BTW, this log had to be removed before I committed it, but I forget, let me remove it now.

repeat(loopIterations) {
loopBody()
override fun loop(loopBody: suspend () -> Unit) {
scope.launch {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't because loopBody is a suspend function, but loop isn't, and suspend function can only be executed inside of a coroutine or from another suspend function.
For now, as far as it wasn't needed, I wanted to keep the Looper method signature but change its internal implementation. If it is needed, we can change it later.


internal fun startCheckingUpdates() {
stopped = false
updatesExecutor.execute { checkQueueUpdates() }
job = scope.launch { checkQueueUpdates() }
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the case of running twice the startCheckingUpdates wasn't handled previously, but now with the current implementation should be as easy as canceling the previous job if it isn't null.
Let me update it

) {

internal lateinit var bot: Bot

private val commandHandlers = linkedSetOf<Handler>()
private val errorHandlers = arrayListOf<ErrorHandler>()

@Volatile private var stopped = false
private val scope: CoroutineScope = CoroutineScope(ioDispatcher)
private var job: Job? = null
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me update it

private val logLevel: LogLevel,
ioDispatcher: CoroutineDispatcher = Dispatchers.IO,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dispatcher is a bit different than Thread, A Dispatcher is more like a ThreadPool where coroutines are executed (Except the Dispatcher.Main on Android implementation that use the UI Thread), so it shouldnt be blocking IO Operations on the rest of the app. Even the coroutine we launch on this Dispatcher will be running indefinitely until the loop is stopped, on every iteration of our infinite loop we are "suspending" our logic and delegating to the dispatcher we can continue (our job has not been canceled) or if there are others coroutines waiting to be executed and need to be run before us. We achieve it by calling yield() method.

Apart from that, I had a doubt about using Dispatchers.Default or Dispatchers.IO. Reading the docs First one is used to run "hard jobs" that consume machine recurses and the second one to operations that will be interacting with IO Resources. In our case, I think the IO definition is closer to our implementation (We run IO calls on every iteration). Maybe, something we could do is run updates on Dispatchers.Default but switch to Dispatchers.IO when API calls are executed. For that would be better to improve the "Retrofit integration" to use coroutines internally, but I think it can be achieved in a separate PR if we decide it is needed.

Lastly, regarding the BlockingQueue, of course, we could use Channel instead, I didn't think about that, but it is a really good point

Comment on lines 22 to 23
private val scope: CoroutineScope = CoroutineScope(ioDispatcher)
private var job: Job? = null
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking and, as far as we cancel the job just before a new job is started, we shouldn't have memory problems. A job can be called to cancel() multiple times.

}

@Test
fun `loops until an exception is thrown`() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When an exception is thrown, the job is stopped, so maybe we can keep this test as well, but I don't think it gives us a lot of value.
Let me see if I can keep it as well

private val logLevel: LogLevel,
ioDispatcher: CoroutineDispatcher = Dispatchers.IO,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to do separate PR using Channel instead of BlockingQueue, because If I change it here, lot of code will need to be updated (Tests are using fake blockingQueue that will need to be updated as well..)

Copy link
Member

@vjgarciag96 vjgarciag96 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff, thanks for actioning the feedback! Once we resolve the discussion around the Dispatcher and the volatile we are good to go.

@JcMinarro
Copy link
Collaborator Author

Great stuff, thanks for actioning the feedback! Once we resolve the discussion around the Dispatcher and the volatile we are good to go.

Ok, I think it should be fine now ;)

Copy link
Member

@vjgarciag96 vjgarciag96 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! This is wonderful, thank you so much for the contribution again!

@vjgarciag96 vjgarciag96 merged commit c41efdf into kotlin-telegram-bot:main Mar 7, 2023
@JcMinarro JcMinarro deleted the use-coroutines branch March 7, 2023 09:30
@JinKoro
Copy link

JinKoro commented May 15, 2023

@vjgarciag96 When is the release of this task planned?

@JcMinarro
Copy link
Collaborator Author

@vjgarciag96 When is the release of this task planned?

I am not sure if there is any new release planned yet.
As far as this is on Jitpack, you can use a "commit SHA1" as a dependency version in your project.

dependencies {
  implementation 'com.github.kotlin-telegram-bot.kotlin-telegram-bot:dispatcher:f2c594d16c'
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Unable to launch coroutines inside of the dispatch block
3 participants