Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't auto-advance time when a spawn_blocking task is running. #5115

Merged
merged 18 commits into from Dec 17, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions tokio/src/runtime/blocking/mod.rs
Expand Up @@ -17,8 +17,6 @@ cfg_trace! {
mod schedule;
mod shutdown;
mod task;
#[cfg(all(test, not(tokio_wasm)))]
pub(crate) use schedule::NoopSchedule;
pub(crate) use task::BlockingTask;

use crate::runtime::Builder;
Expand Down
9 changes: 5 additions & 4 deletions tokio/src/runtime/blocking/pool.rs
Expand Up @@ -2,7 +2,7 @@

use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::schedule::BlockingSchedule;
use crate::runtime::blocking::{shutdown, BlockingTask};
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::task::{self, JoinHandle};
Expand Down Expand Up @@ -120,7 +120,7 @@ struct Shared {
}

pub(crate) struct Task {
task: task::UnownedTask<NoopSchedule>,
task: task::UnownedTask<BlockingSchedule>,
mandatory: Mandatory,
}

Expand Down Expand Up @@ -151,7 +151,7 @@ impl From<SpawnError> for io::Error {
}

impl Task {
pub(crate) fn new(task: task::UnownedTask<NoopSchedule>, mandatory: Mandatory) -> Task {
pub(crate) fn new(task: task::UnownedTask<BlockingSchedule>, mandatory: Mandatory) -> Task {
Task { task, mandatory }
}

Expand Down Expand Up @@ -379,7 +379,8 @@ impl Spawner {
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;

let (task, handle) = task::unowned(fut, NoopSchedule, id);
let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);

let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
(handle, spawned)
}
Expand Down
49 changes: 43 additions & 6 deletions tokio/src/runtime/blocking/schedule.rs
@@ -1,15 +1,52 @@
#[cfg(feature = "test-util")]
use crate::runtime::scheduler;
use crate::runtime::task::{self, Task};
use crate::runtime::Handle;

/// `task::Schedule` implementation that does nothing. This is unique to the
/// blocking scheduler as tasks scheduled are not really futures but blocking
/// operations.
/// `task::Schedule` implementation that does nothing (except some bookkeeping
/// in test-util builds). This is unique to the blocking scheduler as tasks
/// scheduled are not really futures but blocking operations.
///
/// We avoid storing the task by forgetting it in `bind` and re-materializing it
/// in `release.
pub(crate) struct NoopSchedule;
/// in `release`.
pub(crate) struct BlockingSchedule {
#[cfg(feature = "test-util")]
handle: Handle,
}

impl BlockingSchedule {
#[cfg_attr(not(feature = "test-util"), allow(unused_variables))]
pub(crate) fn new(handle: &Handle) -> Self {
#[cfg(feature = "test-util")]
{
match &handle.inner {
scheduler::Handle::CurrentThread(handle) => {
handle.driver.clock.inhibit_auto_advance();
}
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => {}
}
}
BlockingSchedule {
#[cfg(feature = "test-util")]
handle: handle.clone(),
}
}
}

impl task::Schedule for NoopSchedule {
impl task::Schedule for BlockingSchedule {
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
#[cfg(feature = "test-util")]
{
match &self.handle.inner {
scheduler::Handle::CurrentThread(handle) => {
handle.driver.clock.allow_auto_advance();
handle.driver.unpark();
}
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => {}
}
}
None
}

Expand Down
21 changes: 21 additions & 0 deletions tokio/src/runtime/tests/loom_blocking.rs
Expand Up @@ -73,6 +73,27 @@ fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread
});
}

#[test]
fn spawn_blocking_when_paused() {
use std::time::Duration;
loom::model(|| {
let rt = crate::runtime::Builder::new_current_thread()
.enable_time()
.start_paused(true)
.build()
.unwrap();
let handle = rt.handle();
let _enter = handle.enter();
let a = crate::task::spawn_blocking(|| {});
let b = crate::task::spawn_blocking(|| {});
rt.block_on(crate::time::timeout(Duration::from_millis(1), async move {
a.await.expect("blocking task should finish");
b.await.expect("blocking task should finish");
}))
.expect("timeout should not trigger");
});
}

fn mk_runtime(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/tests/loom_queue.rs
@@ -1,6 +1,6 @@
use crate::runtime::blocking::NoopSchedule;
use crate::runtime::scheduler::multi_thread::queue;
use crate::runtime::task::Inject;
use crate::runtime::tests::NoopSchedule;
use crate::runtime::MetricsBatch;

use loom::thread;
Expand Down
20 changes: 19 additions & 1 deletion tokio/src/runtime/tests/mod.rs
Expand Up @@ -2,11 +2,29 @@
// other code when running loom tests.
#![cfg_attr(loom, warn(dead_code, unreachable_pub))]

use self::noop_scheduler::NoopSchedule;
use self::unowned_wrapper::unowned;

mod noop_scheduler {
use crate::runtime::task::{self, Task};

/// `task::Schedule` implementation that does nothing, for testing.
pub(crate) struct NoopSchedule;

impl task::Schedule for NoopSchedule {
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
None
}

fn schedule(&self, _task: task::Notified<Self>) {
unreachable!();
}
}
}

mod unowned_wrapper {
use crate::runtime::blocking::NoopSchedule;
use crate::runtime::task::{Id, JoinHandle, Notified};
use crate::runtime::tests::NoopSchedule;

#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(crate) fn unowned<T>(task: T) -> (Notified<NoopSchedule>, JoinHandle<T::Output>)
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/tests/task.rs
@@ -1,5 +1,5 @@
use crate::runtime::blocking::NoopSchedule;
use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::tests::NoopSchedule;
use crate::util::TryLock;

use std::collections::VecDeque;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/time/mod.rs
Expand Up @@ -222,7 +222,7 @@ impl Driver {
let handle = rt_handle.time();
let clock = &handle.time_source.clock;

if clock.is_paused() {
if clock.can_auto_advance() {
self.park.park_timeout(rt_handle, Duration::from_secs(0));

// If the time driver was woken, then the park completed
Expand Down
19 changes: 17 additions & 2 deletions tokio/src/time/clock.rs
Expand Up @@ -65,6 +65,9 @@ cfg_test_util! {

/// Instant at which the clock was last unfrozen.
unfrozen: Option<std::time::Instant>,

/// Number of `inhibit_auto_advance` calls still in effect.
auto_advance_inhibit_count: usize,
}

/// Pauses time.
Expand Down Expand Up @@ -187,6 +190,7 @@ cfg_test_util! {
enable_pausing,
base: now,
unfrozen: Some(now),
auto_advance_inhibit_count: 0,
})),
};

Expand All @@ -212,9 +216,20 @@ cfg_test_util! {
inner.unfrozen = None;
}

pub(crate) fn is_paused(&self) -> bool {
/// Temporarily stop auto-advancing the clock (see `tokio::time::pause`).
pub(crate) fn inhibit_auto_advance(&self) {
let mut inner = self.inner.lock();
inner.auto_advance_inhibit_count += 1;
}

pub(crate) fn allow_auto_advance(&self) {
let mut inner = self.inner.lock();
inner.auto_advance_inhibit_count -= 1;
}

pub(crate) fn can_auto_advance(&self) -> bool {
let inner = self.inner.lock();
inner.unfrozen.is_none()
inner.unfrozen.is_none() && inner.auto_advance_inhibit_count == 0
}

#[track_caller]
Expand Down
83 changes: 82 additions & 1 deletion tokio/tests/task_blocking.rs
@@ -1,7 +1,7 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads

use tokio::{runtime, task};
use tokio::{runtime, task, time};
use tokio_test::assert_ok;

use std::thread;
Expand Down Expand Up @@ -226,3 +226,84 @@ fn coop_disabled_in_block_in_place_in_block_on() {

done_rx.recv().unwrap().unwrap();
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn blocking_when_paused() {
// Do not auto-advance time when we have started a blocking task that has
// not yet finished.
time::timeout(
Duration::from_secs(3),
task::spawn_blocking(|| thread::sleep(Duration::from_millis(250))),
)
.await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reduce the sleeps. Our test suite already takes long enough.

.expect("timeout should not trigger")
.expect("blocking task should finish");

// Really: Do not auto-advance time, even if the timeout is short and the
// blocking task runs for longer than that. It doesn't matter: Tokio time
// is paused; system time is not.
time::timeout(
Duration::from_millis(1),
task::spawn_blocking(|| thread::sleep(Duration::from_millis(250))),
)
.await
.expect("timeout should not trigger")
.expect("blocking task should finish");
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn blocking_task_wakes_paused_runtime() {
let t0 = std::time::Instant::now();
time::timeout(
Duration::from_secs(15),
task::spawn_blocking(|| thread::sleep(Duration::from_millis(250))),
)
.await
.expect("timeout should not trigger")
.expect("blocking task should finish");
assert!(
t0.elapsed() < Duration::from_secs(10),
"completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
);
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn unawaited_blocking_task_wakes_paused_runtime() {
let t0 = std::time::Instant::now();

// When this task finishes, time should auto-advance, even though the
// JoinHandle has not been awaited yet.
let a = task::spawn_blocking(|| {
thread::sleep(Duration::from_millis(20));
});

crate::time::sleep(Duration::from_secs(15)).await;
a.await.expect("blocking task should finish");
assert!(
t0.elapsed() < Duration::from_secs(10),
"completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
);
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn panicking_blocking_task_wakes_paused_runtime() {
let t0 = std::time::Instant::now();
let result = time::timeout(
Duration::from_secs(15),
task::spawn_blocking(|| {
thread::sleep(Duration::from_millis(250));
panic!("blocking task panicked");
}),
)
.await
.expect("timeout should not trigger");
assert!(result.is_err(), "blocking task should have panicked");
assert!(
t0.elapsed() < Duration::from_secs(10),
"completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
);
}