Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: store driver handles next to scheduler handle #5008

Merged
merged 1 commit into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 22 additions & 20 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,28 +833,20 @@ impl Builder {

fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{Config, CurrentThread, HandleInner, Kind};
use std::sync::Arc;

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

// Blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
let blocking_spawner = blocking_pool.spawner().clone();

let handle_inner = HandleInner {
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
};

// And now put a single-threaded scheduler on top of the timer. When
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler = CurrentThread::new(
driver,
handle_inner,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
Expand All @@ -867,9 +859,18 @@ impl Builder {
);
let spawner = Spawner::CurrentThread(scheduler.spawner().clone());

let inner = Arc::new(HandleInner {
spawner,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
});

Ok(Runtime {
kind: Kind::CurrentThread(scheduler),
handle: Handle { spawner },
handle: Handle { inner },
blocking_pool,
})
}
Expand Down Expand Up @@ -952,6 +953,7 @@ cfg_rt_multi_thread! {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, HandleInner, Kind, MultiThread};
use std::sync::Arc;

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

Expand All @@ -962,18 +964,9 @@ cfg_rt_multi_thread! {
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

let handle_inner = HandleInner {
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
};

let (scheduler, launch) = MultiThread::new(
core_threads,
driver,
handle_inner,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
Expand All @@ -986,8 +979,17 @@ cfg_rt_multi_thread! {
);
let spawner = Spawner::MultiThread(scheduler.spawner().clone());

let inner = Arc::new(HandleInner {
spawner,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
});

// Create the runtime handle
let handle = Handle { spawner };
let handle = Handle { inner };

// Spawn the thread pool workers
let _enter = crate::runtime::context::enter(handle.clone());
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ cfg_time! {

cfg_rt! {
pub(crate) fn spawn_handle() -> Option<crate::runtime::Spawner> {
match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.spawner.clone())) {
match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.inner.spawner.clone())) {
Ok(spawner) => spawner,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
}
Expand Down
13 changes: 8 additions & 5 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};

use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use std::{error, fmt};

/// Handle to the runtime.
Expand All @@ -14,12 +15,14 @@ use std::{error, fmt};
/// [`Runtime::handle`]: crate::runtime::Runtime::handle()
#[derive(Debug, Clone)]
pub struct Handle {
pub(super) spawner: Spawner,
pub(super) inner: Arc<HandleInner>,
}

/// All internal handles that are *not* the scheduler's spawner.
#[derive(Debug)]
pub(crate) struct HandleInner {
pub(super) spawner: Spawner,

/// Handles to the I/O drivers
#[cfg_attr(
not(any(
Expand Down Expand Up @@ -206,7 +209,7 @@ impl Handle {
}

pub(crate) fn as_inner(&self) -> &HandleInner {
self.spawner.as_handle_inner()
&self.inner
}

/// Runs a future to completion on this `Handle`'s associated `Runtime`.
Expand Down Expand Up @@ -306,11 +309,11 @@ impl Handle {
let id = crate::runtime::task::Id::next();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", _name, id.as_u64());
self.spawner.spawn(future, id)
self.inner.spawner.spawn(future, id)
}

pub(crate) fn shutdown(mut self) {
self.spawner.shutdown();
pub(crate) fn shutdown(&self) {
self.inner.spawner.shutdown();
}
}

Expand Down
14 changes: 11 additions & 3 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl RuntimeMetrics {
/// }
/// ```
pub fn num_workers(&self) -> usize {
self.handle.spawner.num_workers()
self.handle.inner.spawner.num_workers()
}

/// Returns the number of tasks scheduled from **outside** of the runtime.
Expand Down Expand Up @@ -68,6 +68,7 @@ impl RuntimeMetrics {
/// ```
pub fn remote_schedule_count(&self) -> u64 {
self.handle
.inner
.spawner
.scheduler_metrics()
.remote_schedule_count
Expand Down Expand Up @@ -111,6 +112,7 @@ impl RuntimeMetrics {
/// ```
pub fn worker_park_count(&self, worker: usize) -> u64 {
self.handle
.inner
.spawner
.worker_metrics(worker)
.park_count
Expand Down Expand Up @@ -154,6 +156,7 @@ impl RuntimeMetrics {
/// ```
pub fn worker_noop_count(&self, worker: usize) -> u64 {
self.handle
.inner
.spawner
.worker_metrics(worker)
.noop_count
Expand Down Expand Up @@ -199,6 +202,7 @@ impl RuntimeMetrics {
/// ```
pub fn worker_steal_count(&self, worker: usize) -> u64 {
self.handle
.inner
.spawner
.worker_metrics(worker)
.steal_count
Expand Down Expand Up @@ -240,6 +244,7 @@ impl RuntimeMetrics {
/// ```
pub fn worker_poll_count(&self, worker: usize) -> u64 {
self.handle
.inner
.spawner
.worker_metrics(worker)
.poll_count
Expand Down Expand Up @@ -285,6 +290,7 @@ impl RuntimeMetrics {
pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
let nanos = self
.handle
.inner
.spawner
.worker_metrics(worker)
.busy_duration_total
Expand Down Expand Up @@ -331,6 +337,7 @@ impl RuntimeMetrics {
/// ```
pub fn worker_local_schedule_count(&self, worker: usize) -> u64 {
self.handle
.inner
.spawner
.worker_metrics(worker)
.local_schedule_count
Expand Down Expand Up @@ -377,6 +384,7 @@ impl RuntimeMetrics {
/// ```
pub fn worker_overflow_count(&self, worker: usize) -> u64 {
self.handle
.inner
.spawner
.worker_metrics(worker)
.overflow_count
Expand Down Expand Up @@ -406,7 +414,7 @@ impl RuntimeMetrics {
/// }
/// ```
pub fn injection_queue_depth(&self) -> usize {
self.handle.spawner.injection_queue_depth()
self.handle.inner.spawner.injection_queue_depth()
}

/// Returns the number of tasks currently scheduled in the given worker's
Expand Down Expand Up @@ -444,7 +452,7 @@ impl RuntimeMetrics {
/// }
/// ```
pub fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.handle.spawner.worker_local_queue_depth(worker)
self.handle.inner.spawner.worker_local_queue_depth(worker)
}
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ cfg_rt! {
/// ```
pub fn shutdown_timeout(mut self, duration: Duration) {
// Wakeup and shutdown all the worker threads
self.handle.clone().shutdown();
self.handle.shutdown();
self.blocking_pool.shutdown(Some(duration));
}

Expand Down
12 changes: 2 additions & 10 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::loom::sync::{Arc, Mutex};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::{Driver, Unpark};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::{Config, HandleInner};
use crate::runtime::Config;
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
Expand Down Expand Up @@ -81,9 +81,6 @@ struct Shared {
/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,

/// Handle to I/O driver, timer, blocking pool, ...
handle_inner: HandleInner,

/// Scheduler configuration options
config: Config,

Expand Down Expand Up @@ -111,7 +108,7 @@ const INITIAL_CAPACITY: usize = 64;
scoped_thread_local!(static CURRENT: Context);

impl CurrentThread {
pub(crate) fn new(driver: Driver, handle_inner: HandleInner, config: Config) -> CurrentThread {
pub(crate) fn new(driver: Driver, config: Config) -> CurrentThread {
let unpark = driver.unpark();

let spawner = Spawner {
Expand All @@ -120,7 +117,6 @@ impl CurrentThread {
owned: OwnedTasks::new(),
unpark,
woken: AtomicBool::new(false),
handle_inner,
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: WorkerMetrics::new(),
Expand Down Expand Up @@ -387,10 +383,6 @@ impl Spawner {
pub(crate) fn reset_woken(&self) -> bool {
self.shared.woken.swap(false, AcqRel)
}

pub(crate) fn as_handle_inner(&self) -> &HandleInner {
&self.shared.handle_inner
}
}

cfg_metrics! {
Expand Down
17 changes: 4 additions & 13 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(crate) use worker::block_in_place;

use crate::loom::sync::Arc;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Config, Driver, HandleInner};
use crate::runtime::{Config, Driver};

use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -45,14 +45,9 @@ pub(crate) struct Spawner {
// ===== impl MultiThread =====

impl MultiThread {
pub(crate) fn new(
size: usize,
driver: Driver,
handle_inner: HandleInner,
config: Config,
) -> (MultiThread, Launch) {
pub(crate) fn new(size: usize, driver: Driver, config: Config) -> (MultiThread, Launch) {
let parker = Parker::new(driver);
let (shared, launch) = worker::create(size, parker, handle_inner, config);
let (shared, launch) = worker::create(size, parker, config);
let spawner = Spawner { shared };
let multi_thread = MultiThread { spawner };

Expand Down Expand Up @@ -104,13 +99,9 @@ impl Spawner {
worker::Shared::bind_new_task(&self.shared, future, id)
}

pub(crate) fn shutdown(&mut self) {
pub(crate) fn shutdown(&self) {
self.shared.close();
}

pub(crate) fn as_handle_inner(&self) -> &HandleInner {
self.shared.as_handle_inner()
}
}

cfg_metrics! {
Expand Down
17 changes: 2 additions & 15 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use crate::runtime;
use crate::runtime::enter::EnterContext;
use crate::runtime::scheduler::multi_thread::{queue, Idle, Parker, Unparker};
use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
use crate::runtime::{task, Config, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::runtime::{task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::util::atomic_cell::AtomicCell;
use crate::util::FastRand;

Expand Down Expand Up @@ -120,9 +120,6 @@ struct Core {

/// State shared across all workers
pub(super) struct Shared {
/// Handle to the I/O driver, timer, blocking spawner, ...
handle_inner: HandleInner,

/// Per-worker remote state. All other workers have access to this and is
/// how they communicate between each other.
remotes: Box<[Remote]>,
Expand Down Expand Up @@ -189,12 +186,7 @@ type Notified = task::Notified<Arc<Shared>>;
// Tracks thread-local state
scoped_thread_local!(static CURRENT: Context);

pub(super) fn create(
size: usize,
park: Parker,
handle_inner: HandleInner,
config: Config,
) -> (Arc<Shared>, Launch) {
pub(super) fn create(size: usize, park: Parker, config: Config) -> (Arc<Shared>, Launch) {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
let mut worker_metrics = Vec::with_capacity(size);
Expand Down Expand Up @@ -222,7 +214,6 @@ pub(super) fn create(
}

let shared = Arc::new(Shared {
handle_inner,
remotes: remotes.into_boxed_slice(),
inject: Inject::new(),
idle: Idle::new(size),
Expand Down Expand Up @@ -708,10 +699,6 @@ impl task::Schedule for Arc<Shared> {
}

impl Shared {
pub(crate) fn as_handle_inner(&self) -> &HandleInner {
&self.handle_inner
}

pub(super) fn bind_new_task<T>(
me: &Arc<Self>,
future: T,
Expand Down