Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: fix new yield_now behavior with block_in_place #5251

Merged
merged 1 commit into from Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 21 additions & 3 deletions tokio/src/runtime/context.rs
Expand Up @@ -107,9 +107,17 @@ cfg_rt! {
/// Guard tracking that a caller has entered a runtime context.
#[must_use]
pub(crate) struct EnterRuntimeGuard {
/// Tracks that the current thread has entered a blocking function call.
pub(crate) blocking: BlockingRegionGuard,

#[allow(dead_code)] // Only tracking the guard.
pub(crate) handle: SetCurrentGuard,

/// If true, then this is the root runtime guard. It is possible to nest
/// runtime guards by using `block_in_place` between the calls. We need
/// to track the root guard as this is the guard responsible for freeing
/// the deferred task queue.
is_root: bool,
}

/// Guard tracking that a caller has entered a blocking region.
Expand Down Expand Up @@ -171,11 +179,19 @@ cfg_rt! {
c.runtime.set(EnterRuntime::Entered { allow_block_in_place });

// Initialize queue to track yielded tasks
*c.defer.borrow_mut() = Some(Defer::new());
let mut defer = c.defer.borrow_mut();

let is_root = if defer.is_none() {
*defer = Some(Defer::new());
true
} else {
false
};

Some(EnterRuntimeGuard {
blocking: BlockingRegionGuard::new(),
handle: c.set_current(handle),
is_root,
})
}
})
Expand Down Expand Up @@ -217,7 +233,6 @@ cfg_rt! {
pub(crate) fn with_defer<R>(f: impl FnOnce(&mut Defer) -> R) -> Option<R> {
CONTEXT.with(|c| {
let mut defer = c.defer.borrow_mut();

defer.as_mut().map(f)
})
}
Expand Down Expand Up @@ -256,7 +271,10 @@ cfg_rt! {
CONTEXT.with(|c| {
assert!(c.runtime.get().is_entered());
c.runtime.set(EnterRuntime::NotEntered);
*c.defer.borrow_mut() = None;

if self.is_root {
*c.defer.borrow_mut() = None;
}
});
}
}
Expand Down
21 changes: 21 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Expand Up @@ -368,6 +368,22 @@ impl Launch {
}

fn run(worker: Arc<Worker>) {
struct AbortOnPanic;

impl Drop for AbortOnPanic {
fn drop(&mut self) {
if std::thread::panicking() {
eprintln!("worker thread panicking; aborting process");
std::process::abort();
}
}
}

// Catching panics on worker threads in tests is quite tricky. Instead, when
// debug assertions are enabled, we just abort the process.
#[cfg(debug_assertions)]
let _abort_on_panic = AbortOnPanic;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for tests and hopefully catching issues sooner.


// Acquire a core. If this fails, then another thread is running this
// worker and there is nothing further to do.
let core = match worker.core.take() {
Expand All @@ -388,6 +404,11 @@ fn run(worker: Arc<Worker>) {
// This should always be an error. It only returns a `Result` to support
// using `?` to short circuit.
assert!(cx.run(core).is_err());

// Check if there are any deferred tasks to notify. This can happen when
// the worker core is lost due to `block_in_place()` being called from
// within the task.
wake_deferred_tasks();
});
}

Expand Down
26 changes: 26 additions & 0 deletions tokio/tests/rt_threaded.rs
Expand Up @@ -415,6 +415,32 @@ fn coop_and_block_in_place() {
});
}

#[test]
fn yield_after_block_in_place() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap();

rt.block_on(async {
tokio::spawn(async move {
// Block in place then enter a new runtime
tokio::task::block_in_place(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

rt.block_on(async {});
});

// Yield, then complete
tokio::task::yield_now().await;
})
.await
.unwrap()
});
}

// Testing this does not panic
#[test]
fn max_blocking_threads() {
Expand Down