diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index b06fca2ddae..00588786eea 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -833,6 +833,7 @@ impl Builder { fn build_current_thread_runtime(&mut self) -> io::Result { use crate::runtime::{Config, CurrentThread, HandleInner, Kind}; + use std::sync::Arc; let (driver, resources) = driver::Driver::new(self.get_cfg())?; @@ -840,21 +841,12 @@ impl Builder { let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); let blocking_spawner = blocking_pool.spawner().clone(); - let handle_inner = HandleInner { - io_handle: resources.io_handle, - time_handle: resources.time_handle, - signal_handle: resources.signal_handle, - clock: resources.clock, - blocking_spawner, - }; - // And now put a single-threaded scheduler on top of the timer. When // there are no futures ready to do something, it'll let the timer or // the reactor to generate some new stimuli for the futures to continue // in their life. let scheduler = CurrentThread::new( driver, - handle_inner, Config { before_park: self.before_park.clone(), after_unpark: self.after_unpark.clone(), @@ -867,9 +859,18 @@ impl Builder { ); let spawner = Spawner::CurrentThread(scheduler.spawner().clone()); + let inner = Arc::new(HandleInner { + spawner, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, + blocking_spawner, + }); + Ok(Runtime { kind: Kind::CurrentThread(scheduler), - handle: Handle { spawner }, + handle: Handle { inner }, blocking_pool, }) } @@ -952,6 +953,7 @@ cfg_rt_multi_thread! { fn build_threaded_runtime(&mut self) -> io::Result { use crate::loom::sys::num_cpus; use crate::runtime::{Config, HandleInner, Kind, MultiThread}; + use std::sync::Arc; let core_threads = self.worker_threads.unwrap_or_else(num_cpus); @@ -962,18 +964,9 @@ cfg_rt_multi_thread! { blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); let blocking_spawner = blocking_pool.spawner().clone(); - let handle_inner = HandleInner { - io_handle: resources.io_handle, - time_handle: resources.time_handle, - signal_handle: resources.signal_handle, - clock: resources.clock, - blocking_spawner, - }; - let (scheduler, launch) = MultiThread::new( core_threads, driver, - handle_inner, Config { before_park: self.before_park.clone(), after_unpark: self.after_unpark.clone(), @@ -986,8 +979,17 @@ cfg_rt_multi_thread! { ); let spawner = Spawner::MultiThread(scheduler.spawner().clone()); + let inner = Arc::new(HandleInner { + spawner, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, + blocking_spawner, + }); + // Create the runtime handle - let handle = Handle { spawner }; + let handle = Handle { inner }; // Spawn the thread pool workers let _enter = crate::runtime::context::enter(handle.clone()); diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 4215124fc83..395c0b70edf 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -72,7 +72,7 @@ cfg_time! { cfg_rt! { pub(crate) fn spawn_handle() -> Option { - match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.spawner.clone())) { + match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.inner.spawner.clone())) { Ok(spawner) => spawner, Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 9c0752db07d..9222f746f0b 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -4,6 +4,7 @@ use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR}; use std::future::Future; use std::marker::PhantomData; +use std::sync::Arc; use std::{error, fmt}; /// Handle to the runtime. @@ -14,12 +15,14 @@ use std::{error, fmt}; /// [`Runtime::handle`]: crate::runtime::Runtime::handle() #[derive(Debug, Clone)] pub struct Handle { - pub(super) spawner: Spawner, + pub(super) inner: Arc, } /// All internal handles that are *not* the scheduler's spawner. #[derive(Debug)] pub(crate) struct HandleInner { + pub(super) spawner: Spawner, + /// Handles to the I/O drivers #[cfg_attr( not(any( @@ -206,7 +209,7 @@ impl Handle { } pub(crate) fn as_inner(&self) -> &HandleInner { - self.spawner.as_handle_inner() + &self.inner } /// Runs a future to completion on this `Handle`'s associated `Runtime`. @@ -306,11 +309,11 @@ impl Handle { let id = crate::runtime::task::Id::next(); #[cfg(all(tokio_unstable, feature = "tracing"))] let future = crate::util::trace::task(future, "task", _name, id.as_u64()); - self.spawner.spawn(future, id) + self.inner.spawner.spawn(future, id) } - pub(crate) fn shutdown(mut self) { - self.spawner.shutdown(); + pub(crate) fn shutdown(&self) { + self.inner.spawner.shutdown(); } } diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 279d70a0e74..3d3c0cb0d4b 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -39,7 +39,7 @@ impl RuntimeMetrics { /// } /// ``` pub fn num_workers(&self) -> usize { - self.handle.spawner.num_workers() + self.handle.inner.spawner.num_workers() } /// Returns the number of tasks scheduled from **outside** of the runtime. @@ -68,6 +68,7 @@ impl RuntimeMetrics { /// ``` pub fn remote_schedule_count(&self) -> u64 { self.handle + .inner .spawner .scheduler_metrics() .remote_schedule_count @@ -111,6 +112,7 @@ impl RuntimeMetrics { /// ``` pub fn worker_park_count(&self, worker: usize) -> u64 { self.handle + .inner .spawner .worker_metrics(worker) .park_count @@ -154,6 +156,7 @@ impl RuntimeMetrics { /// ``` pub fn worker_noop_count(&self, worker: usize) -> u64 { self.handle + .inner .spawner .worker_metrics(worker) .noop_count @@ -199,6 +202,7 @@ impl RuntimeMetrics { /// ``` pub fn worker_steal_count(&self, worker: usize) -> u64 { self.handle + .inner .spawner .worker_metrics(worker) .steal_count @@ -240,6 +244,7 @@ impl RuntimeMetrics { /// ``` pub fn worker_poll_count(&self, worker: usize) -> u64 { self.handle + .inner .spawner .worker_metrics(worker) .poll_count @@ -285,6 +290,7 @@ impl RuntimeMetrics { pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { let nanos = self .handle + .inner .spawner .worker_metrics(worker) .busy_duration_total @@ -331,6 +337,7 @@ impl RuntimeMetrics { /// ``` pub fn worker_local_schedule_count(&self, worker: usize) -> u64 { self.handle + .inner .spawner .worker_metrics(worker) .local_schedule_count @@ -377,6 +384,7 @@ impl RuntimeMetrics { /// ``` pub fn worker_overflow_count(&self, worker: usize) -> u64 { self.handle + .inner .spawner .worker_metrics(worker) .overflow_count @@ -406,7 +414,7 @@ impl RuntimeMetrics { /// } /// ``` pub fn injection_queue_depth(&self) -> usize { - self.handle.spawner.injection_queue_depth() + self.handle.inner.spawner.injection_queue_depth() } /// Returns the number of tasks currently scheduled in the given worker's @@ -444,7 +452,7 @@ impl RuntimeMetrics { /// } /// ``` pub fn worker_local_queue_depth(&self, worker: usize) -> usize { - self.handle.spawner.worker_local_queue_depth(worker) + self.handle.inner.spawner.worker_local_queue_depth(worker) } } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 79cf11c355f..deb95313324 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -570,7 +570,7 @@ cfg_rt! { /// ``` pub fn shutdown_timeout(mut self, duration: Duration) { // Wakeup and shutdown all the worker threads - self.handle.clone().shutdown(); + self.handle.shutdown(); self.blocking_pool.shutdown(Some(duration)); } diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 2bc19fef048..b705c786d56 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -4,7 +4,7 @@ use crate::loom::sync::{Arc, Mutex}; use crate::runtime::context::EnterGuard; use crate::runtime::driver::{Driver, Unpark}; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; -use crate::runtime::{Config, HandleInner}; +use crate::runtime::Config; use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::sync::notify::Notify; use crate::util::atomic_cell::AtomicCell; @@ -81,9 +81,6 @@ struct Shared { /// Indicates whether the blocked on thread was woken. woken: AtomicBool, - /// Handle to I/O driver, timer, blocking pool, ... - handle_inner: HandleInner, - /// Scheduler configuration options config: Config, @@ -111,7 +108,7 @@ const INITIAL_CAPACITY: usize = 64; scoped_thread_local!(static CURRENT: Context); impl CurrentThread { - pub(crate) fn new(driver: Driver, handle_inner: HandleInner, config: Config) -> CurrentThread { + pub(crate) fn new(driver: Driver, config: Config) -> CurrentThread { let unpark = driver.unpark(); let spawner = Spawner { @@ -120,7 +117,6 @@ impl CurrentThread { owned: OwnedTasks::new(), unpark, woken: AtomicBool::new(false), - handle_inner, config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: WorkerMetrics::new(), @@ -387,10 +383,6 @@ impl Spawner { pub(crate) fn reset_woken(&self) -> bool { self.shared.woken.swap(false, AcqRel) } - - pub(crate) fn as_handle_inner(&self) -> &HandleInner { - &self.shared.handle_inner - } } cfg_metrics! { diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index e6c452bd1b0..4946698fc2c 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -15,7 +15,7 @@ pub(crate) use worker::block_in_place; use crate::loom::sync::Arc; use crate::runtime::task::{self, JoinHandle}; -use crate::runtime::{Config, Driver, HandleInner}; +use crate::runtime::{Config, Driver}; use std::fmt; use std::future::Future; @@ -45,14 +45,9 @@ pub(crate) struct Spawner { // ===== impl MultiThread ===== impl MultiThread { - pub(crate) fn new( - size: usize, - driver: Driver, - handle_inner: HandleInner, - config: Config, - ) -> (MultiThread, Launch) { + pub(crate) fn new(size: usize, driver: Driver, config: Config) -> (MultiThread, Launch) { let parker = Parker::new(driver); - let (shared, launch) = worker::create(size, parker, handle_inner, config); + let (shared, launch) = worker::create(size, parker, config); let spawner = Spawner { shared }; let multi_thread = MultiThread { spawner }; @@ -104,13 +99,9 @@ impl Spawner { worker::Shared::bind_new_task(&self.shared, future, id) } - pub(crate) fn shutdown(&mut self) { + pub(crate) fn shutdown(&self) { self.shared.close(); } - - pub(crate) fn as_handle_inner(&self) -> &HandleInner { - self.shared.as_handle_inner() - } } cfg_metrics! { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 6952d9efe89..22ca2dbe842 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -64,7 +64,7 @@ use crate::runtime; use crate::runtime::enter::EnterContext; use crate::runtime::scheduler::multi_thread::{queue, Idle, Parker, Unparker}; use crate::runtime::task::{Inject, JoinHandle, OwnedTasks}; -use crate::runtime::{task, Config, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics}; +use crate::runtime::{task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::util::atomic_cell::AtomicCell; use crate::util::FastRand; @@ -120,9 +120,6 @@ struct Core { /// State shared across all workers pub(super) struct Shared { - /// Handle to the I/O driver, timer, blocking spawner, ... - handle_inner: HandleInner, - /// Per-worker remote state. All other workers have access to this and is /// how they communicate between each other. remotes: Box<[Remote]>, @@ -189,12 +186,7 @@ type Notified = task::Notified>; // Tracks thread-local state scoped_thread_local!(static CURRENT: Context); -pub(super) fn create( - size: usize, - park: Parker, - handle_inner: HandleInner, - config: Config, -) -> (Arc, Launch) { +pub(super) fn create(size: usize, park: Parker, config: Config) -> (Arc, Launch) { let mut cores = Vec::with_capacity(size); let mut remotes = Vec::with_capacity(size); let mut worker_metrics = Vec::with_capacity(size); @@ -222,7 +214,6 @@ pub(super) fn create( } let shared = Arc::new(Shared { - handle_inner, remotes: remotes.into_boxed_slice(), inject: Inject::new(), idle: Idle::new(size), @@ -708,10 +699,6 @@ impl task::Schedule for Arc { } impl Shared { - pub(crate) fn as_handle_inner(&self) -> &HandleInner { - &self.handle_inner - } - pub(super) fn bind_new_task( me: &Arc, future: T, diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs index 87fc3666588..e64b0cb2113 100644 --- a/tokio/src/runtime/spawner.rs +++ b/tokio/src/runtime/spawner.rs @@ -1,7 +1,6 @@ use crate::future::Future; use crate::runtime::scheduler::current_thread; use crate::runtime::task::Id; -use crate::runtime::HandleInner; use crate::task::JoinHandle; cfg_rt_multi_thread! { @@ -16,7 +15,7 @@ pub(crate) enum Spawner { } impl Spawner { - pub(crate) fn shutdown(&mut self) { + pub(crate) fn shutdown(&self) { #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] { if let Spawner::MultiThread(spawner) = self { @@ -36,14 +35,6 @@ impl Spawner { Spawner::MultiThread(spawner) => spawner.spawn(future, id), } } - - pub(crate) fn as_handle_inner(&self) -> &HandleInner { - match self { - Spawner::CurrentThread(spawner) => spawner.as_handle_inner(), - #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - Spawner::MultiThread(spawner) => spawner.as_handle_inner(), - } - } } cfg_metrics! {