From 84394949228d11d1f68925e26f36c435946b9d11 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 6 Jul 2021 20:00:49 +0200 Subject: [PATCH 1/4] runtime: remotely abort tasks on JoinHandle::abort --- tokio/src/runtime/task/harness.rs | 11 +++++++ tokio/src/runtime/task/join.rs | 2 +- tokio/src/runtime/task/raw.rs | 14 +++++++++ tokio/src/runtime/task/state.rs | 9 ++++++ tokio/tests/task_abort.rs | 49 +++++++++++++++++++++++++++++++ 5 files changed, 84 insertions(+), 1 deletion(-) diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 7d596e36e1a..c9c99c76949 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -164,6 +164,17 @@ where self.complete(Err(err), true) } + /// Remotely abort the task + /// + /// This is similar to `shutdown` except that it asks the runtime to perform + /// the shutdown. This is necessary to avoid the shutdown happening in the + /// wrong thread for non-Send tasks. + pub(super) fn remote_abort(self) { + if self.header().state.transition_to_notified_and_cancel() { + self.core().scheduler.schedule(Notified(self.to_task())); + } + } + // ====== internal ====== fn complete(self, output: super::Result, is_join_interested: bool) { diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index dedfb387949..2fe40a72195 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -192,7 +192,7 @@ impl JoinHandle { /// ``` pub fn abort(&self) { if let Some(raw) = self.raw { - raw.shutdown(); + raw.remote_abort(); } } } diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index cae56d037da..39336cee904 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -22,6 +22,9 @@ pub(super) struct Vtable { /// The join handle has been dropped pub(super) drop_join_handle_slow: unsafe fn(NonNull
), + /// The task is remotely aborted + pub(super) remote_abort: unsafe fn(NonNull
), + /// Scheduler is being shutdown pub(super) shutdown: unsafe fn(NonNull
), } @@ -33,6 +36,7 @@ pub(super) fn vtable() -> &'static Vtable { dealloc: dealloc::, try_read_output: try_read_output::, drop_join_handle_slow: drop_join_handle_slow::, + remote_abort: remote_abort::, shutdown: shutdown::, } } @@ -89,6 +93,11 @@ impl RawTask { let vtable = self.header().vtable; unsafe { (vtable.shutdown)(self.ptr) } } + + pub(super) fn remote_abort(self) { + let vtable = self.header().vtable; + unsafe { (vtable.remote_abort)(self.ptr) } + } } impl Clone for RawTask { @@ -125,6 +134,11 @@ unsafe fn drop_join_handle_slow(ptr: NonNull
) { harness.drop_join_handle_slow() } +unsafe fn remote_abort(ptr: NonNull
) { + let harness = Harness::::from_raw(ptr); + harness.remote_abort() +} + unsafe fn shutdown(ptr: NonNull
) { let harness = Harness::::from_raw(ptr); harness.shutdown() diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 21e90430db2..da0c567d1a8 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -177,6 +177,15 @@ impl State { prev.will_need_queueing() } + /// Set the cancelled bit and transition the state to `NOTIFIED`. + /// + /// Returns `true` if the task needs to be submitted to the pool for + /// execution + pub(super) fn transition_to_notified_and_cancel(&self) -> bool { + let prev = Snapshot(self.val.fetch_or(NOTIFIED | CANCELLED, AcqRel)); + prev.will_need_queueing() + } + /// Set the `CANCELLED` bit and attempt to transition to `Running`. /// /// Returns `true` if the transition to `Running` succeeded. diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index 1d72ac3a25c..adfb2b8875f 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -1,6 +1,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use std::thread::sleep; +use std::time::Duration; + /// Checks that a suspended task can be aborted without panicking as reported in /// issue #3157: . #[test] @@ -91,3 +94,49 @@ fn test_abort_without_panic_3662() { i.await.unwrap(); }); } + +/// Checks that a suspended LocalSet task can be aborted from a remote thread +/// without panicking and without running the tasks destructor on the wrong thread. +/// +#[test] +fn remote_abort_local_set_3929() { + struct DropCheck { + created_on: std::thread::ThreadId, + not_send: std::marker::PhantomData<*const ()>, + } + + impl DropCheck { + fn new() -> Self { + Self { + created_on: std::thread::current().id(), + not_send: std::marker::PhantomData, + } + } + } + impl Drop for DropCheck { + fn drop(&mut self) { + if std::thread::current().id() != self.created_on { + panic!("non-Send value dropped in another thread!"); + } + } + } + + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + let local = tokio::task::LocalSet::new(); + + let check = DropCheck::new(); + let jh = local.spawn_local(async move { + futures::future::pending::<()>().await; + drop(check); + }); + + let jh2 = std::thread::spawn(move || { + sleep(Duration::from_millis(50)); + jh.abort(); + }); + + rt.block_on(local); + jh2.join().unwrap(); +} From 9f8dafd47d3e3e96db84c080df25e2ad3b7535de Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 6 Jul 2021 20:23:55 +0200 Subject: [PATCH 2/4] fix CI --- tokio/tests/macros_select.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index ea06d515b57..07c2b81f5fa 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -359,9 +359,6 @@ async fn join_with_select() { async fn use_future_in_if_condition() { use tokio::time::{self, Duration}; - let sleep = time::sleep(Duration::from_millis(50)); - tokio::pin!(sleep); - tokio::select! { _ = time::sleep(Duration::from_millis(50)), if false => { panic!("if condition ignored") From 670fd04b2dd5ed7be681a0daddc3d94f8c2c3354 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 6 Jul 2021 20:33:45 +0200 Subject: [PATCH 3/4] Move drop_flag check --- tokio/tests/task_abort.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index adfb2b8875f..59b346a929b 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -65,7 +65,7 @@ fn test_abort_without_panic_3662() { // This runs in a separate thread so it doesn't have immediate // thread-local access to the executor. It does however transition // the underlying task to be completed, which will cause it to be - // dropped (in this thread no less). + // dropped (but not in this thread). assert!(!drop_flag2.load(Ordering::SeqCst)); j.abort(); // TODO: is this guaranteed at this point? @@ -75,8 +75,8 @@ fn test_abort_without_panic_3662() { .join() .unwrap(); - assert!(drop_flag.load(Ordering::SeqCst)); let result = task.await; + assert!(drop_flag.load(Ordering::SeqCst)); assert!(result.unwrap_err().is_cancelled()); // Note: We do the following to trigger a deferred task cleanup. From 6d6b9a0d321858c7f4fdf58162f64e270d6e720c Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 6 Jul 2021 21:21:46 +0200 Subject: [PATCH 4/4] chore: prepare Tokio v1.5.1 --- tokio/CHANGELOG.md | 8 ++++++++ tokio/Cargo.toml | 2 +- tokio/tests/task_abort.rs | 2 -- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/tokio/CHANGELOG.md b/tokio/CHANGELOG.md index 080892064de..0a24a762b86 100644 --- a/tokio/CHANGELOG.md +++ b/tokio/CHANGELOG.md @@ -1,3 +1,11 @@ +# 1.5.1 (July 6, 2021) + +### Fixed + +- runtime: remotely abort tasks on `JoinHandle::abort` ([#3934]) + +[#3934]: https://github.com/tokio-rs/tokio/pull/3934 + # 1.5.0 (April 12, 2021) ### Added diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 5e53c3f5d66..248d10d0286 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -7,7 +7,7 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.0.x" git tag. -version = "1.5.0" +version = "1.5.1" edition = "2018" authors = ["Tokio Contributors "] license = "MIT" diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index 59b346a929b..e96dcf0c5b4 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -68,8 +68,6 @@ fn test_abort_without_panic_3662() { // dropped (but not in this thread). assert!(!drop_flag2.load(Ordering::SeqCst)); j.abort(); - // TODO: is this guaranteed at this point? - // assert!(drop_flag2.load(Ordering::SeqCst)); j }) .join()