Skip to content

Commit

Permalink
rt: store driver handles next to scheduler handle (#5008)
Browse files Browse the repository at this point in the history
In an earlier PR (#4629), driver handles were moved into the scheduler
handle (`Spawner`). This was done to let the multi-threaded scheduler
have direct access to the thread pool spawner.

However, we are now working on a greater decoupling of the runtime
internals. All drivers and schedulers will be peers, stored in a single
thread-local variable, and the scheduler will be passed the full
runtime::Handle. This will achieve the original goal of giving the
scheduler access to the thread-pool while also (hopefully) simplifying
other aspects of the code.
  • Loading branch information
carllerche committed Sep 13, 2022
1 parent b891714 commit 56ffea0
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 78 deletions.
42 changes: 22 additions & 20 deletions tokio/src/runtime/builder.rs
Expand Up @@ -833,28 +833,20 @@ impl Builder {

fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{Config, CurrentThread, HandleInner, Kind};
use std::sync::Arc;

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

// Blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
let blocking_spawner = blocking_pool.spawner().clone();

let handle_inner = HandleInner {
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
};

// And now put a single-threaded scheduler on top of the timer. When
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler = CurrentThread::new(
driver,
handle_inner,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
Expand All @@ -867,9 +859,18 @@ impl Builder {
);
let spawner = Spawner::CurrentThread(scheduler.spawner().clone());

let inner = Arc::new(HandleInner {
spawner,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
});

Ok(Runtime {
kind: Kind::CurrentThread(scheduler),
handle: Handle { spawner },
handle: Handle { inner },
blocking_pool,
})
}
Expand Down Expand Up @@ -952,6 +953,7 @@ cfg_rt_multi_thread! {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, HandleInner, Kind, MultiThread};
use std::sync::Arc;

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

Expand All @@ -962,18 +964,9 @@ cfg_rt_multi_thread! {
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

let handle_inner = HandleInner {
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
};

let (scheduler, launch) = MultiThread::new(
core_threads,
driver,
handle_inner,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
Expand All @@ -986,8 +979,17 @@ cfg_rt_multi_thread! {
);
let spawner = Spawner::MultiThread(scheduler.spawner().clone());

let inner = Arc::new(HandleInner {
spawner,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
});

// Create the runtime handle
let handle = Handle { spawner };
let handle = Handle { inner };

// Spawn the thread pool workers
let _enter = crate::runtime::context::enter(handle.clone());
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/context.rs
Expand Up @@ -72,7 +72,7 @@ cfg_time! {

cfg_rt! {
pub(crate) fn spawn_handle() -> Option<crate::runtime::Spawner> {
match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.spawner.clone())) {
match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.inner.spawner.clone())) {
Ok(spawner) => spawner,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
}
Expand Down
13 changes: 8 additions & 5 deletions tokio/src/runtime/handle.rs
Expand Up @@ -4,6 +4,7 @@ use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};

use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use std::{error, fmt};

/// Handle to the runtime.
Expand All @@ -14,12 +15,14 @@ use std::{error, fmt};
/// [`Runtime::handle`]: crate::runtime::Runtime::handle()
#[derive(Debug, Clone)]
pub struct Handle {
pub(super) spawner: Spawner,
pub(super) inner: Arc<HandleInner>,
}

/// All internal handles that are *not* the scheduler's spawner.
#[derive(Debug)]
pub(crate) struct HandleInner {
pub(super) spawner: Spawner,

/// Handles to the I/O drivers
#[cfg_attr(
not(any(
Expand Down Expand Up @@ -206,7 +209,7 @@ impl Handle {
}

pub(crate) fn as_inner(&self) -> &HandleInner {
self.spawner.as_handle_inner()
&self.inner
}

/// Runs a future to completion on this `Handle`'s associated `Runtime`.
Expand Down Expand Up @@ -306,11 +309,11 @@ impl Handle {
let id = crate::runtime::task::Id::next();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", _name, id.as_u64());
self.spawner.spawn(future, id)
self.inner.spawner.spawn(future, id)
}

pub(crate) fn shutdown(mut self) {
self.spawner.shutdown();
pub(crate) fn shutdown(&self) {
self.inner.spawner.shutdown();
}
}

Expand Down
14 changes: 11 additions & 3 deletions tokio/src/runtime/metrics/runtime.rs
Expand Up @@ -39,7 +39,7 @@ impl RuntimeMetrics {
/// }
/// ```
pub fn num_workers(&self) -> usize {
self.handle.spawner.num_workers()
self.handle.inner.spawner.num_workers()
}

/// Returns the number of tasks scheduled from **outside** of the runtime.
Expand Down Expand Up @@ -68,6 +68,7 @@ impl RuntimeMetrics {
/// ```
pub fn remote_schedule_count(&self) -> u64 {
self.handle
.inner
.spawner
.scheduler_metrics()
.remote_schedule_count
Expand Down Expand Up @@ -111,6 +112,7 @@ impl RuntimeMetrics {
/// ```
pub fn worker_park_count(&self, worker: usize) -> u64 {
self.handle
.inner
.spawner
.worker_metrics(worker)
.park_count
Expand Down Expand Up @@ -154,6 +156,7 @@ impl RuntimeMetrics {
/// ```
pub fn worker_noop_count(&self, worker: usize) -> u64 {
self.handle
.inner
.spawner
.worker_metrics(worker)
.noop_count
Expand Down Expand Up @@ -199,6 +202,7 @@ impl RuntimeMetrics {
/// ```
pub fn worker_steal_count(&self, worker: usize) -> u64 {
self.handle
.inner
.spawner
.worker_metrics(worker)
.steal_count
Expand Down Expand Up @@ -240,6 +244,7 @@ impl RuntimeMetrics {
/// ```
pub fn worker_poll_count(&self, worker: usize) -> u64 {
self.handle
.inner
.spawner
.worker_metrics(worker)
.poll_count
Expand Down Expand Up @@ -285,6 +290,7 @@ impl RuntimeMetrics {
pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
let nanos = self
.handle
.inner
.spawner
.worker_metrics(worker)
.busy_duration_total
Expand Down Expand Up @@ -331,6 +337,7 @@ impl RuntimeMetrics {
/// ```
pub fn worker_local_schedule_count(&self, worker: usize) -> u64 {
self.handle
.inner
.spawner
.worker_metrics(worker)
.local_schedule_count
Expand Down Expand Up @@ -377,6 +384,7 @@ impl RuntimeMetrics {
/// ```
pub fn worker_overflow_count(&self, worker: usize) -> u64 {
self.handle
.inner
.spawner
.worker_metrics(worker)
.overflow_count
Expand Down Expand Up @@ -406,7 +414,7 @@ impl RuntimeMetrics {
/// }
/// ```
pub fn injection_queue_depth(&self) -> usize {
self.handle.spawner.injection_queue_depth()
self.handle.inner.spawner.injection_queue_depth()
}

/// Returns the number of tasks currently scheduled in the given worker's
Expand Down Expand Up @@ -444,7 +452,7 @@ impl RuntimeMetrics {
/// }
/// ```
pub fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.handle.spawner.worker_local_queue_depth(worker)
self.handle.inner.spawner.worker_local_queue_depth(worker)
}
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/mod.rs
Expand Up @@ -570,7 +570,7 @@ cfg_rt! {
/// ```
pub fn shutdown_timeout(mut self, duration: Duration) {
// Wakeup and shutdown all the worker threads
self.handle.clone().shutdown();
self.handle.shutdown();
self.blocking_pool.shutdown(Some(duration));
}

Expand Down
12 changes: 2 additions & 10 deletions tokio/src/runtime/scheduler/current_thread.rs
Expand Up @@ -4,7 +4,7 @@ use crate::loom::sync::{Arc, Mutex};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::{Driver, Unpark};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::{Config, HandleInner};
use crate::runtime::Config;
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
Expand Down Expand Up @@ -81,9 +81,6 @@ struct Shared {
/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,

/// Handle to I/O driver, timer, blocking pool, ...
handle_inner: HandleInner,

/// Scheduler configuration options
config: Config,

Expand Down Expand Up @@ -111,7 +108,7 @@ const INITIAL_CAPACITY: usize = 64;
scoped_thread_local!(static CURRENT: Context);

impl CurrentThread {
pub(crate) fn new(driver: Driver, handle_inner: HandleInner, config: Config) -> CurrentThread {
pub(crate) fn new(driver: Driver, config: Config) -> CurrentThread {
let unpark = driver.unpark();

let spawner = Spawner {
Expand All @@ -120,7 +117,6 @@ impl CurrentThread {
owned: OwnedTasks::new(),
unpark,
woken: AtomicBool::new(false),
handle_inner,
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: WorkerMetrics::new(),
Expand Down Expand Up @@ -387,10 +383,6 @@ impl Spawner {
pub(crate) fn reset_woken(&self) -> bool {
self.shared.woken.swap(false, AcqRel)
}

pub(crate) fn as_handle_inner(&self) -> &HandleInner {
&self.shared.handle_inner
}
}

cfg_metrics! {
Expand Down
17 changes: 4 additions & 13 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Expand Up @@ -15,7 +15,7 @@ pub(crate) use worker::block_in_place;

use crate::loom::sync::Arc;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Config, Driver, HandleInner};
use crate::runtime::{Config, Driver};

use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -45,14 +45,9 @@ pub(crate) struct Spawner {
// ===== impl MultiThread =====

impl MultiThread {
pub(crate) fn new(
size: usize,
driver: Driver,
handle_inner: HandleInner,
config: Config,
) -> (MultiThread, Launch) {
pub(crate) fn new(size: usize, driver: Driver, config: Config) -> (MultiThread, Launch) {
let parker = Parker::new(driver);
let (shared, launch) = worker::create(size, parker, handle_inner, config);
let (shared, launch) = worker::create(size, parker, config);
let spawner = Spawner { shared };
let multi_thread = MultiThread { spawner };

Expand Down Expand Up @@ -104,13 +99,9 @@ impl Spawner {
worker::Shared::bind_new_task(&self.shared, future, id)
}

pub(crate) fn shutdown(&mut self) {
pub(crate) fn shutdown(&self) {
self.shared.close();
}

pub(crate) fn as_handle_inner(&self) -> &HandleInner {
self.shared.as_handle_inner()
}
}

cfg_metrics! {
Expand Down
17 changes: 2 additions & 15 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Expand Up @@ -64,7 +64,7 @@ use crate::runtime;
use crate::runtime::enter::EnterContext;
use crate::runtime::scheduler::multi_thread::{queue, Idle, Parker, Unparker};
use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
use crate::runtime::{task, Config, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::runtime::{task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::util::atomic_cell::AtomicCell;
use crate::util::FastRand;

Expand Down Expand Up @@ -120,9 +120,6 @@ struct Core {

/// State shared across all workers
pub(super) struct Shared {
/// Handle to the I/O driver, timer, blocking spawner, ...
handle_inner: HandleInner,

/// Per-worker remote state. All other workers have access to this and is
/// how they communicate between each other.
remotes: Box<[Remote]>,
Expand Down Expand Up @@ -189,12 +186,7 @@ type Notified = task::Notified<Arc<Shared>>;
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);

pub(super) fn create(
size: usize,
park: Parker,
handle_inner: HandleInner,
config: Config,
) -> (Arc<Shared>, Launch) {
pub(super) fn create(size: usize, park: Parker, config: Config) -> (Arc<Shared>, Launch) {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size);
Expand Down Expand Up @@ -222,7 +214,6 @@ pub(super) fn create(
}

let shared = Arc::new(Shared {
handle_inner,
remotes: remotes.into_boxed_slice(),
inject: Inject::new(),
idle: Idle::new(size),
Expand Down Expand Up @@ -708,10 +699,6 @@ impl task::Schedule for Arc<Shared> {
}

impl Shared {
pub(crate) fn as_handle_inner(&self) -> &HandleInner {
&self.handle_inner
}

pub(super) fn bind_new_task<T>(
me: &Arc<Self>,
future: T,
Expand Down

0 comments on commit 56ffea0

Please sign in to comment.