From d1b789f33aa9d2bbca84f24b810235a10b149e92 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 1 Dec 2022 17:23:33 -0800 Subject: [PATCH] rt: fix new yield_now behavior with block_in_place (#5251) PR #5223 changed the behavior of `yield_now()` to store yielded tasks and notify them *after* polling the resource drivers. This PR fixes a couple of bugs with this new behavior when combined with `block_in_place()`. First, we need to avoid freeing the deferred task queue when exiting a runtime if it is *not* the root runtime. Because `block_in_place()` allows a user to start a new runtime from within an existing task, this check is necessary. Second, when a worker core is stolen from a thread during a `block_in_place()` call, we need to ensure that deferred tasks are notified anyway. --- tokio/src/runtime/context.rs | 24 ++++++++++++++--- .../runtime/scheduler/multi_thread/worker.rs | 21 +++++++++++++++ tokio/tests/rt_threaded.rs | 26 +++++++++++++++++++ 3 files changed, 68 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 4f30d3374a9..2e54c8ba366 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -107,9 +107,17 @@ cfg_rt! { /// Guard tracking that a caller has entered a runtime context. #[must_use] pub(crate) struct EnterRuntimeGuard { + /// Tracks that the current thread has entered a blocking function call. pub(crate) blocking: BlockingRegionGuard, + #[allow(dead_code)] // Only tracking the guard. pub(crate) handle: SetCurrentGuard, + + /// If true, then this is the root runtime guard. It is possible to nest + /// runtime guards by using `block_in_place` between the calls. We need + /// to track the root guard as this is the guard responsible for freeing + /// the deferred task queue. + is_root: bool, } /// Guard tracking that a caller has entered a blocking region. @@ -171,11 +179,19 @@ cfg_rt! { c.runtime.set(EnterRuntime::Entered { allow_block_in_place }); // Initialize queue to track yielded tasks - *c.defer.borrow_mut() = Some(Defer::new()); + let mut defer = c.defer.borrow_mut(); + + let is_root = if defer.is_none() { + *defer = Some(Defer::new()); + true + } else { + false + }; Some(EnterRuntimeGuard { blocking: BlockingRegionGuard::new(), handle: c.set_current(handle), + is_root, }) } }) @@ -217,7 +233,6 @@ cfg_rt! { pub(crate) fn with_defer(f: impl FnOnce(&mut Defer) -> R) -> Option { CONTEXT.with(|c| { let mut defer = c.defer.borrow_mut(); - defer.as_mut().map(f) }) } @@ -256,7 +271,10 @@ cfg_rt! { CONTEXT.with(|c| { assert!(c.runtime.get().is_entered()); c.runtime.set(EnterRuntime::NotEntered); - *c.defer.borrow_mut() = None; + + if self.is_root { + *c.defer.borrow_mut() = None; + } }); } } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index ce6c313b105..148255a85c4 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -368,6 +368,22 @@ impl Launch { } fn run(worker: Arc) { + struct AbortOnPanic; + + impl Drop for AbortOnPanic { + fn drop(&mut self) { + if std::thread::panicking() { + eprintln!("worker thread panicking; aborting process"); + std::process::abort(); + } + } + } + + // Catching panics on worker threads in tests is quite tricky. Instead, when + // debug assertions are enabled, we just abort the process. + #[cfg(debug_assertions)] + let _abort_on_panic = AbortOnPanic; + // Acquire a core. If this fails, then another thread is running this // worker and there is nothing further to do. let core = match worker.core.take() { @@ -388,6 +404,11 @@ fn run(worker: Arc) { // This should always be an error. It only returns a `Result` to support // using `?` to short circuit. assert!(cx.run(core).is_err()); + + // Check if there are any deferred tasks to notify. This can happen when + // the worker core is lost due to `block_in_place()` being called from + // within the task. + wake_deferred_tasks(); }); } diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index d3dbabdc9c0..c5984182cec 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -415,6 +415,32 @@ fn coop_and_block_in_place() { }); } +#[test] +fn yield_after_block_in_place() { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); + + rt.block_on(async { + tokio::spawn(async move { + // Block in place then enter a new runtime + tokio::task::block_in_place(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + + rt.block_on(async {}); + }); + + // Yield, then complete + tokio::task::yield_now().await; + }) + .await + .unwrap() + }); +} + // Testing this does not panic #[test] fn max_blocking_threads() {