Skip to content

Commit

Permalink
feat: add PyFuture, an async wrapper around Python future-like objects.
Browse files Browse the repository at this point in the history
  • Loading branch information
wyfo committed Apr 7, 2024
1 parent a4aea23 commit 54ec0c8
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 0 deletions.
133 changes: 133 additions & 0 deletions src/types/future.rs
@@ -0,0 +1,133 @@
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker},
};

use parking_lot::Mutex;

use crate::types::PyCFunction;
use crate::{
pyobject_native_type_named,
sync::GILOnceCell,
types::{any::PyAnyMethods, tuple::PyTupleMethods},
Bound, Py, PyAny, PyObject, PyResult, PyTypeCheck, Python,
};

/// A Python future-like object.
///
/// It can be either an asyncio future-like or a concurrent.futures.Future object.
#[repr(transparent)]
pub struct PyFuture(PyAny);
pyobject_native_type_named!(PyFuture);

impl Py<PyFuture> {
/// Convert a `PyFuture` into a Rust `Future`.
///
/// Contrary to asyncio.Future, Rust future will panic if polled after completion, in order
/// to optimize the result retrieving.
pub fn as_rust_future(
&self,
py: Python<'_>,
) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send + Sync + 'static> {
self.bind(py).as_rust_future()
}
}

impl Bound<'_, PyFuture> {
/// Convert a `PyFuture` into a Rust `Future`.
///
/// Contrary to asyncio.Future, Rust future will panic if polled after completion, in order
/// to optimize the result retrieving.
pub fn as_rust_future(
&self,
) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send + Sync + 'static> {
#[derive(Default)]
struct PendingInner {
result: Option<PyResult<PyObject>>,
waker: Option<Waker>,
}
enum FutureImpl {
Done(Option<PyResult<PyObject>>),
Pending(Arc<Mutex<PendingInner>>),
}
impl Future for FutureImpl {
type Output = PyResult<PyObject>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match this {
Self::Done(res) => {
Poll::Ready(res.take().expect("Future polled after completion"))
}
Self::Pending(cb) => {
let mut inner = cb.lock();
if inner.result.is_some() {
let res = inner.result.take().unwrap();
drop(inner);
*this = Self::Done(None);
return Poll::Ready(res);
}
if !matches!(&mut inner.waker, Some(waker) if waker.will_wake(cx.waker())) {
inner.waker = Some(cx.waker().clone());
}
Poll::Pending
}
}
}
}
Ok(if self.call_method0("done")?.extract()? {
FutureImpl::Done(Some(self.call_method0("result").map(Bound::unbind)))
} else {
let pending = Arc::new(Mutex::new(PendingInner::default()));
let pending2 = pending.clone();
// For asyncio futures, `add_done_callback` should be called the event loop thread,
// but because the GIL is held, and because we just checked that the future isn't done,
// the future result cannot be set in the meantime, so it's ok.
let callback =
PyCFunction::new_closure_bound(self.py(), None, None, move |args, _| {
let mut inner = pending2.lock();
inner.result =
Some(args.get_item(0)?.call_method0("result").map(Bound::unbind));
if let Some(waker) = &inner.waker {
waker.wake_by_ref();
}
PyResult::Ok(())
})?;
self.call_method1("add_done_callback", (callback,))?;
FutureImpl::Pending(pending)
})
}
}

fn is_asyncio_future(object: &Bound<'_, PyAny>) -> PyResult<bool> {
static IS_FUTURE: GILOnceCell<PyObject> = GILOnceCell::new();
let import = || {
let module = object.py().import_bound("asyncio")?;
PyResult::Ok(module.getattr("isfuture")?.into())
};
IS_FUTURE
.get_or_try_init(object.py(), import)?
.call1(object.py(), (object,))?
.extract(object.py())
}

fn is_concurrent_future(object: &Bound<'_, PyAny>) -> PyResult<bool> {
static FUTURE: GILOnceCell<PyObject> = GILOnceCell::new();
let import = || {
let module = object.py().import_bound("concurrent.futures")?;
PyResult::Ok(module.getattr("Future")?.into())
};
let future_type = FUTURE
.get_or_try_init(object.py(), import)?
.bind(object.py());
object.is_instance(future_type)
}

impl PyTypeCheck for PyFuture {
const NAME: &'static str = "Future";

fn type_check(object: &Bound<'_, PyAny>) -> bool {
is_asyncio_future(object).unwrap_or(false) || is_concurrent_future(object).unwrap_or(false)
}
}
2 changes: 2 additions & 0 deletions src/types/mod.rs
Expand Up @@ -27,6 +27,7 @@ pub use self::frozenset::{PyFrozenSet, PyFrozenSetBuilder, PyFrozenSetMethods};
pub use self::function::PyCFunction;
#[cfg(all(not(Py_LIMITED_API), not(PyPy), not(GraalPy)))]
pub use self::function::PyFunction;
pub use self::future::PyFuture;
pub use self::iterator::PyIterator;
pub use self::list::{PyList, PyListMethods};
pub use self::mapping::{PyMapping, PyMappingMethods};
Expand Down Expand Up @@ -333,6 +334,7 @@ pub(crate) mod float;
mod frame;
pub(crate) mod frozenset;
mod function;
mod future;
pub(crate) mod iterator;
pub(crate) mod list;
pub(crate) mod mapping;
Expand Down

0 comments on commit 54ec0c8

Please sign in to comment.