Skip to content

Commit

Permalink
chore: stabilize JoinSet and AbortHandle
Browse files Browse the repository at this point in the history
Closes #4535.

This leaves the ID-related APIs unstable.
  • Loading branch information
Noah-Kennedy committed Aug 17, 2022
1 parent 4b1c480 commit de9bd6f
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 32 deletions.
9 changes: 1 addition & 8 deletions tokio/src/runtime/task/abort.rs
Expand Up @@ -10,15 +10,8 @@ use std::panic::{RefUnwindSafe, UnwindSafe};
/// The task may be aborted by calling the [`AbortHandle::abort`] method.
/// Dropping an `AbortHandle` releases the permission to terminate the task
/// --- it does *not* abort the task.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [unstable]: crate#unstable-features
/// [`JoinHandle`]: crate::task::JoinHandle
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
pub struct AbortHandle {
raw: Option<RawTask>,
id: Id,
Expand Down
1 change: 0 additions & 1 deletion tokio/src/runtime/task/join.rs
Expand Up @@ -260,7 +260,6 @@ impl<T> JoinHandle<T> {
}

/// Returns a new `AbortHandle` that can be used to remotely abort this task.
#[cfg(any(tokio_unstable, test))]
pub(crate) fn abort_handle(&self) -> super::AbortHandle {
let raw = self.raw.map(|raw| {
raw.ref_inc();
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -155,11 +155,11 @@ cfg_rt_multi_thread! {
pub(super) use self::inject::Inject;
}

#[cfg(all(feature = "rt", any(tokio_unstable, test)))]
#[cfg(feature = "rt")]
mod abort;
mod join;

#[cfg(all(feature = "rt", any(tokio_unstable, test)))]
#[cfg(feature = "rt")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::abort::AbortHandle;

Expand Down
82 changes: 69 additions & 13 deletions tokio/src/task/join_set.rs
Expand Up @@ -10,7 +10,9 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use crate::runtime::Handle;
use crate::task::{AbortHandle, Id, JoinError, JoinHandle, LocalSet};
#[cfg(unstable)]
use crate::task::Id;
use crate::task::{AbortHandle, JoinError, JoinHandle, LocalSet};
use crate::util::IdleNotifiedSet;

/// A collection of tasks spawned on a Tokio runtime.
Expand Down Expand Up @@ -53,9 +55,7 @@ use crate::util::IdleNotifiedSet;
/// }
/// }
/// ```
///
/// [unstable]: crate#unstable-features
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
pub struct JoinSet<T> {
inner: IdleNotifiedSet<JoinHandle<T>>,
}
Expand Down Expand Up @@ -216,7 +216,7 @@ impl<T: 'static> JoinSet<T> {
pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
crate::future::poll_fn(|cx| self.poll_join_next(cx))
.await
.map(|opt| opt.map(|(_, res)| res))
.map(|opt| opt)
}

/// Waits until one of the tasks in the set completes and returns its
Expand All @@ -236,14 +236,10 @@ impl<T: 'static> JoinSet<T> {
/// [task ID]: crate::task::Id
/// [`JoinError::id`]: fn@crate::task::JoinError::id
#[doc(alias = "join_one_with_id")]
#[cfg(unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
crate::future::poll_fn(|cx| self.poll_join_next(cx)).await
}

#[doc(hidden)]
#[deprecated(since = "1.20.0", note = "renamed to `JoinSet::join_next_with_id`")]
pub async fn join_one_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
self.join_next_with_id().await
crate::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await
}

/// Aborts all tasks and waits for them to finish shutting down.
Expand Down Expand Up @@ -277,6 +273,62 @@ impl<T: 'static> JoinSet<T> {
self.inner.drain(drop);
}

/// Polls for one of the tasks in the set to complete.
///
/// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
/// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
/// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup.
///
/// # Returns
///
/// This function returns:
///
/// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
/// available right now.
/// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed.
/// The `value` is the return value of one of the tasks that completed.
/// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
/// aborted. The `err` is the `JoinError` from the panicked/aborted task.
/// * `Poll::Ready(None)` if the `JoinSet` is empty.
///
/// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
/// This can happen if the [coop budget] is reached.
///
/// [coop budget]: crate::task#cooperative-scheduling
fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> {
// The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
// the `notified` list if the waker is notified in the `poll` call below.
let mut entry = match self.inner.pop_notified(cx.waker()) {
Some(entry) => entry,
None => {
if self.is_empty() {
return Poll::Ready(None);
} else {
// The waker was set by `pop_notified`.
return Poll::Pending;
}
}
};

let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));

if let Poll::Ready(res) = res {
let _entry = entry.remove();
// If the task succeeded, add the task ID to the output. Otherwise, the
// `JoinError` will already have the task's ID.
Poll::Ready(Some(res))
} else {
// A JoinHandle generally won't emit a wakeup without being ready unless
// the coop limit has been reached. We yield to the executor in this
// case.
cx.waker().wake_by_ref();
Poll::Pending
}
}

/// Polls for one of the tasks in the set to complete.
///
/// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
Expand Down Expand Up @@ -304,7 +356,11 @@ impl<T: 'static> JoinSet<T> {
///
/// [coop budget]: crate::task#cooperative-scheduling
/// [task ID]: crate::task::Id
fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<(Id, T), JoinError>>> {
#[cfg(unstable)]
fn poll_join_next_with_id(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<(Id, T), JoinError>>> {
// The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
// the `notified` list if the waker is notified in the `poll` call below.
let mut entry = match self.inner.pop_notified(cx.waker()) {
Expand Down
10 changes: 6 additions & 4 deletions tokio/src/task/mod.rs
Expand Up @@ -307,11 +307,13 @@ cfg_rt! {
mod unconstrained;
pub use unconstrained::{unconstrained, Unconstrained};

pub mod join_set;
#[doc(inline)]
pub use join_set::JoinSet;
pub use crate::runtime::task::AbortHandle;

cfg_unstable! {
pub mod join_set;
#[doc(inline)]
pub use join_set::JoinSet;
pub use crate::runtime::task::{Id, AbortHandle};
pub use crate::runtime::task::Id;
}

cfg_trace! {
Expand Down
6 changes: 2 additions & 4 deletions tokio/src/util/mod.rs
Expand Up @@ -44,10 +44,8 @@ pub(crate) mod linked_list;
mod rand;

cfg_rt! {
cfg_unstable! {
mod idle_notified_set;
pub(crate) use idle_notified_set::IdleNotifiedSet;
}
mod idle_notified_set;
pub(crate) use idle_notified_set::IdleNotifiedSet;

mod wake;
pub(crate) use wake::WakerRef;
Expand Down

0 comments on commit de9bd6f

Please sign in to comment.