Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crossbeam breaks tokio::spawn #380

Closed
sempervictus opened this issue Jan 11, 2022 · 32 comments · Fixed by #425
Closed

Crossbeam breaks tokio::spawn #380

sempervictus opened this issue Jan 11, 2022 · 32 comments · Fixed by #425

Comments

@sempervictus
Copy link

System details

  • OS/Platform name and version: Linux 5.10.90
  • Rust version (if building from source): 1.57
  • Notify version (or commit hash if building from git): 15.0.0-pre3

What you did (as detailed as you can)

Using a bunch of tokio::spawn(async move calls, i'm seeing the last in the series never complete. Upon digging into the GH issues for Tokio i found this little gem which seems to say that crossbeam breaks tokio in exactly the way i'm seeing it break. The thing is, i'm not even using crossbeam in my code, i'm using tokio's unbounded channels but the presence of crossbeam is breaking things. When i make the tokio channel bounded it breaks completely - no recv() occurs. Removing notify from the codebase fixes the problem but its a critical piece of the tool being written.
Specifically when sending 4 items into a channel of PathBufs i see all 4 being recv()d (i print them out at that point) with the result of the recv being passed to a tokio::spawn(async move which then prints the PathBuf again. However, only 3 of those 4 actually print from inside the spawned thread - the last one (always, it seems) never fires - in line with the bug referenced above.

What you expected

Expect every Tokio spawned task to fire all the time

What happened

Some compilations produce a working binary, and others, with no source changes but a cargo clean produce binaries which omit the last execution of the loop running off the tokio::mspc channel's receiver (the data is recv'd but the spawn call goes off into oblivion and never prints or completes). Sometimes using the variables inside the thread make it fire (like printing them a bunch of times), but something is breaking the scheduler (most likely crossbeam).

@0xpr03
Copy link
Member

0xpr03 commented Jan 11, 2022

Can you please append an MVP of an example for this ?

@sempervictus
Copy link
Author

Its intermittent - in this case only started for spawns called from an already spawned thread. It sometimes compiled and worked and sometimes compiled and left the last task un-run. More examples in the issue linked, solved by switching the inner spawn to be std_async tasks which all complete.

@0xpr03
Copy link
Member

0xpr03 commented Jan 11, 2022

So let me get this right: Using an tokio::mpsc inside the watcher callback isn't working?

@sempervictus
Copy link
Author

Not quite, its actually in a completely separate thread:

tokio::spawn(async move { watcher loop which sends pathbuf clones through tokio::mspc::channel_unbounded})
tokio::spawn(async move { thing that reads from the receiver of the unbounded and then tokio::spawn(async move { processes that data })})

that inner spawn is what fails to fire on the LAST thing sent through the channels requiring the inner-spawn but not on the last thing sent through the channel which is processed by the outer spawn (a termination pathbuf for /dev/null to stop the process)

Its something crossbeam is doing internally to the tokio runtime/scheduler, because it works fine without crossbeam (without notify-rs) and fine when the inner spawn is std_async::task::spawn

@sempervictus
Copy link
Author

Oh, right, and it seems to mess up bounded tokio::mspc::Channel completely in that the reader never actually gets anything out of those channels in the outer tokio spawn. This also happens intermittently, supporting the theory of the scheduler being messed up by however crossbeam is priming/running tasks to move data through their channels despite them not being in my code at all but just a dependency in the background of another crate.

@0xpr03
Copy link
Member

0xpr03 commented Jan 11, 2022

This works perfectly fine for me and from what you've shown above I'd guess you're using the wrong channel combination.

use std::path::Path;

use notify::{RecommendedWatcher, RecursiveMode, Result, Event, Error};
use notify::Watcher;
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<()> {
    let (tx, mut rx) = mpsc::channel(100);
    let mut watcher =
        RecommendedWatcher::new(move |result: std::result::Result<Event, Error>| {
            tx.blocking_send(result).expect("Failed to send event");
        })?;

    watcher.watch(Path::new("."), RecursiveMode::Recursive)?;

    // This is a simple loop, but you may want to use more complex logic here,
    // for example to handle I/O.
    while let Some(res) = rx.recv().await {
        tokio::spawn(async move {println!("got = {:?}", res);});
    }

    Ok(())
}

@sempervictus
Copy link
Author

It does work, but not always, the thread i linked says:

 carllerche commented on Sep 12, 2019

Sorry to hear you have been hitting trouble, but I will close the issue as I see no actionable items.

For others, the issue is the use of blocking operations on the event loop. The solution is to use tokio::sync::mpsc instead of crossbeam. Using runtime would have the same problems, so I am not immediately sure why the issue is not immediately apparent. My guess is runtime spawns way more threads by default tokio spawns threads based on physical cores and my guess is that runtime does not by default.

and since i'm not using any crossbeam in my code it has to be coming from notify-rs.

@0xpr03
Copy link
Member

0xpr03 commented Jan 11, 2022

We're not blocking anything. There's no blocking happening. Note though that I'm using the tokio mpsc specifically, even though the default example uses crossbeam channels.

@sempervictus
Copy link
Author

Ditto, how many threads do you see running vs cores? Apparently folio does not like oversubscription

@0xpr03
Copy link
Member

0xpr03 commented Jan 12, 2022

Let me repeat this again:
You claim to use the example I've provided by guessing what you're doing. The problem only happens sporadically for your code. You're saying it has to do with the amount of threads launched. Although the scheduler should handle them fair that may be the case.

My example uses 14 threads on 12 vCores. I'm kinda at a loss for this issue.

@0xpr03
Copy link
Member

0xpr03 commented Jan 12, 2022

You could replace crossbeam channels with normal channels for the OS you're using and see if that helps.

@sempervictus
Copy link
Author

I think that's the plan if this project ends up with any more threading complexity - just do a quickie fork of this crate, make very-very-sure that replacing crossbeam resolves all of the observed issues, then PR it back as a crate feature to avoid compatibility issues with existing implementations and give other users an option to sidestep the crossbeam concern.

@sempervictus
Copy link
Author

Just out of curiosity, is crossbeam the chosen modality for some sort of Windows compatibility function, or does it have some functional advantage over the other ones?

@0xpr03
Copy link
Member

0xpr03 commented Jan 12, 2022

then PR it back as a crate feature

That'd be very welcome. I'm also happy to do the work if a "quick" test from your side can actually confirm it works.

@0xpr03
Copy link
Member

0xpr03 commented Jan 12, 2022

crossbeam the chosen modality

I had to dig a little deeper to find the source for what I remembered :D
Crossbeam-Channel is more efficient in many cases and obviously has more features than the std implementation of mpsc:
https://github.com/crossbeam-rs/crossbeam/tree/master/crossbeam-channel/benchmarks

Edit: And also this claim repeating what I've remembered.

@0xpr03 0xpr03 mentioned this issue Jan 27, 2022
@bmcpt
Copy link

bmcpt commented Apr 11, 2022

I'm seeing this exact issue when running rust code on android. A task isn't executed until another task is spawned after it. A workaround is to spawn an empty task right after spawning your task.

@0xpr03
Copy link
Member

0xpr03 commented Apr 12, 2022

We could add a feature flag for crossbeam/std channels. Would have to investigate how much of the crossbeam functionality we're using currently. If you want you could make a PR.

@Tatametheus
Copy link

Does this mean that if I use crossbeam, there would not problems? Since I crossbeam works with tokio, for me add one more dependence is not a problem.

@sempervictus
Copy link
Author

Does this mean that if I use crossbeam, there would not problems? Since I crossbeam works with tokio, for me add one more dependence is not a problem.

There might be problems between the crossbeam code and other channel-based async code like tokio, but how those problems manifest seems somewhat intermittent and why they do so i opaque to me

@ac5tin
Copy link

ac5tin commented Jul 14, 2022

        let (tx, mut rx) = mpsc::channel(256);

        self.watcher = notify::recommended_watcher(move |res: _| match res {
            Ok(event) => {
                let event: Event = event;
                log::debug!("{:?}", event);

                // only handle newly created files
                if event.kind.is_create() {
                    event.paths.iter().for_each(|path| {
                        log::info!("Created new file: {:?}", path.to_string_lossy());
                        let f = File {
                            path: path.to_string_lossy().to_string(),
                        };
                        if let Err(err) = tx.blocking_send(f) {
                            log::error!("Error handling file {:#?}", err);
                        };
                    });
                }
            }
            Err(e) => log::error!("watch error: {:?}", e),
        })
        .ok();

        tokio::task::spawn(async move {
            // NOT RECEIVING ANYTHING HERE
            while let Some(file) = rx.recv().await {
                log::info!("Received file: {:?}", file.path);
            }
        });

this is my code and my receiver isnt receiving anything

@sempervictus
Copy link
Author

@ac5tin - thats along the lines of what we saw as well: notify's crossbeam craziness blows up other channel and runtime types even if you dont explicitly invoke crossbeam. Its mere presence in the compiled code seems to break things (making me wonder whether there's some hidden initialization/codepath being run or if the compiler is type-confusing things).

@0xpr03
Copy link
Member

0xpr03 commented Aug 8, 2022

Some of you could try out the #425 PR and let me know if that helps.

@ac5tin
Copy link

ac5tin commented Sep 15, 2022

i've modified my cargo.toml to look like this

notify = { version = "5", default-features = false, feature = ["macos_kqueue"] }

but my receiver is still not receiving anything

@0xpr03
Copy link
Member

0xpr03 commented Sep 15, 2022

That sounds unrelated to this issue. Can you first verify that you get events when not using tokio/async ?

@ac5tin
Copy link

ac5tin commented Sep 16, 2022

it works if i use std::sync::mpsc instead of tokio's mpsc

    let (tx, rx) = mpsc::channel(); // std::sync::mpsc

    let mut watcher = notify::recommended_watcher(move |res: _| match res {
        Ok(event) => {
            let event: Event = event;
            log::debug!("{:?}", event);

            // only handle newly created files
            if event.kind.is_create() {
                event.paths.iter().for_each(|path| {
                    let file_path = path.to_string_lossy();
                    log::info!("Created new file: {:?}", file_path);
                    if let Err(err) = tx.send(file_path.to_string()) {
                        log::error!("Error handling file {:#?}", err);
                    };
                });
            }
        }
        Err(e) => log::error!("watch error: {:?}", e),
    })
    .ok()
    .unwrap();

    watcher
        .watch(Path::new("/home/austin/temp/"), RecursiveMode::Recursive)
        .unwrap();

    while let Ok(file) = rx.recv() {
         // THIS WORKS
        log::info!("Received file: {:?}", file);
    }

doesnt work when i switch to tokio's mpsc channel

    let (tx, mut rx) = mpsc::channel(256); // tokio::sync::mpsc

    let mut watcher = notify::recommended_watcher(move |res: _| match res {
        Ok(event) => {
            let event: Event = event;
            log::debug!("{:?}", event);

            // only handle newly created files
            if event.kind.is_create() {
                event.paths.iter().for_each(|path| {
                    let file_path = path.to_string_lossy();
                    log::info!("Created new file: {:?}", file_path);
                    if let Err(err) = tx.blocking_send(file_path.to_string()) {
                        log::error!("Error handling file {:#?}", err);
                    };
                });
            }
        }
        Err(e) => log::error!("watch error: {:?}", e),
    })
    .ok()
    .unwrap();

    watcher
        .watch(Path::new("/home/austin/temp/"), RecursiveMode::Recursive)
        .unwrap();

    tokio::task::spawn(async move {
        while let Some(file) = rx.recv().await {
        // NOT RECEIVING
            log::info!("Received file: {:?}", file);
        }
    });

however, if i make the whole function async and run the recv in the same thread, it works

async fn listen() {
    let (tx, mut rx) = mpsc::channel(256);

    let mut watcher = notify::recommended_watcher(move |res: _| match res {
        Ok(event) => {
            let event: Event = event;
            log::debug!("{:?}", event);

            // only handle newly created files
            if event.kind.is_create() {
                event.paths.iter().for_each(|path| {
                    let file_path = path.to_string_lossy();
                    log::info!("Created new file: {:?}", file_path);
                    if let Err(err) = tx.blocking_send(file_path.to_string()) {
                        log::error!("Error handling file {:#?}", err);
                    };
                });
            }
        }
        Err(e) => log::error!("watch error: {:?}", e),
    })
    .ok()
    .unwrap();

    watcher
        .watch(Path::new("/home/austin/temp/"), RecursiveMode::Recursive)
        .unwrap();

    while let Some(file) = rx.recv().await {
    // THIS WORKS
        log::info!("Received file: {:?}", file);
    }
}

this means it doesn't work when i try to recv inside a tokio spawned thread

@0xpr03
Copy link
Member

0xpr03 commented Sep 16, 2022

The second one, what happens with the watcher ? Could it be you're dropping it ? Because the first one can't drop it (blocked) and the 3rd also can't drop the watcher struct. If you drop the watcher, the whole event loop quits and no events are emitted.

@ac5tin
Copy link

ac5tin commented Sep 19, 2022

i can return watcher so it doesn't get dropped but the recv inside tokio::task::spawn still wouldn't work

#[actix::main]
async fn main() {
    env_logger::init();
    let w = listen();

    // endless
    loop {}
}


fn listen() -> notify::INotifyWatcher {
    let (tx, mut rx) = mpsc::channel(256);

    let mut watcher = notify::recommended_watcher(move |res: _| match res {
        Ok(event) => {
            let event: Event = event;

            // only handle newly created files
            if event.kind.is_create() {
                event.paths.iter().for_each(|path| {
                    let file_path = path.to_string_lossy();
                    // THIS PRINTS EVERYTIME I CREATE A FILE
                    log::info!("Created new file: {:?}", file_path);
                    if let Err(err) = tx.blocking_send(file_path.to_string()) {
                        log::error!("Error handling file {:#?}", err);
                    };
                });
            }
        }
        Err(e) => log::error!("watch error: {:?}", e),
    })
    .ok()
    .unwrap();

    watcher
        .watch(Path::new("/home/austin/temp/"), RecursiveMode::Recursive)
        .unwrap();

    tokio::task::spawn(async move {
        while let Some(file) = rx.recv().await {
            // NOTHING HERE
            log::info!("Received file: {:?}", file);
        }
    });
    watcher
}

@0xpr03
Copy link
Member

0xpr03 commented Sep 19, 2022

Well now you're blocking your tokio executor by using a hot loop.
Replace it with loop { tokio::time::sleep(Duration::from_secs(1)).await; }

@ac5tin
Copy link

ac5tin commented Sep 20, 2022

Well now you're blocking your tokio executor by using a hot loop. Replace it with loop { tokio::time::sleep(Duration::from_secs(1)).await; }

can confirm, my code works now after adding tokio::time::sleep

@mojindri

This comment was marked as off-topic.

@JefferyHus
Copy link

2 years later and got into a similar issue using the 6th version, but the issue was simple and in the code above from @ac5tin if you just add the await.unwrap() to your tokio::spawn then everything will work as expected

#[actix::main]
async fn main() {
    env_logger::init();
    let w = listen();

    // endless
    loop {}
}


fn listen() -> notify::INotifyWatcher {
    let (tx, mut rx) = mpsc::channel(256);

    let mut watcher = notify::recommended_watcher(move |res: _| match res {
        Ok(event) => {
            let event: Event = event;

            // only handle newly created files
            if event.kind.is_create() {
                event.paths.iter().for_each(|path| {
                    let file_path = path.to_string_lossy();
                    // THIS PRINTS EVERYTIME I CREATE A FILE
                    log::info!("Created new file: {:?}", file_path);
                    if let Err(err) = tx.blocking_send(file_path.to_string()) {
                        log::error!("Error handling file {:#?}", err);
                    };
                });
            }
        }
        Err(e) => log::error!("watch error: {:?}", e),
    })
    .ok()
    .unwrap();

    watcher
        .watch(Path::new("/home/austin/temp/"), RecursiveMode::Recursive)
        .unwrap();

    tokio::task::spawn(async move {
        while let Some(file) = rx.recv().await {
            // NOTHING HERE
            log::info!("Received file: {:?}", file);
        }
    }).await.unwrap(); // <= this is the fix, the reason is that you are spawning an async thread and not telling tokio to await for its execution which leads to destroying your watcher as it is a lifetime of its function block
}

@thalesfragoso
Copy link

Coming here from the docs page, and I think we should remove that note from there, or at least better phrase it.

First because, as I understand, no one managed to reproduce this problem and there isn't even an example code someone can try.

Second, and most importantly, std::sync::mpsc is using the crossbeam implementation for quite some time now (since 1.67).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants