Skip to content

Commit

Permalink
rt: fix new yield_now behavior with block_in_place (#5251)
Browse files Browse the repository at this point in the history
PR #5223 changed the behavior of `yield_now()` to store yielded tasks
and notify them *after* polling the resource drivers. This PR fixes a
couple of bugs with this new behavior when combined with
`block_in_place()`.

First, we need to avoid freeing the deferred task queue when exiting a
runtime if it is *not* the root runtime. Because `block_in_place()`
allows a user to start a new runtime from within an existing task, this
check is necessary.

Second, when a worker core is stolen from a thread during a
`block_in_place()` call, we need to ensure that deferred tasks are
notified anyway.
  • Loading branch information
carllerche committed Dec 2, 2022
1 parent 2286273 commit d1b789f
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 3 deletions.
24 changes: 21 additions & 3 deletions tokio/src/runtime/context.rs
Expand Up @@ -107,9 +107,17 @@ cfg_rt! {
/// Guard tracking that a caller has entered a runtime context.
#[must_use]
pub(crate) struct EnterRuntimeGuard {
/// Tracks that the current thread has entered a blocking function call.
pub(crate) blocking: BlockingRegionGuard,

#[allow(dead_code)] // Only tracking the guard.
pub(crate) handle: SetCurrentGuard,

/// If true, then this is the root runtime guard. It is possible to nest
/// runtime guards by using `block_in_place` between the calls. We need
/// to track the root guard as this is the guard responsible for freeing
/// the deferred task queue.
is_root: bool,
}

/// Guard tracking that a caller has entered a blocking region.
Expand Down Expand Up @@ -171,11 +179,19 @@ cfg_rt! {
c.runtime.set(EnterRuntime::Entered { allow_block_in_place });

// Initialize queue to track yielded tasks
*c.defer.borrow_mut() = Some(Defer::new());
let mut defer = c.defer.borrow_mut();

let is_root = if defer.is_none() {
*defer = Some(Defer::new());
true
} else {
false
};

Some(EnterRuntimeGuard {
blocking: BlockingRegionGuard::new(),
handle: c.set_current(handle),
is_root,
})
}
})
Expand Down Expand Up @@ -217,7 +233,6 @@ cfg_rt! {
pub(crate) fn with_defer<R>(f: impl FnOnce(&mut Defer) -> R) -> Option<R> {
CONTEXT.with(|c| {
let mut defer = c.defer.borrow_mut();

defer.as_mut().map(f)
})
}
Expand Down Expand Up @@ -256,7 +271,10 @@ cfg_rt! {
CONTEXT.with(|c| {
assert!(c.runtime.get().is_entered());
c.runtime.set(EnterRuntime::NotEntered);
*c.defer.borrow_mut() = None;

if self.is_root {
*c.defer.borrow_mut() = None;
}
});
}
}
Expand Down
21 changes: 21 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Expand Up @@ -368,6 +368,22 @@ impl Launch {
}

fn run(worker: Arc<Worker>) {
struct AbortOnPanic;

impl Drop for AbortOnPanic {
fn drop(&mut self) {
if std::thread::panicking() {
eprintln!("worker thread panicking; aborting process");
std::process::abort();
}
}
}

// Catching panics on worker threads in tests is quite tricky. Instead, when
// debug assertions are enabled, we just abort the process.
#[cfg(debug_assertions)]
let _abort_on_panic = AbortOnPanic;

// Acquire a core. If this fails, then another thread is running this
// worker and there is nothing further to do.
let core = match worker.core.take() {
Expand All @@ -388,6 +404,11 @@ fn run(worker: Arc<Worker>) {
// This should always be an error. It only returns a `Result` to support
// using `?` to short circuit.
assert!(cx.run(core).is_err());

// Check if there are any deferred tasks to notify. This can happen when
// the worker core is lost due to `block_in_place()` being called from
// within the task.
wake_deferred_tasks();
});
}

Expand Down
26 changes: 26 additions & 0 deletions tokio/tests/rt_threaded.rs
Expand Up @@ -415,6 +415,32 @@ fn coop_and_block_in_place() {
});
}

#[test]
fn yield_after_block_in_place() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();

rt.block_on(async {
tokio::spawn(async move {
// Block in place then enter a new runtime
tokio::task::block_in_place(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

rt.block_on(async {});
});

// Yield, then complete
tokio::task::yield_now().await;
})
.await
.unwrap()
});
}

// Testing this does not panic
#[test]
fn max_blocking_threads() {
Expand Down

0 comments on commit d1b789f

Please sign in to comment.