Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remotely abort tasks on JoinHandle::abort #3934

Merged
merged 4 commits into from Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions tokio/src/runtime/task/harness.rs
Expand Up @@ -164,6 +164,17 @@ where
self.complete(Err(err), true)
}

/// Remotely abort the task
///
/// This is similar to `shutdown` except that it asks the runtime to perform
/// the shutdown. This is necessary to avoid the shutdown happening in the
/// wrong thread for non-Send tasks.
pub(super) fn remote_abort(self) {
if self.header().state.transition_to_notified_and_cancel() {
self.core().scheduler.schedule(Notified(self.to_task()));
}
}

// ====== internal ======

fn complete(self, output: super::Result<T::Output>, is_join_interested: bool) {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/join.rs
Expand Up @@ -192,7 +192,7 @@ impl<T> JoinHandle<T> {
/// ```
pub fn abort(&self) {
if let Some(raw) = self.raw {
raw.shutdown();
raw.remote_abort();
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions tokio/src/runtime/task/raw.rs
Expand Up @@ -22,6 +22,9 @@ pub(super) struct Vtable {
/// The join handle has been dropped
pub(super) drop_join_handle_slow: unsafe fn(NonNull<Header>),

/// The task is remotely aborted
pub(super) remote_abort: unsafe fn(NonNull<Header>),

/// Scheduler is being shutdown
pub(super) shutdown: unsafe fn(NonNull<Header>),
}
Expand All @@ -33,6 +36,7 @@ pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
dealloc: dealloc::<T, S>,
try_read_output: try_read_output::<T, S>,
drop_join_handle_slow: drop_join_handle_slow::<T, S>,
remote_abort: remote_abort::<T, S>,
shutdown: shutdown::<T, S>,
}
}
Expand Down Expand Up @@ -89,6 +93,11 @@ impl RawTask {
let vtable = self.header().vtable;
unsafe { (vtable.shutdown)(self.ptr) }
}

pub(super) fn remote_abort(self) {
let vtable = self.header().vtable;
unsafe { (vtable.remote_abort)(self.ptr) }
}
}

impl Clone for RawTask {
Expand Down Expand Up @@ -125,6 +134,11 @@ unsafe fn drop_join_handle_slow<T: Future, S: Schedule>(ptr: NonNull<Header>) {
harness.drop_join_handle_slow()
}

unsafe fn remote_abort<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.remote_abort()
}

unsafe fn shutdown<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.shutdown()
Expand Down
9 changes: 9 additions & 0 deletions tokio/src/runtime/task/state.rs
Expand Up @@ -177,6 +177,15 @@ impl State {
prev.will_need_queueing()
}

/// Set the cancelled bit and transition the state to `NOTIFIED`.
///
/// Returns `true` if the task needs to be submitted to the pool for
/// execution
pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
let prev = Snapshot(self.val.fetch_or(NOTIFIED | CANCELLED, AcqRel));
prev.will_need_queueing()
}

/// Set the `CANCELLED` bit and attempt to transition to `Running`.
///
/// Returns `true` if the transition to `Running` succeeded.
Expand Down
49 changes: 49 additions & 0 deletions tokio/tests/task_abort.rs
@@ -1,6 +1,9 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use std::thread::sleep;
use std::time::Duration;

/// Checks that a suspended task can be aborted without panicking as reported in
/// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>.
#[test]
Expand Down Expand Up @@ -91,3 +94,49 @@ fn test_abort_without_panic_3662() {
i.await.unwrap();
});
}

/// Checks that a suspended LocalSet task can be aborted from a remote thread
/// without panicking and without running the tasks destructor on the wrong thread.
/// <https://github.com/tokio-rs/tokio/issues/3929>
#[test]
fn remote_abort_local_set_3929() {
struct DropCheck {
created_on: std::thread::ThreadId,
not_send: std::marker::PhantomData<*const ()>,
}

impl DropCheck {
fn new() -> Self {
Self {
created_on: std::thread::current().id(),
not_send: std::marker::PhantomData,
}
}
}
impl Drop for DropCheck {
fn drop(&mut self) {
if std::thread::current().id() != self.created_on {
panic!("non-Send value dropped in another thread!");
}
}
}

let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let local = tokio::task::LocalSet::new();

let check = DropCheck::new();
let jh = local.spawn_local(async move {
futures::future::pending::<()>().await;
drop(check);
});

let jh2 = std::thread::spawn(move || {
sleep(Duration::from_millis(50));
jh.abort();
});

rt.block_on(local);
jh2.join().unwrap();
}