Skip to content

Commit

Permalink
Minimal async-await foundations
Browse files Browse the repository at this point in the history
This sets the foundations for async-await support in godot-rust, based on the
original idea in #284. However, although the tests work, this is not a full
implementation:

- Async methods can only be registered manually using `build_method`. Macro
  syntax and implementation are out of the scope of this PR.
- The runtime types aren't registered automatically yet.
  Users need to manually call `register_runtime` and `terminate_runtime`
  functions in their library lifecycle hooks. Improving this is out of the
  scope of this PR for now.
- The crate is currently re-exported as `gdnative::asn`, instead of the much
  longer `async_yield`. The name is open to discussion -- I don't like it very
  much.
- Only local spawners are supported, due to issues with thread safety. Users
  may off-load tasks that don't contain `yield`-likes to thread pool spawners
  using something like `futures::future::Remote`, however.
- Panics in async methods don't currently behave very well. Their
  `FunctionState`-likes simply block forever and any outstanding bridge
  objects for futures can be leaked.
  • Loading branch information
toasteater authored and chitoyuu committed Oct 26, 2021
1 parent 54624aa commit 7aa9bc5
Show file tree
Hide file tree
Showing 17 changed files with 999 additions and 75 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"gdnative",
"gdnative-async",
"gdnative-bindings",
"gdnative-core",
"gdnative-derive",
Expand Down
26 changes: 26 additions & 0 deletions gdnative-async/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "gdnative-async"
authors = ["The godot-rust developers"]
description = "Runtime async support for godot-rust."
documentation = "https://docs.rs/crate/gdnative-async"
repository = "https://github.com/godot-rust/godot-rust"
homepage = "https://godot-rust.github.io/"
version = "0.9.3"
license = "MIT"
workspace = ".."
edition = "2018"

[features]

[dependencies]
gdnative-derive = { path = "../gdnative-derive", version = "=0.9.3" }
gdnative-core = { path = "../gdnative-core", version = "=0.9.3" }
gdnative-bindings = { path = "../gdnative-bindings", version = "=0.9.3" }
futures-task = "0.3.13"
atomic-waker = "1.0.0"
once_cell = "1.7.2"
thiserror = "1.0"
parking_lot = "0.11.0"
crossbeam-channel = "0.5.0"

[build-dependencies]
45 changes: 45 additions & 0 deletions gdnative-async/src/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use futures_task::LocalSpawn;
use once_cell::unsync::OnceCell as UnsyncCell;
use thiserror::Error;

thread_local!(
static LOCAL_SPAWN: UnsyncCell<&'static dyn LocalSpawn> = UnsyncCell::new();
);

/// Error returned by `set_*_executor` if an executor of the kind has already been set.
#[derive(Error, Debug)]
#[error("an executor is already set")]
pub struct SetExecutorError {
_private: (),
}

impl SetExecutorError {
fn new() -> Self {
SetExecutorError { _private: () }
}
}

pub(crate) fn local_spawn() -> Option<&'static dyn LocalSpawn> {
LOCAL_SPAWN.with(|cell| cell.get().copied())
}

/// Sets the global executor for the current thread to a `Box<dyn LocalSpawn>`. This value is leaked.
pub fn set_boxed_executor(sp: Box<dyn LocalSpawn>) -> Result<(), SetExecutorError> {
set_executor(Box::leak(sp))
}

/// Sets the global executor for the current thread to a `&'static dyn LocalSpawn`.
pub fn set_executor(sp: &'static dyn LocalSpawn) -> Result<(), SetExecutorError> {
LOCAL_SPAWN.with(|cell| cell.set(sp).map_err(|_| SetExecutorError::new()))
}

/// Sets the global executor for the current thread with a function that will only be called
/// if an executor isn't set yet.
pub fn ensure_executor_with<F>(f: F)
where
F: FnOnce() -> &'static dyn LocalSpawn,
{
LOCAL_SPAWN.with(|cell| {
cell.get_or_init(f);
});
}
56 changes: 56 additions & 0 deletions gdnative-async/src/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use atomic_waker::AtomicWaker;
use crossbeam_channel::{Receiver, Sender};

pub(crate) fn make<T>() -> (Yield<T>, Resume<T>) {
let (arg_send, arg_recv) = crossbeam_channel::bounded(1);
let waker = Arc::default();

let future = Yield {
waker: Arc::clone(&waker),
arg_recv,
};

let resume = Resume { waker, arg_send };

(future, resume)
}

/// Signal
pub struct Yield<T> {
waker: Arc<AtomicWaker>,
arg_recv: Receiver<T>,
}

impl<T: Send> Future for Yield<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.arg_recv.try_recv() {
Ok(arg) => Poll::Ready(arg),
Err(_) => {
self.waker.register(cx.waker());
Poll::Pending
}
}
}
}

pub(crate) struct Resume<T> {
waker: Arc<AtomicWaker>,
arg_send: Sender<T>,
}

impl<T: Send> Resume<T> {
/// Resume the task with a given argument from GDScript.
pub fn resume(self, arg: T) {
self.arg_send
.send(arg)
.expect("sender should not become disconnected");

self.waker.wake();
}
}
19 changes: 19 additions & 0 deletions gdnative-async/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
//! Runtime async support for godot-rust.
//!
//! This crate contains types and functions that enable using async code with godot-rust.
//!
//! # Safety assumptions
//!
//! This crate assumes that all user non-Rust code follow the official threading guidelines.

// Workaround for macros that expect the `gdnative` crate.
extern crate gdnative_core as gdnative;

mod executor;
mod future;
mod method;
mod rt;

pub use executor::{ensure_executor_with, set_boxed_executor, set_executor, SetExecutorError};
pub use method::{Async, AsyncMethod, Spawner};
pub use rt::{register_runtime, terminate_runtime, Context, InitError};
126 changes: 126 additions & 0 deletions gdnative-async/src/method.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;

use futures_task::{LocalFutureObj, LocalSpawn, SpawnError};

use gdnative_core::core_types::{ToVariant, Variant};
use gdnative_core::log::{self, Site};
use gdnative_core::nativescript::export::{Method, Varargs};
use gdnative_core::nativescript::{NativeClass, RefInstance};
use gdnative_core::object::ownership::Shared;

use crate::rt::Context;

/// Trait for async methods. When exported, such methods return `FunctionState`-like
/// objects that can be manually resumed or yielded to completion.
///
/// Async methods are always spawned locally on the thread where they were created,
/// and never sent to another thread. This is so that we can ensure the safety of
/// emitting signals from the `FunctionState`-like object. If you need to off-load
/// some task to another thread, consider using something like
/// `futures::future::Remote` to spawn it remotely on a thread pool.
pub trait AsyncMethod<C: NativeClass>: Send + Sync + 'static {
/// Spawns the future for result of this method with `spawner`. This is done so
/// that implementors of this trait do not have to name their future types.
///
/// If the `spawner` object is not used, the method call will fail, output an error,
/// and return a `Nil` variant.
fn spawn_with(&self, spawner: Spawner<'_, C>);

/// Returns an optional site where this method is defined. Used for logging errors in FFI wrappers.
///
/// Default implementation returns `None`.
#[inline]
fn site() -> Option<Site<'static>> {
None
}
}

pub struct Spawner<'a, C: NativeClass> {
sp: &'static dyn LocalSpawn,
ctx: Context,
this: RefInstance<'a, C, Shared>,
args: Varargs<'a>,
result: &'a mut Option<Result<(), SpawnError>>,
/// Remove Send and Sync
_marker: PhantomData<*const ()>,
}

impl<'a, C: NativeClass> Spawner<'a, C> {
/// Consumes this `Spawner` and spawns a future returned by the closure. This indirection
/// is necessary so that implementors of the `AsyncMethod` trait do not have to name their
/// future types.
pub fn spawn<F, R>(self, f: F)
where
F: FnOnce(Arc<Context>, RefInstance<'_, C, Shared>, Varargs<'_>) -> R,
R: Future<Output = Variant> + 'static,
{
let ctx = Arc::new(self.ctx);
let future = f(Arc::clone(&ctx), self.this, self.args);
*self.result = Some(
self.sp
.spawn_local_obj(LocalFutureObj::new(Box::new(async move {
let value = future.await;
ctx.resolve(value);
}))),
);
}
}

/// Adapter for async methods that implements `Method` and can be registered.
#[derive(Clone, Copy, Default, Debug)]
pub struct Async<F> {
f: F,
}

impl<F> Async<F> {
/// Wrap `f` in an adapter that implements `Method`.
#[inline]
pub fn new(f: F) -> Self {
Async { f }
}
}

impl<C: NativeClass, F: AsyncMethod<C>> Method<C> for Async<F> {
fn call(&self, this: RefInstance<'_, C, Shared>, args: Varargs<'_>) -> Variant {
if let Some(sp) = crate::executor::local_spawn() {
let ctx = Context::new();
let func_state = ctx.func_state();

let mut result = None;
self.f.spawn_with(Spawner {
sp,
ctx,
this,
args,
result: &mut result,
_marker: PhantomData,
});

match result {
Some(Ok(())) => func_state.to_variant(),
Some(Err(err)) => {
log::error(
Self::site().unwrap_or_default(),
format_args!("unable to spawn future: {}", err),
);
Variant::new()
}
None => {
log::error(
Self::site().unwrap_or_default(),
format_args!("implementation did not spawn a future"),
);
Variant::new()
}
}
} else {
log::error(
Self::site().unwrap_or_default(),
"a global executor must be set before any async methods can be called on this thread",
);
Variant::new()
}
}
}

0 comments on commit 7aa9bc5

Please sign in to comment.