diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 7bf8b44512c..0c0e95a6504 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,22 +1,35 @@ +use crate::future::poll_fn; +use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; -use crate::runtime; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; +use crate::sync::Notify; use crate::util::linked_list::{Link, LinkedList}; -use crate::util::{waker_ref, Wake}; +use crate::util::{waker_ref, Wake, WakerRef}; use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::sync::{Arc, Mutex}; -use std::task::Poll::Ready; +use std::sync::{Arc, PoisonError}; +use std::task::Poll::{Pending, Ready}; use std::time::Duration; /// Executes tasks on the current thread -pub(crate) struct BasicScheduler

-where - P: Park, -{ +pub(crate) struct BasicScheduler { + /// Inner state guarded by a mutex that is shared + /// between all `block_on` calls. + inner: Mutex>>, + + /// Notifier for waking up other threads to steal the + /// parker. + notify: Notify, + + /// Sendable task spawner + spawner: Spawner, +} + +/// The inner scheduler that owns the task queue and the main parker P. +struct Inner { /// Scheduler run queue /// /// When the scheduler is executed, the queue is removed from `self` and @@ -59,7 +72,7 @@ struct Shared { unpark: Box, } -/// Thread-local context +/// Thread-local context. struct Context { /// Shared scheduler state shared: Arc, @@ -68,38 +81,43 @@ struct Context { tasks: RefCell, } -/// Initial queue capacity +/// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; /// Max number of tasks to poll per tick. const MAX_TASKS_PER_TICK: usize = 61; -/// How often ot check the remote queue first +/// How often to check the remote queue first. const REMOTE_FIRST_INTERVAL: u8 = 31; -// Tracks the current BasicScheduler +// Tracks the current BasicScheduler. scoped_thread_local!(static CURRENT: Context); -impl

BasicScheduler

-where - P: Park, -{ +impl BasicScheduler

{ pub(crate) fn new(park: P) -> BasicScheduler

{ let unpark = Box::new(park.unpark()); - BasicScheduler { + let spawner = Spawner { + shared: Arc::new(Shared { + queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), + unpark: unpark as Box, + }), + }; + + let inner = Mutex::new(Some(Inner { tasks: Some(Tasks { owned: LinkedList::new(), queue: VecDeque::with_capacity(INITIAL_CAPACITY), }), - spawner: Spawner { - shared: Arc::new(Shared { - queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), - unpark: unpark as Box, - }), - }, + spawner: spawner.clone(), tick: 0, park, + })); + + BasicScheduler { + inner, + notify: Notify::new(), + spawner, } } @@ -108,7 +126,6 @@ where } /// Spawns a future onto the thread pool - #[allow(dead_code)] pub(crate) fn spawn(&self, future: F) -> JoinHandle where F: Future + Send + 'static, @@ -117,13 +134,57 @@ where self.spawner.spawn(future) } - pub(crate) fn block_on(&mut self, future: F) -> F::Output - where - F: Future, - { + pub(crate) fn block_on(&self, future: F) -> F::Output { + pin!(future); + + // Attempt to steal the dedicated parker and block_on the future if we can there, + // othwerwise, lets select on a notification that the parker is available + // or the future is complete. + loop { + if let Some(inner) = &mut self.take_inner() { + return inner.block_on(future); + } else { + let mut enter = crate::runtime::enter(false); + + let notified = self.notify.notified(); + pin!(notified); + + if let Some(out) = enter + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); + } + + if let Ready(out) = future.as_mut().poll(cx) { + return Ready(Some(out)); + } + + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; + } + } + } + } + + fn take_inner(&self) -> Option> { + let inner = self.inner.lock().unwrap().take()?; + + Some(InnerGuard { + inner: Some(inner), + basic_scheduler: &self, + }) + } +} + +impl Inner

{ + /// Block on the future provided and drive the runtime's driver. + fn block_on(&mut self, future: F) -> F::Output { enter(self, |scheduler, context| { - let _enter = runtime::enter(false); - let waker = waker_ref(&scheduler.spawner.shared); + let _enter = crate::runtime::enter(false); + let waker = scheduler.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); pin!(future); @@ -178,16 +239,16 @@ where /// Enter the scheduler context. This sets the queue and other necessary /// scheduler state in the thread-local -fn enter(scheduler: &mut BasicScheduler

, f: F) -> R +fn enter(scheduler: &mut Inner

, f: F) -> R where - F: FnOnce(&mut BasicScheduler

, &Context) -> R, + F: FnOnce(&mut Inner

, &Context) -> R, P: Park, { // Ensures the run queue is placed back in the `BasicScheduler` instance // once `block_on` returns.` struct Guard<'a, P: Park> { context: Option, - scheduler: &'a mut BasicScheduler

, + scheduler: &'a mut Inner

, } impl Drop for Guard<'_, P> { @@ -214,12 +275,23 @@ where CURRENT.set(context, || f(scheduler, context)) } -impl

Drop for BasicScheduler

-where - P: Park, -{ +impl Drop for BasicScheduler

{ fn drop(&mut self) { - enter(self, |scheduler, context| { + // Avoid a double panic if we are currently panicking and + // the lock may be poisoned. + + let mut inner = match self + .inner + .lock() + .unwrap_or_else(PoisonError::into_inner) + .take() + { + Some(inner) => inner, + None if std::thread::panicking() => return, + None => panic!("Oh no! We never placed the Inner state back, this is a bug!"), + }; + + enter(&mut inner, |scheduler, context| { // Loop required here to ensure borrow is dropped between iterations #[allow(clippy::while_let_loop)] loop { @@ -269,6 +341,10 @@ impl Spawner { fn pop(&self) -> Option>> { self.shared.queue.lock().unwrap().pop_front() } + + fn waker_ref(&self) -> WakerRef<'_> { + waker_ref(&self.shared) + } } impl fmt::Debug for Spawner { @@ -325,3 +401,43 @@ impl Wake for Shared { arc_self.unpark.unpark(); } } + +// ===== InnerGuard ===== + +/// Used to ensure we always place the Inner value +/// back into its slot in `BasicScheduler`, even if the +/// future panics. +struct InnerGuard<'a, P: Park> { + inner: Option>, + basic_scheduler: &'a BasicScheduler

, +} + +impl InnerGuard<'_, P> { + fn block_on(&mut self, future: F) -> F::Output { + // The only time inner gets set to `None` is if we have dropped + // already so this unwrap is safe. + self.inner.as_mut().unwrap().block_on(future) + } +} + +impl Drop for InnerGuard<'_, P> { + fn drop(&mut self) { + if let Some(scheduler) = self.inner.take() { + // We can ignore the poison error here since we are + // just replacing the state. + let mut lock = self + .basic_scheduler + .inner + .lock() + .unwrap_or_else(PoisonError::into_inner); + + // Replace old scheduler back into the state to allow + // other threads to pick it up and drive it. + lock.replace(scheduler); + + // Wake up other possible threads that could steal + // the dedicated parker P. + self.basic_scheduler.notify.notify_one() + } + } +} diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 4072b04e465..99b34eb3e4c 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -496,7 +496,7 @@ cfg_rt_core! { let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { - kind: Kind::Basic(Mutex::new(Some(scheduler))), + kind: Kind::Basic(scheduler), handle: Handle { spawner, io_handle: resources.io_handle, diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 884c2b46fa4..e4a1cf08cea 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -296,7 +296,7 @@ enum Kind { /// Execute all tasks on the current-thread. #[cfg(feature = "rt-core")] - Basic(Mutex>>), + Basic(BasicScheduler), /// Execute tasks across multiple threads. #[cfg(feature = "rt-threaded")] @@ -401,7 +401,7 @@ impl Runtime { Kind::Shell(_) => panic!("task execution disabled"), #[cfg(feature = "rt-threaded")] Kind::ThreadPool(exec) => exec.spawn(future), - Kind::Basic(_exec) => self.handle.spawner.spawn(future), + Kind::Basic(exec) => exec.spawn(future), } } @@ -461,24 +461,7 @@ impl Runtime { } } #[cfg(feature = "rt-core")] - Kind::Basic(exec) => { - // TODO(lucio): clean this up and move this impl into - // `basic_scheduler.rs`, this is hacky and bad but will work for - // now. - let exec_temp = { - let mut lock = exec.lock().unwrap(); - lock.take() - }; - - if let Some(mut exec_temp) = exec_temp { - let res = exec_temp.block_on(future); - exec.lock().unwrap().replace(exec_temp); - res - } else { - let mut enter = crate::runtime::enter(true); - enter.block_on(future).unwrap() - } - } + Kind::Basic(exec) => exec.block_on(future), #[cfg(feature = "rt-threaded")] Kind::ThreadPool(exec) => exec.block_on(future), }) diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 1a584383b83..2e674136f53 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -456,6 +456,13 @@ cfg_sync! { pub mod watch; } +cfg_not_sync! { + cfg_rt_core! { + mod notify; + pub(crate) use notify::Notify; + } +} + cfg_not_sync! { cfg_atomic_waker_impl! { mod task; diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 56bbc51bf28..d319e8aae8f 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -1,3 +1,10 @@ +// Allow `unreachable_pub` warnings when sync is not enabled +// due to the usage of `Notify` within the `rt-core` feature set. +// When this module is compiled with `sync` enabled we will warn on +// this lint. When `rt-core` is enabled we use `pub(crate)` which +// triggers this warning but it is safe to ignore in this case. +#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] + use crate::loom::sync::atomic::AtomicU8; use crate::loom::sync::Mutex; use crate::util::linked_list::{self, LinkedList}; diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index c5439f4878b..278d6343784 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -12,6 +12,10 @@ mod rand; mod wake; pub(crate) use wake::{waker_ref, Wake}; +cfg_rt_core! { + pub(crate) use wake::WakerRef; +} + cfg_rt_threaded! { pub(crate) use rand::FastRand; diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 35e2ea81a02..a8968be1b96 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -574,6 +574,38 @@ rt_test! { }); } + #[test] + fn always_active_parker() { + // This test it to show that we will always have + // an active parker even if we call block_on concurrently + + let rt = rt(); + let rt2 = rt.clone(); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + let jh1 = thread::spawn(move || { + rt.block_on(async move { + rx2.await.unwrap(); + time::delay_for(Duration::from_millis(5)).await; + tx1.send(()).unwrap(); + }); + }); + + let jh2 = thread::spawn(move || { + rt2.block_on(async move { + tx2.send(()).unwrap(); + time::delay_for(Duration::from_millis(5)).await; + rx1.await.unwrap(); + time::delay_for(Duration::from_millis(5)).await; + }); + }); + + jh1.join().unwrap(); + jh2.join().unwrap(); + } + #[test] // IOCP requires setting the "max thread" concurrency value. The sane, // default, is to set this to the number of cores. Threads that poll I/O