diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index 5d4441b928f..d5b41e437f5 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -416,7 +416,6 @@ where /// * `None` if the `JoinMap` is empty. /// /// [`tokio::select!`]: tokio::select - #[doc(alias = "join_one")] pub async fn join_next(&mut self) -> Option<(K, Result)> { let (res, id) = match self.tasks.join_next_with_id().await { Some(Ok((id, output))) => (Ok(output), id), @@ -430,12 +429,6 @@ where Some((key, res)) } - #[doc(hidden)] - #[deprecated(since = "0.7.4", note = "renamed to `JoinMap::join_next`.")] - pub async fn join_one(&mut self) -> Option<(K, Result)> { - self.join_next().await - } - /// Aborts all tasks and waits for them to finish shutting down. /// /// Calling this method is equivalent to calling [`abort_all`] and then calling [`join_next`] in diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 3897de5340a..30f5abe06dd 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -347,7 +347,6 @@ //! //! Likewise, some parts of the API are only available with the same flag: //! -//! - [`task::JoinSet`] //! - [`task::Builder`] //! //! This flag enables **unstable** features. The public API of these features diff --git a/tokio/src/runtime/task/abort.rs b/tokio/src/runtime/task/abort.rs index 3188394a8ea..c34e2bb9a02 100644 --- a/tokio/src/runtime/task/abort.rs +++ b/tokio/src/runtime/task/abort.rs @@ -11,14 +11,8 @@ use std::panic::{RefUnwindSafe, UnwindSafe}; /// Dropping an `AbortHandle` releases the permission to terminate the task /// --- it does *not* abort the task. /// -/// **Note**: This is an [unstable API][unstable]. The public API of this type -/// may break in 1.x releases. See [the documentation on unstable -/// features][unstable] for details. -/// -/// [unstable]: crate#unstable-features /// [`JoinHandle`]: crate::task::JoinHandle -#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] -#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] pub struct AbortHandle { raw: Option, id: Id, @@ -40,9 +34,6 @@ impl AbortHandle { /// /// [cancelled]: method@super::error::JoinError::is_cancelled /// [`JoinHandle::abort`]: method@super::JoinHandle::abort - // the `AbortHandle` type is only publicly exposed when `tokio_unstable` is - // enabled, but it is still defined for testing purposes. - #[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] pub fn abort(&self) { if let Some(ref raw) = self.raw { raw.remote_abort(); @@ -55,7 +46,6 @@ impl AbortHandle { /// called on the task. This is because the cancellation process may take /// some time, and this method does not return `true` until it has /// completed. - #[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] pub fn is_finished(&self) -> bool { if let Some(raw) = self.raw { let state = raw.header().state.load(); diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index 018e1898dc8..927be1acf3e 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -260,7 +260,6 @@ impl JoinHandle { } /// Returns a new `AbortHandle` that can be used to remotely abort this task. - #[cfg(any(tokio_unstable, test))] pub(crate) fn abort_handle(&self) -> super::AbortHandle { let raw = self.raw.map(|raw| { raw.ref_inc(); diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 350b6442ad0..145f3f2cec5 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -155,11 +155,11 @@ cfg_rt_multi_thread! { pub(super) use self::inject::Inject; } -#[cfg(all(feature = "rt", any(tokio_unstable, test)))] +#[cfg(feature = "rt")] mod abort; mod join; -#[cfg(all(feature = "rt", any(tokio_unstable, test)))] +#[cfg(feature = "rt")] #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::abort::AbortHandle; diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index c3767c99d86..66e53a129d1 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -10,7 +10,9 @@ use std::pin::Pin; use std::task::{Context, Poll}; use crate::runtime::Handle; -use crate::task::{AbortHandle, Id, JoinError, JoinHandle, LocalSet}; +#[cfg(tokio_unstable)] +use crate::task::Id; +use crate::task::{AbortHandle, JoinError, JoinHandle, LocalSet}; use crate::util::IdleNotifiedSet; /// A collection of tasks spawned on a Tokio runtime. @@ -23,10 +25,6 @@ use crate::util::IdleNotifiedSet; /// /// When the `JoinSet` is dropped, all tasks in the `JoinSet` are immediately aborted. /// -/// **Note**: This is an [unstable API][unstable]. The public API of this type -/// may break in 1.x releases. See [the documentation on unstable -/// features][unstable] for details. -/// /// # Examples /// /// Spawn multiple tasks and wait for them. @@ -53,9 +51,7 @@ use crate::util::IdleNotifiedSet; /// } /// } /// ``` -/// -/// [unstable]: crate#unstable-features -#[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] pub struct JoinSet { inner: IdleNotifiedSet>, } @@ -197,12 +193,6 @@ impl JoinSet { abort } - #[doc(hidden)] - #[deprecated(since = "1.20.0", note = "renamed to `JoinSet::join_next`.")] - pub async fn join_one(&mut self) -> Option> { - self.join_next().await - } - /// Waits until one of the tasks in the set completes and returns its output. /// /// Returns `None` if the set is empty. @@ -212,11 +202,8 @@ impl JoinSet { /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!` /// statement and some other branch completes first, it is guaranteed that no tasks were /// removed from this `JoinSet`. - #[doc(alias = "join_one")] pub async fn join_next(&mut self) -> Option> { - crate::future::poll_fn(|cx| self.poll_join_next(cx)) - .await - .map(|opt| opt.map(|(_, res)| res)) + crate::future::poll_fn(|cx| self.poll_join_next(cx)).await } /// Waits until one of the tasks in the set completes and returns its @@ -235,15 +222,9 @@ impl JoinSet { /// /// [task ID]: crate::task::Id /// [`JoinError::id`]: fn@crate::task::JoinError::id - #[doc(alias = "join_one_with_id")] + #[cfg(tokio_unstable)] pub async fn join_next_with_id(&mut self) -> Option> { - crate::future::poll_fn(|cx| self.poll_join_next(cx)).await - } - - #[doc(hidden)] - #[deprecated(since = "1.20.0", note = "renamed to `JoinSet::join_next_with_id`")] - pub async fn join_one_with_id(&mut self) -> Option> { - self.join_next_with_id().await + crate::future::poll_fn(|cx| self.poll_join_next_with_id(cx)).await } /// Aborts all tasks and waits for them to finish shutting down. @@ -277,6 +258,62 @@ impl JoinSet { self.inner.drain(drop); } + /// Polls for one of the tasks in the set to complete. + /// + /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set. + /// + /// When the method returns `Poll::Pending`, the `Waker` in the provided `Context` is scheduled + /// to receive a wakeup when a task in the `JoinSet` completes. Note that on multiple calls to + /// `poll_join_next`, only the `Waker` from the `Context` passed to the most recent call is + /// scheduled to receive a wakeup. + /// + /// # Returns + /// + /// This function returns: + /// + /// * `Poll::Pending` if the `JoinSet` is not empty but there is no task whose output is + /// available right now. + /// * `Poll::Ready(Some(Ok(value)))` if one of the tasks in this `JoinSet` has completed. + /// The `value` is the return value of one of the tasks that completed. + /// * `Poll::Ready(Some(Err(err)))` if one of the tasks in this `JoinSet` has panicked or been + /// aborted. The `err` is the `JoinError` from the panicked/aborted task. + /// * `Poll::Ready(None)` if the `JoinSet` is empty. + /// + /// Note that this method may return `Poll::Pending` even if one of the tasks has completed. + /// This can happen if the [coop budget] is reached. + /// + /// [coop budget]: crate::task#cooperative-scheduling + fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to + // the `notified` list if the waker is notified in the `poll` call below. + let mut entry = match self.inner.pop_notified(cx.waker()) { + Some(entry) => entry, + None => { + if self.is_empty() { + return Poll::Ready(None); + } else { + // The waker was set by `pop_notified`. + return Poll::Pending; + } + } + }; + + let res = entry.with_value_and_context(|jh, ctx| Pin::new(jh).poll(ctx)); + + if let Poll::Ready(res) = res { + let _entry = entry.remove(); + // If the task succeeded, add the task ID to the output. Otherwise, the + // `JoinError` will already have the task's ID. + Poll::Ready(Some(res)) + } else { + // A JoinHandle generally won't emit a wakeup without being ready unless + // the coop limit has been reached. We yield to the executor in this + // case. + cx.waker().wake_by_ref(); + Poll::Pending + } + } + /// Polls for one of the tasks in the set to complete. /// /// If this returns `Poll::Ready(Some(_))`, then the task that completed is removed from the set. @@ -304,7 +341,11 @@ impl JoinSet { /// /// [coop budget]: crate::task#cooperative-scheduling /// [task ID]: crate::task::Id - fn poll_join_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + #[cfg(tokio_unstable)] + fn poll_join_next_with_id( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { // The call to `pop_notified` moves the entry to the `idle` list. It is moved back to // the `notified` list if the waker is notified in the `poll` call below. let mut entry = match self.inner.pop_notified(cx.waker()) { diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index d2b634de09d..c4eb0c56743 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -307,11 +307,13 @@ cfg_rt! { mod unconstrained; pub use unconstrained::{unconstrained, Unconstrained}; + pub mod join_set; + #[doc(inline)] + pub use join_set::JoinSet; + pub use crate::runtime::task::AbortHandle; + cfg_unstable! { - pub mod join_set; - #[doc(inline)] - pub use join_set::JoinSet; - pub use crate::runtime::task::{Id, AbortHandle}; + pub use crate::runtime::task::Id; } cfg_trace! { diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 618f5543802..41a3bce051f 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -44,10 +44,8 @@ pub(crate) mod linked_list; mod rand; cfg_rt! { - cfg_unstable! { - mod idle_notified_set; - pub(crate) use idle_notified_set::IdleNotifiedSet; - } + mod idle_notified_set; + pub(crate) use idle_notified_set::IdleNotifiedSet; mod wake; pub(crate) use wake::WakerRef;