From b49793b095d634d702c6e6d273d8230970132d36 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 12 May 2022 10:40:18 -0700 Subject: [PATCH 01/15] wip: sketch joinset builder API Signed-off-by: Eliza Weisman --- tokio/src/task/join_set.rs | 74 +++++++++++++++++++++++++++++++++++++- tokio/src/task/mod.rs | 3 +- 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 42f55a034cc..f0240f36a2a 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -53,6 +53,13 @@ pub struct JoinSet { inner: IdleNotifiedSet>, } +/// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather +/// than on the current default runtime. +pub struct Builder<'a, T> { + joinset: &'a mut JoinSet, + builder: super::Builder<'a>, +} + impl JoinSet { /// Create a new `JoinSet`. pub fn new() -> Self { @@ -73,6 +80,15 @@ impl JoinSet { } impl JoinSet { + /// Returns a [`Builder`] that can be used to configure a task prior to + /// spawning it on this `JoinSet`. + pub fn build_task(&mut self) -> Builder<'_, T> { + Builder { + builder: super::Builder::new(), + joinset: self, + } + } + /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`] /// that can be used to remotely cancel the task. /// @@ -107,7 +123,7 @@ impl JoinSet { } /// Spawn the provided task on the current [`LocalSet`] and store it in this - /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely + /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely /// cancel the task. /// /// # Panics @@ -289,3 +305,59 @@ impl Default for JoinSet { Self::new() } } + +// === impl Builder === +impl<'a, T: 'static> Builder<'a, T> { + + /// Assigns a name to the task which will be spawned. + pub fn name(self, name: &'a str) -> Self { + let builder = self.builder.name(name); + Self { + builder, ..self + } + } + + /// Spawn the provided task on the [`JoinSet`] with this builder's settings. + /// returning an [`AbortHandle`] that can be used to remotely cancel the task. + /// + /// # Returns + /// + /// An [`AbortHandle`] that can be used to remotely cancel the task. + /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. + /// + /// [`AbortHandle`]: crate::task::AbortHandle + #[track_caller] + pub fn spawn(self, future: F) -> AbortHandle + where + F: Future, + F: Send + 'static, + T: Send, + { + self.joinset.insert(self.builder.spawn(future)) + } + + /// Spawn the provided task on the current [`LocalSet`] with this builder's + /// settings, and store it in the [`JoinSet`]. + /// + /// # Returns + /// + /// An [`AbortHandle`] that can be used to remotely cancel the task. + /// + /// # Panics + /// + /// This method panics if it is called outside of a `LocalSet`. + /// + /// [`LocalSet`]: crate::task::LocalSet + /// [`AbortHandle`]: crate::task::AbortHandle + #[track_caller] + pub fn spawn_local(self, future: F) -> AbortHandle + where + F: Future, + F: 'static, + { + self.joinset.insert(self.builder.spawn_local(future)) + } +} \ No newline at end of file diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index cebc269bb40..320a8cad914 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -301,7 +301,8 @@ cfg_rt! { pub use unconstrained::{unconstrained, Unconstrained}; cfg_unstable! { - mod join_set; + pub mod join_set; + #[doc(inline)] pub use join_set::JoinSet; pub use crate::runtime::task::{Id, AbortHandle}; } From b999833574d45b96140d938074c3d72a9b2002c3 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 12 May 2022 11:18:27 -0700 Subject: [PATCH 02/15] add spawn_on variants to local set builder Signed-off-by: Eliza Weisman --- tokio/src/task/join_set.rs | 44 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index f0240f36a2a..370231984c4 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -317,8 +317,9 @@ impl<'a, T: 'static> Builder<'a, T> { } } - /// Spawn the provided task on the [`JoinSet`] with this builder's settings. - /// returning an [`AbortHandle`] that can be used to remotely cancel the task. + /// Spawn the provided task with this builder's settings and store it in the + /// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely + /// cancel the task. /// /// # Returns /// @@ -339,6 +340,27 @@ impl<'a, T: 'static> Builder<'a, T> { self.joinset.insert(self.builder.spawn(future)) } + + /// Spawn the provided task on the provided [runtime handle] with this + /// builder's settings, and store it in the [`JoinSet`]. + /// + /// # Returns + /// + /// An [`AbortHandle`] that can be used to remotely cancel the task. + /// + /// + /// [`AbortHandle`]: crate::task::AbortHandle + /// [runtime handle]: crate::runtime::Handle + #[track_caller] + pub fn spawn_on(self, future: F, handle: &Handle) -> AbortHandle + where + F: Future, + F: Send + 'static, + T: Send, + { + self.joinset.insert(self.builder.spawn_on(future, handle)) + } + /// Spawn the provided task on the current [`LocalSet`] with this builder's /// settings, and store it in the [`JoinSet`]. /// @@ -360,4 +382,22 @@ impl<'a, T: 'static> Builder<'a, T> { { self.joinset.insert(self.builder.spawn_local(future)) } + + /// Spawn the provided task on the provided [`LocalSet`] with this builder's + /// settings, and store it in the [`JoinSet`]. + /// + /// # Returns + /// + /// An [`AbortHandle`] that can be used to remotely cancel the task. + /// + /// [`LocalSet`]: crate::task::LocalSet + /// [`AbortHandle`]: crate::task::AbortHandle + #[track_caller] + pub fn spawn_local_on(self, future: F, local_set: &LocalSet) -> AbortHandle + where + F: Future, + F: 'static, + { + self.joinset.insert(self.builder.spawn_local_on(future, local_set)) + } } \ No newline at end of file From f918640a73fdcff6bf4d9f8b914ccacc73669244 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 12 May 2022 11:20:37 -0700 Subject: [PATCH 03/15] add docs Signed-off-by: Eliza Weisman --- tokio/src/task/join_set.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 370231984c4..ea91000e567 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -82,6 +82,22 @@ impl JoinSet { impl JoinSet { /// Returns a [`Builder`] that can be used to configure a task prior to /// spawning it on this `JoinSet`. + /// + /// # Examples + /// + /// ``` + /// use tokio::task::JoinSet; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut set = JoinSet::new(); + /// + /// // Use the builder to configure a task's name before spawning it. + /// set.build_task() + /// .name("my_task") + /// .spawn(async { /* ... */ }); + /// } + /// ``` pub fn build_task(&mut self) -> Builder<'_, T> { Builder { builder: super::Builder::new(), From 54dca628dc3f1fa2ba052ac9dc5d5908fec42001 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 12 May 2022 11:22:04 -0700 Subject: [PATCH 04/15] builder also requires the tracing feature Signed-off-by: Eliza Weisman --- tokio/src/task/join_set.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index ea91000e567..3ad27971669 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -55,6 +55,8 @@ pub struct JoinSet { /// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather /// than on the current default runtime. +#[cfg(all(tokio_unstable, feature = "tracing"))] +#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] pub struct Builder<'a, T> { joinset: &'a mut JoinSet, builder: super::Builder<'a>, @@ -98,6 +100,8 @@ impl JoinSet { /// .spawn(async { /* ... */ }); /// } /// ``` + #[cfg(all(tokio_unstable, feature = "tracing"))] + #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] pub fn build_task(&mut self) -> Builder<'_, T> { Builder { builder: super::Builder::new(), @@ -323,6 +327,9 @@ impl Default for JoinSet { } // === impl Builder === + +#[cfg(all(tokio_unstable, feature = "tracing"))] +#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] impl<'a, T: 'static> Builder<'a, T> { /// Assigns a name to the task which will be spawned. From d2b304481ee1da65c7efdcbaf635ea303a91fd73 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 12 May 2022 11:22:29 -0700 Subject: [PATCH 05/15] add attributes Signed-off-by: Eliza Weisman --- tokio/src/task/join_set.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 3ad27971669..6d87c104fee 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -57,6 +57,8 @@ pub struct JoinSet { /// than on the current default runtime. #[cfg(all(tokio_unstable, feature = "tracing"))] #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] +#[must_use = "builders do nothing unless used to spawn a task"] +#[derive(Debug)] pub struct Builder<'a, T> { joinset: &'a mut JoinSet, builder: super::Builder<'a>, From 5fcce1817e7547b85c06413b9988a246c09a2fb6 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 13 May 2022 12:35:24 -0700 Subject: [PATCH 06/15] fix docs Signed-off-by: Eliza Weisman --- tokio/src/task/join_set.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 6d87c104fee..8b4400e341f 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -55,6 +55,8 @@ pub struct JoinSet { /// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather /// than on the current default runtime. +/// +/// [`task::Builder`]: crate::task::Builder #[cfg(all(tokio_unstable, feature = "tracing"))] #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] #[must_use = "builders do nothing unless used to spawn a task"] From dfca79a9f3a97421a5c3acf9f5857cdd150ab4ba Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 13 May 2022 12:42:57 -0700 Subject: [PATCH 07/15] add top-level docs Signed-off-by: Eliza Weisman --- tokio/src/task/join_set.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 8b4400e341f..a0a41cd0b52 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -1,3 +1,9 @@ +//! A collection of tasks spawned on a Tokio runtime. +//! +//! This module provides the [`JoinSet`] type, a collection which stores a set +//! of spawned tasks and allows asynchronously awaiting the output of those +//! tasks as they complete. See the documentation for the [`JoinSet`] type for +//! details. use std::fmt; use std::future::Future; use std::pin::Pin; From 3523e012b3a3ae85ca0d9f558389a6cdda5c9718 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sat, 14 May 2022 10:06:26 -0700 Subject: [PATCH 08/15] placate freebsd ci for some reason Signed-off-by: Eliza Weisman --- tokio/src/task/join_set.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index a0a41cd0b52..9dd2da7e5cf 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -385,7 +385,7 @@ impl<'a, T: 'static> Builder<'a, T> { /// [`AbortHandle`]: crate::task::AbortHandle /// [runtime handle]: crate::runtime::Handle #[track_caller] - pub fn spawn_on(self, future: F, handle: &Handle) -> AbortHandle + pub fn spawn_on(mut self, future: F, handle: &Handle) -> AbortHandle where F: Future, F: Send + 'static, From 0369d3bc569a8c2f4da31492a1873d82cd6dcf1d Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 17 May 2022 10:48:42 -0700 Subject: [PATCH 09/15] Update tokio/src/task/builder.rs Co-authored-by: Alice Ryhl --- tokio/src/task/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index ff3f5954b56..ddb5c430ef1 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -110,7 +110,7 @@ impl<'a> Builder<'a> { /// Spawns `!Send` a task on the current [`LocalSet`] with this builder's /// settings. /// - /// The spawned future will be run on the same thread that called `spawn_local.` + /// The spawned future will be run on the same thread that called `spawn_local`. /// This may only be called from the context of a [local task set][`LocalSet`]. /// /// # Panics From 8b10f3a7af51a51202d387c1fd0472c481137a8a Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 17 May 2022 12:22:20 -0700 Subject: [PATCH 10/15] don't require `T: Debug` Signed-off-by: Eliza Weisman --- tokio/src/task/join_set.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 9dd2da7e5cf..1e6d0aba337 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -66,7 +66,6 @@ pub struct JoinSet { #[cfg(all(tokio_unstable, feature = "tracing"))] #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] #[must_use = "builders do nothing unless used to spawn a task"] -#[derive(Debug)] pub struct Builder<'a, T> { joinset: &'a mut JoinSet, builder: super::Builder<'a>, @@ -433,4 +432,15 @@ impl<'a, T: 'static> Builder<'a, T> { { self.joinset.insert(self.builder.spawn_local_on(future, local_set)) } +} + +// Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is +// `Debug`. +impl<'a, T> fmt::Debug for Builder<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("join_set::Builder") + .field("joinset", &self.joinset) + .field("builder", &self.builder) + .finish() + } } \ No newline at end of file From a71f75462e9c349bc7383472c8f2fc087a6f4fb8 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 17 May 2022 12:24:50 -0700 Subject: [PATCH 11/15] document joinset waking see https://github.com/tokio-rs/tokio/pull/4687#discussion_r875098338 Signed-off-by: Eliza Weisman --- tokio/src/task/local.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index c3d874b6138..0757d3ec7d1 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -502,6 +502,12 @@ impl LocalSet { { let handle = self.context.spawn(future, name); + // Because a task was spawned from *outside* the `JoinSet`, wake the + // `JoinSet` future to execute the new task, if it hasn't been woken. + // + // Spawning via the free fn `spawn` does not require this, as it can + // only be called from *within* a future executing on the `JoinSet` — in + // that case, the `JoinSet` must already be awake. self.context.shared.waker.wake(); handle } From 713eb958adfe38e214d3c2963ee11465d399ebc6 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 17 May 2022 12:25:30 -0700 Subject: [PATCH 12/15] trailing newline oops Signed-off-by: Eliza Weisman --- tokio/src/task/join_set.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 1e6d0aba337..9cdd3b31325 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -443,4 +443,4 @@ impl<'a, T> fmt::Debug for Builder<'a, T> { .field("builder", &self.builder) .finish() } -} \ No newline at end of file +} From 297da23f6d1fdce21cd8f6c893548a4df7140b8f Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 17 May 2022 12:44:45 -0700 Subject: [PATCH 13/15] s/joinset/localset Signed-off-by: Eliza Weisman --- tokio/src/task/local.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 0757d3ec7d1..8c59876d603 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -502,12 +502,12 @@ impl LocalSet { { let handle = self.context.spawn(future, name); - // Because a task was spawned from *outside* the `JoinSet`, wake the - // `JoinSet` future to execute the new task, if it hasn't been woken. + // Because a task was spawned from *outside* the `LocalSet`, wake the + // `LocalSet` future to execute the new task, if it hasn't been woken. // // Spawning via the free fn `spawn` does not require this, as it can - // only be called from *within* a future executing on the `JoinSet` — in - // that case, the `JoinSet` must already be awake. + // only be called from *within* a future executing on the `LocalSet` — + // in that case, the `LocalSet` must already be awake. self.context.shared.waker.wake(); handle } From c63fdfeebffe4e975cb280cc1a0bceab53c87f18 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 17 May 2022 12:45:33 -0700 Subject: [PATCH 14/15] rustfmt --- tokio/src/task/join_set.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 9cdd3b31325..4bb398e62cc 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -61,7 +61,7 @@ pub struct JoinSet { /// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather /// than on the current default runtime. -/// +/// /// [`task::Builder`]: crate::task::Builder #[cfg(all(tokio_unstable, feature = "tracing"))] #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] @@ -340,13 +340,10 @@ impl Default for JoinSet { #[cfg(all(tokio_unstable, feature = "tracing"))] #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] impl<'a, T: 'static> Builder<'a, T> { - /// Assigns a name to the task which will be spawned. pub fn name(self, name: &'a str) -> Self { let builder = self.builder.name(name); - Self { - builder, ..self - } + Self { builder, ..self } } /// Spawn the provided task with this builder's settings and store it in the @@ -372,7 +369,6 @@ impl<'a, T: 'static> Builder<'a, T> { self.joinset.insert(self.builder.spawn(future)) } - /// Spawn the provided task on the provided [runtime handle] with this /// builder's settings, and store it in the [`JoinSet`]. /// @@ -430,7 +426,8 @@ impl<'a, T: 'static> Builder<'a, T> { F: Future, F: 'static, { - self.joinset.insert(self.builder.spawn_local_on(future, local_set)) + self.joinset + .insert(self.builder.spawn_local_on(future, local_set)) } } From 3a1955956d086d5f11ad1ce49068fedee9e860ae Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 18 May 2022 09:35:34 -0700 Subject: [PATCH 15/15] fix missing cfg flag on builder impls Signed-off-by: Eliza Weisman --- tokio/src/task/join_set.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 4bb398e62cc..2619e343f46 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -433,6 +433,8 @@ impl<'a, T: 'static> Builder<'a, T> { // Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is // `Debug`. +#[cfg(all(tokio_unstable, feature = "tracing"))] +#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] impl<'a, T> fmt::Debug for Builder<'a, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("join_set::Builder")