Skip to content

Commit

Permalink
feat: support anyio with a Cargo feature
Browse files Browse the repository at this point in the history
Asyncio is the standard and de facto main Python async runtime.
Among non-standard runtime, only trio seems to have substantial traction,
especially thanks to the anyio project.
There is indeed a strong trend for anyio (e.g. FastApi), which can
justify a dedicated support.
  • Loading branch information
wyfo committed Dec 7, 2023
1 parent 53cccb4 commit e1d5f5c
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 13 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ pyo3-build-config = { path = "pyo3-build-config", version = "0.21.0-dev", featur
[features]
default = ["macros"]

# Switch coroutine implementation to anyio instead of asyncio
anyio = ["macros"]

# Enables pyo3::inspect module and additional type information on FromPyObject
# and IntoPy traits
experimental-inspect = []
Expand Down
9 changes: 7 additions & 2 deletions guide/src/async-await.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ async fn sleep(seconds: f64, result: Option<PyObject>) -> Option<PyObject> {
}
```

*Python awaitables instantiated with this method can only be awaited in *asyncio* context. Other Python async runtime may be supported in the future.*

## `Send + 'static` constraint

Resulting future of an `async fn` decorated by `#[pyfunction]` must be `Send + 'static` to be embedded in a Python object.
Expand Down Expand Up @@ -85,6 +83,13 @@ async fn cancellable(#[pyo3(cancel_handle)] mut cancel: CancelHandle) {
}
```

## *asyncio* vs. *anyio*

By default, Python awaitables instantiated with `async fn` can only be awaited in *asyncio* context.

PyO3 can also target [*anyio*](https://github.com/agronholm/anyio) with the dedicated `anyio` Cargo feature. With it enabled, `async fn` become awaitable both in *asyncio* or [*trio*](https://github.com/python-trio/trio) context.
However, it requires to have the [*sniffio*](https://github.com/python-trio/sniffio) (or *anyio*) library installed.

## The `Coroutine` type

To make a Rust future awaitable in Python, PyO3 defines a [`Coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.Coroutine.html) type, which implements the Python [coroutine protocol](https://docs.python.org/3/library/collections.abc.html#collections.abc.Coroutine).
Expand Down
1 change: 1 addition & 0 deletions guide/src/building_and_distribution.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ There are many ways to go about this: it is possible to use `cargo` to build the
PyO3 has some Cargo features to configure projects for building Python extension modules:
- The `extension-module` feature, which must be enabled when building Python extension modules.
- The `abi3` feature and its version-specific `abi3-pyXY` companions, which are used to opt-in to the limited Python API in order to support multiple Python versions in a single wheel.
- The `anyio` feature, making PyO3 coroutines target [*anyio*](https://github.com/agronholm/anyio) instead of *asyncio*; either [*sniffio*](https://github.com/python-trio/sniffio) or *anyio* should be added as dependency of the Python extension.

This section describes each of these packaging tools before describing how to build manually without them. It then proceeds with an explanation of the `extension-module` feature. Finally, there is a section describing PyO3's `abi3` features.

Expand Down
1 change: 1 addition & 0 deletions newsfragments/3612.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support anyio with a Cargo feature
3 changes: 2 additions & 1 deletion pytests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ edition = "2021"
publish = false

[dependencies]
pyo3 = { path = "../", features = ["extension-module"] }
futures = "0.3.29"
pyo3 = { path = "../", features = ["extension-module", "anyio"] }

[build-dependencies]
pyo3-build-config = { path = "../pyo3-build-config" }
Expand Down
1 change: 1 addition & 0 deletions pytests/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ classifiers = [

[project.optional-dependencies]
dev = [
"anyio[trio]>=4.0",
"hypothesis>=3.55",
"pytest-asyncio>=0.21",
"pytest-benchmark>=3.4",
Expand Down
34 changes: 34 additions & 0 deletions pytests/src/anyio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::{thread, time::Duration};

use futures::{channel::oneshot, future::poll_fn};
use pyo3::prelude::*;

#[pyfunction]
async fn sleep(seconds: f64, result: Option<PyObject>) -> Option<PyObject> {
if seconds <= 0.0 {
let mut ready = false;
poll_fn(|cx| {
if ready {
return Poll::Ready(());
}
ready = true;
cx.waker().wake_by_ref();
Poll::Pending
})
.await;
} else {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_secs_f64(seconds));
tx.send(()).unwrap();
});
rx.await.unwrap();
}
result
}

#[pymodule]
pub fn anyio(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(sleep, m)?)?;
Ok(())
}
3 changes: 3 additions & 0 deletions pytests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use pyo3::prelude::*;
use pyo3::types::PyDict;
use pyo3::wrap_pymodule;

pub mod anyio;
pub mod awaitable;
pub mod buf_and_str;
pub mod comparisons;
Expand All @@ -18,6 +19,7 @@ pub mod subclassing;

#[pymodule]
fn pyo3_pytests(py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pymodule!(anyio::anyio))?;
m.add_wrapped(wrap_pymodule!(awaitable::awaitable))?;
#[cfg(not(Py_LIMITED_API))]
m.add_wrapped(wrap_pymodule!(buf_and_str::buf_and_str))?;
Expand All @@ -39,6 +41,7 @@ fn pyo3_pytests(py: Python<'_>, m: &PyModule) -> PyResult<()> {

let sys = PyModule::import(py, "sys")?;
let sys_modules: &PyDict = sys.getattr("modules")?.downcast()?;
sys_modules.set_item("pyo3_pytests.anyio", m.getattr("anyio")?)?;
sys_modules.set_item("pyo3_pytests.awaitable", m.getattr("awaitable")?)?;
sys_modules.set_item("pyo3_pytests.buf_and_str", m.getattr("buf_and_str")?)?;
sys_modules.set_item("pyo3_pytests.comparisons", m.getattr("comparisons")?)?;
Expand Down
14 changes: 14 additions & 0 deletions pytests/tests/test_anyio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import asyncio

from pyo3_pytests.anyio import sleep
import trio


def test_asyncio():
assert asyncio.run(sleep(0)) is None
assert asyncio.run(sleep(0.1, 42)) == 42


def test_trio():
assert trio.run(sleep, 0) is None
assert trio.run(sleep, 0.1, 42) == 42
4 changes: 4 additions & 0 deletions src/coroutine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ use crate::{
IntoPy, Py, PyErr, PyObject, PyResult, Python,
};

#[cfg(feature = "anyio")]
mod anyio;
mod asyncio;
pub(crate) mod cancel;
#[cfg(feature = "anyio")]
mod trio;
pub(crate) mod waker;

pub use cancel::CancelHandle;
Expand Down
69 changes: 69 additions & 0 deletions src/coroutine/anyio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//! Coroutine implementation using sniffio to select the appropriate implementation,
//! compatible with anyio.
use crate::{
coroutine::{asyncio::AsyncioWaker, trio::TrioWaker},
exceptions::PyRuntimeError,
sync::GILOnceCell,
PyAny, PyErr, PyObject, PyResult, Python,
};

fn current_async_library(py: Python<'_>) -> PyResult<&PyAny> {
static CURRENT_ASYNC_LIBRARY: GILOnceCell<PyObject> = GILOnceCell::new();
let import = || -> PyResult<_> {
let module = py.import("sniffio")?;
Ok(module.getattr("current_async_library")?.into())
};
CURRENT_ASYNC_LIBRARY
.get_or_try_init(py, import)?
.as_ref(py)
.call0()
}

fn unsupported(runtime: &str) -> PyErr {
PyRuntimeError::new_err(format!("unsupported runtime {rt}", rt = runtime))
}

/// Sniffio/anyio-compatible coroutine waker.
///
/// Polling a Rust future calls `sniffio.current_async_library` to select the appropriate
/// implementation, either asyncio or trio.
pub(super) enum AnyioWaker {
/// [`AsyncioWaker`]
Asyncio(AsyncioWaker),
/// [`TrioWaker`]
Trio(TrioWaker),
}

impl AnyioWaker {
pub(super) fn new(py: Python<'_>) -> PyResult<Self> {
let sniffed = current_async_library(py)?;
match sniffed.extract()? {
"asyncio" => Ok(Self::Asyncio(AsyncioWaker::new(py)?)),
"trio" => Ok(Self::Trio(TrioWaker::new(py)?)),
rt => Err(unsupported(rt)),
}
}

pub(super) fn yield_(&self, py: Python<'_>) -> PyResult<PyObject> {
match self {
AnyioWaker::Asyncio(w) => w.yield_(py),
AnyioWaker::Trio(w) => w.yield_(py),
}
}

pub(super) fn yield_waken(py: Python<'_>) -> PyResult<PyObject> {
let sniffed = current_async_library(py)?;
match sniffed.extract()? {
"asyncio" => AsyncioWaker::yield_waken(py),
"trio" => TrioWaker::yield_waken(py),
rt => Err(unsupported(rt)),
}
}

pub(super) fn wake(&self, py: Python<'_>) -> PyResult<()> {
match self {
AnyioWaker::Asyncio(w) => w.wake(py),
AnyioWaker::Trio(w) => w.wake(py),
}
}
}
14 changes: 9 additions & 5 deletions src/coroutine/cancel.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use crate::{PyAny, PyObject};
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker},
};

use parking_lot::Mutex;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};

use crate::{PyAny, PyObject};

#[derive(Debug, Default)]
struct Inner {
Expand Down
88 changes: 88 additions & 0 deletions src/coroutine/trio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//! Coroutine implementation compatible with trio.
use pyo3_macros::pyfunction;

use crate::{
intern,
sync::GILOnceCell,
types::{PyCFunction, PyIterator},
wrap_pyfunction, Py, PyAny, PyObject, PyResult, Python,
};

struct Trio {
cancel_shielded_checkpoint: PyObject,
current_task: PyObject,
current_trio_token: PyObject,
reschedule: PyObject,
succeeded: PyObject,
wait_task_rescheduled: PyObject,
}
impl Trio {
fn get(py: Python<'_>) -> PyResult<&Self> {
static TRIO: GILOnceCell<Trio> = GILOnceCell::new();
TRIO.get_or_try_init(py, || {
let module = py.import("trio.lowlevel")?;
Ok(Self {
cancel_shielded_checkpoint: module.getattr("cancel_shielded_checkpoint")?.into(),
current_task: module.getattr("current_task")?.into(),
current_trio_token: module.getattr("current_trio_token")?.into(),
reschedule: module.getattr("reschedule")?.into(),
succeeded: module.getattr("Abort")?.getattr("SUCCEEDED")?.into(),
wait_task_rescheduled: module.getattr("wait_task_rescheduled")?.into(),
})
})
}
}

fn yield_from(coro_func: &PyAny) -> PyResult<PyObject> {
PyIterator::from_object(coro_func.call_method0("__await__")?)?
.next()
.expect("cancel_shielded_checkpoint didn't yield")
.map(Into::into)
}

/// Asyncio-compatible coroutine waker.
///
/// Polling a Rust future yields `trio.lowlevel.wait_task_rescheduled()`, while `Waker::wake`
/// reschedule the current task.
pub(super) struct TrioWaker {
task: PyObject,
token: PyObject,
}

impl TrioWaker {
pub(super) fn new(py: Python<'_>) -> PyResult<Self> {
let trio = Trio::get(py)?;
let task = trio.current_task.call0(py)?;
let token = trio.current_trio_token.call0(py)?;
Ok(Self { task, token })
}

pub(super) fn yield_(&self, py: Python<'_>) -> PyResult<PyObject> {
static ABORT_FUNC: GILOnceCell<Py<PyCFunction>> = GILOnceCell::new();
let abort_func =
ABORT_FUNC.get_or_try_init(py, || wrap_pyfunction!(abort_func, py).map(Into::into))?;
let wait_task_rescheduled = Trio::get(py)?
.wait_task_rescheduled
.call1(py, (abort_func,))?;
yield_from(wait_task_rescheduled.as_ref(py))
}

pub(super) fn yield_waken(py: Python<'_>) -> PyResult<PyObject> {
let checkpoint = Trio::get(py)?.cancel_shielded_checkpoint.call0(py)?;
yield_from(checkpoint.as_ref(py))
}

pub(super) fn wake(&self, py: Python<'_>) -> PyResult<()> {
self.token.call_method1(
py,
intern!(py, "run_sync_soon"),
(&Trio::get(py)?.reschedule, &self.task),
)?;
Ok(())
}
}

#[pyfunction(crate = "crate")]
fn abort_func(py: Python<'_>, _arg: PyObject) -> PyResult<PyObject> {
Ok(Trio::get(py)?.succeeded.clone())
}
18 changes: 13 additions & 5 deletions src/coroutine/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,18 @@ use std::{
};

use crate::{
coroutine::asyncio::AsyncioWaker, exceptions::PyStopIteration, intern, pyclass::IterNextOutput,
sync::GILOnceCell, types::PyFuture, Py, PyNativeType, PyObject, PyResult, Python,
exceptions::PyStopIteration, intern, pyclass::IterNextOutput, sync::GILOnceCell,
types::PyFuture, Py, PyNativeType, PyObject, PyResult, Python,
};

cfg_if::cfg_if! {
if #[cfg(feature = "anyio")] {
type WakerImpl = crate::coroutine::anyio::AnyioWaker;
} else {
type WakerImpl = crate::coroutine::asyncio::AsyncioWaker;
}
}

const MIXED_AWAITABLE_AND_FUTURE_ERROR: &str = "Python awaitable mixed with Rust future";

pub(crate) enum FutureOrPoll {
Expand All @@ -21,7 +29,7 @@ thread_local! {
}

enum State {
Pending(AsyncioWaker),
Pending(WakerImpl),
Waken,
Delegated(PyObject),
}
Expand Down Expand Up @@ -49,10 +57,10 @@ impl CoroutineWaker {
}

pub(super) fn yield_(&self, py: Python<'_>) -> PyResult<PyObject> {
let init = || PyResult::Ok(State::Pending(AsyncioWaker::new(py)?));
let init = || PyResult::Ok(State::Pending(WakerImpl::new(py)?));
let state = self.state.get_or_try_init(py, init)?;
match state {
State::Waken => AsyncioWaker::yield_waken(py),
State::Waken => WakerImpl::yield_waken(py),
State::Delegated(obj) => Ok(obj.clone_ref(py)),
State::Pending(waker) => waker.yield_(py),
}
Expand Down

0 comments on commit e1d5f5c

Please sign in to comment.