Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: Notify another worker if unparked from driver #6245

Closed
wants to merge 2 commits into from

Conversation

Congyuwang
Copy link

@Congyuwang Congyuwang commented Dec 23, 2023

Motivation

This PR is related to One bad task can halt all executor progress forever.

If busy workers are blocked running tasks for the moment, and all other workers are parked on conditional variables, no worker is blocking on the IO driver. The system without utilizing all workers stalls unnecessarily.

Solution

This problem might be mitigated by attempting to wake some idle worker (if any) after unparking from the IO Driver right before running task.

Previously, if the current worker is the final searching worker, transition_from_searching() wakes another idle worker to steal tasks. When a worker wakes from the I/O driver with new events, it is not in a searching state, so notify_parked_local() is not called in run_task(). The PR adds calling notify_parked_local() in run_task() if the worker wakes from the I/O driver.

Add a flag did_acquire_driver_lock to Core data to record whether during the previous tick the worker has locked the driver. In run_task() before actually running the task, check did_acquire_driver_lock after transition_from_searching. If the flag is true, call notify_parked_local() to notify an idle worker if any (won't notify if there is some worker currently searching).

The PR tries to mitigate the issue by trying to ensure that there is some worker running (searching) other than the current worker which might get unfortunately blocked.

Test Cases

1

use std::thread;
use std::time::Duration;

//#[tokio::main]
#[tokio::main(flavor = "multi_thread", worker_threads = 32)]
async fn main() {
    let mut handles = Vec::new();

    handles.push(tokio::spawn({
        async {
            loop {
                println!("{:?}: good still alive", thread::current().id());
                tokio::time::sleep(Duration::from_secs(10)).await;
            }
        }
    }));
    handles.push(tokio::spawn({
        async {
            let orig_thread_id = format!("{:?}", thread::current().id());
            loop {
                println!("{:?}: bad still alive", thread::current().id());
                thread::sleep(Duration::from_secs(10));
                loop {
                    // here we loop and sleep until we switch threads, once we do, we never call await again
                    // blocking all progress on all other tasks forever
                    let thread_id = format!("{:?}", thread::current().id());
                    if thread_id == orig_thread_id {
                        tokio::time::sleep(Duration::from_secs(1)).await;
                    } else {
                        break;
                    }
                }
            }
        }
    }));

    for handle in handles {
        handle.await.expect("handle await");
    }
}

Output:

ThreadId(33): good still alive
ThreadId(27): bad still alive
ThreadId(3): good still alive
ThreadId(16): bad still alive
ThreadId(27): good still alive
ThreadId(16): bad still alive
ThreadId(3): good still alive
ThreadId(16): bad still alive
ThreadId(27): good still alive
ThreadId(16): bad still alive
ThreadId(3): good still alive
ThreadId(16): bad still alive
ThreadId(27): good still alive
ThreadId(16): bad still alive

2

use std::time::Duration;
use std::time::Instant;

async fn handle_request(start_time: Instant) {
    tokio::time::sleep(Duration::from_millis(500)).await;
    println!("request took {}ms", start_time.elapsed().as_millis());
}

async fn background_job() {
    loop {
        tokio::time::sleep(Duration::from_secs(3)).await;

        println!("background job start");
        // Adjust as needed
        for _ in 0..1_000 { vec![1; 1_000_000].sort() }
        println!("background job finished");
    }
}

#[tokio::main]
async fn main() {
    tokio::spawn(async { background_job().await });

    loop {
        let start = Instant::now();
        tokio::spawn(async move {
            handle_request(start).await
        }).await.unwrap();
    }
}

Output:

request took 502ms
request took 502ms
request took 501ms
request took 502ms
request took 502ms
background job start
request took 501ms
background job finished
request took 502ms
request took 502ms
request took 502ms
request took 502ms
request took 502ms
request took 501ms
background job start
request took 502ms
background job finished
request took 502ms
request took 502ms

@github-actions github-actions bot added the R-loom-multi-thread Run loom multi-thread tests on this PR label Dec 23, 2023
@Congyuwang
Copy link
Author

No apparent performance regression seen running benches/rt_multi_threaded:

Switched to branch 'bench-park_has_driver'
Your branch is up to date with 'origin/bench-park_has_driver'.
   Compiling tokio v1.35.1 (/root/tokio/tokio)
   Compiling tokio-stream v0.1.14 (/root/tokio/tokio-stream)
   Compiling tokio-util v0.7.10 (/root/tokio/tokio-util)
   Compiling benches v0.0.0 (/root/tokio/benches)
    Finished bench [optimized] target(s) in 12.21s
     Running rt_multi_threaded.rs (/root/tokio/target/release/deps/rt_multi_threaded-770a849081960e9e)
Gnuplot not found, using plotters backend
spawn_many_local        time:   [9.6004 ms 9.6073 ms 9.6147 ms]
                        change: [-3.8036% -3.6658% -3.5216%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

spawn_many_remote_idle  time:   [4.7752 ms 4.7920 ms 4.8114 ms]
                        change: [-2.7264% -0.7855% +0.4178%] (p = 0.49 > 0.05)
                        No change in performance detected.
Found 10 outliers among 100 measurements (10.00%)
  1 (1.00%) low mild
  2 (2.00%) high mild
  7 (7.00%) high severe

spawn_many_remote_busy1 time:   [5.8267 ms 5.8540 ms 5.8813 ms]
                        change: [-1.6236% +0.8647% +2.9210%] (p = 0.50 > 0.05)
                        No change in performance detected.
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
  2 (2.00%) high severe

spawn_many_remote_busy2 time:   [35.318 ms 35.607 ms 35.883 ms]
                        change: [-1.3800% -0.3162% +0.8352%] (p = 0.59 > 0.05)
                        No change in performance detected.

ping_pong               time:   [1.0824 ms 1.0841 ms 1.0860 ms]
                        change: [-1.3516% -1.1922% -1.0134%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

yield_many              time:   [11.493 ms 11.528 ms 11.564 ms]
                        change: [-3.5820% -2.5326% -1.4610%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 15 outliers among 100 measurements (15.00%)
  10 (10.00%) low mild
  2 (2.00%) high mild
  3 (3.00%) high severe

chained_spawn           time:   [327.20 µs 332.82 µs 338.44 µs]
                        change: [-2.6834% -0.6663% +1.2213%] (p = 0.52 > 0.05)
                        No change in performance detected.
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) high mild

Platform:
OS: Ubuntu 23.10 Linux 6.5.0-14-generic x86_64
CPU: 4 cores

Architecture:            x86_64
  CPU op-mode(s):        32-bit, 64-bit
  Address sizes:         40 bits physical, 48 bits virtual
  Byte Order:            Little Endian
CPU(s):                  4
  On-line CPU(s) list:   0-3
Vendor ID:               GenuineIntel
  BIOS Vendor ID:        QEMU
  Model name:            Intel(R) Xeon(R) Platinum 8358 CPU @ 2.60GHz

@Congyuwang Congyuwang marked this pull request as ready for review December 27, 2023 14:53
@Congyuwang Congyuwang changed the title Notify Other Worker if Park Has Driver runtime: Notify another worker if unparked from driver Dec 28, 2023
Comment on lines +880 to +883
if self.did_acquire_driver_lock {
worker.handle.notify_parked_local();
self.did_acquire_driver_lock = false;
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did_acquire_driver_lock is checked every time in run_task() after transition_from_searching() so that notify_parked_local() would not consider the current core as searching.

@wathenjiang
Copy link
Contributor

Hi, I see that you are trying to improve the Tokio runtime blocking problem, but it seems that we can use a simpler way to achieve this . Please reference #6251.

If you have additional considerations, please let me know!

@Congyuwang
Copy link
Author

Congyuwang commented Dec 28, 2023

Hi, I see that you are trying to improve the Tokio runtime blocking problem, but it seems that we can use a simpler way to achieve this . Please reference #6251.

If you have additional considerations, please let me know!

@wathenjiang That's a rather neat solution. I don't know if my understanding is correct. So, unparking with tasks means that the worker unparked from driver, as wake by other worker would unpark with an empty local task queue + empty fifo slot. Though I have a question regarding the case where park_timeout(worker, 0ms) is called directly in maintenance() or called when defer is not empty. I can't really figure out whether that's something to worry about.

@Congyuwang Congyuwang closed this Dec 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
R-loom-multi-thread Run loom multi-thread tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants