Skip to content

Commit

Permalink
rt: unhandled panic config for current thread rt (#4770)
Browse files Browse the repository at this point in the history
Allows the user to configure the runtime's behavior when a spawned task
panics. Currently, the panic is propagated to the JoinHandle and the
runtime resumes. This patch lets the user set the runtime to shutdown on
unhandled panic.

So far, this is only implemented for the current-thread runtime.

Refs: #4516
  • Loading branch information
carllerche committed Jun 16, 2022
1 parent 90bc5fa commit c98be22
Show file tree
Hide file tree
Showing 8 changed files with 458 additions and 7 deletions.
59 changes: 56 additions & 3 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -57,6 +57,10 @@ struct Core {

/// Metrics batch
metrics: MetricsBatch,

/// True if a task panicked without being handled and the runtime is
/// configured to shutdown on unhandled panic.
unhandled_panic: bool,
}

#[derive(Clone)]
Expand All @@ -76,6 +80,10 @@ pub(crate) struct Config {

/// Callback for a worker unparking itself
pub(crate) after_unpark: Option<Callback>,

#[cfg(tokio_unstable)]
/// How to respond to unhandled task panics.
pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
}

/// Scheduler state shared between threads.
Expand Down Expand Up @@ -144,6 +152,7 @@ impl BasicScheduler {
tick: 0,
driver: Some(driver),
metrics: MetricsBatch::new(),
unhandled_panic: false,
})));

BasicScheduler {
Expand All @@ -158,6 +167,7 @@ impl BasicScheduler {
&self.spawner
}

#[track_caller]
pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
pin!(future);

Expand Down Expand Up @@ -465,6 +475,35 @@ impl Schedule for Arc<Shared> {
}
});
}

cfg_unstable! {
fn unhandled_panic(&self) {
use crate::runtime::UnhandledPanic;

match self.config.unhandled_panic {
UnhandledPanic::Ignore => {
// Do nothing
}
UnhandledPanic::ShutdownRuntime => {
// This hook is only called from within the runtime, so
// `CURRENT` should match with `&self`, i.e. there is no
// opportunity for a nested scheduler to be called.
CURRENT.with(|maybe_cx| match maybe_cx {
Some(cx) if Arc::ptr_eq(self, &cx.spawner.shared) => {
let mut core = cx.core.borrow_mut();

// If `None`, the runtime is shutting down, so there is no need to signal shutdown
if let Some(core) = core.as_mut() {
core.unhandled_panic = true;
self.owned.close_and_shutdown_all();
}
}
_ => unreachable!("runtime core not set in CURRENT thread-local"),
})
}
}
}
}
}

impl Wake for Shared {
Expand All @@ -489,8 +528,9 @@ struct CoreGuard<'a> {
}

impl CoreGuard<'_> {
#[track_caller]
fn block_on<F: Future>(self, future: F) -> F::Output {
self.enter(|mut core, context| {
let ret = self.enter(|mut core, context| {
let _enter = crate::runtime::enter(false);
let waker = context.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);
Expand All @@ -506,11 +546,16 @@ impl CoreGuard<'_> {
core = c;

if let Ready(v) = res {
return (core, v);
return (core, Some(v));
}
}

for _ in 0..core.spawner.shared.config.event_interval {
// Make sure we didn't hit an unhandled_panic
if core.unhandled_panic {
return (core, None);
}

// Get and increment the current tick
let tick = core.tick;
core.tick = core.tick.wrapping_add(1);
Expand Down Expand Up @@ -544,7 +589,15 @@ impl CoreGuard<'_> {
// pending I/O events.
core = context.park_yield(core);
}
})
});

match ret {
Some(ret) => ret,
None => {
// `block_on` panicked.
panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
}
}
}

/// Enters the scheduler context. This sets the queue and other necessary
Expand Down
150 changes: 150 additions & 0 deletions tokio/src/runtime/builder.rs
Expand Up @@ -84,6 +84,90 @@ pub struct Builder {

/// How many ticks before yielding to the driver for timer and I/O events?
pub(super) event_interval: u32,

#[cfg(tokio_unstable)]
pub(super) unhandled_panic: UnhandledPanic,
}

cfg_unstable! {
/// How the runtime should respond to unhandled panics.
///
/// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic`
/// to configure the runtime behavior when a spawned task panics.
///
/// See [`Builder::unhandled_panic`] for more details.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum UnhandledPanic {
/// The runtime should ignore panics on spawned tasks.
///
/// The panic is forwarded to the task's [`JoinHandle`] and all spawned
/// tasks continue running normally.
///
/// This is the default behavior.
///
/// # Examples
///
/// ```
/// use tokio::runtime::{self, UnhandledPanic};
///
/// # pub fn main() {
/// let rt = runtime::Builder::new_current_thread()
/// .unhandled_panic(UnhandledPanic::Ignore)
/// .build()
/// .unwrap();
///
/// let task1 = rt.spawn(async { panic!("boom"); });
/// let task2 = rt.spawn(async {
/// // This task completes normally
/// "done"
/// });
///
/// rt.block_on(async {
/// // The panic on the first task is forwarded to the `JoinHandle`
/// assert!(task1.await.is_err());
///
/// // The second task completes normally
/// assert!(task2.await.is_ok());
/// })
/// # }
/// ```
///
/// [`JoinHandle`]: struct@crate::task::JoinHandle
Ignore,

/// The runtime should immediately shutdown if a spawned task panics.
///
/// The runtime will immediately shutdown even if the panicked task's
/// [`JoinHandle`] is still available. All further spawned tasks will be
/// immediately dropped and call to [`Runtime::block_on`] will panic.
///
/// # Examples
///
/// ```should_panic
/// use tokio::runtime::{self, UnhandledPanic};
///
/// # pub fn main() {
/// let rt = runtime::Builder::new_current_thread()
/// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
/// .build()
/// .unwrap();
///
/// rt.spawn(async { panic!("boom"); });
/// rt.spawn(async {
/// // This task never completes.
/// });
///
/// rt.block_on(async {
/// // Do some work
/// # loop { tokio::task::yield_now().await; }
/// })
/// # }
/// ```
///
/// [`JoinHandle`]: struct@crate::task::JoinHandle
ShutdownRuntime,
}
}

pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
Expand Down Expand Up @@ -163,6 +247,9 @@ impl Builder {
// as parameters.
global_queue_interval,
event_interval,

#[cfg(tokio_unstable)]
unhandled_panic: UnhandledPanic::Ignore,
}
}

Expand Down Expand Up @@ -631,6 +718,67 @@ impl Builder {
self
}

cfg_unstable! {
/// Configure how the runtime responds to an unhandled panic on a
/// spawned task.
///
/// By default, an unhandled panic (i.e. a panic not caught by
/// [`std::panic::catch_unwind`]) has no impact on the runtime's
/// execution. The panic is error value is forwarded to the task's
/// [`JoinHandle`] and all other spawned tasks continue running.
///
/// The `unhandled_panic` option enables configuring this behavior.
///
/// * `UnhandledPanic::Ignore` is the default behavior. Panics on
/// spawned tasks have no impact on the runtime's execution.
/// * `UnhandledPanic::ShutdownRuntime` will force the runtime to
/// shutdown immediately when a spawned task panics even if that
/// task's `JoinHandle` has not been dropped. All other spawned tasks
/// will immediatetly terminate and further calls to
/// [`Runtime::block_on`] will panic.
///
/// # Unstable
///
/// This option is currently unstable and its implementation is
/// incomplete. The API may change or be removed in the future. See
/// tokio-rs/tokio#4516 for more details.
///
/// # Examples
///
/// The following demonstrates a runtime configured to shutdown on
/// panic. The first spawned task panics and results in the runtime
/// shutting down. The second spawned task never has a chance to
/// execute. The call to `block_on` will panic due to the runtime being
/// forcibly shutdown.
///
/// ```should_panic
/// use tokio::runtime::{self, UnhandledPanic};
///
/// # pub fn main() {
/// let rt = runtime::Builder::new_current_thread()
/// .unhandled_panic(UnhandledPanic::ShutdownRuntime)
/// .build()
/// .unwrap();
///
/// rt.spawn(async { panic!("boom"); });
/// rt.spawn(async {
/// // This task never completes.
/// });
///
/// rt.block_on(async {
/// // Do some work
/// # loop { tokio::task::yield_now().await; }
/// })
/// # }
/// ```
///
/// [`JoinHandle`]: struct@crate::task::JoinHandle
pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self {
self.unhandled_panic = behavior;
self
}
}

fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::basic_scheduler::Config;
use crate::runtime::{BasicScheduler, HandleInner, Kind};
Expand Down Expand Up @@ -661,6 +809,8 @@ impl Builder {
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
},
);
let spawner = Spawner::Basic(scheduler.spawner().clone());
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/runtime/mod.rs
Expand Up @@ -216,6 +216,9 @@ cfg_rt! {

mod builder;
pub use self::builder::Builder;
cfg_unstable! {
pub use self::builder::UnhandledPanic;
}

pub(crate) mod context;
mod driver;
Expand Down
25 changes: 21 additions & 4 deletions tokio/src/runtime/task/harness.rs
Expand Up @@ -101,7 +101,12 @@ where
let waker_ref = waker_ref::<T, S>(&header_ptr);
let cx = Context::from_waker(&*waker_ref);
let core = self.core();
let res = poll_future(&core.stage, core.task_id.clone(), cx);
let res = poll_future(
&core.stage,
&self.core().scheduler,
core.task_id.clone(),
cx,
);

if res == Poll::Ready(()) {
// The future completed. Move on to complete the task.
Expand Down Expand Up @@ -453,7 +458,12 @@ fn cancel_task<T: Future>(stage: &CoreStage<T>, id: super::Id) {

/// Polls the future. If the future completes, the output is written to the
/// stage field.
fn poll_future<T: Future>(core: &CoreStage<T>, id: super::Id, cx: Context<'_>) -> Poll<()> {
fn poll_future<T: Future, S: Schedule>(
core: &CoreStage<T>,
scheduler: &S,
id: super::Id,
cx: Context<'_>,
) -> Poll<()> {
// Poll the future.
let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
struct Guard<'a, T: Future> {
Expand All @@ -476,13 +486,20 @@ fn poll_future<T: Future>(core: &CoreStage<T>, id: super::Id, cx: Context<'_>) -
let output = match output {
Ok(Poll::Pending) => return Poll::Pending,
Ok(Poll::Ready(output)) => Ok(output),
Err(panic) => Err(JoinError::panic(id, panic)),
Err(panic) => {
scheduler.unhandled_panic();
Err(JoinError::panic(id, panic))
}
};

// Catch and ignore panics if the future panics on drop.
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
core.store_output(output);
}));

if res.is_err() {
scheduler.unhandled_panic();
}

Poll::Ready(())
}
5 changes: 5 additions & 0 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -262,6 +262,11 @@ pub(crate) trait Schedule: Sync + Sized + 'static {
fn yield_now(&self, task: Notified<Self>) {
self.schedule(task);
}

/// Polling the task resulted in a panic. Should the runtime shutdown?
fn unhandled_panic(&self) {
// By default, do nothing. This maintains the 1.0 behavior.
}
}

cfg_rt! {
Expand Down

0 comments on commit c98be22

Please sign in to comment.