Skip to content

Commit

Permalink
task: add Builder::{spawn_on, spawn_local_on, spawn_blocking_on} (#…
Browse files Browse the repository at this point in the history
…4683)

## Motivation

`task::JoinSet` currently has both `spawn`/`spawn_local` methods,
and `spawn_on`/`spawn_local_on` variants of these methods that take a
reference to a runtime `Handle` or to a `LocalSet`, and spawn tasks on
the provided runtime/`LocalSet`, rather than the current one. The
`task::Builder` type is _also_ an API type that can spawn tasks, but it
doesn't have `spawn_on` variants of its methods. It occurred to me that
it would be nice to have similar APIs on `task::Builder`.

## Solution

This branch adds `task::Builder::spawn_on`,
`task::Builder::spawn_local_on`, and `task::Builder::spawn_blocking_on`
methods, similar to those on `JoinSet`. In addition, I did some
refactoring of the internal spawning APIs --- there was a bit of
duplicated code that this PR reduces.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed May 14, 2022
1 parent ce0e115 commit b1557ea
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 34 deletions.
17 changes: 13 additions & 4 deletions tokio/src/runtime/handle.rs
Expand Up @@ -175,10 +175,7 @@ impl Handle {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let id = crate::runtime::task::Id::next();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", None, id.as_u64());
self.spawner.spawn(future, id)
self.spawn_named(future, None)
}

/// Runs the provided function on an executor dedicated to blocking.
Expand Down Expand Up @@ -301,6 +298,18 @@ impl Handle {
.expect("failed to park thread")
}

#[track_caller]
pub(crate) fn spawn_named<F>(&self, future: F, _name: Option<&str>) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let id = crate::runtime::task::Id::next();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", _name, id.as_u64());
self.spawner.spawn(future, id)
}

pub(crate) fn shutdown(mut self) {
self.spawner.shutdown();
}
Expand Down
85 changes: 78 additions & 7 deletions tokio/src/task/builder.rs
@@ -1,5 +1,8 @@
#![allow(unreachable_pub)]
use crate::{runtime::context, task::JoinHandle};
use crate::{
runtime::{context, Handle},
task::{JoinHandle, LocalSet},
};
use std::future::Future;

/// Factory which is used to configure the properties of a new task.
Expand Down Expand Up @@ -71,7 +74,11 @@ impl<'a> Builder<'a> {
Self { name: Some(name) }
}

/// Spawns a task on the executor.
/// Spawns a task with this builder's settings on the current runtime.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime.
///
/// See [`task::spawn`](crate::task::spawn) for
/// more details.
Expand All @@ -84,10 +91,36 @@ impl<'a> Builder<'a> {
super::spawn::spawn_inner(future, self.name)
}

/// Spawns a task on the current thread.
/// Spawn a task with this builder's settings on the provided [runtime
/// handle].
///
/// See [`task::spawn_local`](crate::task::spawn_local)
/// for more details.
/// See [`Handle::spawn`] for more details.
///
/// [runtime handle]: crate::runtime::Handle
/// [`Handle::spawn`]: crate::runtime::Handle::spawn
#[track_caller]
pub fn spawn_on<Fut>(&mut self, future: Fut, handle: &Handle) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
handle.spawn_named(future, self.name)
}

/// Spawns `!Send` a task on the current [`LocalSet`] with this builder's
/// settings.
///
/// The spawned future will be run on the same thread that called `spawn_local.`
/// This may only be called from the context of a [local task set][`LocalSet`].
///
/// # Panics
///
/// This function panics if called outside of a [local task set][`LocalSet`].
///
/// See [`task::spawn_local`] for more details.
///
/// [`task::spawn_local`]: crate::task::spawn_local
/// [`LocalSet`]: crate::task::LocalSet
#[track_caller]
pub fn spawn_local<Fut>(self, future: Fut) -> JoinHandle<Fut::Output>
where
Expand All @@ -97,23 +130,61 @@ impl<'a> Builder<'a> {
super::local::spawn_local_inner(future, self.name)
}

/// Spawns `!Send` a task on the provided [`LocalSet`] with this builder's
/// settings.
///
/// See [`LocalSet::spawn_local`] for more details.
///
/// [`LocalSet::spawn_local`]: crate::task::LocalSet::spawn_local
/// [`LocalSet`]: crate::task::LocalSet
#[track_caller]
pub fn spawn_local_on<Fut>(self, future: Fut, local_set: &LocalSet) -> JoinHandle<Fut::Output>
where
Fut: Future + 'static,
Fut::Output: 'static,
{
local_set.spawn_named(future, self.name)
}

/// Spawns blocking code on the blocking threadpool.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime.
///
/// See [`task::spawn_blocking`](crate::task::spawn_blocking)
/// for more details.
#[track_caller]
pub fn spawn_blocking<Function, Output>(self, function: Function) -> JoinHandle<Output>
where
Function: FnOnce() -> Output + Send + 'static,
Output: Send + 'static,
{
self.spawn_blocking_on(function, &context::current())
}

/// Spawns blocking code on the provided [runtime handle]'s blocking threadpool.
///
/// See [`Handle::spawn_blocking`] for more details.
///
/// [runtime handle]: crate::runtime::Handle
/// [`Handle::spawn_blocking`]: crate::runtime::Handle::spawn_blocking
#[track_caller]
pub fn spawn_blocking_on<Function, Output>(
self,
function: Function,
handle: &Handle,
) -> JoinHandle<Output>
where
Function: FnOnce() -> Output + Send + 'static,
Output: Send + 'static,
{
use crate::runtime::Mandatory;
let handle = context::current();
let (join_handle, _was_spawned) = handle.as_inner().spawn_blocking_inner(
function,
Mandatory::NonMandatory,
self.name,
&handle,
handle,
);
join_handle
}
Expand Down
62 changes: 39 additions & 23 deletions tokio/src/task/local.rs
Expand Up @@ -301,19 +301,11 @@ cfg_rt! {
where F: Future + 'static,
F::Output: 'static
{
let id = crate::runtime::task::Id::next();
let future = crate::util::trace::task(future, "local", name, id.as_u64());
CURRENT.with(|maybe_cx| {
let cx = maybe_cx
.expect("`spawn_local` called from outside of a `task::LocalSet`");

let (handle, notified) = cx.owned.bind(future, cx.shared.clone(), id);

if let Some(notified) = notified {
cx.shared.schedule(notified);
}

handle
cx.spawn(future, name)
})
}
}
Expand Down Expand Up @@ -386,20 +378,7 @@ impl LocalSet {
F: Future + 'static,
F::Output: 'static,
{
let id = crate::runtime::task::Id::next();
let future = crate::util::trace::task(future, "local", None, id.as_u64());

let (handle, notified) = self
.context
.owned
.bind(future, self.context.shared.clone(), id);

if let Some(notified) = notified {
self.context.shared.schedule(notified);
}

self.context.shared.waker.wake();
handle
self.spawn_named(future, None)
}

/// Runs a future to completion on the provided runtime, driving any local
Expand Down Expand Up @@ -512,6 +491,21 @@ impl LocalSet {
run_until.await
}

pub(in crate::task) fn spawn_named<F>(
&self,
future: F,
name: Option<&str>,
) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let handle = self.context.spawn(future, name);

self.context.shared.waker.wake();
handle
}

/// Ticks the scheduler, returning whether the local future needs to be
/// notified again.
fn tick(&self) -> bool {
Expand Down Expand Up @@ -628,6 +622,28 @@ impl Drop for LocalSet {
}
}

// === impl Context ===

impl Context {
#[track_caller]
fn spawn<F>(&self, future: F, name: Option<&str>) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let id = crate::runtime::task::Id::next();
let future = crate::util::trace::task(future, "local", name, id.as_u64());

let (handle, notified) = self.owned.bind(future, self.shared.clone(), id);

if let Some(notified) = notified {
self.shared.schedule(notified);
}

handle
}
}

// === impl LocalFuture ===

impl<T: Future> Future for RunUntil<'_, T> {
Expand Down

0 comments on commit b1557ea

Please sign in to comment.