Skip to content

Commit

Permalink
chore: stabilize JoinSet and AbortHandle (#4920)
Browse files Browse the repository at this point in the history
Closes #4535.

This leaves the ID-related APIs unstable.
  • Loading branch information
Noah-Kennedy committed Aug 19, 2022
1 parent de81985 commit b67b8c1
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 56 deletions.
7 changes: 0 additions & 7 deletions tokio-util/src/task/join_map.rs
Expand Up @@ -416,7 +416,6 @@ where
/// * `None` if the `JoinMap` is empty.
///
/// [`tokio::select!`]: tokio::select
#[doc(alias = "join_one")]
pub async fn join_next(&mut self) -> Option<(K, Result<V, JoinError>)> {
let (res, id) = match self.tasks.join_next_with_id().await {
Some(Ok((id, output))) => (Ok(output), id),
Expand All @@ -430,12 +429,6 @@ where
Some((key, res))
}

#[doc(hidden)]
#[deprecated(since = "0.7.4", note = "renamed to `JoinMap::join_next`.")]
pub async fn join_one(&mut self) -> Option<(K, Result<V, JoinError>)> {
self.join_next().await
}

/// Aborts all tasks and waits for them to finish shutting down.
///
/// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in
Expand Down
1 change: 0 additions & 1 deletion tokio/src/lib.rs
Expand Up @@ -347,7 +347,6 @@
//!
//! Likewise, some parts of the API are only available with the same flag:
//!
//! - [`task::JoinSet`]
//! - [`task::Builder`]
//!
//! This flag enables **unstable** features. The public API of these features
Expand Down
10 changes: 10 additions & 0 deletions tokio/src/macros/cfg.rs
Expand Up @@ -406,6 +406,16 @@ macro_rules! cfg_unstable {
};
}

macro_rules! cfg_not_unstable {
($($item:item)*) => {
$(
#[cfg(not(tokio_unstable))]
#[cfg_attr(docsrs, doc(cfg(not(tokio_unstable))))]
$item
)*
};
}

macro_rules! cfg_not_trace {
($($item:item)*) => {
$(
Expand Down
12 changes: 1 addition & 11 deletions tokio/src/runtime/task/abort.rs
Expand Up @@ -11,14 +11,8 @@ use std::panic::{RefUnwindSafe, UnwindSafe};
/// Dropping an `AbortHandle` releases the permission to terminate the task
/// --- it does *not* abort the task.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// [unstable]: crate#unstable-features
/// [`JoinHandle`]: crate::task::JoinHandle
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
pub struct AbortHandle {
raw: Option<RawTask>,
id: Id,
Expand All @@ -40,9 +34,6 @@ impl AbortHandle {
///
/// [cancelled]: method@super::error::JoinError::is_cancelled
/// [`JoinHandle::abort`]: method@super::JoinHandle::abort
// the `AbortHandle` type is only publicly exposed when `tokio_unstable` is
// enabled, but it is still defined for testing purposes.
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
pub fn abort(&self) {
if let Some(ref raw) = self.raw {
raw.remote_abort();
Expand All @@ -55,7 +46,6 @@ impl AbortHandle {
/// called on the task. This is because the cancellation process may take
/// some time, and this method does not return `true` until it has
/// completed.
#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))]
pub fn is_finished(&self) -> bool {
if let Some(raw) = self.raw {
let state = raw.header().state.load();
Expand Down
1 change: 0 additions & 1 deletion tokio/src/runtime/task/join.rs
Expand Up @@ -260,7 +260,6 @@ impl<T> JoinHandle<T> {
}

/// Returns a new `AbortHandle` that can be used to remotely abort this task.
#[cfg(any(tokio_unstable, test))]
pub(crate) fn abort_handle(&self) -> super::AbortHandle {
let raw = self.raw.map(|raw| {
raw.ref_inc();
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/task/mod.rs
Expand Up @@ -155,11 +155,11 @@ cfg_rt_multi_thread! {
pub(super) use self::inject::Inject;
}

#[cfg(all(feature = "rt", any(tokio_unstable, test)))]
#[cfg(feature = "rt")]
mod abort;
mod join;

#[cfg(all(feature = "rt", any(tokio_unstable, test)))]
#[cfg(feature = "rt")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::abort::AbortHandle;

Expand Down
94 changes: 67 additions & 27 deletions tokio/src/task/join_set.rs
Expand Up @@ -10,7 +10,9 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use crate::runtime::Handle;
use crate::task::{AbortHandle, Id, JoinError, JoinHandle, LocalSet};
#[cfg(tokio_unstable)]
use crate::task::Id;
use crate::task::{AbortHandle, JoinError, JoinHandle, LocalSet};
use crate::util::IdleNotifiedSet;

/// A collection of tasks spawned on a Tokio runtime.
Expand All @@ -23,10 +25,6 @@ use crate::util::IdleNotifiedSet;
///
/// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
/// may break in 1.x releases. See [the documentation on unstable
/// features][unstable] for details.
///
/// # Examples
///
/// Spawn multiple tasks and wait for them.
Expand All @@ -53,9 +51,7 @@ use crate::util::IdleNotifiedSet;
/// }
/// }
/// ```
///
/// [unstable]: crate#unstable-features
#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))]
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
pub struct JoinSet<T> {
inner: IdleNotifiedSet<JoinHandle<T>>,
}
Expand Down Expand Up @@ -197,12 +193,6 @@ impl<T: 'static> JoinSet<T> {
abort
}

#[doc(hidden)]
#[deprecated(since = "1.20.0", note = "renamed to `JoinSet::join_next`.")]
pub async fn join_one(&mut self) -> Option<Result<T, JoinError>> {
self.join_next().await
}

/// Waits until one of the tasks in the set completes and returns its output.
///
/// Returns `None` if the set is empty.
Expand All @@ -212,11 +202,8 @@ impl<T: 'static> JoinSet<T> {
/// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!`
/// statement and some other branch completes first, it is guaranteed that no tasks were
/// removed from this `JoinSet`.
#[doc(alias = "join_one")]
pub async fn join_next(&mut self) -> Option<Result<T, JoinError>> {
crate::future::poll_fn(|cx| self.poll_join_next(cx))
.await
.map(|opt| opt.map(|(_, res)| res))
crate::future::poll_fn(|cx| self.poll_join_next(cx)).await
}

/// Waits until one of the tasks in the set completes and returns its
Expand All @@ -235,15 +222,10 @@ impl<T: 'static> JoinSet<T> {
///
/// [task ID]: crate::task::Id
/// [`JoinError::id`]: fn@crate::task::JoinError::id
#[doc(alias = "join_one_with_id")]
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub async fn join_next_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
crate::future::poll_fn(|cx| self.poll_join_next(cx)).await
}

#[doc(hidden)]
#[deprecated(since = "1.20.0", note = "renamed to `JoinSet::join_next_with_id`")]
pub async fn join_one_with_id(&mut self) -> Option<Result<(Id, T), JoinError>> {
self.join_next_with_id().await
crate::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await
}

/// Aborts all tasks and waits for them to finish shutting down.
Expand Down Expand Up @@ -277,6 +259,60 @@ impl<T: 'static> JoinSet<T> {
self.inner.drain(drop);
}

/// Polls for one of the tasks in the set to complete.
///
/// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
///
/// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled
/// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to
/// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is
/// scheduled to receive a wakeup.
///
/// # Returns
///
/// This function returns:
///
/// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is
/// available right now.
/// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed.
/// The `value` is the return value of one of the tasks that completed.
/// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been
/// aborted. The `err` is the `JoinError` from the panicked/aborted task.
/// * `Poll::Ready(None)` if the `JoinSet` is empty.
///
/// Note that this method may return `Poll::Pending` even if one of the tasks has completed.
/// This can happen if the [coop budget] is reached.
///
/// [coop budget]: crate::task#cooperative-scheduling
fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<T, JoinError>>> {
// The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
// the `notified` list if the waker is notified in the `poll` call below.
let mut entry = match self.inner.pop_notified(cx.waker()) {
Some(entry) => entry,
None => {
if self.is_empty() {
return Poll::Ready(None);
} else {
// The waker was set by `pop_notified`.
return Poll::Pending;
}
}
};

let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx));

if let Poll::Ready(res) = res {
let _entry = entry.remove();
Poll::Ready(Some(res))
} else {
// A JoinHandle generally won't emit a wakeup without being ready unless
// the coop limit has been reached. We yield to the executor in this
// case.
cx.waker().wake_by_ref();
Poll::Pending
}
}

/// Polls for one of the tasks in the set to complete.
///
/// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set.
Expand Down Expand Up @@ -304,7 +340,11 @@ impl<T: 'static> JoinSet<T> {
///
/// [coop budget]: crate::task#cooperative-scheduling
/// [task ID]: crate::task::Id
fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<(Id, T), JoinError>>> {
#[cfg(tokio_unstable)]
fn poll_join_next_with_id(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<(Id, T), JoinError>>> {
// The call to `pop_notified` moves the entry to the `idle` list. It is moved back to
// the `notified` list if the waker is notified in the `poll` call below.
let mut entry = match self.inner.pop_notified(cx.waker()) {
Expand Down
13 changes: 10 additions & 3 deletions tokio/src/task/mod.rs
Expand Up @@ -307,11 +307,18 @@ cfg_rt! {
mod unconstrained;
pub use unconstrained::{unconstrained, Unconstrained};

#[doc(inline)]
pub use join_set::JoinSet;
pub use crate::runtime::task::AbortHandle;

cfg_not_unstable! {
mod join_set;
}

cfg_unstable! {
pub use crate::runtime::task::Id;

pub mod join_set;
#[doc(inline)]
pub use join_set::JoinSet;
pub use crate::runtime::task::{Id, AbortHandle};
}

cfg_trace! {
Expand Down
6 changes: 2 additions & 4 deletions tokio/src/util/mod.rs
Expand Up @@ -44,10 +44,8 @@ pub(crate) mod linked_list;
mod rand;

cfg_rt! {
cfg_unstable! {
mod idle_notified_set;
pub(crate) use idle_notified_set::IdleNotifiedSet;
}
mod idle_notified_set;
pub(crate) use idle_notified_set::IdleNotifiedSet;

mod wake;
pub(crate) use wake::WakerRef;
Expand Down

0 comments on commit b67b8c1

Please sign in to comment.