Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JoinHandle.await in another thread has chance to block forever. #241

Open
drmingdrmer opened this issue Feb 17, 2024 · 3 comments
Open

JoinHandle.await in another thread has chance to block forever. #241

drmingdrmer opened this issue Feb 17, 2024 · 3 comments

Comments

@drmingdrmer
Copy link

Version
monoio v0.2.2

Platform

uname -a
Darwin drdrxps-MBP 22.3.0 Darwin Kernel Version 22.3.0: Mon Jan 30 20:38:37 PST 2023; root:xnu-8792.81.3~2/RELEASE_ARM64_T6000 arm64

ulimit -l
unlimited

Description
Calling JoinHandle.await in another thread(and also in another monoio runtime), it has odd to return the expected value and odd to block forever.

The code to reproduce this issue is in the following repo:
https://github.com/drmingdrmer/t-monoio/blob/ec306d33e9f581eacd06c51034de7d0d4698f9dd/src/bin/cross-rt-join.rs#L1

It is quite simple. Create a monoio runtime to run a future, send the JoinHandle to another thread and await it in another monoio runtime:

use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    std::thread::spawn(move || {
        let mut rt = monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
            .enable_all()
            .build()
            .expect("Failed building the Runtime");

        rt.block_on(async move {
            let fu = async move {
                // println!("inner-fu: ready");
                1u64
            };

            let handle = monoio::spawn(fu);
            tx.send(handle).unwrap();

            monoio::time::sleep(Duration::from_millis(1_000)).await;
            println!("outer-fu: after sending handle and sleep");
        });
    });

    let handle = rx.recv().unwrap();

    let mut rt = monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
        .enable_all()
        .build()
        .expect("Failed building the Runtime");

    rt.block_on(async move {
        println!("joiner: before handle.await");
        let got = handle.await;
        println!("joiner: after handle.await: {:?}", got);
    });
}

Clone it and run it with:

cargo run --bin cross-rt-join

Sometimes it blocks forever:

joiner: before handle.await
outer-fu: after sending handle and sleep

Sometime it returns the expected value:

joiner: before handle.await
joiner: after handle.await: 1
@Miaxos
Copy link
Contributor

Miaxos commented Feb 17, 2024

The main issue here is that you run it on mac os, so you do not have any guarantee on where the task is going to be ran (on which core it's going to be executed). That why you have an erratic behavior.
You could run it on linux and pin specific core to those thread and you would be able to have it working each time or failing each time.

For this part, you can activate the sync feature flag on monoio and you'll be able to do cross-thread async communication documented here (but it would be better in term of performance if you avoided cross-thread async communication as it's way more efficient when you avoid it)

@drmingdrmer
Copy link
Author

@Miaxos
Thank you so much! it looks like the sync feature flag is the cause. With sync enabled, the program outputs consistent result.

The doc does not provide any clue about how to use JoinHandle correctly in another thread.
Is it possible to remove the Send and Sync bound from JoinHandle if sync feature flag is not enabled?

Another issue is that if the task panics, the JoinHandle.await in another thread blocks forever.

@Miaxos
Copy link
Contributor

Miaxos commented Feb 19, 2024

Hey @drmingdrmer I took some time to provide you an example of a working solution without sync. If you change the pinned core, the solution won't work anymore without sync.

(To be able to run those tests & io-uring using macos, I suggest you run those inside a VM backed by tart, it's what I'm using and it's working very well).

A working solution without `sync` (Linux only)

(If you change the pinned thread, it won't work without the sync)

use std::sync::mpsc;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    let t1 = std::thread::spawn(move || {
        // If you want to force a thread to be pinned on a core
        // (Only available on linux)
        monoio::utils::bind_to_cpu_set(Some(0)).unwrap();
        let mut rt = monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
            .enable_all()
            .build()
            .unwrap();

        rt.block_on(async move {
            let fu = async move {
                println!("inner-fu: ready");
                1u64
            };

            let handle = monoio::spawn(fu);
            tx.send(handle).unwrap();

            monoio::time::sleep(Duration::from_millis(1_000)).await;
            println!("outer-fu: after sending handle and sleep");
        });
    });

    let t2 = std::thread::spawn(move || {
        // If you want to force a thread to be pinned on a core
        // (Only available on linux)
        monoio::utils::bind_to_cpu_set(Some(0)).unwrap();

        let mut rt = monoio::RuntimeBuilder::<monoio::FusionDriver>::new()
            .enable_all()
            .build()
            .unwrap();

        rt.block_on(async move {
            println!("joiner: before handle.await");
            loop {
                if let Ok(handle) = rx.try_recv() {
                    let got = handle.await;
                    println!("joiner: after handle.await: {:?}", got);
                    break;
                }
            }
        });
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

Is it possible to remove the Send and Sync bound from JoinHandle if sync feature flag is not enabled?

Based on the implementation, you'll need to have T: !Send for JoinHandle<T> to not be Send.

Another issue is that if the task panics, the JoinHandle.await in another thread blocks forever.

Yeah, right now you would need to wrap future spawned into a catch_unwind with a manual Result

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants