Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wasm wasi #4064

Open
wants to merge 13 commits into
base: develop
Choose a base branch
from
Open

Wasm wasi #4064

wants to merge 13 commits into from

Conversation

igoriakovlev
Copy link
Contributor

This is Wasm-WASI target support for coroutines.

This is based on self-written EventLoop which has a simple implementation for queued line of callbacks with timeouts.

This allows to schedule tasks in scheduler. The EventLoop starts on the program entry-point exit (that supported by the Kotlin from 1.9.23).

The coroutine tests runner implemented with a direct EventLoop call so that looks like the RunBlocking in Native or JVM implementation. We do not RunBlocking itself because there is no design for it in single-threaded environments.

The EventLoop throws an exception EventLoopException which is discussable because it is not supposed to be catched by user in normal code (but still possible for the runTest calls).

compilations["main"]?.dependencies {
api("org.jetbrains.kotlinx:atomicfu-wasm-wasi:${version("atomicfu")}")
}
compilations.configureEach {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately we can't do this in configure-compilation-conventions.gradle.kts because there is no way to distingish WasmWasi target from WasmJs or JS targets there.

* Thrown when multiply exception were thrown in event loop.
* @see runEventLoop
*/
public class EventLoopException(public val causes: List<Throwable>) : Throwable("Multiple exceptions were thrown in the event loop.")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this public need consideration.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A typical approach that we take is to take the first throwable, add the other throwables to it as suppressed ones, and then throw the first one. No need for a separate type, the way I understand the problem. Here's an example:

internal fun throwAll(head: Throwable?, other: List<Throwable>) {
if (head != null) {
other.forEach { head.addSuppressed(it) }
throw head
} else {
with(other) {
firstOrNull()?.apply {
drop(1).forEach { addSuppressed(it) }
throw this
}
}
}
}

kotlinx-coroutines-core/js/src/CoroutineContext.kt Outdated Show resolved Hide resolved
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*

internal object WasiDispatcher: CoroutineDispatcher(), Delay {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not a private object in CoroutineContext.kt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, because WasiDispatcher is not a CoroutineContext. But if you would like I will move it.

package kotlinx.coroutines.internal

internal actual fun propagateExceptionFinalResort(exception: Throwable) {
println(exception.toString())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems too mild to me. For example, on Native, we terminate the whole process. On JS, I suppose we use logging only because there's no way to tell JS to stop everything, and even there, we are at least using the error stream, not the standard output. Is there a way to crash the program in WASI or at least make the error seem more scary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I put the code which prints an error into error stream and then exit with not 0 error code. I could even send a signal if you would like to have most scariest termination. Please notice - this way all finalisers will not work.

Comment on lines +10 to +7
@InternalCoroutinesApi
public fun runTestCoroutine(context: CoroutineContext, block: suspend CoroutineScope.() -> Unit) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will @PublishedApi internal with suppresses on the call site also work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to suppress anything. But if all reviewers ask for this then will do.

@InternalCoroutinesApi
public fun runTestCoroutine(context: CoroutineContext, block: suspend CoroutineScope.() -> Unit) {
val newContext = GlobalScope.newCoroutineContext(context)
val coroutine = object: AbstractCoroutine<Unit>(newContext, true, true) {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: boolean parameters should typically be named.

* Thrown when multiply exception were thrown in event loop.
* @see runEventLoop
*/
public class EventLoopException(public val causes: List<Throwable>) : Throwable("Multiple exceptions were thrown in the event loop.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A typical approach that we take is to take the first throwable, add the other throwables to it as suppressed ones, and then throw the first one. No need for a separate type, the way I understand the problem. Here's an example:

internal fun throwAll(head: Throwable?, other: List<Throwable>) {
if (head != null) {
other.forEach { head.addSuppressed(it) }
throw head
} else {
with(other) {
firstOrNull()?.apply {
drop(1).forEach { addSuppressed(it) }
throw this
}
}
}
}


private const val CLOCKID_MONOTONIC = 1

internal class Event internal constructor(internal var callback: (() -> Unit)?, internal val absoluteTimeout: Long) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this class be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It used for cancelling in the WasiDispatcher

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Event can implement the DisposableHandle interface for this purpose.

return ptrToSubscription
}

@OptIn(UnsafeWasmMemoryApi::class)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to opt in for the whole file.

} else {
nextCycleEvents.add(currentEvent)
nextCycleNearestEventAbsoluteTime = min(eventAbsoluteTime, nextCycleNearestEventAbsoluteTime)
nextCycleContainTimedEvent = eventAbsoluteTime > 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bug: only the last added event will decide whether there are timed events. Adding a timed one first, and then untimed, will lead to the system thinking there are no timed events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh, it is true, thanks!!!

val ptrTo8Bytes = allocator.allocate(8)
val ptrToSubscription = initializeSubscriptionPtr(allocator)

while (nextCycleEvents.isNotEmpty()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, here's how the event loop implementation works:

  • New tasks are added to the queue, remembering the earliest time the tasks should start;
  • When the event loop machinery gets its turn, it takes the list of the scheduled events. Then, it waits for the time that it takes for the earliest one to start. Finally, it iterates the list to run all tasks that are due, pushing back the tasks that are not due.

If so, this strategy can lead to incorrect ordering of tasks. Imagine the following scenario.

  • There are two tasks: A is scheduled to run in one second, B is scheduled to run in two seconds.
  • One second later, A runs for three more seconds and schedules another task, C, to be executed.
  • B adds itself to the queue.
  • C executes.
  • Only then does B execute.

The more robust approach is to use a heap. We have an implementation already in the library. https://github.com/Kotlin/kotlinx.coroutines/blob/0ca735851f0a225b6a7582232a1c9847a27fd059/kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt uses the heap to implement the event loop for kotlinx-coroutines-test. This implementation isn't long, but can be radically simplified further when there is no multithreading; you don't need to send the "dispatch events", you don't need to use synchronized, etc. You also don't need the separation between foreground and background work.

The main entry point you're interested in is

internal fun advanceUntilIdleOr(condition: () -> Boolean) {
while (true) {
if (!tryRunNextTaskUnless(condition))
return
}
}

With condition set to { false }. Another change is that here you should actually sleep instead of setting the current time unconditionally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, here's how the event loop implementation works:

Is this a requirement for the implementation? Because there is no tests checking this. And I am not sure that the other implementations, like in JS, are following this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a requirement. Tests don't check this because, in multithreaded environments, we actually violate this intentionally sometimes. On JS, we don't use our own event loop, we rely on the one provided by the environment.

Now that I look at the implementation more closely, I don't understand the reason to implement new event loop logic at all. We can probably inherit from EventLoopImplBase. For that, we need to implement EventLoopImplPlatform for Wasm/WASI properly. If we go down that path, then, to implement runEventLoop, we can duplicate the logic from

fun joinBlocking(): T {
try {
eventLoop?.incrementUseCount()
while (true) {
var parkNanos: Long
// Workaround for bug in BE optimizer that cannot eliminate boxing here
if (eventLoop != null) {
parkNanos = eventLoop.processNextEvent()
} else {
parkNanos = Long.MAX_VALUE
}
// note: processNextEvent may lose unpark flag, so check if completed before parking
if (isCompleted) break
joinWorker.park(parkNanos / 1000L, true)
}
} finally { // paranoia
eventLoop?.decrementUseCount()
}
// now return result
val state = state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
}
What do you think?

@dkhalanskyjb
Copy link
Collaborator

Please change the base of the PR to develop.

@dkhalanskyjb dkhalanskyjb changed the base branch from master to develop April 2, 2024 10:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants