Skip to content

Commit

Permalink
rt: Allow concurrent Shell:block_on calls (#2868)
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioFranco committed Sep 24, 2020
1 parent c29f13b commit 4dfbdbf
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 44 deletions.
3 changes: 1 addition & 2 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::loom::sync::Mutex;
use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner};
Expand Down Expand Up @@ -377,7 +376,7 @@ impl Builder {
let blocking_spawner = blocking_pool.spawner().clone();

Ok(Runtime {
kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))),
kind: Kind::Shell(Shell::new(driver)),
handle: Handle {
spawner,
io_handle: resources.io_handle,
Expand Down
22 changes: 2 additions & 20 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ cfg_rt_core! {
use crate::task::JoinHandle;
}

use crate::loom::sync::Mutex;
use std::future::Future;
use std::time::Duration;

Expand Down Expand Up @@ -292,7 +291,7 @@ pub struct Runtime {
enum Kind {
/// Not able to execute concurrent tasks. This variant is mostly used to get
/// access to the driver handles.
Shell(Mutex<Option<Shell>>),
Shell(Shell),

/// Execute all tasks on the current-thread.
#[cfg(feature = "rt-core")]
Expand Down Expand Up @@ -442,24 +441,7 @@ impl Runtime {
/// [handle]: fn@Handle::block_on
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.handle.enter(|| match &self.kind {
Kind::Shell(exec) => {
// TODO(lucio): clean this up and move this impl into
// `shell.rs`, this is hacky and bad but will work for
// now.
let exec_temp = {
let mut lock = exec.lock().unwrap();
lock.take()
};

if let Some(mut exec_temp) = exec_temp {
let res = exec_temp.block_on(future);
exec.lock().unwrap().replace(exec_temp);
res
} else {
let mut enter = crate::runtime::enter(true);
enter.block_on(future).unwrap()
}
}
Kind::Shell(exec) => exec.block_on(future),
#[cfg(feature = "rt-core")]
Kind::Basic(exec) => exec.block_on(future),
#[cfg(feature = "rt-threaded")]
Expand Down
100 changes: 85 additions & 15 deletions tokio/src/runtime/shell.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
#![allow(clippy::redundant_clone)]

use crate::future::poll_fn;
use crate::park::{Park, Unpark};
use crate::runtime::driver::Driver;
use crate::runtime::enter;
use crate::sync::Notify;
use crate::util::{waker_ref, Wake};

use std::future::Future;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::Context;
use std::task::Poll::Ready;
use std::task::Poll::{Pending, Ready};
use std::{future::Future, sync::PoisonError};

#[derive(Debug)]
pub(super) struct Shell {
driver: Driver,
driver: Mutex<Option<Driver>>,

notify: Notify,

/// TODO: don't store this
unpark: Arc<Handle>,
Expand All @@ -25,28 +28,57 @@ impl Shell {
pub(super) fn new(driver: Driver) -> Shell {
let unpark = Arc::new(Handle(driver.unpark()));

Shell { driver, unpark }
Shell {
driver: Mutex::new(Some(driver)),
notify: Notify::new(),
unpark,
}
}

pub(super) fn block_on<F>(&mut self, f: F) -> F::Output
pub(super) fn block_on<F>(&self, f: F) -> F::Output
where
F: Future,
{
let _e = enter(true);
let mut enter = crate::runtime::enter(true);

pin!(f);

let waker = waker_ref(&self.unpark);
let mut cx = Context::from_waker(&waker);

loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
return v;
}
if let Some(driver) = &mut self.take_driver() {
return driver.block_on(f);
} else {
let notified = self.notify.notified();
pin!(notified);

if let Some(out) = enter
.block_on(poll_fn(|cx| {
if notified.as_mut().poll(cx).is_ready() {
return Ready(None);
}

if let Ready(out) = f.as_mut().poll(cx) {
return Ready(Some(out));
}

self.driver.park().unwrap();
Pending
}))
.expect("Failed to `Enter::block_on`")
{
return out;
}
}
}
}

fn take_driver(&self) -> Option<DriverGuard<'_>> {
let mut lock = self.driver.lock().unwrap();
let driver = lock.take()?;

Some(DriverGuard {
inner: Some(driver),
shell: &self,
})
}
}

impl Wake for Handle {
Expand All @@ -60,3 +92,41 @@ impl Wake for Handle {
arc_self.0.unpark();
}
}

struct DriverGuard<'a> {
inner: Option<Driver>,
shell: &'a Shell,
}

impl DriverGuard<'_> {
fn block_on<F: Future>(&mut self, f: F) -> F::Output {
let driver = self.inner.as_mut().unwrap();

pin!(f);

let waker = waker_ref(&self.shell.unpark);
let mut cx = Context::from_waker(&waker);

loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
return v;
}

driver.park().unwrap();
}
}
}

impl Drop for DriverGuard<'_> {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
self.shell
.driver
.lock()
.unwrap_or_else(PoisonError::into_inner)
.replace(inner);

self.shell.notify.notify_one();
}
}
}
8 changes: 2 additions & 6 deletions tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,9 @@ cfg_sync! {
}

cfg_not_sync! {
cfg_rt_core! {
mod notify;
pub(crate) use notify::Notify;
}
}
mod notify;
pub(crate) use notify::Notify;

cfg_not_sync! {
cfg_atomic_waker_impl! {
mod task;
pub(crate) use task::AtomicWaker;
Expand Down
1 change: 0 additions & 1 deletion tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ cfg_io_driver! {
pub(crate) mod slab;
}

#[cfg(any(feature = "io-readiness", feature = "sync", feature = "rt-core"))]
pub(crate) mod linked_list;

#[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))]
Expand Down

0 comments on commit 4dfbdbf

Please sign in to comment.