diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index c9820b758d5..9792ef57bd5 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -57,6 +57,10 @@ struct Core { /// Metrics batch metrics: MetricsBatch, + + /// True if a task panicked without being handled and the runtime is + /// configured to shutdown on unhandled panic. + unhandled_panic: bool, } #[derive(Clone)] @@ -76,6 +80,10 @@ pub(crate) struct Config { /// Callback for a worker unparking itself pub(crate) after_unpark: Option, + + #[cfg(tokio_unstable)] + /// How to respond to unhandled task panics. + pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, } /// Scheduler state shared between threads. @@ -144,6 +152,7 @@ impl BasicScheduler { tick: 0, driver: Some(driver), metrics: MetricsBatch::new(), + unhandled_panic: false, }))); BasicScheduler { @@ -158,6 +167,7 @@ impl BasicScheduler { &self.spawner } + #[track_caller] pub(crate) fn block_on(&self, future: F) -> F::Output { pin!(future); @@ -465,6 +475,35 @@ impl Schedule for Arc { } }); } + + cfg_unstable! { + fn unhandled_panic(&self) { + use crate::runtime::UnhandledPanic; + + match self.config.unhandled_panic { + UnhandledPanic::Ignore => { + // Do nothing + } + UnhandledPanic::ShutdownRuntime => { + // This hook is only called from within the runtime, so + // `CURRENT` should match with `&self`, i.e. there is no + // opportunity for a nested scheduler to be called. + CURRENT.with(|maybe_cx| match maybe_cx { + Some(cx) if Arc::ptr_eq(self, &cx.spawner.shared) => { + let mut core = cx.core.borrow_mut(); + + // If `None`, the runtime is shutting down, so there is no need to signal shutdown + if let Some(core) = core.as_mut() { + core.unhandled_panic = true; + self.owned.close_and_shutdown_all(); + } + } + _ => unreachable!("runtime core not set in CURRENT thread-local"), + }) + } + } + } + } } impl Wake for Shared { @@ -489,8 +528,9 @@ struct CoreGuard<'a> { } impl CoreGuard<'_> { + #[track_caller] fn block_on(self, future: F) -> F::Output { - self.enter(|mut core, context| { + let ret = self.enter(|mut core, context| { let _enter = crate::runtime::enter(false); let waker = context.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); @@ -506,11 +546,16 @@ impl CoreGuard<'_> { core = c; if let Ready(v) = res { - return (core, v); + return (core, Some(v)); } } for _ in 0..core.spawner.shared.config.event_interval { + // Make sure we didn't hit an unhandled_panic + if core.unhandled_panic { + return (core, None); + } + // Get and increment the current tick let tick = core.tick; core.tick = core.tick.wrapping_add(1); @@ -544,7 +589,15 @@ impl CoreGuard<'_> { // pending I/O events. core = context.park_yield(core); } - }) + }); + + match ret { + Some(ret) => ret, + None => { + // `block_on` panicked. + panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic"); + } + } } /// Enters the scheduler context. This sets the queue and other necessary diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index f6d78f736a0..924aac98ef3 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -84,6 +84,90 @@ pub struct Builder { /// How many ticks before yielding to the driver for timer and I/O events? pub(super) event_interval: u32, + + #[cfg(tokio_unstable)] + pub(super) unhandled_panic: UnhandledPanic, +} + +cfg_unstable! { + /// How the runtime should respond to unhandled panics. + /// + /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic` + /// to configure the runtime behavior when a spawned task panics. + /// + /// See [`Builder::unhandled_panic`] for more details. + #[derive(Debug, Clone)] + #[non_exhaustive] + pub enum UnhandledPanic { + /// The runtime should ignore panics on spawned tasks. + /// + /// The panic is forwarded to the task's [`JoinHandle`] and all spawned + /// tasks continue running normally. + /// + /// This is the default behavior. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::{self, UnhandledPanic}; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new_current_thread() + /// .unhandled_panic(UnhandledPanic::Ignore) + /// .build() + /// .unwrap(); + /// + /// let task1 = rt.spawn(async { panic!("boom"); }); + /// let task2 = rt.spawn(async { + /// // This task completes normally + /// "done" + /// }); + /// + /// rt.block_on(async { + /// // The panic on the first task is forwarded to the `JoinHandle` + /// assert!(task1.await.is_err()); + /// + /// // The second task completes normally + /// assert!(task2.await.is_ok()); + /// }) + /// # } + /// ``` + /// + /// [`JoinHandle`]: struct@crate::task::JoinHandle + Ignore, + + /// The runtime should immediately shutdown if a spawned task panics. + /// + /// The runtime will immediately shutdown even if the panicked task's + /// [`JoinHandle`] is still available. All further spawned tasks will be + /// immediately dropped and call to [`Runtime::block_on`] will panic. + /// + /// # Examples + /// + /// ```should_panic + /// use tokio::runtime::{self, UnhandledPanic}; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new_current_thread() + /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) + /// .build() + /// .unwrap(); + /// + /// rt.spawn(async { panic!("boom"); }); + /// rt.spawn(async { + /// // This task never completes. + /// }); + /// + /// rt.block_on(async { + /// // Do some work + /// # loop { tokio::task::yield_now().await; } + /// }) + /// # } + /// ``` + /// + /// [`JoinHandle`]: struct@crate::task::JoinHandle + ShutdownRuntime, + } } pub(crate) type ThreadNameFn = std::sync::Arc String + Send + Sync + 'static>; @@ -163,6 +247,9 @@ impl Builder { // as parameters. global_queue_interval, event_interval, + + #[cfg(tokio_unstable)] + unhandled_panic: UnhandledPanic::Ignore, } } @@ -631,6 +718,67 @@ impl Builder { self } + cfg_unstable! { + /// Configure how the runtime responds to an unhandled panic on a + /// spawned task. + /// + /// By default, an unhandled panic (i.e. a panic not caught by + /// [`std::panic::catch_unwind`]) has no impact on the runtime's + /// execution. The panic is error value is forwarded to the task's + /// [`JoinHandle`] and all other spawned tasks continue running. + /// + /// The `unhandled_panic` option enables configuring this behavior. + /// + /// * `UnhandledPanic::Ignore` is the default behavior. Panics on + /// spawned tasks have no impact on the runtime's execution. + /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to + /// shutdown immediately when a spawned task panics even if that + /// task's `JoinHandle` has not been dropped. All other spawned tasks + /// will immediatetly terminate and further calls to + /// [`Runtime::block_on`] will panic. + /// + /// # Unstable + /// + /// This option is currently unstable and its implementation is + /// incomplete. The API may change or be removed in the future. See + /// tokio-rs/tokio#4516 for more details. + /// + /// # Examples + /// + /// The following demonstrates a runtime configured to shutdown on + /// panic. The first spawned task panics and results in the runtime + /// shutting down. The second spawned task never has a chance to + /// execute. The call to `block_on` will panic due to the runtime being + /// forcibly shutdown. + /// + /// ```should_panic + /// use tokio::runtime::{self, UnhandledPanic}; + /// + /// # pub fn main() { + /// let rt = runtime::Builder::new_current_thread() + /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) + /// .build() + /// .unwrap(); + /// + /// rt.spawn(async { panic!("boom"); }); + /// rt.spawn(async { + /// // This task never completes. + /// }); + /// + /// rt.block_on(async { + /// // Do some work + /// # loop { tokio::task::yield_now().await; } + /// }) + /// # } + /// ``` + /// + /// [`JoinHandle`]: struct@crate::task::JoinHandle + pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self { + self.unhandled_panic = behavior; + self + } + } + fn build_basic_runtime(&mut self) -> io::Result { use crate::runtime::basic_scheduler::Config; use crate::runtime::{BasicScheduler, HandleInner, Kind}; @@ -661,6 +809,8 @@ impl Builder { after_unpark: self.after_unpark.clone(), global_queue_interval: self.global_queue_interval, event_interval: self.event_interval, + #[cfg(tokio_unstable)] + unhandled_panic: self.unhandled_panic.clone(), }, ); let spawner = Spawner::Basic(scheduler.spawner().clone()); diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index a030ccdf0d2..399e602d2bf 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -216,6 +216,9 @@ cfg_rt! { mod builder; pub use self::builder::Builder; + cfg_unstable! { + pub use self::builder::UnhandledPanic; + } pub(crate) mod context; mod driver; diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 1d3ababfb17..206cdf2695a 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -101,7 +101,12 @@ where let waker_ref = waker_ref::(&header_ptr); let cx = Context::from_waker(&*waker_ref); let core = self.core(); - let res = poll_future(&core.stage, core.task_id.clone(), cx); + let res = poll_future( + &core.stage, + &self.core().scheduler, + core.task_id.clone(), + cx, + ); if res == Poll::Ready(()) { // The future completed. Move on to complete the task. @@ -453,7 +458,12 @@ fn cancel_task(stage: &CoreStage, id: super::Id) { /// Polls the future. If the future completes, the output is written to the /// stage field. -fn poll_future(core: &CoreStage, id: super::Id, cx: Context<'_>) -> Poll<()> { +fn poll_future( + core: &CoreStage, + scheduler: &S, + id: super::Id, + cx: Context<'_>, +) -> Poll<()> { // Poll the future. let output = panic::catch_unwind(panic::AssertUnwindSafe(|| { struct Guard<'a, T: Future> { @@ -476,13 +486,20 @@ fn poll_future(core: &CoreStage, id: super::Id, cx: Context<'_>) - let output = match output { Ok(Poll::Pending) => return Poll::Pending, Ok(Poll::Ready(output)) => Ok(output), - Err(panic) => Err(JoinError::panic(id, panic)), + Err(panic) => { + scheduler.unhandled_panic(); + Err(JoinError::panic(id, panic)) + } }; // Catch and ignore panics if the future panics on drop. - let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + let res = panic::catch_unwind(panic::AssertUnwindSafe(|| { core.store_output(output); })); + if res.is_err() { + scheduler.unhandled_panic(); + } + Poll::Ready(()) } diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 316b4a49675..e73b3f35a54 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -262,6 +262,11 @@ pub(crate) trait Schedule: Sync + Sized + 'static { fn yield_now(&self, task: Notified) { self.schedule(task); } + + /// Polling the task resulted in a panic. Should the runtime shutdown? + fn unhandled_panic(&self) { + // By default, do nothing. This maintains the 1.0 behavior. + } } cfg_rt! { diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 8c59876d603..e3ba50a79ff 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -232,6 +232,10 @@ struct Context { /// State shared between threads. shared: Arc, + + /// True if a task panicked without being handled and the local set is + /// configured to shutdown on unhandled panic. + unhandled_panic: Cell, } /// LocalSet state shared between threads. @@ -241,6 +245,10 @@ struct Shared { /// Wake the `LocalSet` task. waker: AtomicWaker, + + /// How to respond to unhandled task panics. + #[cfg(tokio_unstable)] + pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, } pin_project! { @@ -330,7 +338,10 @@ impl LocalSet { shared: Arc::new(Shared { queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), waker: AtomicWaker::new(), + #[cfg(tokio_unstable)] + unhandled_panic: crate::runtime::UnhandledPanic::Ignore, }), + unhandled_panic: Cell::new(false), }, _not_send: PhantomData, } @@ -516,6 +527,11 @@ impl LocalSet { /// notified again. fn tick(&self) -> bool { for _ in 0..MAX_TASKS_PER_TICK { + // Make sure we didn't hit an unhandled panic + if self.context.unhandled_panic.get() { + panic!("a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic"); + } + match self.next_task() { // Run the task // @@ -567,6 +583,76 @@ impl LocalSet { } } +cfg_unstable! { + impl LocalSet { + /// Configure how the `LocalSet` responds to an unhandled panic on a + /// spawned task. + /// + /// By default, an unhandled panic (i.e. a panic not caught by + /// [`std::panic::catch_unwind`]) has no impact on the `LocalSet`'s + /// execution. The panic is error value is forwarded to the task's + /// [`JoinHandle`] and all other spawned tasks continue running. + /// + /// The `unhandled_panic` option enables configuring this behavior. + /// + /// * `UnhandledPanic::Ignore` is the default behavior. Panics on + /// spawned tasks have no impact on the `LocalSet`'s execution. + /// * `UnhandledPanic::ShutdownRuntime` will force the `LocalSet` to + /// shutdown immediately when a spawned task panics even if that + /// task's `JoinHandle` has not been dropped. All other spawned tasks + /// will immediatetly terminate and further calls to + /// [`LocalSet::block_on`] and [`LocalSet::run_until`] will panic. + /// + /// # Panics + /// + /// This method panics if called after the `LocalSet` has started + /// running. + /// + /// # Unstable + /// + /// This option is currently unstable and its implementation is + /// incomplete. The API may change or be removed in the future. See + /// tokio-rs/tokio#4516 for more details. + /// + /// # Examples + /// + /// The following demonstrates a `LocalSet` configured to shutdown on + /// panic. The first spawned task panics and results in the `LocalSet` + /// shutting down. The second spawned task never has a chance to + /// execute. The call to `run_until` will panic due to the runtime being + /// forcibly shutdown. + /// + /// ```should_panic + /// use tokio::runtime::UnhandledPanic; + /// + /// # #[tokio::main] + /// # async fn main() { + /// tokio::task::LocalSet::new() + /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) + /// .run_until(async { + /// tokio::task::spawn_local(async { panic!("boom"); }); + /// tokio::task::spawn_local(async { + /// // This task never completes + /// }); + /// + /// // Do some work, but `run_until` will panic before it completes + /// # loop { tokio::task::yield_now().await; } + /// }) + /// .await; + /// # } + /// ``` + /// + /// [`JoinHandle`]: struct@crate::task::JoinHandle + pub fn unhandled_panic(&mut self, behavior: crate::runtime::UnhandledPanic) -> &mut Self { + // TODO: This should be set as a builder + Arc::get_mut(&mut self.context.shared) + .expect("TODO: we shouldn't panic") + .unhandled_panic = behavior; + self + } + } +} + impl fmt::Debug for LocalSet { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("LocalSet").finish() @@ -722,4 +808,28 @@ impl task::Schedule for Arc { fn schedule(&self, task: task::Notified) { Shared::schedule(self, task); } + + cfg_unstable! { + fn unhandled_panic(&self) { + use crate::runtime::UnhandledPanic; + + match self.unhandled_panic { + UnhandledPanic::Ignore => { + // Do nothing + } + UnhandledPanic::ShutdownRuntime => { + // This hook is only called from within the runtime, so + // `CURRENT` should match with `&self`, i.e. there is no + // opportunity for a nested scheduler to be called. + CURRENT.with(|maybe_cx| match maybe_cx { + Some(cx) if Arc::ptr_eq(self, &cx.shared) => { + cx.unhandled_panic.set(true); + cx.owned.close_and_shutdown_all(); + } + _ => unreachable!("runtime core not set in CURRENT thread-local"), + }) + } + } + } + } } diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index cc6ac677280..50b7e9732a5 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -288,6 +288,96 @@ fn timeout_panics_when_no_time_handle() { }); } +#[cfg(tokio_unstable)] +mod unstable { + use tokio::runtime::{Builder, UnhandledPanic}; + + #[test] + #[should_panic( + expected = "a spawned task panicked and the runtime is configured to shut down on unhandled panic" + )] + fn shutdown_on_panic() { + let rt = Builder::new_current_thread() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + + rt.block_on(async { + tokio::spawn(async { + panic!("boom"); + }); + + futures::future::pending::<()>().await; + }) + } + + #[test] + fn spawns_do_nothing() { + use std::sync::Arc; + + let rt = Builder::new_current_thread() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + + let rt1 = Arc::new(rt); + let rt2 = rt1.clone(); + + let _ = std::thread::spawn(move || { + rt2.block_on(async { + tokio::spawn(async { + panic!("boom"); + }); + + futures::future::pending::<()>().await; + }) + }) + .join(); + + let task = rt1.spawn(async {}); + let res = futures::executor::block_on(task); + assert!(res.is_err()); + } + + #[test] + fn shutdown_all_concurrent_block_on() { + const N: usize = 2; + use std::sync::{mpsc, Arc}; + + let rt = Builder::new_current_thread() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .build() + .unwrap(); + + let rt = Arc::new(rt); + let mut ths = vec![]; + let (tx, rx) = mpsc::channel(); + + for _ in 0..N { + let rt = rt.clone(); + let tx = tx.clone(); + ths.push(std::thread::spawn(move || { + rt.block_on(async { + tx.send(()).unwrap(); + futures::future::pending::<()>().await; + }); + })); + } + + for _ in 0..N { + rx.recv().unwrap(); + } + + rt.spawn(async { + panic!("boom"); + }); + + for th in ths { + assert!(th.join().is_err()); + } + } +} + fn rt() -> Runtime { tokio::runtime::Builder::new_current_thread() .enable_all() diff --git a/tokio/tests/task_local_set.rs b/tokio/tests/task_local_set.rs index f8a35d0ede8..2bd5b17b84d 100644 --- a/tokio/tests/task_local_set.rs +++ b/tokio/tests/task_local_set.rs @@ -517,6 +517,29 @@ async fn spawn_wakes_localset() { } } +#[cfg(tokio_unstable)] +mod unstable { + use tokio::runtime::UnhandledPanic; + use tokio::task::LocalSet; + + #[tokio::test] + #[should_panic( + expected = "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic" + )] + async fn shutdown_on_panic() { + LocalSet::new() + .unhandled_panic(UnhandledPanic::ShutdownRuntime) + .run_until(async { + tokio::task::spawn_local(async { + panic!("boom"); + }); + + futures::future::pending::<()>().await; + }) + .await; + } +} + fn rt() -> Runtime { tokio::runtime::Builder::new_current_thread() .enable_all()