Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: Allow concurrent block_on's with basic_scheduler #2804

Merged
merged 12 commits into from
Sep 23, 2020
12 changes: 8 additions & 4 deletions tokio/src/park/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub(crate) struct UnparkThread {
#[derive(Debug)]
struct Inner {
state: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
mutex: Arc<Mutex<()>>,
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
condvar: Arc<Condvar>,
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
}

const EMPTY: usize = 0;
Expand All @@ -37,11 +37,15 @@ thread_local! {

impl ParkThread {
pub(crate) fn new() -> Self {
ParkThread::with_condvar(Arc::new(Condvar::new()), Arc::new(Mutex::new(())))
}

pub(crate) fn with_condvar(condvar: Arc<Condvar>, mutex: Arc<Mutex<()>>) -> Self {
Self {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
mutex: Mutex::new(()),
condvar: Condvar::new(),
mutex,
condvar,
}),
}
}
Expand Down
172 changes: 140 additions & 32 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
use crate::park::{Park, Unpark};
use crate::loom;
use crate::loom::sync::{Condvar, Mutex};
use crate::park::{Park, ParkThread, Unpark};
use crate::runtime;
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
use crate::util::linked_list::LinkedList;
use crate::util::{waker_ref, Wake};
use crate::util::{waker_ref, Wake, WakerRef};

use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::task::Poll::Ready;
use std::time::Duration;

/// Executes tasks on the current thread
pub(crate) struct BasicScheduler<P>
where
P: Park,
{
pub(crate) struct BasicScheduler<P: Park> {
/// Inner with the dedicated parker P
inner: Mutex<Option<Inner<P>>>,

/// Sync items used to notify other threads that called
/// block_on concurrently that the dedicated driver is available
/// to steal
condvar: loom::sync::Arc<Condvar>,
mutex: loom::sync::Arc<Mutex<()>>,
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved

/// Sendable task spawner
spawner: Spawner,
}

struct Inner<P: Park> {
/// Scheduler run queue
///
/// When the scheduler is executed, the queue is removed from `self` and
Expand Down Expand Up @@ -59,7 +72,7 @@ struct Shared {
unpark: Box<dyn Unpark>,
}

/// Thread-local context
/// Thread-local context.
struct Context {
/// Shared scheduler state
shared: Arc<Shared>,
Expand All @@ -68,16 +81,16 @@ struct Context {
tasks: RefCell<Tasks>,
}

/// Initial queue capacity
/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;

/// Max number of tasks to poll per tick.
const MAX_TASKS_PER_TICK: usize = 61;

/// How often ot check the remote queue first
/// How often to check the remote queue first.
const REMOTE_FIRST_INTERVAL: u8 = 31;

// Tracks the current BasicScheduler
// Tracks the current BasicScheduler.
scoped_thread_local!(static CURRENT: Context);

impl<P> BasicScheduler<P>
Expand All @@ -87,19 +100,28 @@ where
pub(crate) fn new(park: P) -> BasicScheduler<P> {
let unpark = Box::new(park.unpark());

BasicScheduler {
let spawner = Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
unpark: unpark as Box<dyn Unpark>,
}),
};

let inner = Mutex::new(Some(Inner {
tasks: Some(Tasks {
owned: LinkedList::new(),
queue: VecDeque::with_capacity(INITIAL_CAPACITY),
}),
spawner: Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
unpark: unpark as Box<dyn Unpark>,
}),
},
spawner: spawner.clone(),
tick: 0,
park,
}));

BasicScheduler {
inner,
spawner,
condvar: loom::sync::Arc::new(Condvar::new()),
mutex: loom::sync::Arc::new(Mutex::new(())),
}
}

Expand All @@ -108,7 +130,6 @@ where
}

/// Spawns a future onto the thread pool
#[allow(dead_code)]
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
Expand All @@ -117,13 +138,60 @@ where
self.spawner.spawn(future)
}

pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
{
pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
// If we can steal the dedicated parker than lets block_on that
// otherwise, lets block_on and attempt to steal it back if we can.
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
if let Some(mut inner) = self.take_inner() {
inner.block_on(future)
} else {
// TODO: should this be false or true? In the origina block_on for
// basic_scheduler we have false?
let enter = crate::runtime::enter(false);
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved

let mut park = ParkThread::with_condvar(self.condvar.clone(), self.mutex.clone());
let waker = park.unpark().into_waker();
let mut cx = std::task::Context::from_waker(&waker);

pin!(future);

loop {
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
}

// Check if we can steal the driver
// TODO: Consider using an atomic load here intead of locking
// the mutex.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this shouldn't be too tricky if we just ensure that the state is only changed while inside a lock.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to punt this to the next PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, that's fine, i was just mentioning this for whenever you get to it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR, but when we do this, we probably want to define some "cell" type in src/util.

if let Some(mut inner) = self.take_inner() {
// We will enter again on in the inner implementation below
drop(enter);
return inner.block_on(future);
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
}

// Park this thread waiting for some external wake up from
// a waker or a notification that we can steal the driver again.
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
park.park().expect("failed to park");
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
let mut lock = self.inner.lock().unwrap();
let inner = lock.take()?;

Some(InnerGuard {
inner: Some(inner),
scheduler: &self,
})
}
}

impl<P: Park> Inner<P> {
/// Block on the future provided and drive the runtime's driver.
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
enter(self, |scheduler, context| {
let _enter = runtime::enter(false);
let waker = waker_ref(&scheduler.spawner.shared);
let waker = scheduler.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);

pin!(future);
Expand Down Expand Up @@ -178,16 +246,16 @@ where

/// Enter the scheduler context. This sets the queue and other necessary
/// scheduler state in the thread-local
fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R
fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R
where
F: FnOnce(&mut BasicScheduler<P>, &Context) -> R,
F: FnOnce(&mut Inner<P>, &Context) -> R,
P: Park,
{
// Ensures the run queue is placed back in the `BasicScheduler` instance
// once `block_on` returns.`
struct Guard<'a, P: Park> {
context: Option<Context>,
scheduler: &'a mut BasicScheduler<P>,
scheduler: &'a mut Inner<P>,
}

impl<P: Park> Drop for Guard<'_, P> {
Expand All @@ -214,12 +282,15 @@ where
CURRENT.set(context, || f(scheduler, context))
}

impl<P> Drop for BasicScheduler<P>
where
P: Park,
{
impl<P: Park> Drop for BasicScheduler<P> {
fn drop(&mut self) {
enter(self, |scheduler, context| {
let mut inner = {
let mut lock = self.inner.lock().expect("BasicScheduler Inner lock");
lock.take()
.expect("Oh no! We never placed the Inner state back!")
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
};
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved

enter(&mut inner, |scheduler, context| {
// Loop required here to ensure borrow is dropped between iterations
#[allow(clippy::while_let_loop)]
loop {
Expand Down Expand Up @@ -269,6 +340,10 @@ impl Spawner {
fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
self.shared.queue.lock().unwrap().pop_front()
}

fn waker_ref(&self) -> WakerRef<'_> {
waker_ref(&self.shared)
}
}

impl fmt::Debug for Spawner {
Expand Down Expand Up @@ -325,3 +400,36 @@ impl Wake for Shared {
arc_self.unpark.unpark();
}
}

// ===== InnerGuard =====

/// Used to ensure we always place the Inner value
/// back into its slot in `BasicScheduler` even if the
/// future panics.
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
struct InnerGuard<'a, P: Park> {
inner: Option<Inner<P>>,
scheduler: &'a BasicScheduler<P>,
}

impl<P: Park> InnerGuard<'_, P> {
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
// The only time inner gets set to `None` is if we have dropped
// already so this unwrap is safe.
self.inner.as_mut().unwrap().block_on(future)
}
}

impl<P: Park> Drop for InnerGuard<'_, P> {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
let mut lock = self.scheduler.inner.lock().unwrap();
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
lock.replace(inner);

// Wake up possible other threads
// notifying them that they might need
// to steal the driver.
drop(self.scheduler.mutex.lock().unwrap());
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
self.scheduler.condvar.notify_one();
}
}
}
2 changes: 1 addition & 1 deletion tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ cfg_rt_core! {
let blocking_spawner = blocking_pool.spawner().clone();

Ok(Runtime {
kind: Kind::Basic(Mutex::new(Some(scheduler))),
kind: Kind::Basic(scheduler),
handle: Handle {
spawner,
io_handle,
Expand Down
23 changes: 3 additions & 20 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ enum Kind {

/// Execute all tasks on the current-thread.
#[cfg(feature = "rt-core")]
Basic(Mutex<Option<BasicScheduler<time::Driver>>>),
Basic(BasicScheduler<time::Driver>),

/// Execute tasks across multiple threads.
#[cfg(feature = "rt-threaded")]
Expand Down Expand Up @@ -398,7 +398,7 @@ impl Runtime {
Kind::Shell(_) => panic!("task execution disabled"),
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.spawn(future),
Kind::Basic(_exec) => self.handle.spawner.spawn(future),
Kind::Basic(exec) => exec.spawn(future),
}
}

Expand Down Expand Up @@ -458,24 +458,7 @@ impl Runtime {
}
}
#[cfg(feature = "rt-core")]
Kind::Basic(exec) => {
// TODO(lucio): clean this up and move this impl into
// `basic_scheduler.rs`, this is hacky and bad but will work for
// now.
let exec_temp = {
let mut lock = exec.lock().unwrap();
lock.take()
};

if let Some(mut exec_temp) = exec_temp {
let res = exec_temp.block_on(future);
exec.lock().unwrap().replace(exec_temp);
res
} else {
let mut enter = crate::runtime::enter(true);
enter.block_on(future).unwrap()
}
}
Kind::Basic(exec) => exec.block_on(future),
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.block_on(future),
})
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ mod rand;
mod wake;
pub(crate) use wake::{waker_ref, Wake};

cfg_rt_core! {
pub(crate) use wake::WakerRef;
}

cfg_rt_threaded! {
pub(crate) use rand::FastRand;

Expand Down