Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: Allow concurrent Shell:block_on calls #2868

Merged
merged 3 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::loom::sync::Mutex;
use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner};
Expand Down Expand Up @@ -377,7 +376,7 @@ impl Builder {
let blocking_spawner = blocking_pool.spawner().clone();

Ok(Runtime {
kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))),
kind: Kind::Shell(Shell::new(driver)),
handle: Handle {
spawner,
io_handle: resources.io_handle,
Expand Down
22 changes: 2 additions & 20 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ cfg_rt_core! {
use crate::task::JoinHandle;
}

use crate::loom::sync::Mutex;
use std::future::Future;
use std::time::Duration;

Expand Down Expand Up @@ -292,7 +291,7 @@ pub struct Runtime {
enum Kind {
/// Not able to execute concurrent tasks. This variant is mostly used to get
/// access to the driver handles.
Shell(Mutex<Option<Shell>>),
Shell(Shell),

/// Execute all tasks on the current-thread.
#[cfg(feature = "rt-core")]
Expand Down Expand Up @@ -442,24 +441,7 @@ impl Runtime {
/// [handle]: fn@Handle::block_on
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.handle.enter(|| match &self.kind {
Kind::Shell(exec) => {
// TODO(lucio): clean this up and move this impl into
// `shell.rs`, this is hacky and bad but will work for
// now.
let exec_temp = {
let mut lock = exec.lock().unwrap();
lock.take()
};

if let Some(mut exec_temp) = exec_temp {
let res = exec_temp.block_on(future);
exec.lock().unwrap().replace(exec_temp);
res
} else {
let mut enter = crate::runtime::enter(true);
enter.block_on(future).unwrap()
}
}
Kind::Shell(exec) => exec.block_on(future),
#[cfg(feature = "rt-core")]
Kind::Basic(exec) => exec.block_on(future),
#[cfg(feature = "rt-threaded")]
Expand Down
100 changes: 85 additions & 15 deletions tokio/src/runtime/shell.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
#![allow(clippy::redundant_clone)]

use crate::future::poll_fn;
use crate::park::{Park, Unpark};
use crate::runtime::driver::Driver;
use crate::runtime::enter;
use crate::sync::Notify;
use crate::util::{waker_ref, Wake};

use std::future::Future;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::Context;
use std::task::Poll::Ready;
use std::task::Poll::{Pending, Ready};
use std::{future::Future, sync::PoisonError};

#[derive(Debug)]
pub(super) struct Shell {
driver: Driver,
driver: Mutex<Option<Driver>>,

notify: Notify,

/// TODO: don't store this
unpark: Arc<Handle>,
Expand All @@ -25,28 +28,57 @@ impl Shell {
pub(super) fn new(driver: Driver) -> Shell {
let unpark = Arc::new(Handle(driver.unpark()));

Shell { driver, unpark }
Shell {
driver: Mutex::new(Some(driver)),
notify: Notify::new(),
unpark,
}
}

pub(super) fn block_on<F>(&mut self, f: F) -> F::Output
pub(super) fn block_on<F>(&self, f: F) -> F::Output
where
F: Future,
{
let _e = enter(true);
let mut enter = crate::runtime::enter(true);

pin!(f);

let waker = waker_ref(&self.unpark);
let mut cx = Context::from_waker(&waker);

loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
return v;
}
if let Some(driver) = &mut self.take_driver() {
return driver.block_on(f);
} else {
let notified = self.notify.notified();
pin!(notified);

if let Some(out) = enter
.block_on(poll_fn(|cx| {
if notified.as_mut().poll(cx).is_ready() {
return Ready(None);
}

if let Ready(out) = f.as_mut().poll(cx) {
return Ready(Some(out));
}

self.driver.park().unwrap();
Pending
}))
.expect("Failed to `Enter::block_on`")
{
return out;
}
}
}
}

fn take_driver(&self) -> Option<DriverGuard<'_>> {
let mut lock = self.driver.lock().unwrap();
let driver = lock.take()?;

Some(DriverGuard {
inner: Some(driver),
shell: &self,
})
}
}

impl Wake for Handle {
Expand All @@ -60,3 +92,41 @@ impl Wake for Handle {
arc_self.0.unpark();
}
}

struct DriverGuard<'a> {
inner: Option<Driver>,
shell: &'a Shell,
}

impl DriverGuard<'_> {
fn block_on<F: Future>(&mut self, f: F) -> F::Output {
let driver = self.inner.as_mut().unwrap();

pin!(f);

let waker = waker_ref(&self.shell.unpark);
let mut cx = Context::from_waker(&waker);

loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
return v;
}

driver.park().unwrap();
}
}
}

impl Drop for DriverGuard<'_> {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
self.shell
.driver
.lock()
.unwrap_or_else(PoisonError::into_inner)
.replace(inner);

self.shell.notify.notify_one();
}
}
}
8 changes: 2 additions & 6 deletions tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,9 @@ cfg_sync! {
}

cfg_not_sync! {
cfg_rt_core! {
mod notify;
pub(crate) use notify::Notify;
}
}
mod notify;
pub(crate) use notify::Notify;

cfg_not_sync! {
cfg_atomic_waker_impl! {
mod task;
pub(crate) use task::AtomicWaker;
Expand Down
1 change: 0 additions & 1 deletion tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ cfg_io_driver! {
pub(crate) mod slab;
}

#[cfg(any(feature = "io-readiness", feature = "sync", feature = "rt-core"))]
pub(crate) mod linked_list;

#[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))]
Expand Down