Skip to content

Commit

Permalink
Try #804:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] committed Oct 26, 2021
2 parents dd1629f + 41b62e3 commit 1cc9eb8
Show file tree
Hide file tree
Showing 19 changed files with 1,044 additions and 183 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
@@ -1,6 +1,7 @@
[workspace]
members = [
"gdnative",
"gdnative-async",
"gdnative-bindings",
"gdnative-core",
"gdnative-derive",
Expand Down
27 changes: 27 additions & 0 deletions gdnative-async/Cargo.toml
@@ -0,0 +1,27 @@
[package]
name = "gdnative-async"
authors = ["The godot-rust developers"]
description = "Runtime async support for godot-rust."
documentation = "https://docs.rs/crate/gdnative-async"
repository = "https://github.com/godot-rust/godot-rust"
homepage = "https://godot-rust.github.io/"
version = "0.9.3"
license = "MIT"
workspace = ".."
edition = "2018"

[features]

[dependencies]
gdnative-derive = { path = "../gdnative-derive", version = "=0.9.3" }
gdnative-core = { path = "../gdnative-core", version = "=0.9.3" }
gdnative-bindings = { path = "../gdnative-bindings", version = "=0.9.3" }
futures-task = "0.3.17"
atomic-waker = "1.0.0"
once_cell = "1.8.0"
thiserror = "1.0"
parking_lot = "0.11.2"
crossbeam-channel = "0.5.1"
crossbeam-utils = "0.8.5"

[build-dependencies]
21 changes: 21 additions & 0 deletions gdnative-async/src/executor.rs
@@ -0,0 +1,21 @@
use std::cell::Cell;

use futures_task::LocalSpawn;

thread_local!(
static LOCAL_SPAWN: Cell<Option<&'static dyn LocalSpawn>> = Cell::new(None);
);

pub(crate) fn local_spawn() -> Option<&'static dyn LocalSpawn> {
LOCAL_SPAWN.with(|cell| cell.get())
}

/// Sets the global executor for the current thread to a `Box<dyn LocalSpawn>`. This value is leaked.
pub fn set_boxed_executor(sp: Box<dyn LocalSpawn>) {
set_executor(Box::leak(sp))
}

/// Sets the global executor for the current thread to a `&'static dyn LocalSpawn`.
pub fn set_executor(sp: &'static dyn LocalSpawn) {
LOCAL_SPAWN.with(|cell| cell.set(Some(sp)))
}
57 changes: 57 additions & 0 deletions gdnative-async/src/future.rs
@@ -0,0 +1,57 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use atomic_waker::AtomicWaker;
use crossbeam_channel::{Receiver, Sender};

pub(crate) fn make<T>() -> (Yield<T>, Resume<T>) {
let (arg_send, arg_recv) = crossbeam_channel::bounded(1);
let waker = Arc::default();

let future = Yield {
waker: Arc::clone(&waker),
arg_recv,
};

let resume = Resume { waker, arg_send };

(future, resume)
}

/// Future that can be `await`ed for a signal or a `resume` call from Godot. See
/// [`Context`](crate::Context) for methods that return this future.
pub struct Yield<T> {
waker: Arc<AtomicWaker>,
arg_recv: Receiver<T>,
}

impl<T: Send> Future for Yield<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.arg_recv.try_recv() {
Ok(arg) => Poll::Ready(arg),
Err(_) => {
self.waker.register(cx.waker());
Poll::Pending
}
}
}
}

pub(crate) struct Resume<T> {
waker: Arc<AtomicWaker>,
arg_send: Sender<T>,
}

impl<T: Send> Resume<T> {
/// Resume the task with a given argument from GDScript.
pub fn resume(self, arg: T) {
self.arg_send
.send(arg)
.expect("sender should not become disconnected");

self.waker.wake();
}
}
20 changes: 20 additions & 0 deletions gdnative-async/src/lib.rs
@@ -0,0 +1,20 @@
//! Runtime async support for godot-rust.
//!
//! This crate contains types and functions that enable using async code with godot-rust.
//!
//! # Safety assumptions
//!
//! This crate assumes that all user non-Rust code follow the official threading guidelines.

// Workaround for macros that expect the `gdnative` crate.
extern crate gdnative_core as gdnative;

mod executor;
mod future;
mod method;
mod rt;

pub use executor::{set_boxed_executor, set_executor};
pub use future::Yield;
pub use method::{Async, AsyncMethod, Spawner};
pub use rt::{register_runtime, terminate_runtime, Context};
127 changes: 127 additions & 0 deletions gdnative-async/src/method.rs
@@ -0,0 +1,127 @@
use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;

use futures_task::{LocalFutureObj, LocalSpawn, SpawnError};

use gdnative_core::core_types::{ToVariant, Variant};
use gdnative_core::log::{self, Site};
use gdnative_core::nativescript::export::{Method, Varargs};
use gdnative_core::nativescript::{NativeClass, RefInstance};
use gdnative_core::object::ownership::Shared;

use crate::rt::Context;

/// Trait for async methods. When exported, such methods return `FunctionState`-like
/// objects that can be manually resumed or yielded to completion.
///
/// Async methods are always spawned locally on the thread where they were created,
/// and never sent to another thread. This is so that we can ensure the safety of
/// emitting signals from the `FunctionState`-like object. If you need to off-load
/// some task to another thread, consider using something like
/// `futures::future::Remote` to spawn it remotely on a thread pool.
pub trait AsyncMethod<C: NativeClass>: Send + Sync + 'static {
/// Spawns the future for result of this method with `spawner`. This is done so
/// that implementors of this trait do not have to name their future types.
///
/// If the `spawner` object is not used, the method call will fail, output an error,
/// and return a `Nil` variant.
fn spawn_with(&self, spawner: Spawner<'_, C>);

/// Returns an optional site where this method is defined. Used for logging errors in FFI wrappers.
///
/// Default implementation returns `None`.
#[inline]
fn site() -> Option<Site<'static>> {
None
}
}

/// A helper structure for working around naming future types. See [`Spawner::spawn`].
pub struct Spawner<'a, C: NativeClass> {
sp: &'static dyn LocalSpawn,
ctx: Context,
this: RefInstance<'a, C, Shared>,
args: Varargs<'a>,
result: &'a mut Option<Result<(), SpawnError>>,
/// Remove Send and Sync
_marker: PhantomData<*const ()>,
}

impl<'a, C: NativeClass> Spawner<'a, C> {
/// Consumes this `Spawner` and spawns a future returned by the closure. This indirection
/// is necessary so that implementors of the `AsyncMethod` trait do not have to name their
/// future types.
pub fn spawn<F, R>(self, f: F)
where
F: FnOnce(Arc<Context>, RefInstance<'_, C, Shared>, Varargs<'_>) -> R,
R: Future<Output = Variant> + 'static,
{
let ctx = Arc::new(self.ctx);
let future = f(Arc::clone(&ctx), self.this, self.args);
*self.result = Some(
self.sp
.spawn_local_obj(LocalFutureObj::new(Box::new(async move {
let value = future.await;
ctx.resolve(value);
}))),
);
}
}

/// Adapter for async methods that implements `Method` and can be registered.
#[derive(Clone, Copy, Default, Debug)]
pub struct Async<F> {
f: F,
}

impl<F> Async<F> {
/// Wrap `f` in an adapter that implements `Method`.
#[inline]
pub fn new(f: F) -> Self {
Async { f }
}
}

impl<C: NativeClass, F: AsyncMethod<C>> Method<C> for Async<F> {
fn call(&self, this: RefInstance<'_, C, Shared>, args: Varargs<'_>) -> Variant {
if let Some(sp) = crate::executor::local_spawn() {
let ctx = Context::new();
let func_state = ctx.func_state();

let mut result = None;
self.f.spawn_with(Spawner {
sp,
ctx,
this,
args,
result: &mut result,
_marker: PhantomData,
});

match result {
Some(Ok(())) => func_state.to_variant(),
Some(Err(err)) => {
log::error(
Self::site().unwrap_or_default(),
format_args!("unable to spawn future: {}", err),
);
Variant::new()
}
None => {
log::error(
Self::site().unwrap_or_default(),
format_args!("implementation did not spawn a future"),
);
Variant::new()
}
}
} else {
log::error(
Self::site().unwrap_or_default(),
"a global executor must be set before any async methods can be called on this thread",
);
Variant::new()
}
}
}
106 changes: 106 additions & 0 deletions gdnative-async/src/rt.rs
@@ -0,0 +1,106 @@
use std::marker::PhantomData;

use gdnative_bindings::Object;
use gdnative_core::object::SubClass;

use gdnative_core::core_types::{GodotError, Variant};
use gdnative_core::nativescript::export::InitHandle;
use gdnative_core::nativescript::{Instance, RefInstance};
use gdnative_core::object::ownership::Shared;
use gdnative_core::object::TRef;

use crate::future;

mod bridge;
mod func_state;

use func_state::FuncState;

/// Context for creating `yield`-like futures in async methods.
pub struct Context {
func_state: Instance<FuncState, Shared>,
/// Remove Send and Sync
_marker: PhantomData<*const ()>,
}

impl Context {
pub(crate) fn new() -> Self {
Context {
func_state: FuncState::new().into_shared(),
_marker: PhantomData,
}
}

pub(crate) fn func_state(&self) -> Instance<FuncState, Shared> {
self.func_state.clone()
}

fn safe_func_state(&self) -> RefInstance<'_, FuncState, Shared> {
// SAFETY: FuncState objects are bound to their origin threads in Rust, and
// Context is !Send, so this is safe to call within this type.
// Non-Rust code is expected to be following the official guidelines as per
// the global safety assumptions. Since a reference of `FuncState` is held by
// Rust, it voids the assumption to send the reference to any thread aside from
// the one where it's created.
unsafe { self.func_state.assume_safe() }
}

pub(crate) fn resolve(&self, value: Variant) {
func_state::resolve(self.safe_func_state(), value);
}

/// Returns a future that waits until the corresponding `FunctionState` object
/// is manually resumed from GDScript, and yields the argument to `resume` or `Nil`
/// if nothing is passed.
///
/// Calling this function will put the associated `FunctionState`-like object in
/// resumable state, and will make it emit a `resumable` signal if it isn't in that
/// state already.
///
/// Only the most recent future created from this `Context` is guaranteed to resolve
/// upon a `resume` call. If any previous futures weren't `await`ed to completion, they
/// are no longer guaranteed to resolve, and have unspecified, but safe behavior
/// when polled.
pub fn until_resume(&self) -> future::Yield<Variant> {
let (future, resume) = future::make();
func_state::make_resumable(self.safe_func_state(), resume);
future
}

/// Returns a future that waits until the specified signal is emitted, if connection succeeds.
/// Yields any arguments emitted with the signal.
///
/// Only the most recent future created from this `Context` is guaranteed to resolve
/// when the signal is emitted. If any previous futures weren't `await`ed to completion, they
/// are no longer guaranteed to resolve, and have unspecified, but safe behavior
/// when polled.
///
/// # Errors
///
/// If connection to the signal failed.
pub fn signal<C>(
&self,
obj: TRef<'_, C>,
signal: &str,
) -> Result<future::Yield<Vec<Variant>>, GodotError>
where
C: SubClass<Object>,
{
let (future, resume) = future::make();
bridge::SignalBridge::connect(obj.upcast(), signal, resume)?;
Ok(future)
}
}

/// Adds required supporting NativeScript classes to `handle`. This must be called once and
/// only once per initialization.
pub fn register_runtime(handle: &InitHandle) {
handle.add_class::<bridge::SignalBridge>();
handle.add_class::<func_state::FuncState>();
}

/// Releases all observers still in use. This should be called in the
/// `godot_gdnative_terminate` callback.
pub fn terminate_runtime() {
bridge::terminate();
}

0 comments on commit 1cc9eb8

Please sign in to comment.