Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Await !Send Rust futures in Python #59

Open
Kobzol opened this issue Jan 7, 2022 · 19 comments
Open

Await !Send Rust futures in Python #59

Kobzol opened this issue Jan 7, 2022 · 19 comments

Comments

@Kobzol
Copy link

Kobzol commented Jan 7, 2022

Hi! I want to use pyo3-asyncio to convert Rust futures into Python awaitables that can then be awaited in Python. For Send futures, this is quite easy, e.g.:

#[pyfunction]
fn make_awaitable(py: Python) -> PyResult<&PyAny> {
    pyo3_asyncio::tokio::future_into_py(py, async {
        something_async().await;
        Ok(())
    })
}

However, when the Rust Future is not Send, this gets problematic. I know that there is the local_future_into_py function, which is demonstrated here. However, to actually await that future, the context needs to be inside a LocalSet.

Is it possible to create a Python awaitable from a Rust !Send future and then await it in Python (and not in Rust using LocalSet)? My idea would be to use a global single-threaded tokio runtime and run everything inside a global LocalSet, but I'm not sure if that's possible currently.

Thank you for a great library btw! :) Even if it wouldn't support this use case, it's very useful.

@awestlake87
Copy link
Owner

Is it possible to create a Python awaitable from a Rust !Send future and then await it in Python (and not in Rust using LocalSet)? My idea would be to use a global single-threaded tokio runtime and run everything inside a global LocalSet, but I'm not sure if that's possible currently

Unfortunately, I don't think it's possible without a LocalSet. There are two kinds of threads in PyO3 Asyncio - threads that are controlled by Python's event loop and threads that are controlled by Rust's event loop. In order to run a !Send future, you have to call local_future_into_py on a thread that is controlled by Rust's event loop because the future cannot be 'sent' to a Rust thread later. In order to await the future in a thread controlled by Python, you would need some kind of unified event loop like what is discussed here.

Maybe we can get something figured out for your use-case though. If you provide some more info about what you're trying to do I might be able to give you some pointers on how to get around it. Sometimes you can circumvent this issue by first spawning a Send future that creates the !Send future, but it varies from problem to problem.

Thank you for a great library btw! :) Even if it wouldn't support this use case, it's very useful.

Thanks, I appreciate it!

@Kobzol
Copy link
Author

Kobzol commented Jan 7, 2022

Basically I want to do exactly what I showed here:

#[pyfunction]
fn make_awaitable(py: Python) -> PyResult<&PyAny> {
    pyo3_asyncio::tokio::future_into_py(py, async {
        something_async().await;
        Ok(())
    })
}

just with a !Send future (my library uses single-threaded runtime and !Send futures for everything).

Maybe we can get something figured out for your use-case though. If you provide some more info about what you're trying to do I might be able to give you some pointers on how to get around it. Sometimes you can circumvent this issue by first spawning a Send future that creates the !Send future, but it varies from problem to problem.

I was trying to spawn a !Send future within a Send future, but couldn't get it to work without creating a nested runtime:

struct Fut(Rc<u32>);

impl Future for Fut {
    type Output = u32;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        todo!()
    }
}

#[pyfunction]
fn nonsend_fut(py: Python) -> PyResult<&PyAny> {
    future_into_py(py, async move {
        let rt = Builder::new_current_thread().enable_all().build().unwrap();

        let set = LocalSet::new();

        let fut = async move {
            let fut = Fut(Rc::new(0));
            Ok(fut.await)
        };

        rt.block_on(set.run_until(fut))
    })
}

Which seems to deadlock however.

It seems to me that creating a LocalSet within a Send future is a futile attempt. Maybe tokio-rs/tokio#3370 could help.

@awestlake87
Copy link
Owner

How is your project set up? Are you using pyo3_asyncio::tokio::main or is it a PyO3 native extension?

@Kobzol
Copy link
Author

Kobzol commented Jan 7, 2022

I'm not using the main macro, I'm writing a native extension. So I have a Rust library from which I want to expose several functions to Python, some of them should return awaitables.

@awestlake87
Copy link
Owner

Ok, if you're using the current thread scheduler, does that mean that you're initializing tokio and spawning a thread for it somewhere in your native extension?

@Kobzol
Copy link
Author

Kobzol commented Jan 7, 2022

Well, I also have a binary that uses the library, and I do that in the binary.

But I hoped that for the extension I could just use the global tokio runtime from pyo3-asyncio, so I didn't initialize tokio explicitly in my extension so far.

I suppose that spawning a thread which would contain a single threaded-runtime and then communicating with it using e.g. mpsc queues from Send futures running inside of the pyo3-asyncio could work (would it be safe to create this thread in the #[pymodule] initialization code?). Of course I was wondering whether something like this could be avoided.

@awestlake87
Copy link
Owner

awestlake87 commented Jan 7, 2022

I had an idea about runtime initialization, but then I remembered I had done something very similar in one of my tests:

Try something like this:

#[pyfunction]
fn return_3(py: Python) -> PyResult<&PyAny> {
    pyo3_asyncio::tokio::future_into_py(py, async move {
        let result = tokio::task::spawn_blocking(|| {
            let data = Rc::new(3i32);

            tokio::task::LocalSet::new().block_on(pyo3_asyncio::tokio::get_runtime(), async move {
                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                println!("using local data {}", *data);
                *data
            })
        });

        Ok(result.await.unwrap())
    })
}

Instead of creating a new runtime, you create a blocking task on the tokio runtime to run the localset.

It's not a workaround I've tested very well so I don't know how many local tasks you can run concurrently like this. Let me know if this works for you!

@Kobzol
Copy link
Author

Kobzol commented Jan 7, 2022

Right, this is conceptually similar to the "separate thread + single threaded runtime + queue" approach, but it's better if it's handled by tokio, of course.

Originally, I hoped to avoid something like this, but now I realized that it can be actually quite useful. If you use a single-threaded runtime and !Send futures, you will probably need to access some shared state via RefCell/PyCell (at least I have set it up like this, but it's probably unavoidable). You would need to protect this state via a mutex or something like that, to avoid concurrent Python calls of Rust functions that access the state and create futures from crashing with BorrowMut errors. By using spawn_blocking and setting the number of blocking threads in tokio to 1, it could solve this issue automatically.

Of course, an even better solution would be if Python and Rust event loops knew about each other and awaits in Rust and Python would cooperate, but that's probably utopia for now :)

I can't test it right now, but I'll experiment with it over the weekend and let you know if it works. Thank you very much for the suggestion!

@Kobzol
Copy link
Author

Kobzol commented Jan 8, 2022

Great, this solution seems to work! For some reason it deadlocks with a single-threaded runtime, maybe it does not support spawn_blocking or there's some other problem. But it's not a big deal for me at this moment.

Maybe this approach could be put into the documentation, because currently if someone has a !Send future, the docs immediately redirect him to local_future_into_py, which is however basically unusable from the Python side.

@awestlake87
Copy link
Owner

Great, this solution seems to work! For some reason it deadlocks with a single-threaded runtime, maybe it does not support spawn_blocking or there's some other problem. But it's not a big deal for me at this moment.

That's strange. I didn't see any deadlocks when I tried it with the current thread scheduler. I didn't restrict the number of blocking threads in tokio though, so maybe that's why?

It does make me wonder why tokio doesn't support LocalSet in tasks. Seems like it would make things a lot easier if it was more like async-std where spawn_local ensures that the task you spawn can only run on the worker thread that spawned it (and panic if it wasn't spawned on a worker thread). If it worked like that, then you wouldn't need additional blocking threads like in tokio.

Maybe this approach could be put into the documentation, because currently if someone has a !Send future, the docs immediately redirect him to local_future_into_py, which is however basically unusable from the Python side.

Yeah it probably should. Honestly this is making me rethink having those conversions to begin with because as you say, they can really only be used from the Rust side, which pretty much defeats the purpose. Maybe just documenting the spawn blocking workaround is good enough.

@awestlake87
Copy link
Owner

@TkTech opened the original issue that led to those conversions being created. I'd be curious to know if they still have a use-case for these conversions that wasn't mentioned here or if they have similar issues with these conversions.

If they're of the same opinion, I might just deprecate these conversions.

@TkTech
Copy link

TkTech commented Jan 9, 2022

Ultimately, I ended up moving away from pyo3-asyncio. I created my own event loop shims by implementing std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker} & futures::channel::oneshot and drive the event loop directly from asyncio, which works well for my simple use case.

@Kobzol
Copy link
Author

Kobzol commented Jan 9, 2022

@TkTech Is your solution open-sourced? I think that it might be useful for other people (myself included!).

@awestlake87
Copy link
Owner

awestlake87 commented Jan 9, 2022

Sounds very similar to what @ThibaultLemaire was working on in #6. !Send futures was a use-case I hadn't really considered for an asyncio-driven event loop, but it makes sense now that I think about it.

Might be worth taking a second look at it!

@ThibaultLemaire
Copy link

if Python and Rust event loops knew about each other and awaits in Rust and Python would cooperate

That sounds a bit like what I implemented, yes. Here's the code for your convenience (to spare you the long github discussion).

In a nutshell, the idea is to write a wrapper that behaves like a Python awaitable/task but is able to drive a Rust future.

I eventually dropped the project because staying off the Python thread was much faster and I couldn't think of a situation where !Send futures would be required.

I cannot guarantee !Send support though, as I think to recall some issues I had with such futures (and indeed, looking at my code, I'm taking Future<Output = TOutput> + Send + 'static). It's possible it was just related to some issues I had when trying to combine it with tokio, but it could be something deeper. I don't remember, so you're welcome to take a look at the code and figure something more clever for yourself 🙂

@TkTech
Copy link

TkTech commented Jan 14, 2022

@TkTech Is your solution open-sourced? I think that it might be useful for other people (myself included!).

Not yet, but it will be. It's part of a tool that allows users to provide their own scripts to interact with events on IRC (a rewrite of https://github.com/TkTech/notifico). It's very simplistic, and takes advantage of the fact that v8::Isolates are !Send, can be driven by a polling loop (which in this case is driven by asyncio), and is single threaded itself to make quite a few assumptions simplifying implementation.

@AzureMarker
Copy link

I don't have any context on this thread, but tokio-rs/tokio#3370 merged and is released in 1.16.0.
(the PR was referenced earlier as something that might help)

@awestlake87
Copy link
Owner

@AzureMarker thanks! Looks like that would be a cleaner replacement for the spawn_blocking workaround.

@AzureMarker
Copy link

AzureMarker commented Feb 19, 2022

Oh, I just realized the spawn_pinned changes didn't release with 1.16.0 since they're in tokio_util which isn't included in the main tokio crate. But they did release recently in tokio_util 0.7.0:
tokio-rs/tokio#4486
https://lib.rs/crates/tokio-util/versions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants