diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 7984b61276b..c3603b50f6b 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -111,6 +111,8 @@ where } pub(super) fn drop_join_handle_slow(self) { + let mut maybe_panic = None; + // Try to unset `JOIN_INTEREST`. This must be done as a first step in // case the task concurrently completed. if self.header().state.unset_join_interested().is_err() { @@ -119,11 +121,20 @@ where // the scheduler or `JoinHandle`. i.e. if the output remains in the // task structure until the task is deallocated, it may be dropped // by a Waker on any arbitrary thread. - self.core().stage.drop_future_or_output(); + let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { + self.core().stage.drop_future_or_output(); + })); + if let Err(panic) = panic { + maybe_panic = Some(panic); + } } // Drop the `JoinHandle` reference, possibly deallocating the task self.drop_reference(); + + if let Some(panic) = maybe_panic { + panic::resume_unwind(panic); + } } // ===== waker behavior ===== @@ -182,17 +193,25 @@ where // ====== internal ====== fn complete(self, output: super::Result, is_join_interested: bool) { - if is_join_interested { - // Store the output. The future has already been dropped - // - // Safety: Mutual exclusion is obtained by having transitioned the task - // state -> Running - let stage = &self.core().stage; - stage.store_output(output); - - // Transition to `Complete`, notifying the `JoinHandle` if necessary. - transition_to_complete(self.header(), stage, &self.trailer()); - } + // We catch panics here because dropping the output may panic. + // + // Dropping the output can also happen in the first branch inside + // transition_to_complete. + let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + if is_join_interested { + // Store the output. The future has already been dropped + // + // Safety: Mutual exclusion is obtained by having transitioned the task + // state -> Running + let stage = &self.core().stage; + stage.store_output(output); + + // Transition to `Complete`, notifying the `JoinHandle` if necessary. + transition_to_complete(self.header(), stage, &self.trailer()); + } else { + drop(output); + } + })); // The task has completed execution and will no longer be scheduled. // diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index f3c6a9bd700..e0379a68c89 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -40,6 +40,9 @@ cfg_loom! { cfg_not_loom! { mod queue; + #[cfg(not(miri))] + mod task_combinations; + #[cfg(miri)] mod task; } diff --git a/tokio/src/runtime/tests/task_combinations.rs b/tokio/src/runtime/tests/task_combinations.rs new file mode 100644 index 00000000000..76ce2330c2c --- /dev/null +++ b/tokio/src/runtime/tests/task_combinations.rs @@ -0,0 +1,380 @@ +use std::future::Future; +use std::panic; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::runtime::Builder; +use crate::sync::oneshot; +use crate::task::JoinHandle; + +use futures::future::FutureExt; + +// Enums for each option in the combinations being tested + +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiRuntime { + CurrentThread, + Multi1, + Multi2, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiLocalSet { + Yes, + No, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiTask { + PanicOnRun, + PanicOnDrop, + PanicOnRunAndDrop, + NoPanic, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiOutput { + PanicOnDrop, + NoPanic, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiJoinInterest { + Polled, + NotPolled, +} +#[allow(clippy::enum_variant_names)] // we aren't using glob imports +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiJoinHandle { + DropImmediately = 1, + DropFirstPoll = 2, + DropAfterNoConsume = 3, + DropAfterConsume = 4, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiAbort { + NotAborted = 0, + AbortedImmediately = 1, + AbortedFirstPoll = 2, + AbortedAfterFinish = 3, + AbortedAfterConsumeOutput = 4, +} + +#[test] +fn test_combinations() { + let mut rt = &[ + CombiRuntime::CurrentThread, + CombiRuntime::Multi1, + CombiRuntime::Multi2, + ][..]; + + if cfg!(miri) { + rt = &[CombiRuntime::CurrentThread]; + } + + let ls = [CombiLocalSet::Yes, CombiLocalSet::No]; + let task = [ + CombiTask::NoPanic, + CombiTask::PanicOnRun, + CombiTask::PanicOnDrop, + CombiTask::PanicOnRunAndDrop, + ]; + let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop]; + let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled]; + let jh = [ + CombiJoinHandle::DropImmediately, + CombiJoinHandle::DropFirstPoll, + CombiJoinHandle::DropAfterNoConsume, + CombiJoinHandle::DropAfterConsume, + ]; + let abort = [ + CombiAbort::NotAborted, + CombiAbort::AbortedImmediately, + CombiAbort::AbortedFirstPoll, + CombiAbort::AbortedAfterFinish, + CombiAbort::AbortedAfterConsumeOutput, + ]; + + for rt in rt.iter().copied() { + for ls in ls.iter().copied() { + for task in task.iter().copied() { + for output in output.iter().copied() { + for ji in ji.iter().copied() { + for jh in jh.iter().copied() { + for abort in abort.iter().copied() { + test_combination(rt, ls, task, output, ji, jh, abort); + } + } + } + } + } + } + } +} + +fn test_combination( + rt: CombiRuntime, + ls: CombiLocalSet, + task: CombiTask, + output: CombiOutput, + ji: CombiJoinInterest, + jh: CombiJoinHandle, + abort: CombiAbort, +) { + if (jh as usize) < (abort as usize) { + // drop before abort not possible + return; + } + if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) { + // this causes double panic + return; + } + if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) { + // this causes double panic + return; + } + + println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); + + // A runtime optionally with a LocalSet + struct Rt { + rt: crate::runtime::Runtime, + ls: Option, + } + impl Rt { + fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self { + let rt = match rt { + CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(), + CombiRuntime::Multi1 => Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(), + CombiRuntime::Multi2 => Builder::new_multi_thread() + .worker_threads(2) + .build() + .unwrap(), + }; + + let ls = match ls { + CombiLocalSet::Yes => Some(crate::task::LocalSet::new()), + CombiLocalSet::No => None, + }; + + Self { rt, ls } + } + fn block_on(&self, task: T) -> T::Output + where + T: Future, + { + match &self.ls { + Some(ls) => ls.block_on(&self.rt, task), + None => self.rt.block_on(task), + } + } + fn spawn(&self, task: T) -> JoinHandle + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + match &self.ls { + Some(ls) => ls.spawn_local(task), + None => self.rt.spawn(task), + } + } + } + + // The type used for the output of the future + struct Output { + panic_on_drop: bool, + on_drop: Option>, + } + impl Output { + fn disarm(&mut self) { + self.panic_on_drop = false; + } + } + impl Drop for Output { + fn drop(&mut self) { + let _ = self.on_drop.take().unwrap().send(()); + if self.panic_on_drop { + panic!("Panicking in Output"); + } + } + } + + // A wrapper around the future that is spawned + struct FutWrapper { + inner: F, + on_drop: Option>, + panic_on_drop: bool, + } + impl Future for FutWrapper { + type Output = F::Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unsafe { + let me = Pin::into_inner_unchecked(self); + let inner = Pin::new_unchecked(&mut me.inner); + inner.poll(cx) + } + } + } + impl Drop for FutWrapper { + fn drop(&mut self) { + let _: Result<(), ()> = self.on_drop.take().unwrap().send(()); + if self.panic_on_drop { + panic!("Panicking in FutWrapper"); + } + } + } + + // The channels passed to the task + struct Signals { + on_first_poll: Option>, + wait_complete: Option>, + on_output_drop: Option>, + } + + // The task we will spawn + async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output { + // Signal that we have been polled once + let _ = signal.on_first_poll.take().unwrap().send(()); + + // Wait for a signal, then complete the future + let _ = signal.wait_complete.take().unwrap().await; + + // If the task gets past wait_complete without yielding, then aborts + // may not be caught without this yield_now. + crate::task::yield_now().await; + + if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop { + panic!("Panicking in my_task on {:?}", std::thread::current().id()); + } + + Output { + panic_on_drop: out == CombiOutput::PanicOnDrop, + on_drop: signal.on_output_drop.take(), + } + } + + let rt = Rt::new(rt, ls); + + let (on_first_poll, wait_first_poll) = oneshot::channel(); + let (on_complete, wait_complete) = oneshot::channel(); + let (on_future_drop, wait_future_drop) = oneshot::channel(); + let (on_output_drop, wait_output_drop) = oneshot::channel(); + let signal = Signals { + on_first_poll: Some(on_first_poll), + wait_complete: Some(wait_complete), + on_output_drop: Some(on_output_drop), + }; + + // === Spawn task === + let mut handle = Some(rt.spawn(FutWrapper { + inner: my_task(signal, task, output), + on_drop: Some(on_future_drop), + panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop, + })); + + // Keep track of whether the task has been killed with an abort + let mut aborted = false; + + // If we want to poll the JoinHandle, do it now + if ji == CombiJoinInterest::Polled { + assert!( + handle.as_mut().unwrap().now_or_never().is_none(), + "Polling handle succeeded" + ); + } + + if abort == CombiAbort::AbortedImmediately { + handle.as_mut().unwrap().abort(); + aborted = true; + } + if jh == CombiJoinHandle::DropImmediately { + drop(handle.take().unwrap()); + } + + // === Wait for first poll === + let got_polled = rt.block_on(wait_first_poll).is_ok(); + if !got_polled { + // it's possible that we are aborted but still got polled + assert!( + aborted, + "Task completed without ever being polled but was not aborted." + ); + } + + if abort == CombiAbort::AbortedFirstPoll { + handle.as_mut().unwrap().abort(); + aborted = true; + } + if jh == CombiJoinHandle::DropFirstPoll { + drop(handle.take().unwrap()); + } + + // Signal the future that it can return now + let _ = on_complete.send(()); + // === Wait for future to be dropped === + assert!( + rt.block_on(wait_future_drop).is_ok(), + "The future should always be dropped." + ); + + if abort == CombiAbort::AbortedAfterFinish { + // Don't set aborted to true here as the task already finished + handle.as_mut().unwrap().abort(); + } + if jh == CombiJoinHandle::DropAfterNoConsume { + // The runtime will usually have dropped every ref-count at this point, + // in which case dropping the JoinHandle drops the output. + // + // (But it might race and still hold a ref-count) + let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { + drop(handle.take().unwrap()); + })); + if panic.is_err() { + assert!( + (output == CombiOutput::PanicOnDrop) + && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) + && !aborted, + "Dropping JoinHandle shouldn't panic here" + ); + } + } + + // Check whether we drop after consuming the output + if jh == CombiJoinHandle::DropAfterConsume { + // Using as_mut() to not immediately drop the handle + let result = rt.block_on(handle.as_mut().unwrap()); + + match result { + Ok(mut output) => { + // Don't panic here. + output.disarm(); + assert!(!aborted, "Task was aborted but returned output"); + } + Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"), + Err(err) if err.is_panic() => { + assert!( + (task == CombiTask::PanicOnRun) + || (task == CombiTask::PanicOnDrop) + || (task == CombiTask::PanicOnRunAndDrop) + || (output == CombiOutput::PanicOnDrop), + "Panic but nothing should panic" + ); + } + _ => unreachable!(), + } + + let handle = handle.take().unwrap(); + if abort == CombiAbort::AbortedAfterConsumeOutput { + handle.abort(); + } + drop(handle); + } + + // The output should have been dropped now. Check whether the output + // object was created at all. + let output_created = rt.block_on(wait_output_drop).is_ok(); + assert_eq!( + output_created, + (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted, + "Creation of output object" + ); +} diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index 8f621683faa..cdaa405b86a 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -5,11 +5,21 @@ use std::sync::Arc; use std::thread::sleep; use std::time::Duration; +use tokio::runtime::Builder; + +struct PanicOnDrop; + +impl Drop for PanicOnDrop { + fn drop(&mut self) { + panic!("Well what did you expect would happen..."); + } +} + /// Checks that a suspended task can be aborted without panicking as reported in /// issue #3157: . #[test] fn test_abort_without_panic_3157() { - let rt = tokio::runtime::Builder::new_multi_thread() + let rt = Builder::new_multi_thread() .enable_time() .worker_threads(1) .build() @@ -45,9 +55,7 @@ fn test_abort_without_panic_3662() { } } - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); + let rt = Builder::new_current_thread().build().unwrap(); rt.block_on(async move { let drop_flag = Arc::new(AtomicBool::new(false)); @@ -120,9 +128,7 @@ fn remote_abort_local_set_3929() { } } - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); + let rt = Builder::new_current_thread().build().unwrap(); let local = tokio::task::LocalSet::new(); let check = DropCheck::new(); @@ -144,10 +150,7 @@ fn remote_abort_local_set_3929() { /// issue #3964: . #[test] fn test_abort_wakes_task_3964() { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); + let rt = Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { let notify_dropped = Arc::new(()); @@ -174,22 +177,11 @@ fn test_abort_wakes_task_3964() { }); } -struct PanicOnDrop; - -impl Drop for PanicOnDrop { - fn drop(&mut self) { - panic!("Well what did you expect would happen..."); - } -} - /// Checks that aborting a task whose destructor panics does not allow the /// panic to escape the task. #[test] fn test_abort_task_that_panics_on_drop_contained() { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); + let rt = Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { let handle = tokio::spawn(async move { @@ -213,10 +205,7 @@ fn test_abort_task_that_panics_on_drop_contained() { /// Checks that aborting a task whose destructor panics has the expected result. #[test] fn test_abort_task_that_panics_on_drop_returned() { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); + let rt = Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { let handle = tokio::spawn(async move {