Skip to content

Commit

Permalink
rt: Allow concurrent block_on's with basic_scheduler
Browse files Browse the repository at this point in the history
This allows us to concurrently call `Runtime::block_on` with the basic_scheduler
and allowing other threads to steal the dedicated parker.
  • Loading branch information
LucioFranco committed Aug 31, 2020
1 parent 8270774 commit db92967
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 57 deletions.
12 changes: 8 additions & 4 deletions tokio/src/park/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub(crate) struct UnparkThread {
#[derive(Debug)]
struct Inner {
state: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
mutex: Arc<Mutex<()>>,
condvar: Arc<Condvar>,
}

const EMPTY: usize = 0;
Expand All @@ -37,11 +37,15 @@ thread_local! {

impl ParkThread {
pub(crate) fn new() -> Self {
ParkThread::with_condvar(Arc::new(Condvar::new()), Arc::new(Mutex::new(())))
}

pub(crate) fn with_condvar(condvar: Arc<Condvar>, mutex: Arc<Mutex<()>>) -> Self {
Self {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
mutex,
condvar,
}),
}
}
Expand Down
172 changes: 140 additions & 32 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
use crate::park::{Park, Unpark};
use crate::loom;
use crate::loom::sync::{Condvar, Mutex};
use crate::park::{Park, ParkThread, Unpark};
use crate::runtime;
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
use crate::util::linked_list::LinkedList;
use crate::util::{waker_ref, Wake};
use crate::util::{waker_ref, Wake, WakerRef};

use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::task::Poll::Ready;
use std::time::Duration;

/// Executes tasks on the current thread
pub(crate) struct BasicScheduler<P>
where
P: Park,
{
pub(crate) struct BasicScheduler<P: Park> {
/// Inner with the dedicated parker P
inner: Mutex<Option<Inner<P>>>,

/// Sync items used to notify other threads that called
/// block_on concurrently that the dedicated driver is available
/// to steal
condvar: loom::sync::Arc<Condvar>,
mutex: loom::sync::Arc<Mutex<()>>,

/// Sendable task spawner
spawner: Spawner,
}

struct Inner<P: Park> {
/// Scheduler run queue
///
/// When the scheduler is executed, the queue is removed from `self` and
Expand Down Expand Up @@ -59,7 +72,7 @@ struct Shared {
unpark: Box<dyn Unpark>,
}

/// Thread-local context
/// Thread-local context.
struct Context {
/// Shared scheduler state
shared: Arc<Shared>,
Expand All @@ -68,16 +81,16 @@ struct Context {
tasks: RefCell<Tasks>,
}

/// Initial queue capacity
/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;

/// Max number of tasks to poll per tick.
const MAX_TASKS_PER_TICK: usize = 61;

/// How often ot check the remote queue first
/// How often to check the remote queue first.
const REMOTE_FIRST_INTERVAL: u8 = 31;

// Tracks the current BasicScheduler
// Tracks the current BasicScheduler.
scoped_thread_local!(static CURRENT: Context);

impl<P> BasicScheduler<P>
Expand All @@ -87,19 +100,28 @@ where
pub(crate) fn new(park: P) -> BasicScheduler<P> {
let unpark = Box::new(park.unpark());

BasicScheduler {
let spawner = Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
unpark: unpark as Box<dyn Unpark>,
}),
};

let inner = Mutex::new(Some(Inner {
tasks: Some(Tasks {
owned: LinkedList::new(),
queue: VecDeque::with_capacity(INITIAL_CAPACITY),
}),
spawner: Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
unpark: unpark as Box<dyn Unpark>,
}),
},
spawner: spawner.clone(),
tick: 0,
park,
}));

BasicScheduler {
inner,
spawner,
condvar: loom::sync::Arc::new(Condvar::new()),
mutex: loom::sync::Arc::new(Mutex::new(())),
}
}

Expand All @@ -108,7 +130,6 @@ where
}

/// Spawns a future onto the thread pool
#[allow(dead_code)]
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
Expand All @@ -117,13 +138,60 @@ where
self.spawner.spawn(future)
}

pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
{
pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
// If we can steal the dedicated parker than lets block_on that
// otherwise, lets block_on and attempt to steal it back if we can.
if let Some(mut inner) = self.take_inner() {
inner.block_on(future)
} else {
// TODO: should this be false or true? In the origina block_on for
// basic_scheduler we have false?
let enter = crate::runtime::enter(false);

let mut park = ParkThread::with_condvar(self.condvar.clone(), self.mutex.clone());
let waker = park.unpark().into_waker();
let mut cx = std::task::Context::from_waker(&waker);

pin!(future);

loop {
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
}

// Check if we can steal the driver
// TODO: Consider using an atomic load here intead of locking
// the mutex.
if let Some(mut inner) = self.take_inner() {
// We will enter again on in the inner implementation below
drop(enter);
return inner.block_on(future);
}

// Park this thread waiting for some external wake up from
// a waker or a notification that we can steal the driver again.
park.park().expect("failed to park");
}
}
}

fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
let mut lock = self.inner.lock().unwrap();
let inner = lock.take()?;

Some(InnerGuard {
inner: Some(inner),
scheduler: &self,
})
}
}

impl<P: Park> Inner<P> {
/// Block on the future provided and drive the runtime's driver.
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
enter(self, |scheduler, context| {
let _enter = runtime::enter(false);
let waker = waker_ref(&scheduler.spawner.shared);
let waker = scheduler.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);

pin!(future);
Expand Down Expand Up @@ -178,16 +246,16 @@ where

/// Enter the scheduler context. This sets the queue and other necessary
/// scheduler state in the thread-local
fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R
fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R
where
F: FnOnce(&mut BasicScheduler<P>, &Context) -> R,
F: FnOnce(&mut Inner<P>, &Context) -> R,
P: Park,
{
// Ensures the run queue is placed back in the `BasicScheduler` instance
// once `block_on` returns.`
struct Guard<'a, P: Park> {
context: Option<Context>,
scheduler: &'a mut BasicScheduler<P>,
scheduler: &'a mut Inner<P>,
}

impl<P: Park> Drop for Guard<'_, P> {
Expand All @@ -214,12 +282,15 @@ where
CURRENT.set(context, || f(scheduler, context))
}

impl<P> Drop for BasicScheduler<P>
where
P: Park,
{
impl<P: Park> Drop for BasicScheduler<P> {
fn drop(&mut self) {
enter(self, |scheduler, context| {
let mut inner = {
let mut lock = self.inner.lock().expect("BasicScheduler Inner lock");
lock.take()
.expect("Oh no! We never placed the Inner state back!")
};

enter(&mut inner, |scheduler, context| {
// Loop required here to ensure borrow is dropped between iterations
#[allow(clippy::while_let_loop)]
loop {
Expand Down Expand Up @@ -269,6 +340,10 @@ impl Spawner {
fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
self.shared.queue.lock().unwrap().pop_front()
}

fn waker_ref(&self) -> WakerRef<'_> {
waker_ref(&self.shared)
}
}

impl fmt::Debug for Spawner {
Expand Down Expand Up @@ -325,3 +400,36 @@ impl Wake for Shared {
arc_self.unpark.unpark();
}
}

// ===== InnerGuard =====

/// Used to ensure we always place the Inner value
/// back into its slot in `BasicScheduler` even if the
/// future panics.
struct InnerGuard<'a, P: Park> {
inner: Option<Inner<P>>,
scheduler: &'a BasicScheduler<P>,
}

impl<P: Park> InnerGuard<'_, P> {
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
// The only time inner gets set to `None` is if we have dropped
// already so this unwrap is safe.
self.inner.as_mut().unwrap().block_on(future)
}
}

impl<P: Park> Drop for InnerGuard<'_, P> {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
let mut lock = self.scheduler.inner.lock().unwrap();
lock.replace(inner);

// Wake up possible other threads
// notifying them that they might need
// to steal the driver.
drop(self.scheduler.mutex.lock().unwrap());
self.scheduler.condvar.notify_one();
}
}
}
2 changes: 1 addition & 1 deletion tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ cfg_rt_core! {
let blocking_spawner = blocking_pool.spawner().clone();

Ok(Runtime {
kind: Kind::Basic(Mutex::new(Some(scheduler))),
kind: Kind::Basic(scheduler),
handle: Handle {
spawner,
io_handle,
Expand Down
23 changes: 3 additions & 20 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ enum Kind {

/// Execute all tasks on the current-thread.
#[cfg(feature = "rt-core")]
Basic(Mutex<Option<BasicScheduler<time::Driver>>>),
Basic(BasicScheduler<time::Driver>),

/// Execute tasks across multiple threads.
#[cfg(feature = "rt-threaded")]
Expand Down Expand Up @@ -398,7 +398,7 @@ impl Runtime {
Kind::Shell(_) => panic!("task execution disabled"),
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.spawn(future),
Kind::Basic(_exec) => self.handle.spawner.spawn(future),
Kind::Basic(exec) => exec.spawn(future),
}
}

Expand Down Expand Up @@ -458,24 +458,7 @@ impl Runtime {
}
}
#[cfg(feature = "rt-core")]
Kind::Basic(exec) => {
// TODO(lucio): clean this up and move this impl into
// `basic_scheduler.rs`, this is hacky and bad but will work for
// now.
let exec_temp = {
let mut lock = exec.lock().unwrap();
lock.take()
};

if let Some(mut exec_temp) = exec_temp {
let res = exec_temp.block_on(future);
exec.lock().unwrap().replace(exec_temp);
res
} else {
let mut enter = crate::runtime::enter(true);
enter.block_on(future).unwrap()
}
}
Kind::Basic(exec) => exec.block_on(future),
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.block_on(future),
})
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ mod rand;
mod wake;
pub(crate) use wake::{waker_ref, Wake};

cfg_rt_core! {
pub(crate) use wake::WakerRef;
}

cfg_rt_threaded! {
pub(crate) use rand::FastRand;

Expand Down

0 comments on commit db92967

Please sign in to comment.