From 8993e6e80129209c5464f7b7833b5f16831704bf Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 19 Jul 2021 15:29:23 +0200 Subject: [PATCH 01/15] tests: test combinations of task behavior --- tokio/src/runtime/task/harness.rs | 4 + tokio/tests/task_abort.rs | 43 ++-- tokio/tests/task_combinations.rs | 357 ++++++++++++++++++++++++++++++ 3 files changed, 377 insertions(+), 27 deletions(-) create mode 100644 tokio/tests/task_combinations.rs diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 9f0b1071130..d35647c2d94 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -193,6 +193,10 @@ where // Transition to `Complete`, notifying the `JoinHandle` if necessary. transition_to_complete(self.header(), stage, &self.trailer()); + } else { + let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + drop(output); + })); } // The task has completed execution and will no longer be scheduled. diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index 8f621683faa..cdaa405b86a 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -5,11 +5,21 @@ use std::sync::Arc; use std::thread::sleep; use std::time::Duration; +use tokio::runtime::Builder; + +struct PanicOnDrop; + +impl Drop for PanicOnDrop { + fn drop(&mut self) { + panic!("Well what did you expect would happen..."); + } +} + /// Checks that a suspended task can be aborted without panicking as reported in /// issue #3157: . #[test] fn test_abort_without_panic_3157() { - let rt = tokio::runtime::Builder::new_multi_thread() + let rt = Builder::new_multi_thread() .enable_time() .worker_threads(1) .build() @@ -45,9 +55,7 @@ fn test_abort_without_panic_3662() { } } - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); + let rt = Builder::new_current_thread().build().unwrap(); rt.block_on(async move { let drop_flag = Arc::new(AtomicBool::new(false)); @@ -120,9 +128,7 @@ fn remote_abort_local_set_3929() { } } - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); + let rt = Builder::new_current_thread().build().unwrap(); let local = tokio::task::LocalSet::new(); let check = DropCheck::new(); @@ -144,10 +150,7 @@ fn remote_abort_local_set_3929() { /// issue #3964: . #[test] fn test_abort_wakes_task_3964() { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); + let rt = Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { let notify_dropped = Arc::new(()); @@ -174,22 +177,11 @@ fn test_abort_wakes_task_3964() { }); } -struct PanicOnDrop; - -impl Drop for PanicOnDrop { - fn drop(&mut self) { - panic!("Well what did you expect would happen..."); - } -} - /// Checks that aborting a task whose destructor panics does not allow the /// panic to escape the task. #[test] fn test_abort_task_that_panics_on_drop_contained() { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); + let rt = Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { let handle = tokio::spawn(async move { @@ -213,10 +205,7 @@ fn test_abort_task_that_panics_on_drop_contained() { /// Checks that aborting a task whose destructor panics has the expected result. #[test] fn test_abort_task_that_panics_on_drop_returned() { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); + let rt = Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { let handle = tokio::spawn(async move { diff --git a/tokio/tests/task_combinations.rs b/tokio/tests/task_combinations.rs new file mode 100644 index 00000000000..afdc24a0e2a --- /dev/null +++ b/tokio/tests/task_combinations.rs @@ -0,0 +1,357 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::future::Future; +use std::panic; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use tokio::runtime::Builder; +use tokio::sync::oneshot; +use tokio::task::JoinHandle; + +use futures::future::FutureExt; + +// Enums for each option in the combinations being tested + +#[derive(Copy, Clone, Debug)] +enum CombiRuntime { + CurrentThread, + Multi1, + Multi2, +} +#[derive(Copy, Clone, Debug)] +enum CombiLocalSet { + Yes, + No, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiTask { + PanicOnRun, + PanicOnDrop, + PanicOnRunAndDrop, + NoPanic, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiOutput { + PanicOnDrop, + NoPanic, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiJoinInterest { + Polled, + NotPolled, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiJoinHandle { + DropImmediately = 1, + DropFirstPoll = 2, + DropAfterNoConsume = 3, + DropAfterConsume = 4, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiAbort { + NotAborted = 0, + AbortedImmediately = 1, + AbortedFirstPoll = 2, + AbortedAfterFinish = 3, + AbortedAfterConsumeOutput = 4, +} + +#[test] +fn test_combinations() { + let rt = [ + CombiRuntime::CurrentThread, + CombiRuntime::Multi1, + CombiRuntime::Multi2, + ]; + let ls = [CombiLocalSet::Yes, CombiLocalSet::No]; + let task = [ + CombiTask::NoPanic, + CombiTask::PanicOnRun, + CombiTask::PanicOnDrop, + CombiTask::PanicOnRunAndDrop, + ]; + let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop]; + let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled]; + let jh = [ + CombiJoinHandle::DropImmediately, + CombiJoinHandle::DropFirstPoll, + CombiJoinHandle::DropAfterNoConsume, + CombiJoinHandle::DropAfterConsume, + ]; + let abort = [ + CombiAbort::NotAborted, + CombiAbort::AbortedImmediately, + CombiAbort::AbortedFirstPoll, + CombiAbort::AbortedAfterFinish, + CombiAbort::AbortedAfterConsumeOutput, + ]; + + for rt in rt.iter().copied() { + for ls in ls.iter().copied() { + for task in task.iter().copied() { + for output in output.iter().copied() { + for ji in ji.iter().copied() { + for jh in jh.iter().copied() { + for abort in abort.iter().copied() { + test_combination(rt, ls, task, output, ji, jh, abort); + } + } + } + } + } + } + } +} + +fn test_combination( + rt: CombiRuntime, + ls: CombiLocalSet, + task: CombiTask, + output: CombiOutput, + ji: CombiJoinInterest, + jh: CombiJoinHandle, + abort: CombiAbort, +) { + if (jh as usize) < (abort as usize) { + // drop before abort not possible + return; + } + if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) { + // this causes double panic + return; + } + if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) { + // this causes double panic + return; + } + + println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?} JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); + + // A runtime optionally with a LocalSet + struct Rt { + rt: tokio::runtime::Runtime, + ls: Option, + } + impl Rt { + fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self { + let rt = match rt { + CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(), + CombiRuntime::Multi1 => Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(), + CombiRuntime::Multi2 => Builder::new_multi_thread() + .worker_threads(2) + .build() + .unwrap(), + }; + + let ls = match ls { + CombiLocalSet::Yes => Some(tokio::task::LocalSet::new()), + CombiLocalSet::No => None, + }; + + Self { rt, ls } + } + fn block_on(&self, task: T) -> T::Output + where + T: Future, + { + match &self.ls { + Some(ls) => ls.block_on(&self.rt, task), + None => self.rt.block_on(task), + } + } + fn spawn(&self, task: T) -> JoinHandle + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + match &self.ls { + Some(ls) => ls.spawn_local(task), + None => self.rt.spawn(task), + } + } + } + + // The type used for the output of the future + struct Output { + panic_on_drop: bool, + on_drop: Option>, + } + impl Output { + fn disarm(&mut self) { + self.panic_on_drop = false; + } + } + impl Drop for Output { + fn drop(&mut self) { + let _ = self.on_drop.take().unwrap().send(()); + if self.panic_on_drop { + panic!("Panicking"); + } + } + } + + // A wrapper around the future that is spawned + struct FutWrapper { + inner: F, + on_drop: Option>, + panic_on_drop: bool, + } + impl Future for FutWrapper { + type Output = F::Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unsafe { + let me = Pin::into_inner_unchecked(self); + let inner = Pin::new_unchecked(&mut me.inner); + inner.poll(cx) + } + } + } + impl Drop for FutWrapper { + fn drop(&mut self) { + let _ = self.on_drop.take().unwrap().send(()); + if self.panic_on_drop { + panic!("Panicking"); + } + } + } + + // The channels passed to the task + struct Signals { + on_first_poll: Option>, + wait_complete: Option>, + on_output_drop: Option>, + } + + // The task we will spawn + async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output { + // Signal that we have been polled once + let _ = signal.on_first_poll.take().unwrap().send(()); + + // Wait for a signal, then complete the future + let _ = signal.wait_complete.take().unwrap().await; + + if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop { + panic!("Panicking"); + } + + Output { + panic_on_drop: out == CombiOutput::PanicOnDrop, + on_drop: signal.on_output_drop.take(), + } + } + + let rt = Rt::new(rt, ls); + + let (on_first_poll, wait_first_poll) = oneshot::channel(); + let (on_complete, wait_complete) = oneshot::channel(); + let (on_future_drop, wait_future_drop) = oneshot::channel(); + let (on_output_drop, wait_output_drop) = oneshot::channel(); + let signal = Signals { + on_first_poll: Some(on_first_poll), + wait_complete: Some(wait_complete), + on_output_drop: Some(on_output_drop), + }; + + // === Spawn task === + let mut handle = Some(rt.spawn(FutWrapper { + inner: my_task(signal, task, output), + on_drop: Some(on_future_drop), + panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop, + })); + + // Keep track of whether the task has been killed with an abort + let mut aborted = false; + + // If we want to poll the JoinHandle, do it now + if ji == CombiJoinInterest::Polled { + assert!(handle.as_mut().unwrap().now_or_never().is_none()); + } + + if abort == CombiAbort::AbortedImmediately { + handle.as_mut().unwrap().abort(); + aborted = true; + } + if jh == CombiJoinHandle::DropImmediately { + drop(handle.take().unwrap()); + } + + // === Wait for first poll === + let got_polled = rt.block_on(wait_first_poll).is_ok(); + if !got_polled { + // it's possible that we are aborted but still got polled + assert!(aborted); + } + + if abort == CombiAbort::AbortedFirstPoll { + handle.as_mut().unwrap().abort(); + aborted = true; + } + if jh == CombiJoinHandle::DropFirstPoll { + drop(handle.take().unwrap()); + } + + // Signal the future that it can return now + let _ = on_complete.send(()); + // === Wait for future to be dropped === + assert!(rt.block_on(wait_future_drop).is_ok()); + + if abort == CombiAbort::AbortedAfterFinish { + // Don't set aborted to true here as the task already finished + handle.as_mut().unwrap().abort(); + } + if jh == CombiJoinHandle::DropAfterNoConsume { + // Dropping the JoinHandle drops the output + let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { + drop(handle.take().unwrap()); + })); + assert_eq!( + panic.is_err(), + (output == CombiOutput::PanicOnDrop) + && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) + && !aborted + ); + } + + // Check whether we drop after consuming the output + if jh == CombiJoinHandle::DropAfterConsume { + // Using as_mut() to not immediately drop the handle + let result = rt.block_on(handle.as_mut().unwrap()); + + match result { + Ok(mut output) => { + // Don't panic here. + output.disarm(); + assert!(!aborted); + } + Err(err) if err.is_cancelled() => assert!(aborted), + Err(err) if err.is_panic() => { + assert!( + (task == CombiTask::PanicOnRun) + || (task == CombiTask::PanicOnDrop) + || (task == CombiTask::PanicOnRunAndDrop) + || (output == CombiOutput::PanicOnDrop) + ); + } + _ => unreachable!(), + } + + let handle = handle.take().unwrap(); + if abort == CombiAbort::AbortedAfterConsumeOutput { + handle.abort(); + } + drop(handle); + } + + // The output should have been dropped now. Check whether the output + // object was created at all. + let output_created = rt.block_on(wait_output_drop).is_ok(); + assert_eq!( + output_created, + (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted + ); +} From eda9559276d222b749895d44bf452bea702125a6 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 19 Jul 2021 15:35:57 +0200 Subject: [PATCH 02/15] Silence clippy lint --- tokio/tests/task_combinations.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/tests/task_combinations.rs b/tokio/tests/task_combinations.rs index afdc24a0e2a..78b608e7c28 100644 --- a/tokio/tests/task_combinations.rs +++ b/tokio/tests/task_combinations.rs @@ -42,6 +42,7 @@ enum CombiJoinInterest { Polled, NotPolled, } +#[allow(clippy::enum_variant_names)] // we aren't using glob imports #[derive(Copy, Clone, Debug, PartialEq)] enum CombiJoinHandle { DropImmediately = 1, From dcce6ec8da090f6913a5ca43ea2127c3c50fcec4 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 19 Jul 2021 16:51:04 +0200 Subject: [PATCH 03/15] Update to fix races in test --- tokio/tests/task_combinations.rs | 35 ++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/tokio/tests/task_combinations.rs b/tokio/tests/task_combinations.rs index 78b608e7c28..1ece0e2ce7a 100644 --- a/tokio/tests/task_combinations.rs +++ b/tokio/tests/task_combinations.rs @@ -128,7 +128,7 @@ fn test_combination( return; } - println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?} JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); + println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); // A runtime optionally with a LocalSet struct Rt { @@ -236,6 +236,10 @@ fn test_combination( // Wait for a signal, then complete the future let _ = signal.wait_complete.take().unwrap().await; + // If the task gets past wait_complete without yielding, then aborts + // may not be caught without this yield_now. + tokio::task::yield_now().await; + if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop { panic!("Panicking"); } @@ -270,7 +274,7 @@ fn test_combination( // If we want to poll the JoinHandle, do it now if ji == CombiJoinInterest::Polled { - assert!(handle.as_mut().unwrap().now_or_never().is_none()); + assert!(handle.as_mut().unwrap().now_or_never().is_none(), "Polling handle succeeded"); } if abort == CombiAbort::AbortedImmediately { @@ -285,7 +289,7 @@ fn test_combination( let got_polled = rt.block_on(wait_first_poll).is_ok(); if !got_polled { // it's possible that we are aborted but still got polled - assert!(aborted); + assert!(aborted, "Task completed without ever being polled but was not aborted."); } if abort == CombiAbort::AbortedFirstPoll { @@ -299,23 +303,26 @@ fn test_combination( // Signal the future that it can return now let _ = on_complete.send(()); // === Wait for future to be dropped === - assert!(rt.block_on(wait_future_drop).is_ok()); + assert!(rt.block_on(wait_future_drop).is_ok(), "The future should always be dropped."); if abort == CombiAbort::AbortedAfterFinish { // Don't set aborted to true here as the task already finished handle.as_mut().unwrap().abort(); } if jh == CombiJoinHandle::DropAfterNoConsume { - // Dropping the JoinHandle drops the output + // The runtime will usually have dropped every ref-count at this point, + // in which case dropping the JoinHandle drops the output. + // + // (But it might race and still hold a ref-count) let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { drop(handle.take().unwrap()); })); - assert_eq!( - panic.is_err(), + if panic.is_err() { + assert!( (output == CombiOutput::PanicOnDrop) && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) - && !aborted - ); + && !aborted, "Dropping JoinHandle shouldn't panic here"); + } } // Check whether we drop after consuming the output @@ -327,15 +334,16 @@ fn test_combination( Ok(mut output) => { // Don't panic here. output.disarm(); - assert!(!aborted); + assert!(!aborted, "Task was aborted but returned output"); } - Err(err) if err.is_cancelled() => assert!(aborted), + Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"), Err(err) if err.is_panic() => { assert!( (task == CombiTask::PanicOnRun) || (task == CombiTask::PanicOnDrop) || (task == CombiTask::PanicOnRunAndDrop) - || (output == CombiOutput::PanicOnDrop) + || (output == CombiOutput::PanicOnDrop), + "Panic but nothing should panic" ); } _ => unreachable!(), @@ -353,6 +361,7 @@ fn test_combination( let output_created = rt.block_on(wait_output_drop).is_ok(); assert_eq!( output_created, - (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted + (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted, + "Creation of output object" ); } From f56d94d66ed504de6c9c80d68c17fe240e10249c Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 19 Jul 2021 16:51:48 +0200 Subject: [PATCH 04/15] rustfmt --- tokio/tests/task_combinations.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/tokio/tests/task_combinations.rs b/tokio/tests/task_combinations.rs index 1ece0e2ce7a..89269ee1ed5 100644 --- a/tokio/tests/task_combinations.rs +++ b/tokio/tests/task_combinations.rs @@ -274,7 +274,10 @@ fn test_combination( // If we want to poll the JoinHandle, do it now if ji == CombiJoinInterest::Polled { - assert!(handle.as_mut().unwrap().now_or_never().is_none(), "Polling handle succeeded"); + assert!( + handle.as_mut().unwrap().now_or_never().is_none(), + "Polling handle succeeded" + ); } if abort == CombiAbort::AbortedImmediately { @@ -289,7 +292,10 @@ fn test_combination( let got_polled = rt.block_on(wait_first_poll).is_ok(); if !got_polled { // it's possible that we are aborted but still got polled - assert!(aborted, "Task completed without ever being polled but was not aborted."); + assert!( + aborted, + "Task completed without ever being polled but was not aborted." + ); } if abort == CombiAbort::AbortedFirstPoll { @@ -303,7 +309,10 @@ fn test_combination( // Signal the future that it can return now let _ = on_complete.send(()); // === Wait for future to be dropped === - assert!(rt.block_on(wait_future_drop).is_ok(), "The future should always be dropped."); + assert!( + rt.block_on(wait_future_drop).is_ok(), + "The future should always be dropped." + ); if abort == CombiAbort::AbortedAfterFinish { // Don't set aborted to true here as the task already finished @@ -319,9 +328,11 @@ fn test_combination( })); if panic.is_err() { assert!( - (output == CombiOutput::PanicOnDrop) - && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) - && !aborted, "Dropping JoinHandle shouldn't panic here"); + (output == CombiOutput::PanicOnDrop) + && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) + && !aborted, + "Dropping JoinHandle shouldn't panic here" + ); } } From 89ad298bce5ca71fb3a7abeacd5b65df77a9ff96 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 19 Jul 2021 17:00:36 +0200 Subject: [PATCH 05/15] Turn into loom test --- tokio/src/runtime/tests/loom_task.rs | 377 +++++++++++++++++++++++++++ tokio/src/runtime/tests/mod.rs | 1 + 2 files changed, 378 insertions(+) create mode 100644 tokio/src/runtime/tests/loom_task.rs diff --git a/tokio/src/runtime/tests/loom_task.rs b/tokio/src/runtime/tests/loom_task.rs new file mode 100644 index 00000000000..d306859e48c --- /dev/null +++ b/tokio/src/runtime/tests/loom_task.rs @@ -0,0 +1,377 @@ +use std::future::Future; +use std::panic; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::runtime::Builder; +use crate::sync::oneshot; +use crate::task::JoinHandle; + +use futures::future::FutureExt; + +// Enums for each option in the combinations being tested + +#[derive(Copy, Clone, Debug)] +enum CombiRuntime { + CurrentThread, + Multi1, + Multi2, +} +#[derive(Copy, Clone, Debug)] +enum CombiLocalSet { + Yes, + No, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiTask { + PanicOnRun, + PanicOnDrop, + PanicOnRunAndDrop, + NoPanic, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiOutput { + PanicOnDrop, + NoPanic, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiJoinInterest { + Polled, + NotPolled, +} +#[allow(clippy::enum_variant_names)] // we aren't using glob imports +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiJoinHandle { + DropImmediately = 1, + DropFirstPoll = 2, + DropAfterNoConsume = 3, + DropAfterConsume = 4, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiAbort { + NotAborted = 0, + AbortedImmediately = 1, + AbortedFirstPoll = 2, + AbortedAfterFinish = 3, + AbortedAfterConsumeOutput = 4, +} + +#[test] +fn test_combinations() { + let rt = [ + CombiRuntime::CurrentThread, + CombiRuntime::Multi1, + CombiRuntime::Multi2, + ]; + let ls = [CombiLocalSet::Yes, CombiLocalSet::No]; + let task = [ + CombiTask::NoPanic, + CombiTask::PanicOnRun, + CombiTask::PanicOnDrop, + CombiTask::PanicOnRunAndDrop, + ]; + let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop]; + let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled]; + let jh = [ + CombiJoinHandle::DropImmediately, + CombiJoinHandle::DropFirstPoll, + CombiJoinHandle::DropAfterNoConsume, + CombiJoinHandle::DropAfterConsume, + ]; + let abort = [ + CombiAbort::NotAborted, + CombiAbort::AbortedImmediately, + CombiAbort::AbortedFirstPoll, + CombiAbort::AbortedAfterFinish, + CombiAbort::AbortedAfterConsumeOutput, + ]; + + for rt in rt.iter().copied() { + for ls in ls.iter().copied() { + for task in task.iter().copied() { + for output in output.iter().copied() { + for ji in ji.iter().copied() { + for jh in jh.iter().copied() { + for abort in abort.iter().copied() { + loom::model(move || { + test_combination(rt, ls, task, output, ji, jh, abort); + }); + } + } + } + } + } + } + } +} + +fn test_combination( + rt: CombiRuntime, + ls: CombiLocalSet, + task: CombiTask, + output: CombiOutput, + ji: CombiJoinInterest, + jh: CombiJoinHandle, + abort: CombiAbort, +) { + if (jh as usize) < (abort as usize) { + // drop before abort not possible + return; + } + if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) { + // this causes double panic + return; + } + if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) { + // this causes double panic + return; + } + + println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); + + // A runtime optionally with a LocalSet + struct Rt { + rt: crate::runtime::Runtime, + ls: Option, + } + impl Rt { + fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self { + let rt = match rt { + CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(), + CombiRuntime::Multi1 => Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(), + CombiRuntime::Multi2 => Builder::new_multi_thread() + .worker_threads(2) + .build() + .unwrap(), + }; + + let ls = match ls { + CombiLocalSet::Yes => Some(crate::task::LocalSet::new()), + CombiLocalSet::No => None, + }; + + Self { rt, ls } + } + fn block_on(&self, task: T) -> T::Output + where + T: Future, + { + match &self.ls { + Some(ls) => ls.block_on(&self.rt, task), + None => self.rt.block_on(task), + } + } + fn spawn(&self, task: T) -> JoinHandle + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + match &self.ls { + Some(ls) => ls.spawn_local(task), + None => self.rt.spawn(task), + } + } + } + + // The type used for the output of the future + struct Output { + panic_on_drop: bool, + on_drop: Option>, + } + impl Output { + fn disarm(&mut self) { + self.panic_on_drop = false; + } + } + impl Drop for Output { + fn drop(&mut self) { + let _ = self.on_drop.take().unwrap().send(()); + if self.panic_on_drop { + panic!("Panicking"); + } + } + } + + // A wrapper around the future that is spawned + struct FutWrapper { + inner: F, + on_drop: Option>, + panic_on_drop: bool, + } + impl Future for FutWrapper { + type Output = F::Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unsafe { + let me = Pin::into_inner_unchecked(self); + let inner = Pin::new_unchecked(&mut me.inner); + inner.poll(cx) + } + } + } + impl Drop for FutWrapper { + fn drop(&mut self) { + let _ = self.on_drop.take().unwrap().send(()); + if self.panic_on_drop { + panic!("Panicking"); + } + } + } + + // The channels passed to the task + struct Signals { + on_first_poll: Option>, + wait_complete: Option>, + on_output_drop: Option>, + } + + // The task we will spawn + async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output { + // Signal that we have been polled once + let _ = signal.on_first_poll.take().unwrap().send(()); + + // Wait for a signal, then complete the future + let _ = signal.wait_complete.take().unwrap().await; + + // If the task gets past wait_complete without yielding, then aborts + // may not be caught without this yield_now. + crate::task::yield_now().await; + + if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop { + panic!("Panicking"); + } + + Output { + panic_on_drop: out == CombiOutput::PanicOnDrop, + on_drop: signal.on_output_drop.take(), + } + } + + let rt = Rt::new(rt, ls); + + let (on_first_poll, wait_first_poll) = oneshot::channel(); + let (on_complete, wait_complete) = oneshot::channel(); + let (on_future_drop, wait_future_drop) = oneshot::channel(); + let (on_output_drop, wait_output_drop) = oneshot::channel(); + let signal = Signals { + on_first_poll: Some(on_first_poll), + wait_complete: Some(wait_complete), + on_output_drop: Some(on_output_drop), + }; + + // === Spawn task === + let mut handle = Some(rt.spawn(FutWrapper { + inner: my_task(signal, task, output), + on_drop: Some(on_future_drop), + panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop, + })); + + // Keep track of whether the task has been killed with an abort + let mut aborted = false; + + // If we want to poll the JoinHandle, do it now + if ji == CombiJoinInterest::Polled { + assert!( + handle.as_mut().unwrap().now_or_never().is_none(), + "Polling handle succeeded" + ); + } + + if abort == CombiAbort::AbortedImmediately { + handle.as_mut().unwrap().abort(); + aborted = true; + } + if jh == CombiJoinHandle::DropImmediately { + drop(handle.take().unwrap()); + } + + // === Wait for first poll === + let got_polled = rt.block_on(wait_first_poll).is_ok(); + if !got_polled { + // it's possible that we are aborted but still got polled + assert!( + aborted, + "Task completed without ever being polled but was not aborted." + ); + } + + if abort == CombiAbort::AbortedFirstPoll { + handle.as_mut().unwrap().abort(); + aborted = true; + } + if jh == CombiJoinHandle::DropFirstPoll { + drop(handle.take().unwrap()); + } + + // Signal the future that it can return now + let _ = on_complete.send(()); + // === Wait for future to be dropped === + assert!( + rt.block_on(wait_future_drop).is_ok(), + "The future should always be dropped." + ); + + if abort == CombiAbort::AbortedAfterFinish { + // Don't set aborted to true here as the task already finished + handle.as_mut().unwrap().abort(); + } + if jh == CombiJoinHandle::DropAfterNoConsume { + // The runtime will usually have dropped every ref-count at this point, + // in which case dropping the JoinHandle drops the output. + // + // (But it might race and still hold a ref-count) + let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { + drop(handle.take().unwrap()); + })); + if panic.is_err() { + assert!( + (output == CombiOutput::PanicOnDrop) + && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) + && !aborted, + "Dropping JoinHandle shouldn't panic here" + ); + } + } + + // Check whether we drop after consuming the output + if jh == CombiJoinHandle::DropAfterConsume { + // Using as_mut() to not immediately drop the handle + let result = rt.block_on(handle.as_mut().unwrap()); + + match result { + Ok(mut output) => { + // Don't panic here. + output.disarm(); + assert!(!aborted, "Task was aborted but returned output"); + } + Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"), + Err(err) if err.is_panic() => { + assert!( + (task == CombiTask::PanicOnRun) + || (task == CombiTask::PanicOnDrop) + || (task == CombiTask::PanicOnRunAndDrop) + || (output == CombiOutput::PanicOnDrop), + "Panic but nothing should panic" + ); + } + _ => unreachable!(), + } + + let handle = handle.take().unwrap(); + if abort == CombiAbort::AbortedAfterConsumeOutput { + handle.abort(); + } + drop(handle); + } + + // The output should have been dropped now. Check whether the output + // object was created at all. + let output_created = rt.block_on(wait_output_drop).is_ok(); + assert_eq!( + output_created, + (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted, + "Creation of output object" + ); +} diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index 596e47dfd00..22031d7e546 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -26,6 +26,7 @@ cfg_loom! { mod loom_pool; mod loom_queue; mod loom_shutdown_join; + mod loom_task; } cfg_not_loom! { From 3765a17743387ef703c9beca6aeb63ab2fa22d8b Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 19 Jul 2021 17:08:35 +0200 Subject: [PATCH 06/15] Fix memory leak --- tokio/src/runtime/task/harness.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index d35647c2d94..55b20cf199a 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -112,6 +112,8 @@ where } pub(super) fn drop_join_handle_slow(self) { + let mut maybe_panic = None; + // Try to unset `JOIN_INTEREST`. This must be done as a first step in // case the task concurrently completed. if self.header().state.unset_join_interested().is_err() { @@ -120,11 +122,20 @@ where // the scheduler or `JoinHandle`. i.e. if the output remains in the // task structure until the task is deallocated, it may be dropped // by a Waker on any arbitrary thread. - self.core().stage.drop_future_or_output(); + let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { + self.core().stage.drop_future_or_output(); + })); + if let Err(panic) = panic { + maybe_panic = Some(panic); + } } // Drop the `JoinHandle` reference, possibly deallocating the task self.drop_reference(); + + if let Some(panic) = maybe_panic { + panic::resume_unwind(panic); + } } // ===== waker behavior ===== From 9088a486e3c189011202496d4a8317b2e710e9cd Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 19 Jul 2021 17:11:09 +0200 Subject: [PATCH 07/15] Delete old test file --- tokio/src/runtime/tests/loom_task.rs | 3 +- tokio/tests/task_combinations.rs | 378 --------------------------- 2 files changed, 1 insertion(+), 380 deletions(-) delete mode 100644 tokio/tests/task_combinations.rs diff --git a/tokio/src/runtime/tests/loom_task.rs b/tokio/src/runtime/tests/loom_task.rs index d306859e48c..db1a1949adc 100644 --- a/tokio/src/runtime/tests/loom_task.rs +++ b/tokio/src/runtime/tests/loom_task.rs @@ -93,6 +93,7 @@ fn test_combinations() { for ji in ji.iter().copied() { for jh in jh.iter().copied() { for abort in abort.iter().copied() { + println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); loom::model(move || { test_combination(rt, ls, task, output, ji, jh, abort); }); @@ -127,8 +128,6 @@ fn test_combination( return; } - println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); - // A runtime optionally with a LocalSet struct Rt { rt: crate::runtime::Runtime, diff --git a/tokio/tests/task_combinations.rs b/tokio/tests/task_combinations.rs deleted file mode 100644 index 89269ee1ed5..00000000000 --- a/tokio/tests/task_combinations.rs +++ /dev/null @@ -1,378 +0,0 @@ -#![warn(rust_2018_idioms)] -#![cfg(feature = "full")] - -use std::future::Future; -use std::panic; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use tokio::runtime::Builder; -use tokio::sync::oneshot; -use tokio::task::JoinHandle; - -use futures::future::FutureExt; - -// Enums for each option in the combinations being tested - -#[derive(Copy, Clone, Debug)] -enum CombiRuntime { - CurrentThread, - Multi1, - Multi2, -} -#[derive(Copy, Clone, Debug)] -enum CombiLocalSet { - Yes, - No, -} -#[derive(Copy, Clone, Debug, PartialEq)] -enum CombiTask { - PanicOnRun, - PanicOnDrop, - PanicOnRunAndDrop, - NoPanic, -} -#[derive(Copy, Clone, Debug, PartialEq)] -enum CombiOutput { - PanicOnDrop, - NoPanic, -} -#[derive(Copy, Clone, Debug, PartialEq)] -enum CombiJoinInterest { - Polled, - NotPolled, -} -#[allow(clippy::enum_variant_names)] // we aren't using glob imports -#[derive(Copy, Clone, Debug, PartialEq)] -enum CombiJoinHandle { - DropImmediately = 1, - DropFirstPoll = 2, - DropAfterNoConsume = 3, - DropAfterConsume = 4, -} -#[derive(Copy, Clone, Debug, PartialEq)] -enum CombiAbort { - NotAborted = 0, - AbortedImmediately = 1, - AbortedFirstPoll = 2, - AbortedAfterFinish = 3, - AbortedAfterConsumeOutput = 4, -} - -#[test] -fn test_combinations() { - let rt = [ - CombiRuntime::CurrentThread, - CombiRuntime::Multi1, - CombiRuntime::Multi2, - ]; - let ls = [CombiLocalSet::Yes, CombiLocalSet::No]; - let task = [ - CombiTask::NoPanic, - CombiTask::PanicOnRun, - CombiTask::PanicOnDrop, - CombiTask::PanicOnRunAndDrop, - ]; - let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop]; - let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled]; - let jh = [ - CombiJoinHandle::DropImmediately, - CombiJoinHandle::DropFirstPoll, - CombiJoinHandle::DropAfterNoConsume, - CombiJoinHandle::DropAfterConsume, - ]; - let abort = [ - CombiAbort::NotAborted, - CombiAbort::AbortedImmediately, - CombiAbort::AbortedFirstPoll, - CombiAbort::AbortedAfterFinish, - CombiAbort::AbortedAfterConsumeOutput, - ]; - - for rt in rt.iter().copied() { - for ls in ls.iter().copied() { - for task in task.iter().copied() { - for output in output.iter().copied() { - for ji in ji.iter().copied() { - for jh in jh.iter().copied() { - for abort in abort.iter().copied() { - test_combination(rt, ls, task, output, ji, jh, abort); - } - } - } - } - } - } - } -} - -fn test_combination( - rt: CombiRuntime, - ls: CombiLocalSet, - task: CombiTask, - output: CombiOutput, - ji: CombiJoinInterest, - jh: CombiJoinHandle, - abort: CombiAbort, -) { - if (jh as usize) < (abort as usize) { - // drop before abort not possible - return; - } - if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) { - // this causes double panic - return; - } - if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) { - // this causes double panic - return; - } - - println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); - - // A runtime optionally with a LocalSet - struct Rt { - rt: tokio::runtime::Runtime, - ls: Option, - } - impl Rt { - fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self { - let rt = match rt { - CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(), - CombiRuntime::Multi1 => Builder::new_multi_thread() - .worker_threads(1) - .build() - .unwrap(), - CombiRuntime::Multi2 => Builder::new_multi_thread() - .worker_threads(2) - .build() - .unwrap(), - }; - - let ls = match ls { - CombiLocalSet::Yes => Some(tokio::task::LocalSet::new()), - CombiLocalSet::No => None, - }; - - Self { rt, ls } - } - fn block_on(&self, task: T) -> T::Output - where - T: Future, - { - match &self.ls { - Some(ls) => ls.block_on(&self.rt, task), - None => self.rt.block_on(task), - } - } - fn spawn(&self, task: T) -> JoinHandle - where - T: Future + Send + 'static, - T::Output: Send + 'static, - { - match &self.ls { - Some(ls) => ls.spawn_local(task), - None => self.rt.spawn(task), - } - } - } - - // The type used for the output of the future - struct Output { - panic_on_drop: bool, - on_drop: Option>, - } - impl Output { - fn disarm(&mut self) { - self.panic_on_drop = false; - } - } - impl Drop for Output { - fn drop(&mut self) { - let _ = self.on_drop.take().unwrap().send(()); - if self.panic_on_drop { - panic!("Panicking"); - } - } - } - - // A wrapper around the future that is spawned - struct FutWrapper { - inner: F, - on_drop: Option>, - panic_on_drop: bool, - } - impl Future for FutWrapper { - type Output = F::Output; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - unsafe { - let me = Pin::into_inner_unchecked(self); - let inner = Pin::new_unchecked(&mut me.inner); - inner.poll(cx) - } - } - } - impl Drop for FutWrapper { - fn drop(&mut self) { - let _ = self.on_drop.take().unwrap().send(()); - if self.panic_on_drop { - panic!("Panicking"); - } - } - } - - // The channels passed to the task - struct Signals { - on_first_poll: Option>, - wait_complete: Option>, - on_output_drop: Option>, - } - - // The task we will spawn - async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output { - // Signal that we have been polled once - let _ = signal.on_first_poll.take().unwrap().send(()); - - // Wait for a signal, then complete the future - let _ = signal.wait_complete.take().unwrap().await; - - // If the task gets past wait_complete without yielding, then aborts - // may not be caught without this yield_now. - tokio::task::yield_now().await; - - if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop { - panic!("Panicking"); - } - - Output { - panic_on_drop: out == CombiOutput::PanicOnDrop, - on_drop: signal.on_output_drop.take(), - } - } - - let rt = Rt::new(rt, ls); - - let (on_first_poll, wait_first_poll) = oneshot::channel(); - let (on_complete, wait_complete) = oneshot::channel(); - let (on_future_drop, wait_future_drop) = oneshot::channel(); - let (on_output_drop, wait_output_drop) = oneshot::channel(); - let signal = Signals { - on_first_poll: Some(on_first_poll), - wait_complete: Some(wait_complete), - on_output_drop: Some(on_output_drop), - }; - - // === Spawn task === - let mut handle = Some(rt.spawn(FutWrapper { - inner: my_task(signal, task, output), - on_drop: Some(on_future_drop), - panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop, - })); - - // Keep track of whether the task has been killed with an abort - let mut aborted = false; - - // If we want to poll the JoinHandle, do it now - if ji == CombiJoinInterest::Polled { - assert!( - handle.as_mut().unwrap().now_or_never().is_none(), - "Polling handle succeeded" - ); - } - - if abort == CombiAbort::AbortedImmediately { - handle.as_mut().unwrap().abort(); - aborted = true; - } - if jh == CombiJoinHandle::DropImmediately { - drop(handle.take().unwrap()); - } - - // === Wait for first poll === - let got_polled = rt.block_on(wait_first_poll).is_ok(); - if !got_polled { - // it's possible that we are aborted but still got polled - assert!( - aborted, - "Task completed without ever being polled but was not aborted." - ); - } - - if abort == CombiAbort::AbortedFirstPoll { - handle.as_mut().unwrap().abort(); - aborted = true; - } - if jh == CombiJoinHandle::DropFirstPoll { - drop(handle.take().unwrap()); - } - - // Signal the future that it can return now - let _ = on_complete.send(()); - // === Wait for future to be dropped === - assert!( - rt.block_on(wait_future_drop).is_ok(), - "The future should always be dropped." - ); - - if abort == CombiAbort::AbortedAfterFinish { - // Don't set aborted to true here as the task already finished - handle.as_mut().unwrap().abort(); - } - if jh == CombiJoinHandle::DropAfterNoConsume { - // The runtime will usually have dropped every ref-count at this point, - // in which case dropping the JoinHandle drops the output. - // - // (But it might race and still hold a ref-count) - let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { - drop(handle.take().unwrap()); - })); - if panic.is_err() { - assert!( - (output == CombiOutput::PanicOnDrop) - && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) - && !aborted, - "Dropping JoinHandle shouldn't panic here" - ); - } - } - - // Check whether we drop after consuming the output - if jh == CombiJoinHandle::DropAfterConsume { - // Using as_mut() to not immediately drop the handle - let result = rt.block_on(handle.as_mut().unwrap()); - - match result { - Ok(mut output) => { - // Don't panic here. - output.disarm(); - assert!(!aborted, "Task was aborted but returned output"); - } - Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"), - Err(err) if err.is_panic() => { - assert!( - (task == CombiTask::PanicOnRun) - || (task == CombiTask::PanicOnDrop) - || (task == CombiTask::PanicOnRunAndDrop) - || (output == CombiOutput::PanicOnDrop), - "Panic but nothing should panic" - ); - } - _ => unreachable!(), - } - - let handle = handle.take().unwrap(); - if abort == CombiAbort::AbortedAfterConsumeOutput { - handle.abort(); - } - drop(handle); - } - - // The output should have been dropped now. Check whether the output - // object was created at all. - let output_created = rt.block_on(wait_output_drop).is_ok(); - assert_eq!( - output_created, - (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted, - "Creation of output object" - ); -} From 513ee71ea2c08c5e6ff302fb124ceb6944a12d69 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 19 Jul 2021 17:53:33 +0200 Subject: [PATCH 08/15] Panics can also happen in transition_to_complete --- tokio/src/runtime/task/harness.rs | 32 +++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 55b20cf199a..8cd649dc7f5 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -194,21 +194,25 @@ where // ====== internal ====== fn complete(self, output: super::Result, is_join_interested: bool) { - if is_join_interested { - // Store the output. The future has already been dropped - // - // Safety: Mutual exclusion is obtained by having transitioned the task - // state -> Running - let stage = &self.core().stage; - stage.store_output(output); - - // Transition to `Complete`, notifying the `JoinHandle` if necessary. - transition_to_complete(self.header(), stage, &self.trailer()); - } else { - let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + // We catch panics here because dropping the output may panic. + // + // Dropping the output can also happen in the first branch inside + // transition_to_complete. + let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + if is_join_interested { + // Store the output. The future has already been dropped + // + // Safety: Mutual exclusion is obtained by having transitioned the task + // state -> Running + let stage = &self.core().stage; + stage.store_output(output); + + // Transition to `Complete`, notifying the `JoinHandle` if necessary. + transition_to_complete(self.header(), stage, &self.trailer()); + } else { drop(output); - })); - } + } + })); // The task has completed execution and will no longer be scheduled. // From 18ec2ca19228d26bcf88f4fca2953784fcc4d2f1 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 20 Jul 2021 12:06:27 +0200 Subject: [PATCH 09/15] Change to normal and miri test --- .github/workflows/ci.yml | 3 +++ tokio/src/runtime/tests/mod.rs | 4 ++- .../{loom_task.rs => task_combinations.rs} | 26 +++++++++++-------- 3 files changed, 21 insertions(+), 12 deletions(-) rename tokio/src/runtime/tests/{loom_task.rs => task_combinations.rs} (95%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dd38a52c277..fde3ef4e6c9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -146,6 +146,9 @@ jobs: - name: miri run: cargo miri test --features rt,rt-multi-thread,sync task working-directory: tokio + env: + MIRIFLAGS: -Zmiri-disable-isolation + san: name: san runs-on: ubuntu-latest diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index 22031d7e546..6eacdd833e7 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -26,12 +26,14 @@ cfg_loom! { mod loom_pool; mod loom_queue; mod loom_shutdown_join; - mod loom_task; } cfg_not_loom! { mod queue; + // Runs both with and without miri + mod task_combinations; + #[cfg(miri)] mod task; } diff --git a/tokio/src/runtime/tests/loom_task.rs b/tokio/src/runtime/tests/task_combinations.rs similarity index 95% rename from tokio/src/runtime/tests/loom_task.rs rename to tokio/src/runtime/tests/task_combinations.rs index db1a1949adc..7c5bb26e420 100644 --- a/tokio/src/runtime/tests/loom_task.rs +++ b/tokio/src/runtime/tests/task_combinations.rs @@ -11,13 +11,13 @@ use futures::future::FutureExt; // Enums for each option in the combinations being tested -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq)] enum CombiRuntime { CurrentThread, Multi1, Multi2, } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq)] enum CombiLocalSet { Yes, No, @@ -58,11 +58,16 @@ enum CombiAbort { #[test] fn test_combinations() { - let rt = [ + let mut rt = &[ CombiRuntime::CurrentThread, CombiRuntime::Multi1, CombiRuntime::Multi2, - ]; + ][..]; + + if cfg!(miri) { + rt = &[CombiRuntime::CurrentThread]; + } + let ls = [CombiLocalSet::Yes, CombiLocalSet::No]; let task = [ CombiTask::NoPanic, @@ -94,9 +99,7 @@ fn test_combinations() { for jh in jh.iter().copied() { for abort in abort.iter().copied() { println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); - loom::model(move || { - test_combination(rt, ls, task, output, ji, jh, abort); - }); + test_combination(rt, ls, task, output, ji, jh, abort); } } } @@ -189,7 +192,7 @@ fn test_combination( fn drop(&mut self) { let _ = self.on_drop.take().unwrap().send(()); if self.panic_on_drop { - panic!("Panicking"); + panic!("Panicking in Output"); } } } @@ -212,9 +215,9 @@ fn test_combination( } impl Drop for FutWrapper { fn drop(&mut self) { - let _ = self.on_drop.take().unwrap().send(()); + let _: Result<(), ()> = self.on_drop.take().unwrap().send(()); if self.panic_on_drop { - panic!("Panicking"); + panic!("Panicking in FutWrapper"); } } } @@ -239,7 +242,7 @@ fn test_combination( crate::task::yield_now().await; if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop { - panic!("Panicking"); + panic!("Panicking in my_task on {:?}", std::thread::current().id()); } Output { @@ -373,4 +376,5 @@ fn test_combination( (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted, "Creation of output object" ); + } From c65bcb1e6e9e6be3b3f8aea2c6074f255a0c3390 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 20 Jul 2021 12:11:16 +0200 Subject: [PATCH 10/15] rustfmt --- tokio/src/runtime/tests/task_combinations.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio/src/runtime/tests/task_combinations.rs b/tokio/src/runtime/tests/task_combinations.rs index 7c5bb26e420..a7d23e4c805 100644 --- a/tokio/src/runtime/tests/task_combinations.rs +++ b/tokio/src/runtime/tests/task_combinations.rs @@ -376,5 +376,4 @@ fn test_combination( (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted, "Creation of output object" ); - } From 0712dcb86412517b110f4c8c40e0d5e7eb7adffc Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 20 Jul 2021 17:11:12 +0200 Subject: [PATCH 11/15] Temporarily run miri with nocapture --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fde3ef4e6c9..2d14b0d7a78 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -144,7 +144,7 @@ jobs: rm -rf tokio/tests - name: miri - run: cargo miri test --features rt,rt-multi-thread,sync task + run: cargo miri test --features rt,rt-multi-thread,sync task -- --nocapture working-directory: tokio env: MIRIFLAGS: -Zmiri-disable-isolation From 8cced74dd4146bfda7b6a314e42e9c4114e881fb Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Wed, 21 Jul 2021 10:30:25 +0200 Subject: [PATCH 12/15] Simplify double abort cases --- tokio/src/runtime/tests/task_combinations.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/tests/task_combinations.rs b/tokio/src/runtime/tests/task_combinations.rs index a7d23e4c805..d38f53d6a82 100644 --- a/tokio/src/runtime/tests/task_combinations.rs +++ b/tokio/src/runtime/tests/task_combinations.rs @@ -98,7 +98,6 @@ fn test_combinations() { for ji in ji.iter().copied() { for jh in jh.iter().copied() { for abort in abort.iter().copied() { - println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); test_combination(rt, ls, task, output, ji, jh, abort); } } @@ -126,11 +125,13 @@ fn test_combination( // this causes double panic return; } - if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) { + if task == CombiTask::PanicOnRunAndDrop { // this causes double panic return; } + println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); + // A runtime optionally with a LocalSet struct Rt { rt: crate::runtime::Runtime, From 28955cf1acde708cdad792c293793a842e67ae9f Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Wed, 21 Jul 2021 10:58:23 +0200 Subject: [PATCH 13/15] Run without nocapture --- .github/workflows/ci.yml | 2 +- tokio/src/runtime/tests/task_combinations.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2d14b0d7a78..fde3ef4e6c9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -144,7 +144,7 @@ jobs: rm -rf tokio/tests - name: miri - run: cargo miri test --features rt,rt-multi-thread,sync task -- --nocapture + run: cargo miri test --features rt,rt-multi-thread,sync task working-directory: tokio env: MIRIFLAGS: -Zmiri-disable-isolation diff --git a/tokio/src/runtime/tests/task_combinations.rs b/tokio/src/runtime/tests/task_combinations.rs index d38f53d6a82..76ce2330c2c 100644 --- a/tokio/src/runtime/tests/task_combinations.rs +++ b/tokio/src/runtime/tests/task_combinations.rs @@ -125,7 +125,7 @@ fn test_combination( // this causes double panic return; } - if task == CombiTask::PanicOnRunAndDrop { + if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) { // this causes double panic return; } From e9178fc50dbf0e119aef6ea9b7bd283bece9f642 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Wed, 21 Jul 2021 11:26:53 +0200 Subject: [PATCH 14/15] Disable miri test --- tokio/src/runtime/tests/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index 78f6ae00ff9..e0379a68c89 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -40,7 +40,7 @@ cfg_loom! { cfg_not_loom! { mod queue; - // Runs both with and without miri + #[cfg(not(miri))] mod task_combinations; #[cfg(miri)] From 4bcad575489905ea6c26a44fa5f245f102d5ce5a Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Wed, 21 Jul 2021 12:00:22 +0200 Subject: [PATCH 15/15] Remove CI changes --- .github/workflows/ci.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fde3ef4e6c9..dd38a52c277 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -146,9 +146,6 @@ jobs: - name: miri run: cargo miri test --features rt,rt-multi-thread,sync task working-directory: tokio - env: - MIRIFLAGS: -Zmiri-disable-isolation - san: name: san runs-on: ubuntu-latest