Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add PyFuture, an async wrapper around Python future-like objects.
- Loading branch information
Showing
2 changed files
with
135 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
use std::{ | ||
future::Future, | ||
pin::Pin, | ||
sync::Arc, | ||
task::{Context, Poll, Waker}, | ||
}; | ||
|
||
use parking_lot::Mutex; | ||
|
||
use crate::types::PyCFunction; | ||
use crate::{ | ||
pyobject_native_type_named, | ||
sync::GILOnceCell, | ||
types::{any::PyAnyMethods, tuple::PyTupleMethods}, | ||
Bound, Py, PyAny, PyObject, PyResult, PyTypeCheck, Python, | ||
}; | ||
|
||
/// A Python future-like object. | ||
/// | ||
/// It can be either an asyncio future-like or a concurrent.futures.Future object. | ||
#[repr(transparent)] | ||
pub struct PyFuture(PyAny); | ||
pyobject_native_type_named!(PyFuture); | ||
|
||
impl Py<PyFuture> { | ||
/// Convert a `PyFuture` into a Rust `Future`. | ||
/// | ||
/// Contrary to asyncio.Future, Rust future will panic if polled after completion, in order | ||
/// to optimize the result retrieving. | ||
pub fn as_rust_future( | ||
&self, | ||
py: Python<'_>, | ||
) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send + Sync + 'static> { | ||
self.bind(py).as_rust_future() | ||
} | ||
} | ||
|
||
impl Bound<'_, PyFuture> { | ||
/// Convert a `PyFuture` into a Rust `Future`. | ||
/// | ||
/// Contrary to asyncio.Future, Rust future will panic if polled after completion, in order | ||
/// to optimize the result retrieving. | ||
pub fn as_rust_future( | ||
&self, | ||
) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send + Sync + 'static> { | ||
#[derive(Default)] | ||
struct PendingInner { | ||
result: Option<PyResult<PyObject>>, | ||
waker: Option<Waker>, | ||
} | ||
enum FutureImpl { | ||
Done(Option<PyResult<PyObject>>), | ||
Pending(Arc<Mutex<PendingInner>>), | ||
} | ||
impl Future for FutureImpl { | ||
type Output = PyResult<PyObject>; | ||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let this = self.get_mut(); | ||
match this { | ||
Self::Done(res) => { | ||
Poll::Ready(res.take().expect("Future polled after completion")) | ||
} | ||
Self::Pending(cb) => { | ||
let mut inner = cb.lock(); | ||
if inner.result.is_some() { | ||
let res = inner.result.take().unwrap(); | ||
drop(inner); | ||
*this = Self::Done(None); | ||
return Poll::Ready(res); | ||
} | ||
if !matches!(&mut inner.waker, Some(waker) if waker.will_wake(cx.waker())) { | ||
inner.waker = Some(cx.waker().clone()); | ||
} | ||
Poll::Pending | ||
} | ||
} | ||
} | ||
} | ||
Ok(if self.call_method0("done")?.extract()? { | ||
FutureImpl::Done(Some(self.call_method0("result").map(Bound::unbind))) | ||
} else { | ||
let pending = Arc::new(Mutex::new(PendingInner::default())); | ||
let pending2 = pending.clone(); | ||
// For asyncio futures, `add_done_callback` should be called the event loop thread, | ||
// but because the GIL is held, and because we just checked that the future isn't done, | ||
// the future result cannot be set in the meantime, so it's ok. | ||
let callback = | ||
PyCFunction::new_closure_bound(self.py(), None, None, move |args, _| { | ||
let result = args.get_item(0)?.call_method0("result").map(Bound::unbind); | ||
let mut inner = pending2.lock(); | ||
inner.result = Some(result); | ||
if let Some(waker) = &inner.waker { | ||
waker.wake_by_ref(); | ||
} | ||
PyResult::Ok(()) | ||
})?; | ||
self.call_method1("add_done_callback", (callback,))?; | ||
FutureImpl::Pending(pending) | ||
}) | ||
} | ||
} | ||
|
||
fn is_asyncio_future(object: &Bound<'_, PyAny>) -> PyResult<bool> { | ||
static IS_FUTURE: GILOnceCell<PyObject> = GILOnceCell::new(); | ||
let import = || { | ||
let module = object.py().import_bound("asyncio")?; | ||
PyResult::Ok(module.getattr("isfuture")?.into()) | ||
}; | ||
IS_FUTURE | ||
.get_or_try_init(object.py(), import)? | ||
.call1(object.py(), (object,))? | ||
.extract(object.py()) | ||
} | ||
|
||
fn is_concurrent_future(object: &Bound<'_, PyAny>) -> PyResult<bool> { | ||
static FUTURE: GILOnceCell<PyObject> = GILOnceCell::new(); | ||
let import = || { | ||
let module = object.py().import_bound("concurrent.futures")?; | ||
PyResult::Ok(module.getattr("Future")?.into()) | ||
}; | ||
let future_type = FUTURE | ||
.get_or_try_init(object.py(), import)? | ||
.bind(object.py()); | ||
object.is_instance(future_type) | ||
} | ||
|
||
impl PyTypeCheck for PyFuture { | ||
const NAME: &'static str = "Future"; | ||
|
||
fn type_check(object: &Bound<'_, PyAny>) -> bool { | ||
is_asyncio_future(object).unwrap_or(false) || is_concurrent_future(object).unwrap_or(false) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters