Skip to content

Commit

Permalink
task: add AbortHandle type for cancelling tasks in a JoinSet (#4530)
Browse files Browse the repository at this point in the history
## Motivation

Before we stabilize the `JoinSet` API, we intend to add a method for
individual tasks in the `JoinSet` to be aborted. Because the
`JoinHandle`s for the tasks spawned on a `JoinSet` are owned by the
`JoinSet`, the user can no longer use them to abort tasks on the
`JoinSet`. Therefore, we need another way to cause a remote abort of a
task on a `JoinSet` without holding its `JoinHandle`.

## Solution

This branch adds a new `AbortHandle` type in `tokio::task`, which
represents the owned permission to remotely cancel a task, but _not_ to
await its output. The `AbortHandle` type holds an additional reference
to the task cell.

A crate-private method is added to `JoinHandle` that returns an
`AbortHandle` for the same task, incrementing its ref count.
`AbortHandle` provides a single method, `AbortHandle::abort(self)`, that
remotely cancels the task. Dropping an `AbortHandle` decrements the
task's ref count but does not cancel it. The `AbortHandle` type is
currently marked as unstable.

The spawning methods on `JoinSet` are modified to return an
`AbortHandle` that can be used to cancel the spawned task.

## Future Work

- Currently, the `AbortHandle` type is _only_ available in the public
API through a `JoinSet`. We could also make the
`JoinHandle::abort_handle` method public, to allow users to use the
`AbortHandle` type in other contexts. I didn't do that in this PR,
because I wanted to make the API addition as minimal as possible, but we
could make this method public later.

- Currently, `AbortHandle` is not `Clone`. We could easily make it
`Clone` by incrementing the task's ref count. Since this adds more trait
impls to the API, we may want to be cautious about this, but I see no
obvious reason we would need to remove a `Clone` implementation if one
was added...

- There's been some discussion of adding a `JoinMap` type that allows
aborting tasks by key, and manages a hash map of keys to `AbortHandle`s,
and removes the tasks from the map when they complete. This would make
aborting by key much easier, since the user wouldn't have to worry about
keeping the state of the map of abort handles and the tasks actually
active on the `JoinSet` in sync. After thinking about it a bit, I
thought this is probably best as a `tokio-util` API --- it can currently
be implemented in `tokio-util` with the APIs added in `tokio` in this
PR.

- I noticed while working on this that `JoinSet::join_one` and
`JoinSet::poll_join_one` return a cancelled `JoinError` when a task is
cancelled. I'm not sure if I love this behavior --- it seems like it
would be nicer to just skip cancelled tasks and continue polling. But,
there are currently tests that expect a cancelled `JoinError` to be
returned for each cancelled task, so I didn't want to change it in
_this_ PR. I think this is worth revisiting before stabilizing the API,
though?

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Feb 24, 2022
1 parent dfac73d commit 8e0e56f
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 37 deletions.
69 changes: 69 additions & 0 deletions tokio/src/runtime/task/abort.rs
@@ -0,0 +1,69 @@
use crate::runtime::task::RawTask;
use std::fmt;
use std::panic::{RefUnwindSafe, UnwindSafe};

/// An owned permission to abort a spawned task, without awaiting its completion.
///
/// Unlike a [`JoinHandle`], an `AbortHandle` does *not* represent the
/// permission to await the task's completion, only to terminate it.
///
/// The task may be aborted by calling the [`AbortHandle::abort`] method.
/// Dropping an `AbortHandle` releases the permission to terminate the task
/// --- it does *not* abort the task.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [unstable]: crate#unstable-features
/// [`JoinHandle`]: crate::task::JoinHandle
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
pub struct AbortHandle {
raw: Option<RawTask>,
}

impl AbortHandle {
pub(super) fn new(raw: Option<RawTask>) -> Self {
Self { raw }
}

/// Abort the task associated with the handle.
///
/// Awaiting a cancelled task might complete as usual if the task was
/// already completed at the time it was cancelled, but most likely it
/// will fail with a [cancelled] `JoinError`.
///
/// If the task was already cancelled, such as by [`JoinHandle::abort`],
/// this method will do nothing.
///
/// [cancelled]: method@super::error::JoinError::is_cancelled
// the `AbortHandle` type is only publicly exposed when `tokio_unstable` is
// enabled, but it is still defined for testing purposes.
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
pub fn abort(self) {
if let Some(raw) = self.raw {
raw.remote_abort();
}
}
}

unsafe impl Send for AbortHandle {}
unsafe impl Sync for AbortHandle {}

impl UnwindSafe for AbortHandle {}
impl RefUnwindSafe for AbortHandle {}

impl fmt::Debug for AbortHandle {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("AbortHandle").finish()
}
}

impl Drop for AbortHandle {
fn drop(&mut self) {
if let Some(raw) = self.raw.take() {
raw.drop_abort_handle();
}
}
}
10 changes: 10 additions & 0 deletions tokio/src/runtime/task/join.rs
Expand Up @@ -210,6 +210,16 @@ impl<T> JoinHandle<T> {
}
}
}

/// Returns a new `AbortHandle` that can be used to remotely abort this task.
#[cfg(any(tokio_unstable, test))]
pub(crate) fn abort_handle(&self) -> super::AbortHandle {
let raw = self.raw.map(|raw| {
raw.ref_inc();
raw
});
super::AbortHandle::new(raw)
}
}

impl<T> Unpin for JoinHandle<T> {}
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -155,7 +155,14 @@ cfg_rt_multi_thread! {
pub(super) use self::inject::Inject;
}

#[cfg(all(feature = "rt", any(tokio_unstable, test)))]
mod abort;
mod join;

#[cfg(all(feature = "rt", any(tokio_unstable, test)))]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::abort::AbortHandle;

#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::join::JoinHandle;

Expand Down
21 changes: 21 additions & 0 deletions tokio/src/runtime/task/raw.rs
Expand Up @@ -27,6 +27,9 @@ pub(super) struct Vtable {
/// The join handle has been dropped.
pub(super) drop_join_handle_slow: unsafe fn(NonNull<Header>),

/// An abort handle has been dropped.
pub(super) drop_abort_handle: unsafe fn(NonNull<Header>),

/// The task is remotely aborted.
pub(super) remote_abort: unsafe fn(NonNull<Header>),

Expand All @@ -42,6 +45,7 @@ pub(super) fn vtable<T: Future, S: Schedule>() -> &'static Vtable {
try_read_output: try_read_output::<T, S>,
try_set_join_waker: try_set_join_waker::<T, S>,
drop_join_handle_slow: drop_join_handle_slow::<T, S>,
drop_abort_handle: drop_abort_handle::<T, S>,
remote_abort: remote_abort::<T, S>,
shutdown: shutdown::<T, S>,
}
Expand Down Expand Up @@ -104,6 +108,11 @@ impl RawTask {
unsafe { (vtable.drop_join_handle_slow)(self.ptr) }
}

pub(super) fn drop_abort_handle(self) {
let vtable = self.header().vtable;
unsafe { (vtable.drop_abort_handle)(self.ptr) }
}

pub(super) fn shutdown(self) {
let vtable = self.header().vtable;
unsafe { (vtable.shutdown)(self.ptr) }
Expand All @@ -113,6 +122,13 @@ impl RawTask {
let vtable = self.header().vtable;
unsafe { (vtable.remote_abort)(self.ptr) }
}

/// Increment the task's reference count.
///
/// Currently, this is used only when creating an `AbortHandle`.
pub(super) fn ref_inc(self) {
self.header().state.ref_inc();
}
}

impl Clone for RawTask {
Expand Down Expand Up @@ -154,6 +170,11 @@ unsafe fn drop_join_handle_slow<T: Future, S: Schedule>(ptr: NonNull<Header>) {
harness.drop_join_handle_slow()
}

unsafe fn drop_abort_handle<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.drop_reference();
}

unsafe fn remote_abort<T: Future, S: Schedule>(ptr: NonNull<Header>) {
let harness = Harness::<T, S>::from_raw(ptr);
harness.remote_abort()
Expand Down
38 changes: 38 additions & 0 deletions tokio/src/runtime/tests/task.rs
Expand Up @@ -78,6 +78,44 @@ fn create_drop2() {
handle.assert_dropped();
}

#[test]
fn drop_abort_handle1() {
let (ad, handle) = AssertDrop::new();
let (notified, join) = unowned(
async {
drop(ad);
unreachable!()
},
NoopSchedule,
);
let abort = join.abort_handle();
drop(join);
handle.assert_not_dropped();
drop(notified);
handle.assert_not_dropped();
drop(abort);
handle.assert_dropped();
}

#[test]
fn drop_abort_handle2() {
let (ad, handle) = AssertDrop::new();
let (notified, join) = unowned(
async {
drop(ad);
unreachable!()
},
NoopSchedule,
);
let abort = join.abort_handle();
drop(notified);
handle.assert_not_dropped();
drop(abort);
handle.assert_not_dropped();
drop(join);
handle.assert_dropped();
}

// Shutting down through Notified works
#[test]
fn create_shutdown1() {
Expand Down

0 comments on commit 8e0e56f

Please sign in to comment.