Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coroutines in Okio. #531

Closed
wants to merge 1 commit into from
Closed

Coroutines in Okio. #531

wants to merge 1 commit into from

Conversation

swankjesse
Copy link
Member

This is a work in progress.

@swankjesse
Copy link
Member Author

This makes it possible for suspending functions to call Okio, and Okio might not block when called. Initially almost everything will block anyway because we haven’t written nonblocking backends (Sockets, Files, Pipes), and we haven’t written non-blocking transforms (compression, hashing). But it shouldn’t be too difficult to make those things non-blocking.

Callers must opt-into the non-blocking operations with the Async suffix. We also have an internal Suspendable suffix when the caller is a coroutine but we aren’t sure whether the implementation is.

This code is bad because it copy-pastes a bunch of behavior for the async variant. We could delegate from non-coroutine to coroutine, but then I think we have to allocate. Is there a better way?

This doesn’t yet offer the full compliment of Async methods because I don’t think callers should use them. In particular, doing a suspending call to read each byte is likely to be extremely inefficient. It’s better to do coarse async calls like require or request and then use sync calls for the pieces.

@Tolriq
Copy link

Tolriq commented Nov 2, 2018

Sorry to ask here, but since it's probably the follow up to that PR :)

Is there any plan in future to extend those to OkHTTP at all to make the synchronous calls like execute suspendable, or not? Other solution is to enqueue and suspendCancellableCoroutine but require a tons more of internal changes in code.

Since proper migration to coroutines will be a lot of works for everyone, it would be nice to know what are the plans on Ohttp/Moshi/Okio so we can plan our new internals API for the consumers of those libraries.

@swankjesse
Copy link
Member Author

I’m hoping we can reach the point where you can have more in-flight HTTP calls than threads. I don’t yet know how this will work in practice.

Copy link

@zach-klippenstein zach-klippenstein left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exciting! A few general comments:

Okio might not block when called

This contract concerns me a bit. If I’m trying to write some non-blocking coroutine code that calls out to Okio, I’ll have to wrap all my *Async() calls in a withContext(IO) { … } anyway since it’s perfectly valid for the function to block. That defeats the point of providing a suspend api in the first place, doesn’t it? Additionally, at Kotlin Conf this year, Roman Elizarov recommended following the convention that functions marked as suspend do not block (video at time). Not following that convention here is going to be surprising.

Instead, for the default implementations of the async methods that delegate to the blocking ones, we could just wrap those in withContext(IO) { … }, which is likely what consuming code would be doing manually anyway without the suspend APIs. Then, when underlying streams natively support non-blocking IO, its basically just an optimization.

doing a suspending call to read each byte is likely to be extremely inefficient

For Sink/Source, where a read/write of each single byte will actually context switch, definitely. However, for the buffered versions of the interfaces, individual reads/writes will only block to read an entire chunk anyway. In the fast path, where buffered data is already available, none of the functions will suspend and the coroutines runtime is optimized for that case.

try {
val toRead = takeAsync(byteCount)
return super.readAsync(sink, toRead)
} catch (e: InterruptedException) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this exception wrapped? Should this handle CancellationExceptions too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thing to be consistent with our blocking APIs, which would otherwise need to throw this checked exception. And yes, we should figure out cancelation.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, it's a checked exception. I think coroutine cancellation should just work.


while (true) {
val now = System.nanoTime()
val byteCountOrWaitNanos: Long = synchronized(this) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another blocking call in a coroutine. The coroutine-friendly Mutex doesn't provide a non-suspend API so unless we want to write our own Mutex that supports both, I think we either have to do this or convert all synchronization to use coroutines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two kinds of synchronized; when you would wait for blocking code and when you would wait for non-blocking code. This is the latter, and could be replaced with compareAndSet() in a loop. In practice that means it'll rarely stall here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and we have a project planned where we replace lots of synchronized blocks with compareAndSet() loops. The big drawback is that the latter requires an allocation on every use so we replace a risk of blocking with a certainty of allocating.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I'm not too worried. Even if this does block it will be for such a short period of time it's probably fine.

@zach-klippenstein
Copy link

zach-klippenstein commented Nov 2, 2018

The only potential issue with automatically wrapping calls in withContext(IO) is hard-coding that dispatcher. That sort of thing is usually bad for testability, however I think it might be fine here because:

  • Dispatchers.IO is an extremely reasonable choice for blocking work on the JVM. It's built into the standard library, and optimized to work efficiently with the Default dispatcher.
  • By opting-in to the async APIs, consuming code is effectively saying "I don't care how you make the operation asynchronous, just don't block me".
  • If you need to mock out a SuspendableSink/Source for testing, you'll be mocking out the suspend methods themselves anyway.
  • On JS, IO is all asynchronous already so there are no blocking calls to wrap.

I'm not sure what the story looks like for Kotlin/Native, but I'm sure we can solve that problem when we get there.

@swankjesse
Copy link
Member Author

Really good advice. I'm gonna update this PR to do withContext(IO).

@Tolriq
Copy link

Tolriq commented Nov 3, 2018

Instead, for the default implementations of the async methods that delegate to the blocking ones, we could just wrap those in withContext(IO) { … }, which is likely what consuming code would be doing manually anyway without the suspend APIs.

IO is not always good, 64 thread is a lot and for network / DB access some kind of limits are often wanted.

I have hope that Kotlin/kotlinx.coroutines#261 comes so allow such controls, but until then I'll use my dispatcher (and thread pool) for IO with DB and remote hosts I talk to. Allowing control of load.

If you force switching to IO then you force costly context switching without bringing anything for the consumer.
So I'm really not sure it's library job to chose the dispatcher at all.

@swankjesse swankjesse force-pushed the jwilson.1101.coroutines branch 4 times, most recently from ef9d762 to afae35b Compare November 3, 2018 11:12
@@ -51,10 +53,24 @@ interface Sink : Closeable, Flushable {
@Throws(IOException::class)
fun write(source: Buffer, byteCount: Long)

/** Non-blocking variant of [write]. Uses the IO dispatcher if blocking is necessary. */
@Throws(IOException::class)
@JvmDefault
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Egorand I got this working so I don’t need separate interfaces. Yay!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And hoorah for D8 for making default methods available on all Androids!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Niice!

try {
val toRead = takeAsync(byteCount)
return super.readAsync(sink, toRead)
} catch (e: InterruptedException) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thing to be consistent with our blocking APIs, which would otherwise need to throw this checked exception. And yes, we should figure out cancelation.

@zach-klippenstein
Copy link

If you force switching to IO then you force costly context switching without bringing anything for the consumer.

@Tolriq The current IO dispatcher shares threads with Default, and so dispatching to it only forces a context switch if you're not already on Default or IO. From the kdoc:

This dispatcher shares threads with a Default dispatcher, so using withContext(Dispatchers.IO) { ... } does not lead to an actual switching to another thread — typically execution continues in the same thread.

As for limiting concurrency, how are you doing that without coroutines? Why won't that strategy work with them?

@zach-klippenstein
Copy link

We could also provide an API like Sink.withAsyncContext(context: CoroutineContext) that would wrap a stream with one that uses the given context for all the async methods.

E.g.:

fun Sink.withAsyncContext(context: CoroutineContext): Sink = object : Sink by this {
  suspend fun writeAsync(source: Buffer, byteCount: Long) = withContext(context) {
    write(source, byteCount)
  }

  suspend fun flushAsync() = withContext(context) {
    flush()
  }

  suspend fun closeAsync() = withContext(context) {
    close()
  }
}

And similar for Source, BufferedSink, BufferedSource, etc.

@swankjesse
Copy link
Member Author

I’m not thinking in coroutines yet, but...

I expect our async strategy won’t be to configure contexts on streams. Instead we attempt to get as much suspend coverage as possible and only context switch when we would otherwise block.

@Tolriq
Copy link

Tolriq commented Nov 4, 2018

Well a real life example is that I need to talk to a bugged server that only support 4 blocking threads.

Some of the blocking commands can take up to 1 minute like sync, and I also always need to have at least one thread free for commands that should always run instantly.

The normal solution is to use a dedicated ThreadPool limited to max 3 with .asDispatcher and use that as the coroutine context for all the potentially blocking commands.

Now in a coroutine world, suspend function are non blocking but run in the coroutine context.

So with that coroutine context I could totally directly do things like

launch {
suspendingfunction()
}

Since the function run on the coroutine context, it's limited to the 3 threads and in theory there's no issue.

Now if internally the function switch to IO, then despite all my configuration being OK, I can now have up to 64 active connection to the server and it's not good.

  • If I don't know that it switch, I have an issue hard to debug.
  • If I'm now aware, what solution do I have, either call the suspend function in runBlocking or async.await.
    But not only this reduce a lot the benefit of couroutines, but there's still a thread switch from my thread pool to IO then back to my thread pool, so waste of ressources for no benefit.
    In that case I'm better not using the suspend version and directly call the blocking version.

So IMO this is an issue specially for all the people that would not expect the switch and will have an hard time understanding.

@zach-klippenstein I gave the explanations about threads in original post ;)
About Default and IO sharing pool as I said until Kotlin/kotlinx.coroutines#261 is reality, it does not work for all the other thread pools. And I can imagine a lot of different scenario like mine that will require dedicated thread pools.

@zach-klippenstein
Copy link

@Tolriq Limiting the number of threads your dispatcher uses is only one way to limit concurrency.

Another way to achieve that is to use a pool of coroutines, instead of threads, eg the worker pool pattern. This has the added advantage of sharing threads with the existing standard thread pools as I mentioned above. It might be a bit more verbose, but given how important this behavior is to your system, I would think erring on the explicit side could be a good thing.

I think, more generally, in the ideal future, all libraries, apps, etc would only use the standard dispatcher threadpools for almost all of their work, because it reduces the total number of distinct threads in the system and reduces thread context switching even when two bits of code that don't know anything about the others' internals are communicating. The Kotlin team has already said they plan to ship an implementation of the worker pool pattern eventually, but it's simple enough that it's already feasible to implement one-off when you need it.

@zach-klippenstein
Copy link

@swankjesse Yea that withAsyncContext was a horrible idea. For streams that are backed by non-blocking primitives (instead of just wrapping the blocking ones), wrapping with a different context wouldn't only make no sense, it would force falling back to synchronous APIs.

I'm also wondering if there's some utility in keeping the SuspendableSource/Sink interfaces around in some form. If I'm wrapping IO primitives that only provide an asynchronous API, in order to implement the blocking calls I would need to use runBlocking in each call. On one hand, this means no consumer of any okio types needs to know or care whether the underlying primitives are synchronous or asynchronous.
On the other hand, Mlmaybe the consumer should care about that - if spinning up an event loop just to make a small blocking call has enough performance overhead (not sure it would, but seems possible), it would be better for the type system to express that the suspend calls will be more efficient. That's a lot of speculation though so I might just be over-thinking it at this point.

@Tolriq
Copy link

Tolriq commented Nov 4, 2018

I've since talked to Roman Elizarov about that and he also suggest worker pool to handle both blocking and non blocking, but the talk he directed me to is really unclear about how to do that at an higher level. Channels are still experimental and Actor should be very self contained.

I have currently no idea about how to apply this pattern globally to limit any of X functions to at least 3 at the same time.

Do you have any more details about how it could currently be applied in that way?

Typically my app can talk to many different systems.
The limits are specific to each systems. So the main app should not worry about that. All is behind interfaces and callbacks currently with each system having it's own thread pool. Goal would be to migrate that part to coroutines too, but still be able to force limits inside each system for a pool of commands. (Commands can be anything in to anything out, so not really any common pattern between the commands)

@zach-klippenstein
Copy link

zach-klippenstein commented Nov 4, 2018

@Tolriq This is getting off-topic for this PR, but I think one solution could be fairly straightforward. Since thread pools are pervasive in your architecture, I'm assuming you're using Executors in front of them. One simple thing you could do is replace every Executor/thread pool pair with a worker pool and a Channel<suspend ()->Unit>. Calls like executor.execute { foo() } then become channel.send { foo() }. Are you in the Kotlin slack (kotlinlang.slack.com)? The #coroutines channel there would be a great place to continue this discussion!

@zach-klippenstein
Copy link

zach-klippenstein commented Nov 4, 2018

One more argument for keeping the dispatcher private is that the use of any dispatcher at all is an implementation detail of the default implementation of the asynchronous methods. A particular Sink/Source implementation may wrap an IO API that is "natively" asynchronous (e.g. NIO's AsynchronousChannel, all IO in JavaScript) in which case there is no separate dispatcher involved anyway.

Providing any API to set the dispatcher used for the default implementation leaks the implementation and gives consumers a false sense of control when there's no guarantee that dispatcher will be honored.

@Tolriq
Copy link

Tolriq commented Nov 4, 2018

@zach-klippenstein Yes I'm there and I'll open a thread there to continue when I'm more advanced, but just to finish threadpool where the only way I found to handle that and coroutines seemed a way out to have a way better internals.

Just want to be sure that the fact that some libraries move to withContext(IO) does not break any pattern I decide to go with. This is a larger discussion about coroutines and blocking/non blocking network and I'm pretty sure the decisions you guys make for okio/okhttp will serve as root for many others due to how respected you are. (That's why I'm asking here too by giving some special needs that where maybe not seen during design process).

Edit: And your solution does just remove the threadpools but not the callbacks so still keeping the same messing internals :(

@zach-klippenstein
Copy link

This code is bad because it copy-pastes a bunch of behavior for the async variant. We could delegate from non-coroutine to coroutine, but then I think we have to allocate. Is there a better way?

@swankjesse Here's one idea: #532

@zach-klippenstein
Copy link

@Tolriq The more edge cases the better! 🙂 That solution is definitely not ideal, but I don't know your architecture. Again, happy to dig into this more on slack.

I think the contract of okio's async methods should be simply that they won't block the current thread. It shouldn't make any guarantees about dispatchers or threads, because there are likely to be implementations that don't use dispatchers (or even threads, in the case of JavaScript) behind the scenes anyway.

@Tolriq
Copy link

Tolriq commented Nov 4, 2018

But isn't using a dispatcher in Okio already an hack to try to have a blocking function non blocking?

It's just a convenience for consumers, but if a method can't be made really non blocking why not let the consumer handle that as we already do?

To me this is the real question.

@zach-klippenstein
Copy link

It's only a hack inasmuch as any code offloading blocking work to a dedicated thread pool is a hack.

I think the whole premise of this PR is that it is useful for the standard Okio interfaces to have suspend support, so they can be composed without blocking, wrap callback-based APIs, etc. In order for that to work though, every implementation needs to handle both cases. Nothing's stopping you from still calling the blocking methods if you really want to.

@Egorand
Copy link
Collaborator

Egorand commented Nov 5, 2018

How about making dispatcher a function parameter and defaulting it to Dispatchers.IO?

@JvmOverloads suspend fun writeAsync(
  source: Buffer,
  byteCount: Long,
  dispatcher: CoroutineDispatcher = Dispatchers.IO
) = withContext(dispatcher) {
  write(source, byteCount)
}

@Tolriq
Copy link

Tolriq commented Nov 5, 2018

@zach-klippenstein

It's only a hack inasmuch as any code offloading blocking work to a dedicated thread pool is a hack.

The difference is that every library that use dedicated pool like OkHTTP or Glide do give control over that thread pool.

Here's we are talking about library using a default pool of 64 threads. (64MB of memory if wrongly used, ...)

Since English is not my native language, I'd just want to add that I'm not arguing for my cause as in all cases users will adapt to the decisions. I just want to be sure that such important decisions are fully thought. (And actually this discussion also helps thinking about how to architecture apps in general, since we are at the beginning of coroutines).

You are shaping the future here ;)

Questions goes with cancelling too, how blocking function called on IO thread can be cancelled? Cancellation is cooperative, so for socket based, usually you use suspdencancelablecoroutines and close the socket at cancel moment.
A user would assume that a suspend function can be properly cancelled and will be cooperative.
Are the calls moved to withContext cancellable in this proposal?

Edit: Link to slack about the worker pool https://kotlinlang.slack.com/archives/C1CFAFJSK/p1541415543349000

@zach-klippenstein
Copy link

I understand the gravity of getting the API right here, and I definitely appreciate this discussion!

Questions goes with cancelling too, how blocking function called on IO thread can be cancelled?

That's a great question, definitely needs some more exploration I think. Also, the Timeout mechanism needs to support coroutines too and that hasn't been addressed in this PR yet.

The difference is that every library that use dedicated pool like OkHTTP or Glide do give control over that thread pool.

Again, I think it really comes down to the fact that the API can't, and shouldn't, make any guarantees about how threading is (or isn't) handled for the async functions, if for no other reason than what I've already stated above: streams might be backed with IO primitives that are callback-based and don't give you any control over threads themselves (asynchronous NIO sockets, everything in javascript). Also, neither Kotlin/Native nor KotlinJS have support for threads at all.

I think as coroutines become more pervasive, we'll stop thinking in terms of "thread pools" because it doesn't make sense to keep reaching all the way down to such a low-level concept. Thread pools make sense when you're working with threads. Other patterns, such as worker pools, make sense for coroutines. It makes sense for higher-level libraries like OkHTTP to provide configurable thread pools for blocking calls, because that's the only way to limit concurrency when you're working with blocking calls and threads. However, for coroutine-based async calls, thread pools are the wrong tool. I'm not sure exactly what it would look like yet, but OkHTTP could, for example, give you control over some worker pool implementation.

@Tolriq
Copy link

Tolriq commented Nov 5, 2018

However, for coroutine-based async calls, thread pools are the wrong tool.

The thing is that from my understanding by using dispatchers to simulate async calls from blocking one you are introducing that tool and could generate issue.

That's where I'm still blocked in this discussion and want to be sure, I have problems understanding why using non configurable dispatcher to simulate async calls is a good solution to the problem.
What is async is async what can't be async should maybe be left to the consumer or exposed as being blocking but with a convenience helper to make it async to know what is blocking, what is async, and what is blocking in async wrapper.

I see that a same public API will have different implementation for JS or Android or others, but if implementation can have issues on some OS like no control or problems to cancel then the implementation start to be something that is important enough to impact the API definition.

@zach-klippenstein
Copy link

Just chatted with @swankjesse offline about timeouts and cancellation, here's what we came up with:

  • When Okio’s Timeout/AsyncTimeout cancels an asynchronous operation, the operation should be cancelled but the coroutine in which the operation is running should not be cancelled.
  • Cancelling a coroutine that's running an asynchronous operation should not leak anything. To that end, when such a coroutine is cancelled, the currently-running operation should be cancelled as well as if the operation had timed out. One way to do this would be to register a CompletionHandler on the job in AsyncTimeout.enter() that delegates to AsyncTimeout.timedOut() (which is the method that cancels socket operations by closing the socket).

In particular, that second point ensures that Kotlin's withTimeout works as expected. If you want to specify a timeout on total request/response, you should ultimately be able to write something like:

try {
  val sendMoneyResponse = withTimeout(1000) { paymentService.sendMoney(sendMoneyRequest) }
} catch (e: TimeoutCancellationException) {
  // network call timed out, socket will have been closed.
}

Timeouts will already clean up after themselves. Operations are wrapped in something that looks like this, which means that even if doWork is suspending and the coroutine gets cancelled, timeout.exit() will still get called.

timeout.enter()
try {
  doWork()
} finally {
  timeout.exit()
}

@Tolriq
Copy link

Tolriq commented Nov 6, 2018

Sounds nice.

Only question would be if there's things in Okio that could be needed to be cancelled without socket close or that socket being closed in that case could trigger an issue?

Simple example that I suppose is handled already but using any of the closable api with .use {} would trigger a close after a close when the call is cancelled. I guess double close are ignored and all is OK everywhere just thinking out loud is there's no other cases. My Okio usage is relatively basic.

@zach-klippenstein
Copy link

How about making dispatcher a function parameter and defaulting it to Dispatchers.IO?

@Egorand Dispatcher is an implementation detail of the default implementation that shoves blocking calls onto a different dispatcher, so exposing it is leaking implementation specifics. Async calls implemented by callback APIs (e.g. NIO's async sockets, Javascript) would ignore that parameter anyway, in which case it would not only be irrelevant but would probably be confusing and misleading.

@Egorand
Copy link
Collaborator

Egorand commented Nov 6, 2018

Good to know @zach-klippenstein. I was thinking that dispatchers are similar to RxJava's Schedulers and subscribeOn(), is that the wrong mental model?

@zach-klippenstein
Copy link

zach-klippenstein commented Nov 6, 2018

@Egorand They're very close, so it's a useful mental model, in some cases. However, remember that coroutines are essentially just callbacks. So when you pass a callback to a method, it might get posted to an Executor or Handler or something, but it might be invoked a different way. In the same way, when a coroutine needs to do asynchronous work, it has options: it might jump onto a different dispatcher/thread to do some blocking work, or it might just pass the continuation to some other API (as in the NIO case).

The nice thing about coroutine dispatchers and contexts is that, if I care about what context my coroutine runs in, I specify that explicitly, and then if code I'm calling out to jumps onto other threads, sends continuations through 3rd-party libraries, or whatever, when my code gets around to being executed again, I am guaranteed it will be running in the context I initially asked for. So unless you're doing something very strange, it shouldn't matter how a suspend function does asynchronous work.

@Tolriq
Copy link

Tolriq commented Nov 7, 2018

So unless you're doing something very strange, it shouldn't matter how a suspend function does asynchronous work.

I still don't agree ;) On limited ressources devices those implementation details can have impacts. Like you decide to go in a special thread pool with priority real time (Ok I push the concept a little, but since you switch context you can switch to actually anything it's just implementation details from your saying) this would be an issue for any consumer.

Same on slack #coroutine a guy did not understand why moving from asynctask to coroutines did impact it's UI thread while all was in background thread as before. (Due to difference in number of threads and thread priorities).
In the end both are ways to do async work, only implementation change, and yet the impact difference is very huge and hard to understand for newcomer.

I know I repeat myself a lot and I'm sorry about that, but a lot of people do not dig in implementation details and have hard times understanding what happens when problems arise that are often hard to diagnose and tests.

To me there's blocking calls and there's non blocking calls. Those should not switch context without user knowing.

Then there's conveniences function that can make blocking calls non blocking via the usage of contexts and dispatchers, those should be clearly identified about that fact and what dispatcher will be used.

I still do not buy that the fact that on some cases there's no context switch due to the function being non blocking, magically makes the implementation details for the cases where it is blocking a non problem.

I'm probably wrong as you are all more experienced, but so far I still can't see a proper explanation that can make someone like me with threading experience believe that threading is a non issue in that special case.
Okio is not really a newcomer library so most people will have the necessary knowledge about blocking / non blocking and threading and limits, but if the concept is spread to all libraries then it will be a big mess for anyone to have control.
Coroutines are a fantastic tool under a controlled environment, but if just 1 library decide to switch to Default a blocking call you have a catastrophe incoming.
Just that simple case makes the implementation detail something important as in coroutine world blocking things are an issue, and hiding blocking things without controls is error prone.

I really hate to not be better at English to be sure the message is clear but I hope it can be resumed as:
Implementation details for blocking calls wrapped in coroutines should be clearly identified and in control of the consumer.

This is a work in progress.
channel.write(source, byteCount)
}
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIO!

okio/jvm/src/main/java/okio/Okio.kt Show resolved Hide resolved
val instance: SelectRunner by lazy {
val result = SelectRunner()
result.start()
result

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
result
SelectRunner().apply { start() }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd written that and then used undo ’cause I’d line wrapped it and it seemed fancier than necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the best way to have a worker thread standing by? I was wondering if there's a more coroutines-idiomatic way to kick this off.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could turn the thread into a coroutine on the IO dispatcher. Saves you from spinning up another thread, especially when there aren't any pending operations – if the queue is empty, you could suspend the coroutine instead of blocking the thread.


// If the channel has been closed, fail the task.
if (!key.isValid) {
continuation.resumeWithException(IOException("canceled"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key.isValid will also be false if the key has been canceled, right? I know you haven't implemented it yet but you have a todo above – if the key was canceled due to coroutine cancelation, the coroutine will already be in the canceled state and this resumeWithException will just report the IOException to the uncaught exception handler. That's probably not what we want in this case, instead we should just not create the IOException at all and return true immediately.

If the key is invalid for some other reason (e.g. channel was closed, as you've commented), then we should still resume with exception.

There's at least two approaches to this:

  1. Rely on the continuation itself as the source-of-truth for whether the key is invalid due to coroutine cancelation or not, or
  2. Keep track explicitly ourselves.

The naïve way to implement 1 be just checking the active flag:

if (!key.isValid) {
  if (continuation.isActive) continuation.resumeWithException(IOException("canceled"))
  return true
}

But that creates a race where the IOException might get reported if the coroutine was canceled after the channel was closed. That might be ok since the key wasn't canceled due to coroutine cancelation, so technically we should still throw the exception. Another solution would be to use the atomic resume API:

if (!key.isValid) {
  continuation.tryResumeWithException(IOException("canceled"))?.let(continuation::completeResume)
  return true
}

But that API is marked internal so that's just asking for trouble.

The safest solution is to do something like add an atomic boolean to Task that we can set to false in invokeOnCancellation before we cancel the key, and then check here instead of the isActive flag.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. We'll need to handle both coroutine cancellation and socket close.


suspend fun <T> SelectableChannel.selectAsync(ops: Int, block: () -> T): T {
val selectRunner = SelectRunner.instance
// TODO: cancel the key when the coroutine is canceled?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you have any ideas for how to implement this? Task could probably vend a CompletionHandler that we could directly pass to invokeOnCancellation.

I don't know the NIO select apis at all, but according to the docs canceling a key can potentially block, so we probably shouldn't cancel the key direction from that handler (CompletionHandlers are invoked synchronously). Maybe we could enqueue a special "cancelation task" that would actually cancel the key in the future? I think it's fine if the key/channel doesn't get canceled immediately as long as it doesn't try to do any wasted IO in the future (which we could ensure using our own flag as discussed above).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No cancelation task required, if we just set the flag on the existing task then wake up the selector the task can just cancel itself.

// The channel is ready! Call the block.
if (key.readyOps() != 0) {
try {
val result = block()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might also need to double-check for coroutine cancellation here, depending on how we implement that.

@yogurtearl
Copy link

Not sure if these are commonly accepted definitions, but

Asynchronous: Doesn't block the current thread, but might block some other thread.
Non-blocking: Subset of Asynchronous behavior that doesn't block any thread.

If these definitions make sense, would it make sense to have the methods named *NonBlocking() instead of *Async() ?

@kevincianfarini
Copy link
Contributor

kevincianfarini commented Oct 14, 2019

Does it make sense to break off the async capabilities into their own async classes like AsyncSource? Seems to me that could be justified, but I don't know if there's a motivation not to.

Since this is specifically for coroutines, maybe offer an artifact with those classes?

That being said, it might also make sense to look into how ktor implements io with coroutines.

@swankjesse
Copy link
Member Author

Abandoning this for now. Loom is a nicer solution.

@swankjesse swankjesse closed this Oct 11, 2020
@yogurtearl yogurtearl mentioned this pull request Mar 5, 2021
4 tasks
@yogurtearl
Copy link

Loom is a nicer solution.

@swankjesse
Any docs/write-up as to why Loom is a nicer solution?
also, if/when okio supports multiplatform, what would be the solution for JS and Native?

@zach-klippenstein
Copy link

My understanding is that it's "nicer" because it's a drop-in solution – you don't have to rewrite all your functions as suspend functions and introduce duplicate APIs to handle suspend and non-suspend callers. Everything just works, the threads are just managed by the VM instead of the OS. I think it might also integrate with the existing java IO libraries more seamlessly, but I don't know that for sure.

@swankjesse
Copy link
Member Author

Yes! In particular, the compiled bytecode doesn’t need to be dramatically larger and slower to support being suspended. Instead, the VM does it internally using the existing datastructures it already keeps.
#814 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants