Skip to content

Commit

Permalink
task: add LocalSet API for running !Send futures (#1733)
Browse files Browse the repository at this point in the history
## Motivation

In earlier versions of `tokio`, the `current_thread::Runtime` type could
be used to run `!Send` futures. However, PR #1716 merged the
current-thread and threadpool runtimes into a single type, which can no
longer run `!Send` futures. There is still a need in some cases to
support futures that don't implement `Send`, and the `tokio-compat`
crate requires this in order to provide APIs that existed in `tokio`
0.1.

## Solution

This branch implements the API described by @carllerche in
#1716 (comment). It
adds a new `LocalSet` type and `spawn_local` function to `tokio::task`.
The `LocalSet` type is used to group together a set of tasks which must
run on the same thread and don't implement `Send`. These are available
when a new "rt-util" feature flag is enabled.

Currently, the local task set is run by passing it a reference to a
`Runtime` and a future to `block_on`. In the future, we may also want
to investigate allowing spawned futures to construct their own local
task sets, which would be executed on the worker that the future is
executing on. 

In order to implement the new API, I've made some internal changes to
the `task` module and `Schedule` trait to support scheduling both `Send`
and `!Send` futures.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Nov 27, 2019
1 parent 8e83a9f commit 38e602f
Show file tree
Hide file tree
Showing 12 changed files with 886 additions and 14 deletions.
2 changes: 2 additions & 0 deletions tokio/Cargo.toml
Expand Up @@ -39,6 +39,7 @@ full = [
"net",
"process",
"rt-core",
"rt-util",
"rt-threaded",
"signal",
"stream",
Expand Down Expand Up @@ -67,6 +68,7 @@ process = [
]
# Includes basic task execution capabilities
rt-core = []
rt-util = []
rt-threaded = [
"num_cpus",
"rt-core",
Expand Down
25 changes: 25 additions & 0 deletions tokio/src/macros/cfg.rs
Expand Up @@ -52,6 +52,21 @@ macro_rules! cfg_not_blocking_impl {
}
}

/// Enable internal `AtomicWaker` impl
macro_rules! cfg_atomic_waker_impl {
($($item:item)*) => {
$(
#[cfg(any(
feature = "io-driver",
feature = "time",
all(feature = "rt-core", feature = "rt-util")
))]
#[cfg(not(loom))]
$item
)*
}
}

macro_rules! cfg_dns {
($($item:item)*) => {
$(
Expand Down Expand Up @@ -220,6 +235,16 @@ macro_rules! cfg_rt_threaded {
}
}

macro_rules! cfg_rt_util {
($($item:item)*) => {
$(
#[cfg(feature = "rt-util")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-util")))]
$item
)*
}
}

macro_rules! cfg_not_rt_threaded {
($($item:item)*) => {
$( #[cfg(not(feature = "rt-threaded"))] $item )*
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/runtime/basic_scheduler.rs
@@ -1,5 +1,5 @@
use crate::park::{Park, Unpark};
use crate::task::{self, JoinHandle, Schedule, Task};
use crate::task::{self, JoinHandle, Schedule, ScheduleSendOnly, Task};

use std::cell::UnsafeCell;
use std::collections::VecDeque;
Expand Down Expand Up @@ -304,6 +304,8 @@ impl Schedule for SchedulerPriv {
}
}

impl ScheduleSendOnly for SchedulerPriv {}

impl<P> Drop for BasicScheduler<P>
where
P: Park,
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/runtime/blocking/schedule.rs
@@ -1,4 +1,4 @@
use crate::task::{Schedule, Task};
use crate::task::{Schedule, ScheduleSendOnly, Task};

/// `task::Schedule` implementation that does nothing. This is unique to the
/// blocking scheduler as tasks scheduled are not really futures but blocking
Expand All @@ -16,3 +16,5 @@ impl Schedule for NoopSchedule {
unreachable!();
}
}

impl ScheduleSendOnly for NoopSchedule {}
6 changes: 4 additions & 2 deletions tokio/src/runtime/thread_pool/shared.rs
@@ -1,7 +1,7 @@
use crate::park::Unpark;
use crate::runtime::Unparker;
use crate::runtime::thread_pool::slice;
use crate::task::{self, Schedule, Task};
use crate::runtime::Unparker;
use crate::task::{self, Schedule, ScheduleSendOnly, Task};

use std::ptr;

Expand Down Expand Up @@ -90,3 +90,5 @@ impl Schedule for Shared {
Self::schedule(self, task);
}
}

impl ScheduleSendOnly for Shared {}
2 changes: 1 addition & 1 deletion tokio/src/sync/mod.rs
Expand Up @@ -35,7 +35,7 @@ cfg_sync! {
}

cfg_not_sync! {
cfg_resource_drivers! {
cfg_atomic_waker_impl! {
mod task;
pub(crate) use task::AtomicWaker;
}
Expand Down

0 comments on commit 38e602f

Please sign in to comment.