diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index ff3f5954b56..ddb5c430ef1 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -110,7 +110,7 @@ impl<'a> Builder<'a> { /// Spawns `!Send` a task on the current [`LocalSet`] with this builder's /// settings. /// - /// The spawned future will be run on the same thread that called `spawn_local.` + /// The spawned future will be run on the same thread that called `spawn_local`. /// This may only be called from the context of a [local task set][`LocalSet`]. /// /// # Panics diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index 42f55a034cc..2619e343f46 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -1,3 +1,9 @@ +//! A collection of tasks spawned on a Tokio runtime. +//! +//! This module provides the [`JoinSet`] type, a collection which stores a set +//! of spawned tasks and allows asynchronously awaiting the output of those +//! tasks as they complete. See the documentation for the [`JoinSet`] type for +//! details. use std::fmt; use std::future::Future; use std::pin::Pin; @@ -53,6 +59,18 @@ pub struct JoinSet { inner: IdleNotifiedSet>, } +/// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather +/// than on the current default runtime. +/// +/// [`task::Builder`]: crate::task::Builder +#[cfg(all(tokio_unstable, feature = "tracing"))] +#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] +#[must_use = "builders do nothing unless used to spawn a task"] +pub struct Builder<'a, T> { + joinset: &'a mut JoinSet, + builder: super::Builder<'a>, +} + impl JoinSet { /// Create a new `JoinSet`. pub fn new() -> Self { @@ -73,6 +91,33 @@ impl JoinSet { } impl JoinSet { + /// Returns a [`Builder`] that can be used to configure a task prior to + /// spawning it on this `JoinSet`. + /// + /// # Examples + /// + /// ``` + /// use tokio::task::JoinSet; + /// + /// #[tokio::main] + /// async fn main() { + /// let mut set = JoinSet::new(); + /// + /// // Use the builder to configure a task's name before spawning it. + /// set.build_task() + /// .name("my_task") + /// .spawn(async { /* ... */ }); + /// } + /// ``` + #[cfg(all(tokio_unstable, feature = "tracing"))] + #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] + pub fn build_task(&mut self) -> Builder<'_, T> { + Builder { + builder: super::Builder::new(), + joinset: self, + } + } + /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`] /// that can be used to remotely cancel the task. /// @@ -107,7 +152,7 @@ impl JoinSet { } /// Spawn the provided task on the current [`LocalSet`] and store it in this - /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely + /// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely /// cancel the task. /// /// # Panics @@ -289,3 +334,112 @@ impl Default for JoinSet { Self::new() } } + +// === impl Builder === + +#[cfg(all(tokio_unstable, feature = "tracing"))] +#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] +impl<'a, T: 'static> Builder<'a, T> { + /// Assigns a name to the task which will be spawned. + pub fn name(self, name: &'a str) -> Self { + let builder = self.builder.name(name); + Self { builder, ..self } + } + + /// Spawn the provided task with this builder's settings and store it in the + /// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely + /// cancel the task. + /// + /// # Returns + /// + /// An [`AbortHandle`] that can be used to remotely cancel the task. + /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. + /// + /// [`AbortHandle`]: crate::task::AbortHandle + #[track_caller] + pub fn spawn(self, future: F) -> AbortHandle + where + F: Future, + F: Send + 'static, + T: Send, + { + self.joinset.insert(self.builder.spawn(future)) + } + + /// Spawn the provided task on the provided [runtime handle] with this + /// builder's settings, and store it in the [`JoinSet`]. + /// + /// # Returns + /// + /// An [`AbortHandle`] that can be used to remotely cancel the task. + /// + /// + /// [`AbortHandle`]: crate::task::AbortHandle + /// [runtime handle]: crate::runtime::Handle + #[track_caller] + pub fn spawn_on(mut self, future: F, handle: &Handle) -> AbortHandle + where + F: Future, + F: Send + 'static, + T: Send, + { + self.joinset.insert(self.builder.spawn_on(future, handle)) + } + + /// Spawn the provided task on the current [`LocalSet`] with this builder's + /// settings, and store it in the [`JoinSet`]. + /// + /// # Returns + /// + /// An [`AbortHandle`] that can be used to remotely cancel the task. + /// + /// # Panics + /// + /// This method panics if it is called outside of a `LocalSet`. + /// + /// [`LocalSet`]: crate::task::LocalSet + /// [`AbortHandle`]: crate::task::AbortHandle + #[track_caller] + pub fn spawn_local(self, future: F) -> AbortHandle + where + F: Future, + F: 'static, + { + self.joinset.insert(self.builder.spawn_local(future)) + } + + /// Spawn the provided task on the provided [`LocalSet`] with this builder's + /// settings, and store it in the [`JoinSet`]. + /// + /// # Returns + /// + /// An [`AbortHandle`] that can be used to remotely cancel the task. + /// + /// [`LocalSet`]: crate::task::LocalSet + /// [`AbortHandle`]: crate::task::AbortHandle + #[track_caller] + pub fn spawn_local_on(self, future: F, local_set: &LocalSet) -> AbortHandle + where + F: Future, + F: 'static, + { + self.joinset + .insert(self.builder.spawn_local_on(future, local_set)) + } +} + +// Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is +// `Debug`. +#[cfg(all(tokio_unstable, feature = "tracing"))] +#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))] +impl<'a, T> fmt::Debug for Builder<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("join_set::Builder") + .field("joinset", &self.joinset) + .field("builder", &self.builder) + .finish() + } +} diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index c3d874b6138..8c59876d603 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -502,6 +502,12 @@ impl LocalSet { { let handle = self.context.spawn(future, name); + // Because a task was spawned from *outside* the `LocalSet`, wake the + // `LocalSet` future to execute the new task, if it hasn't been woken. + // + // Spawning via the free fn `spawn` does not require this, as it can + // only be called from *within* a future executing on the `LocalSet` — + // in that case, the `LocalSet` must already be awake. self.context.shared.waker.wake(); handle } diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index cebc269bb40..320a8cad914 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -301,7 +301,8 @@ cfg_rt! { pub use unconstrained::{unconstrained, Unconstrained}; cfg_unstable! { - mod join_set; + pub mod join_set; + #[doc(inline)] pub use join_set::JoinSet; pub use crate::runtime::task::{Id, AbortHandle}; }