Skip to content

Commit

Permalink
Fix race when main thread is parked as auto-advance is re-enabled.
Browse files Browse the repository at this point in the history
Specifically, when a blocking task is spawned, but no thread immediately awaits
the JoinHandle, if the main thread parks itself, we previously failed to unpark
it when the blocking task completed. Auto-advance would be re-enabled but could
not happen, a potential deadlock.
  • Loading branch information
jorendorff committed Nov 14, 2022
1 parent 5d38959 commit f23610f
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 14 deletions.
18 changes: 14 additions & 4 deletions tokio/src/runtime/blocking/schedule.rs
@@ -1,5 +1,6 @@
use crate::runtime::task::{self, Task};
use crate::time::Clock;
#[cfg(feature = "test-util")]
use crate::runtime::{scheduler, Handle};

/// `task::Schedule` implementation that does nothing (except some bookkeeping
/// in test-util builds). This is unique to the blocking scheduler as tasks
Expand All @@ -9,14 +10,20 @@ use crate::time::Clock;
/// in `release`.
pub(crate) struct BlockingSchedule {
#[cfg(feature = "test-util")]
clock: Clock,
handle: Handle,
}

impl BlockingSchedule {
pub(crate) fn new() -> Self {
BlockingSchedule {
#[cfg(feature = "test-util")]
clock: crate::time::inhibit_auto_advance(),
handle: {
let handle = Handle::current();
if let scheduler::Handle::CurrentThread(handle) = &handle.inner {
handle.driver.clock.inhibit_auto_advance();
}
handle
},
}
}
}
Expand All @@ -25,7 +32,10 @@ impl task::Schedule for BlockingSchedule {
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
#[cfg(feature = "test-util")]
{
self.clock.allow_auto_advance();
if let scheduler::Handle::CurrentThread(handle) = &self.handle.inner {
handle.driver.clock.allow_auto_advance();
handle.driver.unpark();
}
}
None
}
Expand Down
10 changes: 2 additions & 8 deletions tokio/src/time/clock.rs
Expand Up @@ -132,13 +132,6 @@ cfg_test_util! {
inner.unfrozen = Some(std::time::Instant::now());
}

/// Temporarily stop auto-advancing the clock (see `tokio::time::pause`).
pub(crate) fn inhibit_auto_advance() -> Clock {
let clock = clock().expect("can't inhibit auto-advance from outside the Tokio runtime");
clock.inhibit_auto_advance();
clock
}

/// Advances time.
///
/// Increments the saved `Instant::now()` value by `duration`. Subsequent
Expand Down Expand Up @@ -223,7 +216,8 @@ cfg_test_util! {
inner.unfrozen = None;
}

fn inhibit_auto_advance(&self) {
/// Temporarily stop auto-advancing the clock (see `tokio::time::pause`).
pub(crate) fn inhibit_auto_advance(&self) {
let mut inner = self.inner.lock();
inner.auto_advance_inhibit_count += 1;
}
Expand Down
2 changes: 0 additions & 2 deletions tokio/src/time/mod.rs
Expand Up @@ -87,8 +87,6 @@
mod clock;
pub(crate) use self::clock::Clock;
#[cfg(feature = "test-util")]
pub(crate) use clock::inhibit_auto_advance;
#[cfg(feature = "test-util")]
pub use clock::{advance, pause, resume};

pub mod error;
Expand Down
19 changes: 19 additions & 0 deletions tokio/tests/task_blocking.rs
Expand Up @@ -269,6 +269,25 @@ async fn blocking_task_wakes_paused_runtime() {
);
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn unawaited_blocking_task_wakes_paused_runtime() {
let t0 = std::time::Instant::now();

// When this task finishes, time should auto-advance, even though the
// JoinHandle has not been awaited yet.
let a = task::spawn_blocking(|| {
thread::sleep(Duration::from_millis(20));
});

crate::time::sleep(Duration::from_secs(15)).await;
a.await.expect("blocking task should finish");
assert!(
t0.elapsed() < Duration::from_secs(10),
"completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
);
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn panicking_blocking_task_wakes_paused_runtime() {
Expand Down

0 comments on commit f23610f

Please sign in to comment.