Skip to content

Commit

Permalink
rt, chore: rename internal scheduler types (#4945)
Browse files Browse the repository at this point in the history
For historical reasons, the current-thread runtime's scheduler was named
BasicScheduler and the multi-thread runtime's scheduler were named
ThreadPool. This patch renames the schedulers to mirror the runtime
names to increase consistency. This patch also moves the scheduler
implementations into a new `runtime::scheduler` module.
  • Loading branch information
carllerche committed Aug 26, 2022
1 parent 218f262 commit a3411a4
Show file tree
Hide file tree
Showing 25 changed files with 125 additions and 120 deletions.
4 changes: 2 additions & 2 deletions tokio/src/lib.rs
Expand Up @@ -153,7 +153,7 @@
//! provide the functionality you need.
//!
//! Using the runtime requires the "rt" or "rt-multi-thread" feature flags, to
//! enable the basic [single-threaded scheduler][rt] and the [thread-pool
//! enable the current-thread [single-threaded scheduler][rt] and the [multi-thread
//! scheduler][rt-multi-thread], respectively. See the [`runtime` module
//! documentation][rt-features] for details. In addition, the "macros" feature
//! flag enables the `#[tokio::main]` and `#[tokio::test]` attributes.
Expand Down Expand Up @@ -310,7 +310,7 @@
//! need.
//!
//! - `full`: Enables all features listed below except `test-util` and `tracing`.
//! - `rt`: Enables `tokio::spawn`, the basic (current thread) scheduler,
//! - `rt`: Enables `tokio::spawn`, the current-thread scheduler,
//! and non-scheduler utilities.
//! - `rt-multi-thread`: Enables the heavier, multi-threaded, work-stealing scheduler.
//! - `io-util`: Enables the IO based `Ext` traits.
Expand Down
18 changes: 9 additions & 9 deletions tokio/src/runtime/builder.rs
Expand Up @@ -625,7 +625,7 @@ impl Builder {
/// ```
pub fn build(&mut self) -> io::Result<Runtime> {
match &self.kind {
Kind::CurrentThread => self.build_basic_runtime(),
Kind::CurrentThread => self.build_current_thread_runtime(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThread => self.build_threaded_runtime(),
}
Expand Down Expand Up @@ -831,8 +831,8 @@ impl Builder {
}
}

fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{BasicScheduler, Config, HandleInner, Kind};
fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{Config, CurrentThread, HandleInner, Kind};

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

Expand All @@ -852,7 +852,7 @@ impl Builder {
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler = BasicScheduler::new(
let scheduler = CurrentThread::new(
driver,
handle_inner,
Config {
Expand All @@ -865,7 +865,7 @@ impl Builder {
disable_lifo_slot: self.disable_lifo_slot,
},
);
let spawner = Spawner::Basic(scheduler.spawner().clone());
let spawner = Spawner::CurrentThread(scheduler.spawner().clone());

Ok(Runtime {
kind: Kind::CurrentThread(scheduler),
Expand Down Expand Up @@ -951,7 +951,7 @@ cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, HandleInner, Kind, ThreadPool};
use crate::runtime::{Config, HandleInner, Kind, MultiThread};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

Expand All @@ -970,7 +970,7 @@ cfg_rt_multi_thread! {
blocking_spawner,
};

let (scheduler, launch) = ThreadPool::new(
let (scheduler, launch) = MultiThread::new(
core_threads,
driver,
handle_inner,
Expand All @@ -984,7 +984,7 @@ cfg_rt_multi_thread! {
disable_lifo_slot: self.disable_lifo_slot,
},
);
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
let spawner = Spawner::MultiThread(scheduler.spawner().clone());

// Create the runtime handle
let handle = Handle { spawner };
Expand All @@ -994,7 +994,7 @@ cfg_rt_multi_thread! {
launch.launch();

Ok(Runtime {
kind: Kind::ThreadPool(scheduler),
kind: Kind::MultiThread(scheduler),
handle,
blocking_pool,
})
Expand Down
29 changes: 13 additions & 16 deletions tokio/src/runtime/mod.rs
Expand Up @@ -137,7 +137,7 @@
//! use tokio::runtime;
//!
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let basic_rt = runtime::Builder::new_current_thread()
//! let rt = runtime::Builder::new_current_thread()
//! .build()?;
//! # Ok(()) }
//! ```
Expand Down Expand Up @@ -166,7 +166,6 @@
//! [`tokio::main`]: ../attr.main.html
//! [runtime builder]: crate::runtime::Builder
//! [`Runtime::new`]: crate::runtime::Runtime::new
//! [`Builder::basic_scheduler`]: crate::runtime::Builder::basic_scheduler
//! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler
//! [`Builder::enable_io`]: crate::runtime::Builder::enable_io
//! [`Builder::enable_time`]: crate::runtime::Builder::enable_time
Expand All @@ -187,8 +186,8 @@ cfg_rt! {

pub(crate) mod task;

mod basic_scheduler;
use basic_scheduler::BasicScheduler;
pub(crate) mod scheduler;
use scheduler::CurrentThread;

mod config;
use config::Config;
Expand Down Expand Up @@ -244,8 +243,7 @@ cfg_rt! {
cfg_rt_multi_thread! {
use driver::Driver;

pub(crate) mod thread_pool;
use self::thread_pool::ThreadPool;
use scheduler::MultiThread;
}

cfg_rt! {
Expand Down Expand Up @@ -304,15 +302,15 @@ cfg_rt! {
blocking_pool: BlockingPool,
}

/// The runtime executor is either a thread-pool or a current-thread executor.
/// The runtime executor is either a multi-thread or a current-thread executor.
#[derive(Debug)]
enum Kind {
/// Execute all tasks on the current-thread.
CurrentThread(BasicScheduler),
CurrentThread(CurrentThread),

/// Execute tasks across multiple threads.
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
ThreadPool(ThreadPool),
MultiThread(MultiThread),
}

/// After thread starts / before thread stops
Expand Down Expand Up @@ -347,7 +345,6 @@ cfg_rt! {
/// [mod]: index.html
/// [main]: ../attr.main.html
/// [threaded scheduler]: index.html#threaded-scheduler
/// [basic scheduler]: index.html#basic-scheduler
/// [runtime builder]: crate::runtime::Builder
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
Expand Down Expand Up @@ -492,7 +489,7 @@ cfg_rt! {
match &self.kind {
Kind::CurrentThread(exec) => exec.block_on(future),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::ThreadPool(exec) => exec.block_on(future),
Kind::MultiThread(exec) => exec.block_on(future),
}
}

Expand Down Expand Up @@ -608,11 +605,11 @@ cfg_rt! {
impl Drop for Runtime {
fn drop(&mut self) {
match &mut self.kind {
Kind::CurrentThread(basic) => {
// This ensures that tasks spawned on the basic runtime are dropped inside the
// runtime's context.
Kind::CurrentThread(current_thread) => {
// This ensures that tasks spawned on the current-thread
// runtime are dropped inside the runtime's context.
match self::context::try_enter(self.handle.clone()) {
Some(guard) => basic.set_context_guard(guard),
Some(guard) => current_thread.set_context_guard(guard),
None => {
// The context thread-local has already been destroyed.
//
Expand All @@ -622,7 +619,7 @@ cfg_rt! {
}
},
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::ThreadPool(_) => {
Kind::MultiThread(_) => {
// The threaded scheduler drops its tasks on its worker threads, which is
// already in the runtime's context.
},
Expand Down
Expand Up @@ -20,7 +20,7 @@ use std::task::Poll::{Pending, Ready};
use std::time::Duration;

/// Executes tasks on the current thread
pub(crate) struct BasicScheduler {
pub(crate) struct CurrentThread {
/// Core scheduler data is acquired by a thread entering `block_on`.
core: AtomicCell<Core>,

Expand All @@ -31,10 +31,10 @@ pub(crate) struct BasicScheduler {
/// Sendable task spawner
spawner: Spawner,

/// This is usually None, but right before dropping the BasicScheduler, it
/// is changed to `Some` with the context being the runtime's own context.
/// This ensures that any tasks dropped in the `BasicScheduler`s destructor
/// run in that runtime's context.
/// This is usually None, but right before dropping the CurrentThread
/// scheduler, it is changed to `Some` with the context being the runtime's
/// own context. This ensures that any tasks dropped in the `CurrentThread`'s
/// destructor run in that runtime's context.
context_guard: Option<EnterGuard>,
}

Expand Down Expand Up @@ -108,11 +108,11 @@ struct Context {
/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;

// Tracks the current BasicScheduler.
// Tracks the current CurrentThread.
scoped_thread_local!(static CURRENT: Context);

impl BasicScheduler {
pub(crate) fn new(driver: Driver, handle_inner: HandleInner, config: Config) -> BasicScheduler {
impl CurrentThread {
pub(crate) fn new(driver: Driver, handle_inner: HandleInner, config: Config) -> CurrentThread {
let unpark = driver.unpark();

let spawner = Spawner {
Expand All @@ -137,7 +137,7 @@ impl BasicScheduler {
unhandled_panic: false,
})));

BasicScheduler {
CurrentThread {
core,
notify: Notify::new(),
spawner,
Expand Down Expand Up @@ -193,16 +193,16 @@ impl BasicScheduler {
spawner: self.spawner.clone(),
core: RefCell::new(Some(core)),
},
basic_scheduler: self,
scheduler: self,
})
}

pub(super) fn set_context_guard(&mut self, guard: EnterGuard) {
pub(crate) fn set_context_guard(&mut self, guard: EnterGuard) {
self.context_guard = Some(guard);
}
}

impl Drop for BasicScheduler {
impl Drop for CurrentThread {
fn drop(&mut self) {
// Avoid a double panic if we are currently panicking and
// the lock may be poisoned.
Expand Down Expand Up @@ -246,9 +246,9 @@ impl Drop for BasicScheduler {
}
}

impl fmt::Debug for BasicScheduler {
impl fmt::Debug for CurrentThread {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("BasicScheduler").finish()
fmt.debug_struct("CurrentThread").finish()
}
}

Expand Down Expand Up @@ -357,8 +357,8 @@ impl Context {
// ===== impl Spawner =====

impl Spawner {
/// Spawns a future onto the basic scheduler
pub(crate) fn spawn<F>(&self, future: F, id: super::task::Id) -> JoinHandle<F::Output>
/// Spawns a future onto the `CurrentThread` scheduler
pub(crate) fn spawn<F>(&self, future: F, id: crate::runtime::task::Id) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
Expand Down Expand Up @@ -503,10 +503,10 @@ impl Wake for Shared {
// ===== CoreGuard =====

/// Used to ensure we always place the `Core` value back into its slot in
/// `BasicScheduler`, even if the future panics.
/// `CurrentThread`, even if the future panics.
struct CoreGuard<'a> {
context: Context,
basic_scheduler: &'a BasicScheduler,
scheduler: &'a CurrentThread,
}

impl CoreGuard<'_> {
Expand Down Expand Up @@ -605,10 +605,10 @@ impl Drop for CoreGuard<'_> {
if let Some(core) = self.context.core.borrow_mut().take() {
// Replace old scheduler back into the state to allow
// other threads to pick it up and drive it.
self.basic_scheduler.core.set(core);
self.scheduler.core.set(core);

// Wake up other possible threads that could steal the driver.
self.basic_scheduler.notify.notify_one()
self.scheduler.notify.notify_one()
}
}
}
7 changes: 7 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
@@ -0,0 +1,7 @@
pub(crate) mod current_thread;
pub(crate) use current_thread::CurrentThread;

cfg_rt_multi_thread! {
pub(crate) mod multi_thread;
pub(crate) use multi_thread::MultiThread;
}
File renamed without changes.
@@ -1,12 +1,12 @@
//! Threadpool
//! Multi-threaded runtime

mod idle;
use self::idle::Idle;

mod park;
pub(crate) use park::{Parker, Unparker};

pub(super) mod queue;
pub(crate) mod queue;

mod worker;
pub(crate) use worker::Launch;
Expand All @@ -21,7 +21,7 @@ use std::fmt;
use std::future::Future;

/// Work-stealing based thread pool for executing futures.
pub(crate) struct ThreadPool {
pub(crate) struct MultiThread {
spawner: Spawner,
}

Expand All @@ -34,29 +34,29 @@ pub(crate) struct ThreadPool {
/// impact the lifecycle of the thread pool in any way. The thread pool may
/// shut down while there are outstanding `Spawner` instances.
///
/// `Spawner` instances are obtained by calling [`ThreadPool::spawner`].
/// `Spawner` instances are obtained by calling [`MultiThread::spawner`].
///
/// [`ThreadPool::spawner`]: method@ThreadPool::spawner
/// [`MultiThread::spawner`]: method@MultiThread::spawner
#[derive(Clone)]
pub(crate) struct Spawner {
shared: Arc<worker::Shared>,
}

// ===== impl ThreadPool =====
// ===== impl MultiThread =====

impl ThreadPool {
impl MultiThread {
pub(crate) fn new(
size: usize,
driver: Driver,
handle_inner: HandleInner,
config: Config,
) -> (ThreadPool, Launch) {
) -> (MultiThread, Launch) {
let parker = Parker::new(driver);
let (shared, launch) = worker::create(size, parker, handle_inner, config);
let spawner = Spawner { shared };
let thread_pool = ThreadPool { spawner };
let multi_thread = MultiThread { spawner };

(thread_pool, launch)
(multi_thread, launch)
}

/// Returns reference to `Spawner`.
Expand All @@ -80,13 +80,13 @@ impl ThreadPool {
}
}

impl fmt::Debug for ThreadPool {
impl fmt::Debug for MultiThread {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("ThreadPool").finish()
fmt.debug_struct("MultiThread").finish()
}
}

impl Drop for ThreadPool {
impl Drop for MultiThread {
fn drop(&mut self) {
self.spawner.shutdown();
}
Expand Down
File renamed without changes.
File renamed without changes.

0 comments on commit a3411a4

Please sign in to comment.