From de9bd6f69e7a8e0cfaf33617a7879bb15c31856d Mon Sep 17 00:00:00 2001 From: Noah Kennedy Date: Wed, 17 Aug 2022 14:10:45 -0500 Subject: [PATCH] chore: stabilize JoinSet and AbortHandle Closes #4535. This leaves the ID-related APIs unstable. --- tokio/src/runtime/task/abort.rs | 9 +--- tokio/src/runtime/task/join.rs | 1 - tokio/src/runtime/task/mod.rs | 4 +- tokio/src/task/join_set.rs | 82 +++++++++++++++++++++++++++------ tokio/src/task/mod.rs | 10 ++-- tokio/src/util/mod.rs | 6 +-- 6 files changed, 80 insertions(+), 32 deletions(-) diff --git a/tokio/src/runtime/task/abort.rs b/tokio/src/runtime/task/abort.rs index 3188394a8ea..b704bdafb45 100644 --- a/tokio/src/runtime/task/abort.rs +++ b/tokio/src/runtime/task/abort.rs @@ -10,15 +10,8 @@ use std::panic::{RefUnwindSafe, UnwindSafe}; /// The task may be aborted by calling the [`AbortHandle::abort`] method. /// Dropping an `AbortHandle` releases the permission to terminate the task /// --- it does *not* abort the task. -/// -/// **Note**: This is an [unstable API][unstable]. The public API of this type -/// may break in 1.x releases. See [the documentation on unstable -/// features][unstable] for details. -/// -/// [unstable]: crate#unstable-features /// [`JoinHandle`]: crate::task::JoinHandle -#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] -#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] pub struct AbortHandle { raw: Option, id: Id, diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index 018e1898dc8..927be1acf3e 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -260,7 +260,6 @@ impl JoinHandle { } /// Returns a new `AbortHandle` that can be used to remotely abort this task. - #[cfg(any(tokio_unstable, test))] pub(crate) fn abort_handle(&self) -> super::AbortHandle { let raw = self.raw.map(|raw| { raw.ref_inc(); diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 350b6442ad0..145f3f2cec5 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -155,11 +155,11 @@ cfg_rt_multi_thread! { pub(super) use self::inject::Inject; } -#[cfg(all(feature = "rt", any(tokio_unstable, test)))] +#[cfg(feature = "rt")] mod abort; mod join; -#[cfg(all(feature = "rt", any(tokio_unstable, test)))] +#[cfg(feature = "rt")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::abort::AbortHandle; diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index c3767c99d86..7139059f030 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -10,7 +10,9 @@ use std::pin::Pin; use std::task::{Context, Poll}; use crate::runtime::Handle; -use crate::task::{AbortHandle, Id, JoinError, JoinHandle, LocalSet}; +#[cfg(unstable)] +use crate::task::Id; +use crate::task::{AbortHandle, JoinError, JoinHandle, LocalSet}; use crate::util::IdleNotifiedSet; /// A collection of tasks spawned on a Tokio runtime. @@ -53,9 +55,7 @@ use crate::util::IdleNotifiedSet; /// } /// } /// ``` -/// -/// [unstable]: crate#unstable-features -#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] pub struct JoinSet { inner: IdleNotifiedSet>, } @@ -216,7 +216,7 @@ impl JoinSet { pub async fn join_next(&mut self) -> Option> { crate::future::poll_fn(|cx| self.poll_join_next(cx)) .await - .map(|opt| opt.map(|(_, res)| res)) + .map(|opt| opt) } /// Waits until one of the tasks in the set completes and returns its @@ -236,14 +236,10 @@ impl JoinSet { /// [task ID]: crate::task::Id /// [`JoinError::id`]: fn@crate::task::JoinError::id #[doc(alias = "join_one_with_id")] + #[cfg(unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] pub async fn join_next_with_id(&mut self) -> Option> { - crate::future::poll_fn(|cx| self.poll_join_next(cx)).await - } - - #[doc(hidden)] - #[deprecated(since = "1.20.0", note = "renamed to `JoinSet::join_next_with_id`")] - pub async fn join_one_with_id(&mut self) -> Option> { - self.join_next_with_id().await + crate::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await } /// Aborts all tasks and waits for them to finish shutting down. @@ -277,6 +273,62 @@ impl JoinSet { self.inner.drain(drop); } + /// Polls for one of the tasks in the set to complete. + /// + /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled + /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to + /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. + /// + /// # Returns + /// + /// This function returns: + /// + /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is + /// available right now. + /// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed. + /// The `value` is the return value of one of the tasks that completed. + /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been + /// aborted. The `err` is the `JoinError` from the panicked/aborted task. + /// * `Poll::Ready(None)` if the `JoinSet` is empty. + /// + /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. + /// This can happen if the [coop budget] is reached. + /// + /// [coop budget]: crate::task#cooperative-scheduling + fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to + // the `notified` list if the waker is notified in the `poll` call below. + let mut entry = match self.inner.pop_notified(cx.waker()) { + Some(entry) => entry, + None => { + if self.is_empty() { + return Poll::Ready(None); + } else { + // The waker was set by `pop_notified`. + return Poll::Pending; + } + } + }; + + let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx)); + + if let Poll::Ready(res) = res { + let _entry = entry.remove(); + // If the task succeeded, add the task ID to the output. Otherwise, the + // `JoinError` will already have the task's ID. + Poll::Ready(Some(res)) + } else { + // A JoinHandle generally won't emit a wakeup without being ready unless + // the coop limit has been reached. We yield to the executor in this + // case. + cx.waker().wake_by_ref(); + Poll::Pending + } + } + /// Polls for one of the tasks in the set to complete. /// /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set. @@ -304,7 +356,11 @@ impl JoinSet { /// /// [coop budget]: crate::task#cooperative-scheduling /// [task ID]: crate::task::Id - fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + #[cfg(unstable)] + fn poll_join_next_with_id( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to // the `notified` list if the waker is notified in the `poll` call below. let mut entry = match self.inner.pop_notified(cx.waker()) { diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index d2b634de09d..c4eb0c56743 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -307,11 +307,13 @@ cfg_rt! { mod unconstrained; pub use unconstrained::{unconstrained, Unconstrained}; + pub mod join_set; + #[doc(inline)] + pub use join_set::JoinSet; + pub use crate::runtime::task::AbortHandle; + cfg_unstable! { - pub mod join_set; - #[doc(inline)] - pub use join_set::JoinSet; - pub use crate::runtime::task::{Id, AbortHandle}; + pub use crate::runtime::task::Id; } cfg_trace! { diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 618f5543802..41a3bce051f 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -44,10 +44,8 @@ pub(crate) mod linked_list; mod rand; cfg_rt! { - cfg_unstable! { - mod idle_notified_set; - pub(crate) use idle_notified_set::IdleNotifiedSet; - } + mod idle_notified_set; + pub(crate) use idle_notified_set::IdleNotifiedSet; mod wake; pub(crate) use wake::WakerRef;