Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: add join_set::Builder for configuring JoinSet tasks #4687

Merged
merged 15 commits into from May 30, 2022
2 changes: 1 addition & 1 deletion tokio/src/task/builder.rs
Expand Up @@ -110,7 +110,7 @@ impl<'a> Builder<'a> {
/// Spawns `!Send` a task on the current [`LocalSet`] with this builder's
/// settings.
///
/// The spawned future will be run on the same thread that called `spawn_local.`
/// The spawned future will be run on the same thread that called `spawn_local`.
/// This may only be called from the context of a [local task set][`LocalSet`].
///
/// # Panics
Expand Down
157 changes: 156 additions & 1 deletion tokio/src/task/join_set.rs
@@ -1,3 +1,9 @@
//! A collection of tasks spawned on a Tokio runtime.
//!
//! This module provides the [`JoinSet`] type, a collection which stores a set
//! of spawned tasks and allows asynchronously awaiting the output of those
//! tasks as they complete. See the documentation for the [`JoinSet`] type for
//! details.
use std::fmt;
use std::future::Future;
use std::pin::Pin;
Expand Down Expand Up @@ -53,6 +59,18 @@ pub struct JoinSet<T> {
inner: IdleNotifiedSet<JoinHandle<T>>,
}

/// A variant of [`task::Builder`] that spawns tasks on a [`JoinSet`] rather
/// than on the current default runtime.
///
/// [`task::Builder`]: crate::task::Builder
#[cfg(all(tokio_unstable, feature = "tracing"))]
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
#[must_use = "builders do nothing unless used to spawn a task"]
pub struct Builder<'a, T> {
joinset: &'a mut JoinSet<T>,
builder: super::Builder<'a>,
}

impl<T> JoinSet<T> {
/// Create a new `JoinSet`.
pub fn new() -> Self {
Expand All @@ -73,6 +91,33 @@ impl<T> JoinSet<T> {
}

impl<T: 'static> JoinSet<T> {
/// Returns a [`Builder`] that can be used to configure a task prior to
/// spawning it on this `JoinSet`.
///
/// # Examples
///
/// ```
/// use tokio::task::JoinSet;
///
/// #[tokio::main]
/// async fn main() {
/// let mut set = JoinSet::new();
///
/// // Use the builder to configure a task's name before spawning it.
/// set.build_task()
/// .name("my_task")
/// .spawn(async { /* ... */ });
/// }
/// ```
#[cfg(all(tokio_unstable, feature = "tracing"))]
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
pub fn build_task(&mut self) -> Builder<'_, T> {
carllerche marked this conversation as resolved.
Show resolved Hide resolved
Builder {
builder: super::Builder::new(),
joinset: self,
}
}

/// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`]
/// that can be used to remotely cancel the task.
///
Expand Down Expand Up @@ -107,7 +152,7 @@ impl<T: 'static> JoinSet<T> {
}

/// Spawn the provided task on the current [`LocalSet`] and store it in this
/// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely
/// `JoinSet`, returning an [`AbortHandle`] that can be used to remotely
/// cancel the task.
///
/// # Panics
Expand Down Expand Up @@ -289,3 +334,113 @@ impl<T> Default for JoinSet<T> {
Self::new()
}
}

// === impl Builder ===

#[cfg(all(tokio_unstable, feature = "tracing"))]
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "tracing"))))]
impl<'a, T: 'static> Builder<'a, T> {

/// Assigns a name to the task which will be spawned.
pub fn name(self, name: &'a str) -> Self {
let builder = self.builder.name(name);
Self {
builder, ..self
}
}

/// Spawn the provided task with this builder's settings and store it in the
/// [`JoinSet`], returning an [`AbortHandle`] that can be used to remotely
/// cancel the task.
///
/// # Returns
///
/// An [`AbortHandle`] that can be used to remotely cancel the task.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime.
///
/// [`AbortHandle`]: crate::task::AbortHandle
#[track_caller]
pub fn spawn<F>(self, future: F) -> AbortHandle
where
F: Future<Output = T>,
F: Send + 'static,
T: Send,
{
self.joinset.insert(self.builder.spawn(future))
}


/// Spawn the provided task on the provided [runtime handle] with this
/// builder's settings, and store it in the [`JoinSet`].
///
/// # Returns
///
/// An [`AbortHandle`] that can be used to remotely cancel the task.
///
///
/// [`AbortHandle`]: crate::task::AbortHandle
/// [runtime handle]: crate::runtime::Handle
#[track_caller]
pub fn spawn_on<F>(mut self, future: F, handle: &Handle) -> AbortHandle
where
F: Future<Output = T>,
F: Send + 'static,
T: Send,
{
self.joinset.insert(self.builder.spawn_on(future, handle))
}

/// Spawn the provided task on the current [`LocalSet`] with this builder's
/// settings, and store it in the [`JoinSet`].
///
/// # Returns
///
/// An [`AbortHandle`] that can be used to remotely cancel the task.
///
/// # Panics
///
/// This method panics if it is called outside of a `LocalSet`.
///
/// [`LocalSet`]: crate::task::LocalSet
/// [`AbortHandle`]: crate::task::AbortHandle
#[track_caller]
pub fn spawn_local<F>(self, future: F) -> AbortHandle
where
F: Future<Output = T>,
F: 'static,
{
self.joinset.insert(self.builder.spawn_local(future))
}

/// Spawn the provided task on the provided [`LocalSet`] with this builder's
/// settings, and store it in the [`JoinSet`].
///
/// # Returns
///
/// An [`AbortHandle`] that can be used to remotely cancel the task.
///
/// [`LocalSet`]: crate::task::LocalSet
/// [`AbortHandle`]: crate::task::AbortHandle
#[track_caller]
pub fn spawn_local_on<F>(self, future: F, local_set: &LocalSet) -> AbortHandle
where
F: Future<Output = T>,
F: 'static,
{
self.joinset.insert(self.builder.spawn_local_on(future, local_set))
}
}

// Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `T` is
// `Debug`.
impl<'a, T> fmt::Debug for Builder<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("join_set::Builder")
.field("joinset", &self.joinset)
.field("builder", &self.builder)
.finish()
}
}
6 changes: 6 additions & 0 deletions tokio/src/task/local.rs
Expand Up @@ -502,6 +502,12 @@ impl LocalSet {
{
let handle = self.context.spawn(future, name);

// Because a task was spawned from *outside* the `JoinSet`, wake the
// `JoinSet` future to execute the new task, if it hasn't been woken.
//
// Spawning via the free fn `spawn` does not require this, as it can
// only be called from *within* a future executing on the `JoinSet` — in
// that case, the `JoinSet` must already be awake.
self.context.shared.waker.wake();
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
handle
}
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/task/mod.rs
Expand Up @@ -301,7 +301,8 @@ cfg_rt! {
pub use unconstrained::{unconstrained, Unconstrained};

cfg_unstable! {
mod join_set;
pub mod join_set;
#[doc(inline)]
pub use join_set::JoinSet;
pub use crate::runtime::task::{Id, AbortHandle};
}
Expand Down