diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 208d48c4d33..7044845d4a3 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -285,6 +285,17 @@ where self.cancel_task(); } + /// Remotely abort the task + /// + /// This is similar to `shutdown` except that it asks the runtime to perform + /// the shutdown. This is necessary to avoid the shutdown happening in the + /// wrong thread for non-Send tasks. + pub(super) fn remote_abort(self) { + if self.header().state.transition_to_notified_and_cancel() { + self.core().scheduler.schedule(Notified(self.to_task())); + } + } + // ====== internal ====== fn cancel_task(self) { diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index dedfb387949..2fe40a72195 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -192,7 +192,7 @@ impl JoinHandle { /// ``` pub fn abort(&self) { if let Some(raw) = self.raw { - raw.shutdown(); + raw.remote_abort(); } } } diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index cae56d037da..39336cee904 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -22,6 +22,9 @@ pub(super) struct Vtable { /// The join handle has been dropped pub(super) drop_join_handle_slow: unsafe fn(NonNull
), + /// The task is remotely aborted + pub(super) remote_abort: unsafe fn(NonNull
), + /// Scheduler is being shutdown pub(super) shutdown: unsafe fn(NonNull
), } @@ -33,6 +36,7 @@ pub(super) fn vtable() -> &'static Vtable { dealloc: dealloc::, try_read_output: try_read_output::, drop_join_handle_slow: drop_join_handle_slow::, + remote_abort: remote_abort::, shutdown: shutdown::, } } @@ -89,6 +93,11 @@ impl RawTask { let vtable = self.header().vtable; unsafe { (vtable.shutdown)(self.ptr) } } + + pub(super) fn remote_abort(self) { + let vtable = self.header().vtable; + unsafe { (vtable.remote_abort)(self.ptr) } + } } impl Clone for RawTask { @@ -125,6 +134,11 @@ unsafe fn drop_join_handle_slow(ptr: NonNull
) { harness.drop_join_handle_slow() } +unsafe fn remote_abort(ptr: NonNull
) { + let harness = Harness::::from_raw(ptr); + harness.remote_abort() +} + unsafe fn shutdown(ptr: NonNull
) { let harness = Harness::::from_raw(ptr); harness.shutdown() diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 21e90430db2..da0c567d1a8 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -177,6 +177,15 @@ impl State { prev.will_need_queueing() } + /// Set the cancelled bit and transition the state to `NOTIFIED`. + /// + /// Returns `true` if the task needs to be submitted to the pool for + /// execution + pub(super) fn transition_to_notified_and_cancel(&self) -> bool { + let prev = Snapshot(self.val.fetch_or(NOTIFIED | CANCELLED, AcqRel)); + prev.will_need_queueing() + } + /// Set the `CANCELLED` bit and attempt to transition to `Running`. /// /// Returns `true` if the transition to `Running` succeeded. diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index e84f19c3d03..b3838e2b07f 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -1,6 +1,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use std::thread::sleep; +use std::time::Duration; + /// Checks that a suspended task can be aborted without panicking as reported in /// issue #3157: . #[test] @@ -24,3 +27,49 @@ fn test_abort_without_panic_3157() { let _ = handle.await; }); } + +/// Checks that a suspended LocalSet task can be aborted from a remote thread +/// without panicking and without running the tasks destructor on the wrong thread. +/// +#[test] +fn remote_abort_local_set_3929() { + struct DropCheck { + created_on: std::thread::ThreadId, + not_send: std::marker::PhantomData<*const ()>, + } + + impl DropCheck { + fn new() -> Self { + Self { + created_on: std::thread::current().id(), + not_send: std::marker::PhantomData, + } + } + } + impl Drop for DropCheck { + fn drop(&mut self) { + if std::thread::current().id() != self.created_on { + panic!("non-Send value dropped in another thread!"); + } + } + } + + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + let local = tokio::task::LocalSet::new(); + + let check = DropCheck::new(); + let jh = local.spawn_local(async move { + futures::future::pending::<()>().await; + drop(check); + }); + + let jh2 = std::thread::spawn(move || { + sleep(Duration::from_millis(50)); + jh.abort(); + }); + + rt.block_on(local); + jh2.join().unwrap(); +}