Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: add Builder::{spawn_on, spawn_local_on, spawn_blocking_on} #4683

Merged
merged 4 commits into from May 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 13 additions & 4 deletions tokio/src/runtime/handle.rs
Expand Up @@ -175,10 +175,7 @@ impl Handle {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let id = crate::runtime::task::Id::next();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", None, id.as_u64());
self.spawner.spawn(future, id)
self.spawn_named(future, None)
}

/// Runs the provided function on an executor dedicated to blocking.
Expand Down Expand Up @@ -301,6 +298,18 @@ impl Handle {
.expect("failed to park thread")
}

#[track_caller]
pub(crate) fn spawn_named<F>(&self, future: F, _name: Option<&str>) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
let id = crate::runtime::task::Id::next();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", _name, id.as_u64());
self.spawner.spawn(future, id)
}

pub(crate) fn shutdown(mut self) {
self.spawner.shutdown();
}
Expand Down
85 changes: 78 additions & 7 deletions tokio/src/task/builder.rs
@@ -1,5 +1,8 @@
#![allow(unreachable_pub)]
use crate::{runtime::context, task::JoinHandle};
use crate::{
runtime::{context, Handle},
task::{JoinHandle, LocalSet},
};
use std::future::Future;

/// Factory which is used to configure the properties of a new task.
Expand Down Expand Up @@ -71,7 +74,11 @@ impl<'a> Builder<'a> {
Self { name: Some(name) }
}

/// Spawns a task on the executor.
/// Spawns a task with this builder's settings on the current runtime.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime.
///
/// See [`task::spawn`](crate::task::spawn) for
/// more details.
Expand All @@ -84,10 +91,36 @@ impl<'a> Builder<'a> {
super::spawn::spawn_inner(future, self.name)
}

/// Spawns a task on the current thread.
/// Spawn a task with this builder's settings on the provided [runtime
/// handle].
///
/// See [`task::spawn_local`](crate::task::spawn_local)
/// for more details.
/// See [`Handle::spawn`] for more details.
///
/// [runtime handle]: crate::runtime::Handle
/// [`Handle::spawn`]: crate::runtime::Handle::spawn
#[track_caller]
pub fn spawn_on<Fut>(&mut self, future: Fut, handle: &Handle) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
handle.spawn_named(future, self.name)
}

/// Spawns `!Send` a task on the current [`LocalSet`] with this builder's
/// settings.
///
/// The spawned future will be run on the same thread that called `spawn_local.`
/// This may only be called from the context of a [local task set][`LocalSet`].
///
/// # Panics
///
/// This function panics if called outside of a [local task set][`LocalSet`].
///
/// See [`task::spawn_local`] for more details.
///
/// [`task::spawn_local`]: crate::task::spawn_local
/// [`LocalSet`]: crate::task::LocalSet
#[track_caller]
pub fn spawn_local<Fut>(self, future: Fut) -> JoinHandle<Fut::Output>
where
Expand All @@ -97,23 +130,61 @@ impl<'a> Builder<'a> {
super::local::spawn_local_inner(future, self.name)
}

/// Spawns `!Send` a task on the provided [`LocalSet`] with this builder's
/// settings.
///
/// See [`LocalSet::spawn_local`] for more details.
///
/// [`LocalSet::spawn_local`]: crate::task::LocalSet::spawn_local
/// [`LocalSet`]: crate::task::LocalSet
#[track_caller]
pub fn spawn_local_on<Fut>(self, future: Fut, local_set: &LocalSet) -> JoinHandle<Fut::Output>
where
Fut: Future + 'static,
Fut::Output: 'static,
{
local_set.spawn_named(future, self.name)
}

/// Spawns blocking code on the blocking threadpool.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime.
///
/// See [`task::spawn_blocking`](crate::task::spawn_blocking)
/// for more details.
#[track_caller]
pub fn spawn_blocking<Function, Output>(self, function: Function) -> JoinHandle<Output>
where
Function: FnOnce() -> Output + Send + 'static,
Output: Send + 'static,
{
self.spawn_blocking_on(function, &context::current())
}

/// Spawns blocking code on the provided [runtime handle]'s blocking threadpool.
///
/// See [`Handle::spawn_blocking`] for more details.
///
/// [runtime handle]: crate::runtime::Handle
/// [`Handle::spawn_blocking`]: crate::runtime::Handle::spawn_blocking
#[track_caller]
pub fn spawn_blocking_on<Function, Output>(
self,
function: Function,
handle: &Handle,
) -> JoinHandle<Output>
where
Function: FnOnce() -> Output + Send + 'static,
Output: Send + 'static,
{
use crate::runtime::Mandatory;
let handle = context::current();
let (join_handle, _was_spawned) = handle.as_inner().spawn_blocking_inner(
function,
Mandatory::NonMandatory,
self.name,
&handle,
handle,
);
join_handle
}
Expand Down
62 changes: 39 additions & 23 deletions tokio/src/task/local.rs
Expand Up @@ -301,19 +301,11 @@ cfg_rt! {
where F: Future + 'static,
F::Output: 'static
{
let id = crate::runtime::task::Id::next();
let future = crate::util::trace::task(future, "local", name, id.as_u64());
CURRENT.with(|maybe_cx| {
let cx = maybe_cx
.expect("`spawn_local` called from outside of a `task::LocalSet`");

let (handle, notified) = cx.owned.bind(future, cx.shared.clone(), id);

if let Some(notified) = notified {
cx.shared.schedule(notified);
}

handle
cx.spawn(future, name)
})
}
}
Expand Down Expand Up @@ -386,20 +378,7 @@ impl LocalSet {
F: Future + 'static,
F::Output: 'static,
{
let id = crate::runtime::task::Id::next();
let future = crate::util::trace::task(future, "local", None, id.as_u64());

let (handle, notified) = self
.context
.owned
.bind(future, self.context.shared.clone(), id);

if let Some(notified) = notified {
self.context.shared.schedule(notified);
}

self.context.shared.waker.wake();
handle
self.spawn_named(future, None)
}

/// Runs a future to completion on the provided runtime, driving any local
Expand Down Expand Up @@ -512,6 +491,21 @@ impl LocalSet {
run_until.await
}

pub(in crate::task) fn spawn_named<F>(
&self,
future: F,
name: Option<&str>,
) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let handle = self.context.spawn(future, name);

self.context.shared.waker.wake();
handle
}

/// Ticks the scheduler, returning whether the local future needs to be
/// notified again.
fn tick(&self) -> bool {
Expand Down Expand Up @@ -628,6 +622,28 @@ impl Drop for LocalSet {
}
}

// === impl Context ===

impl Context {
#[track_caller]
fn spawn<F>(&self, future: F, name: Option<&str>) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let id = crate::runtime::task::Id::next();
let future = crate::util::trace::task(future, "local", name, id.as_u64());

let (handle, notified) = self.owned.bind(future, self.shared.clone(), id);

if let Some(notified) = notified {
self.shared.schedule(notified);
}

handle
}
}

// === impl LocalFuture ===

impl<T: Future> Future for RunUntil<'_, T> {
Expand Down