Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt, chore: rename internal scheduler types #4945

Merged
merged 3 commits into from Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions tokio/src/lib.rs
Expand Up @@ -153,7 +153,7 @@
//! provide the functionality you need.
//!
//! Using the runtime requires the "rt" or "rt-multi-thread" feature flags, to
//! enable the basic [single-threaded scheduler][rt] and the [thread-pool
//! enable the current-thread [single-threaded scheduler][rt] and the [multi-thread
//! scheduler][rt-multi-thread], respectively. See the [`runtime` module
//! documentation][rt-features] for details. In addition, the "macros" feature
//! flag enables the `#[tokio::main]` and `#[tokio::test]` attributes.
Expand Down Expand Up @@ -310,7 +310,7 @@
//! need.
//!
//! - `full`: Enables all features listed below except `test-util` and `tracing`.
//! - `rt`: Enables `tokio::spawn`, the basic (current thread) scheduler,
//! - `rt`: Enables `tokio::spawn`, the current-thread scheduler,
//! and non-scheduler utilities.
//! - `rt-multi-thread`: Enables the heavier, multi-threaded, work-stealing scheduler.
//! - `io-util`: Enables the IO based `Ext` traits.
Expand Down
18 changes: 9 additions & 9 deletions tokio/src/runtime/builder.rs
Expand Up @@ -625,7 +625,7 @@ impl Builder {
/// ```
pub fn build(&mut self) -> io::Result<Runtime> {
match &self.kind {
Kind::CurrentThread => self.build_basic_runtime(),
Kind::CurrentThread => self.build_current_thread_runtime(),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::MultiThread => self.build_threaded_runtime(),
}
Expand Down Expand Up @@ -831,8 +831,8 @@ impl Builder {
}
}

fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{BasicScheduler, Config, HandleInner, Kind};
fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{Config, CurrentThread, HandleInner, Kind};

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

Expand All @@ -852,7 +852,7 @@ impl Builder {
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler = BasicScheduler::new(
let scheduler = CurrentThread::new(
driver,
handle_inner,
Config {
Expand All @@ -865,7 +865,7 @@ impl Builder {
disable_lifo_slot: self.disable_lifo_slot,
},
);
let spawner = Spawner::Basic(scheduler.spawner().clone());
let spawner = Spawner::CurrentThread(scheduler.spawner().clone());

Ok(Runtime {
kind: Kind::CurrentThread(scheduler),
Expand Down Expand Up @@ -951,7 +951,7 @@ cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, HandleInner, Kind, ThreadPool};
use crate::runtime::{Config, HandleInner, Kind, MultiThread};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

Expand All @@ -970,7 +970,7 @@ cfg_rt_multi_thread! {
blocking_spawner,
};

let (scheduler, launch) = ThreadPool::new(
let (scheduler, launch) = MultiThread::new(
core_threads,
driver,
handle_inner,
Expand All @@ -984,7 +984,7 @@ cfg_rt_multi_thread! {
disable_lifo_slot: self.disable_lifo_slot,
},
);
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());
let spawner = Spawner::MultiThread(scheduler.spawner().clone());

// Create the runtime handle
let handle = Handle { spawner };
Expand All @@ -994,7 +994,7 @@ cfg_rt_multi_thread! {
launch.launch();

Ok(Runtime {
kind: Kind::ThreadPool(scheduler),
kind: Kind::MultiThread(scheduler),
handle,
blocking_pool,
})
Expand Down
29 changes: 13 additions & 16 deletions tokio/src/runtime/mod.rs
Expand Up @@ -137,7 +137,7 @@
//! use tokio::runtime;
//!
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! let basic_rt = runtime::Builder::new_current_thread()
//! let rt = runtime::Builder::new_current_thread()
//! .build()?;
//! # Ok(()) }
//! ```
Expand Down Expand Up @@ -166,7 +166,6 @@
//! [`tokio::main`]: ../attr.main.html
//! [runtime builder]: crate::runtime::Builder
//! [`Runtime::new`]: crate::runtime::Runtime::new
//! [`Builder::basic_scheduler`]: crate::runtime::Builder::basic_scheduler
//! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler
//! [`Builder::enable_io`]: crate::runtime::Builder::enable_io
//! [`Builder::enable_time`]: crate::runtime::Builder::enable_time
Expand All @@ -187,8 +186,8 @@ cfg_rt! {

pub(crate) mod task;

mod basic_scheduler;
use basic_scheduler::BasicScheduler;
pub(crate) mod scheduler;
use scheduler::CurrentThread;

mod config;
use config::Config;
Expand Down Expand Up @@ -244,8 +243,7 @@ cfg_rt! {
cfg_rt_multi_thread! {
use driver::Driver;

pub(crate) mod thread_pool;
use self::thread_pool::ThreadPool;
use scheduler::MultiThread;
}

cfg_rt! {
Expand Down Expand Up @@ -304,15 +302,15 @@ cfg_rt! {
blocking_pool: BlockingPool,
}

/// The runtime executor is either a thread-pool or a current-thread executor.
/// The runtime executor is either a multi-thread or a current-thread executor.
#[derive(Debug)]
enum Kind {
/// Execute all tasks on the current-thread.
CurrentThread(BasicScheduler),
CurrentThread(CurrentThread),

/// Execute tasks across multiple threads.
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
ThreadPool(ThreadPool),
MultiThread(MultiThread),
}

/// After thread starts / before thread stops
Expand Down Expand Up @@ -347,7 +345,6 @@ cfg_rt! {
/// [mod]: index.html
/// [main]: ../attr.main.html
/// [threaded scheduler]: index.html#threaded-scheduler
/// [basic scheduler]: index.html#basic-scheduler
/// [runtime builder]: crate::runtime::Builder
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
Expand Down Expand Up @@ -492,7 +489,7 @@ cfg_rt! {
match &self.kind {
Kind::CurrentThread(exec) => exec.block_on(future),
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::ThreadPool(exec) => exec.block_on(future),
Kind::MultiThread(exec) => exec.block_on(future),
}
}

Expand Down Expand Up @@ -608,11 +605,11 @@ cfg_rt! {
impl Drop for Runtime {
fn drop(&mut self) {
match &mut self.kind {
Kind::CurrentThread(basic) => {
// This ensures that tasks spawned on the basic runtime are dropped inside the
// runtime's context.
Kind::CurrentThread(current_thread) => {
// This ensures that tasks spawned on the current-thread
// runtime are dropped inside the runtime's context.
match self::context::try_enter(self.handle.clone()) {
Some(guard) => basic.set_context_guard(guard),
Some(guard) => current_thread.set_context_guard(guard),
None => {
// The context thread-local has already been destroyed.
//
Expand All @@ -622,7 +619,7 @@ cfg_rt! {
}
},
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Kind::ThreadPool(_) => {
Kind::MultiThread(_) => {
// The threaded scheduler drops its tasks on its worker threads, which is
// already in the runtime's context.
},
Expand Down
Expand Up @@ -20,7 +20,7 @@ use std::task::Poll::{Pending, Ready};
use std::time::Duration;

/// Executes tasks on the current thread
pub(crate) struct BasicScheduler {
pub(crate) struct CurrentThread {
/// Core scheduler data is acquired by a thread entering `block_on`.
core: AtomicCell<Core>,

Expand All @@ -31,10 +31,10 @@ pub(crate) struct BasicScheduler {
/// Sendable task spawner
spawner: Spawner,

/// This is usually None, but right before dropping the BasicScheduler, it
/// is changed to `Some` with the context being the runtime's own context.
/// This ensures that any tasks dropped in the `BasicScheduler`s destructor
/// run in that runtime's context.
/// This is usually None, but right before dropping the CurrentThread
/// scheduler, it is changed to `Some` with the context being the runtime's
/// own context. This ensures that any tasks dropped in the `CurrentThread`s
carllerche marked this conversation as resolved.
Show resolved Hide resolved
/// destructor run in that runtime's context.
context_guard: Option<EnterGuard>,
}

Expand Down Expand Up @@ -108,11 +108,11 @@ struct Context {
/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;

// Tracks the current BasicScheduler.
// Tracks the current CurrentThread.
scoped_thread_local!(static CURRENT: Context);

impl BasicScheduler {
pub(crate) fn new(driver: Driver, handle_inner: HandleInner, config: Config) -> BasicScheduler {
impl CurrentThread {
pub(crate) fn new(driver: Driver, handle_inner: HandleInner, config: Config) -> CurrentThread {
let unpark = driver.unpark();

let spawner = Spawner {
Expand All @@ -137,7 +137,7 @@ impl BasicScheduler {
unhandled_panic: false,
})));

BasicScheduler {
CurrentThread {
core,
notify: Notify::new(),
spawner,
Expand Down Expand Up @@ -193,16 +193,16 @@ impl BasicScheduler {
spawner: self.spawner.clone(),
core: RefCell::new(Some(core)),
},
basic_scheduler: self,
scheduler: self,
})
}

pub(super) fn set_context_guard(&mut self, guard: EnterGuard) {
pub(crate) fn set_context_guard(&mut self, guard: EnterGuard) {
self.context_guard = Some(guard);
}
}

impl Drop for BasicScheduler {
impl Drop for CurrentThread {
fn drop(&mut self) {
// Avoid a double panic if we are currently panicking and
// the lock may be poisoned.
Expand Down Expand Up @@ -246,9 +246,9 @@ impl Drop for BasicScheduler {
}
}

impl fmt::Debug for BasicScheduler {
impl fmt::Debug for CurrentThread {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("BasicScheduler").finish()
fmt.debug_struct("CurrentThread").finish()
}
}

Expand Down Expand Up @@ -357,8 +357,8 @@ impl Context {
// ===== impl Spawner =====

impl Spawner {
/// Spawns a future onto the basic scheduler
pub(crate) fn spawn<F>(&self, future: F, id: super::task::Id) -> JoinHandle<F::Output>
/// Spawns a future onto the `CurrentThread` scheduler
pub(crate) fn spawn<F>(&self, future: F, id: crate::runtime::task::Id) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
Expand Down Expand Up @@ -503,10 +503,10 @@ impl Wake for Shared {
// ===== CoreGuard =====

/// Used to ensure we always place the `Core` value back into its slot in
/// `BasicScheduler`, even if the future panics.
/// `CurrentThread`, even if the future panics.
struct CoreGuard<'a> {
context: Context,
basic_scheduler: &'a BasicScheduler,
scheduler: &'a CurrentThread,
}

impl CoreGuard<'_> {
Expand Down Expand Up @@ -605,10 +605,10 @@ impl Drop for CoreGuard<'_> {
if let Some(core) = self.context.core.borrow_mut().take() {
// Replace old scheduler back into the state to allow
// other threads to pick it up and drive it.
self.basic_scheduler.core.set(core);
self.scheduler.core.set(core);

// Wake up other possible threads that could steal the driver.
self.basic_scheduler.notify.notify_one()
self.scheduler.notify.notify_one()
}
}
}
7 changes: 7 additions & 0 deletions tokio/src/runtime/scheduler/mod.rs
@@ -0,0 +1,7 @@
pub(crate) mod current_thread;
pub(crate) use current_thread::CurrentThread;

cfg_rt_multi_thread! {
pub(crate) mod multi_thread;
pub(crate) use multi_thread::MultiThread;
}
@@ -1,12 +1,12 @@
//! Threadpool
//! Multi-threaded runtime

mod idle;
use self::idle::Idle;

mod park;
pub(crate) use park::{Parker, Unparker};

pub(super) mod queue;
pub(crate) mod queue;

mod worker;
pub(crate) use worker::Launch;
Expand All @@ -21,7 +21,7 @@ use std::fmt;
use std::future::Future;

/// Work-stealing based thread pool for executing futures.
pub(crate) struct ThreadPool {
pub(crate) struct MultiThread {
spawner: Spawner,
}

Expand All @@ -34,29 +34,29 @@ pub(crate) struct ThreadPool {
/// impact the lifecycle of the thread pool in any way. The thread pool may
/// shut down while there are outstanding `Spawner` instances.
///
/// `Spawner` instances are obtained by calling [`ThreadPool::spawner`].
/// `Spawner` instances are obtained by calling [`MultiThread::spawner`].
///
/// [`ThreadPool::spawner`]: method@ThreadPool::spawner
/// [`MultiThread::spawner`]: method@MultiThread::spawner
#[derive(Clone)]
pub(crate) struct Spawner {
shared: Arc<worker::Shared>,
}

// ===== impl ThreadPool =====
// ===== impl MultiThread =====

impl ThreadPool {
impl MultiThread {
pub(crate) fn new(
size: usize,
driver: Driver,
handle_inner: HandleInner,
config: Config,
) -> (ThreadPool, Launch) {
) -> (MultiThread, Launch) {
let parker = Parker::new(driver);
let (shared, launch) = worker::create(size, parker, handle_inner, config);
let spawner = Spawner { shared };
let thread_pool = ThreadPool { spawner };
let multi_thread = MultiThread { spawner };

(thread_pool, launch)
(multi_thread, launch)
}

/// Returns reference to `Spawner`.
Expand All @@ -80,13 +80,13 @@ impl ThreadPool {
}
}

impl fmt::Debug for ThreadPool {
impl fmt::Debug for MultiThread {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("ThreadPool").finish()
fmt.debug_struct("MultiThread").finish()
}
}

impl Drop for ThreadPool {
impl Drop for MultiThread {
fn drop(&mut self) {
self.spawner.shutdown();
}
Expand Down