From cdd6eeaf70351220df05126a7f0c353b0058dc95 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 16 Sep 2022 19:57:45 -0700 Subject: [PATCH] rt: update MultiThread to use its own Handle (#5025) This is the equivalent as tokio-rs/tokio#5023, but for the MultiThread scheduler. This patch updates `MultiThread` to use `Arc` for all of its cross-thread needs instead of `Arc`. The effect of this change is that the multi-thread scheduler only has a single `Arc` type, which includes the driver handles, keeping all "shared state" together. --- tokio/src/runtime/builder.rs | 24 ++-- tokio/src/runtime/scheduler/mod.rs | 4 +- .../runtime/scheduler/multi_thread/handle.rs | 59 ++++++-- .../src/runtime/scheduler/multi_thread/mod.rs | 73 +++------- .../runtime/scheduler/multi_thread/worker.rs | 134 +++++++++--------- 5 files changed, 151 insertions(+), 143 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index e4e4ec3e957..562733b1226 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -993,10 +993,9 @@ cfg_test_util! { cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { - use crate::loom::sync::Arc; use crate::loom::sys::num_cpus; use crate::runtime::{Config, Scheduler}; - use crate::runtime::scheduler::{self, multi_thread, MultiThread}; + use crate::runtime::scheduler::{self, MultiThread}; let core_threads = self.worker_threads.unwrap_or_else(num_cpus); @@ -1007,9 +1006,16 @@ cfg_rt_multi_thread! { blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); let blocking_spawner = blocking_pool.spawner().clone(); + // Generate a rng seed for this runtime. + let seed_generator_1 = self.seed_generator.next_generator(); + let seed_generator_2 = self.seed_generator.next_generator(); + let (scheduler, launch) = MultiThread::new( core_threads, driver, + driver_handle, + blocking_spawner, + seed_generator_2, Config { before_park: self.before_park.clone(), after_unpark: self.after_unpark.clone(), @@ -1018,20 +1024,12 @@ cfg_rt_multi_thread! { #[cfg(tokio_unstable)] unhandled_panic: self.unhandled_panic.clone(), disable_lifo_slot: self.disable_lifo_slot, - seed_generator: self.seed_generator.next_generator(), + seed_generator: seed_generator_1, }, ); - let inner = Arc::new(multi_thread::Handle { - spawner: scheduler.spawner().clone(), - driver: driver_handle, - blocking_spawner, - seed_generator: self.seed_generator.next_generator(), - }); - let inner = scheduler::Handle::MultiThread(inner); - - // Create the runtime handle - let handle = Handle { inner }; + let handle = scheduler::Handle::MultiThread(scheduler.handle().clone()); + let handle = Handle { inner: handle }; // Spawn the thread pool workers let _enter = crate::runtime::context::enter(handle.clone()); diff --git a/tokio/src/runtime/scheduler/mod.rs b/tokio/src/runtime/scheduler/mod.rs index 84714ef94a2..e214086d4d0 100644 --- a/tokio/src/runtime/scheduler/mod.rs +++ b/tokio/src/runtime/scheduler/mod.rs @@ -83,7 +83,7 @@ cfg_rt! { Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id), #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - Handle::MultiThread(h) => h.spawner.spawn(future, id), + Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id), } } @@ -92,7 +92,7 @@ cfg_rt! { Handle::CurrentThread(_) => {}, #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - Handle::MultiThread(ref h) => h.spawner.shutdown(), + Handle::MultiThread(ref h) => h.shutdown(), } } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 1b02afd4848..a34042257be 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -1,12 +1,18 @@ -use crate::runtime::scheduler::multi_thread::Spawner; -use crate::runtime::{blocking, driver}; +use crate::future::Future; +use crate::loom::sync::Arc; +use crate::runtime::scheduler::multi_thread::worker; +use crate::runtime::{ + blocking, driver, + task::{self, JoinHandle}, +}; use crate::util::RngSeedGenerator; +use std::fmt; + /// Handle to the multi thread scheduler -#[derive(Debug)] pub(crate) struct Handle { /// Task spawner - pub(crate) spawner: Spawner, + pub(super) shared: worker::Shared, /// Resource driver handles pub(crate) driver: driver::Handle, @@ -18,28 +24,63 @@ pub(crate) struct Handle { pub(crate) seed_generator: RngSeedGenerator, } +impl Handle { + /// Spawns a future onto the thread pool + pub(crate) fn spawn(me: &Arc, future: F, id: task::Id) -> JoinHandle + where + F: crate::future::Future + Send + 'static, + F::Output: Send + 'static, + { + Self::bind_new_task(me, future, id) + } + + pub(crate) fn shutdown(&self) { + self.shared.close(); + } + + pub(super) fn bind_new_task(me: &Arc, future: T, id: task::Id) -> JoinHandle + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let (handle, notified) = me.shared.owned.bind(future, me.clone(), id); + + if let Some(notified) = notified { + me.shared.schedule(notified, false); + } + + handle + } +} + cfg_metrics! { use crate::runtime::{SchedulerMetrics, WorkerMetrics}; impl Handle { pub(crate) fn num_workers(&self) -> usize { - self.spawner.shared.worker_metrics.len() + self.shared.worker_metrics.len() } pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { - &self.spawner.shared.scheduler_metrics + &self.shared.scheduler_metrics } pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { - &self.spawner.shared.worker_metrics[worker] + &self.shared.worker_metrics[worker] } pub(crate) fn injection_queue_depth(&self) -> usize { - self.spawner.shared.injection_queue_depth() + self.shared.injection_queue_depth() } pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { - self.spawner.shared.worker_local_queue_depth(worker) + self.shared.worker_local_queue_depth(worker) } } } + +impl fmt::Debug for Handle { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("multi_thread::Handle { ... }").finish() + } +} diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index 1c71f1d2913..403eabacfbc 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -17,42 +17,38 @@ pub(crate) use worker::Launch; pub(crate) use worker::block_in_place; use crate::loom::sync::Arc; -use crate::runtime::task::{self, JoinHandle}; -use crate::runtime::{Config, Driver}; +use crate::runtime::{blocking, driver, Config, Driver}; +use crate::util::RngSeedGenerator; use std::fmt; use std::future::Future; /// Work-stealing based thread pool for executing futures. pub(crate) struct MultiThread { - spawner: Spawner, -} - -/// Submits futures to the associated thread pool for execution. -/// -/// A `Spawner` instance is a handle to a single thread pool that allows the owner -/// of the handle to spawn futures onto the thread pool. -/// -/// The `Spawner` handle is *only* used for spawning new futures. It does not -/// impact the lifecycle of the thread pool in any way. The thread pool may -/// shut down while there are outstanding `Spawner` instances. -/// -/// `Spawner` instances are obtained by calling [`MultiThread::spawner`]. -/// -/// [`MultiThread::spawner`]: method@MultiThread::spawner -#[derive(Clone)] -pub(crate) struct Spawner { - shared: Arc, + handle: Arc, } // ===== impl MultiThread ===== impl MultiThread { - pub(crate) fn new(size: usize, driver: Driver, config: Config) -> (MultiThread, Launch) { + pub(crate) fn new( + size: usize, + driver: Driver, + driver_handle: driver::Handle, + blocking_spawner: blocking::Spawner, + seed_generator: RngSeedGenerator, + config: Config, + ) -> (MultiThread, Launch) { let parker = Parker::new(driver); - let (shared, launch) = worker::create(size, parker, config); - let spawner = Spawner { shared }; - let multi_thread = MultiThread { spawner }; + let (handle, launch) = worker::create( + size, + parker, + driver_handle, + blocking_spawner, + seed_generator, + config, + ); + let multi_thread = MultiThread { handle }; (multi_thread, launch) } @@ -61,8 +57,8 @@ impl MultiThread { /// /// The `Spawner` handle can be cloned and enables spawning tasks from other /// threads. - pub(crate) fn spawner(&self) -> &Spawner { - &self.spawner + pub(crate) fn handle(&self) -> &Arc { + &self.handle } /// Blocks the current thread waiting for the future to complete. @@ -86,29 +82,6 @@ impl fmt::Debug for MultiThread { impl Drop for MultiThread { fn drop(&mut self) { - self.spawner.shutdown(); - } -} - -// ==== impl Spawner ===== - -impl Spawner { - /// Spawns a future onto the thread pool - pub(crate) fn spawn(&self, future: F, id: task::Id) -> JoinHandle - where - F: crate::future::Future + Send + 'static, - F::Output: Send + 'static, - { - worker::Shared::bind_new_task(&self.shared, future, id) - } - - pub(crate) fn shutdown(&self) { - self.shared.close(); - } -} - -impl fmt::Debug for Spawner { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("Spawner").finish() + self.handle.shutdown(); } } diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 2193486484f..61e51fa5d01 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -57,23 +57,24 @@ //! leak. use crate::coop; -use crate::future::Future; use crate::loom::sync::{Arc, Mutex}; use crate::runtime; use crate::runtime::enter::EnterContext; -use crate::runtime::scheduler::multi_thread::{queue, Idle, Parker, Unparker}; -use crate::runtime::task::{Inject, JoinHandle, OwnedTasks}; -use crate::runtime::{task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics}; +use crate::runtime::scheduler::multi_thread::{queue, Handle, Idle, Parker, Unparker}; +use crate::runtime::task::{Inject, OwnedTasks}; +use crate::runtime::{ + blocking, driver, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics, +}; use crate::util::atomic_cell::AtomicCell; -use crate::util::FastRand; +use crate::util::{FastRand, RngSeedGenerator}; use std::cell::RefCell; use std::time::Duration; /// A scheduler worker pub(super) struct Worker { - /// Reference to shared state - shared: Arc, + /// Reference to scheduler's handle + handle: Arc, /// Index holding this worker's remote state index: usize, @@ -95,7 +96,7 @@ struct Core { lifo_slot: Option, /// The worker-local run queue. - run_queue: queue::Local>, + run_queue: queue::Local>, /// True if the worker is currently searching for more work. Searching /// involves attempting to steal from other workers. @@ -126,13 +127,13 @@ pub(super) struct Shared { /// Global task queue used for: /// 1. Submit work to the scheduler while **not** currently on a worker thread. /// 2. Submit work to the scheduler when a worker run queue is saturated - inject: Inject>, + inject: Inject>, /// Coordinates idle workers idle: Idle, /// Collection of all active tasks spawned onto this executor. - owned: OwnedTasks>, + pub(super) owned: OwnedTasks>, /// Cores that have observed the shutdown signal /// @@ -153,7 +154,7 @@ pub(super) struct Shared { /// Used to communicate with a worker from other threads. struct Remote { /// Steals tasks from this worker. - steal: queue::Steal>, + steal: queue::Steal>, /// Unparks the associated worker thread unpark: Unparker, @@ -177,15 +178,22 @@ pub(crate) struct Launch(Vec>); type RunResult = Result, ()>; /// A task handle -type Task = task::Task>; +type Task = task::Task>; /// A notified task handle -type Notified = task::Notified>; +type Notified = task::Notified>; // Tracks thread-local state scoped_thread_local!(static CURRENT: Context); -pub(super) fn create(size: usize, park: Parker, config: Config) -> (Arc, Launch) { +pub(super) fn create( + size: usize, + park: Parker, + driver_handle: driver::Handle, + blocking_spawner: blocking::Spawner, + seed_generator: RngSeedGenerator, + config: Config, +) -> (Arc, Launch) { let mut cores = Vec::with_capacity(size); let mut remotes = Vec::with_capacity(size); let mut worker_metrics = Vec::with_capacity(size); @@ -212,28 +220,33 @@ pub(super) fn create(size: usize, park: Parker, config: Config) -> (Arc, worker_metrics.push(WorkerMetrics::new()); } - let shared = Arc::new(Shared { - remotes: remotes.into_boxed_slice(), - inject: Inject::new(), - idle: Idle::new(size), - owned: OwnedTasks::new(), - shutdown_cores: Mutex::new(vec![]), - config, - scheduler_metrics: SchedulerMetrics::new(), - worker_metrics: worker_metrics.into_boxed_slice(), + let handle = Arc::new(Handle { + shared: Shared { + remotes: remotes.into_boxed_slice(), + inject: Inject::new(), + idle: Idle::new(size), + owned: OwnedTasks::new(), + shutdown_cores: Mutex::new(vec![]), + config, + scheduler_metrics: SchedulerMetrics::new(), + worker_metrics: worker_metrics.into_boxed_slice(), + }, + driver: driver_handle, + blocking_spawner, + seed_generator, }); let mut launch = Launch(vec![]); for (index, core) in cores.drain(..).enumerate() { launch.0.push(Arc::new(Worker { - shared: shared.clone(), + handle: handle.clone(), index, core: AtomicCell::new(Some(core)), })); } - (shared, launch) + (handle, launch) } pub(crate) fn block_in_place(f: F) -> R @@ -390,12 +403,12 @@ impl Context { core.pre_shutdown(&self.worker); // Signal shutdown - self.worker.shared.shutdown(core); + self.worker.handle.shared.shutdown(core); Err(()) } fn run_task(&self, task: Notified, mut core: Box) -> RunResult { - let task = self.worker.shared.owned.assert_owner(task); + let task = self.worker.handle.shared.owned.assert_owner(task); // Make sure the worker is not in the **searching** state. This enables // another idle worker to try to steal work. @@ -429,7 +442,7 @@ impl Context { // Run the LIFO task, then loop core.metrics.incr_poll_count(); *self.core.borrow_mut() = Some(core); - let task = self.worker.shared.owned.assert_owner(task); + let task = self.worker.handle.shared.owned.assert_owner(task); task.run(); } else { // Not enough budget left to run the LIFO task, push it to @@ -443,7 +456,7 @@ impl Context { } fn maintenance(&self, mut core: Box) -> Box { - if core.tick % self.worker.shared.config.event_interval == 0 { + if core.tick % self.worker.handle.shared.config.event_interval == 0 { // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... // to run without actually putting the thread to sleep. core = self.park_timeout(core, Some(Duration::from_millis(0))); @@ -467,7 +480,7 @@ impl Context { /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers /// after all the IOs get dispatched fn park(&self, mut core: Box) -> Box { - if let Some(f) = &self.worker.shared.config.before_park { + if let Some(f) = &self.worker.handle.shared.config.before_park { f(); } @@ -486,7 +499,7 @@ impl Context { } } - if let Some(f) = &self.worker.shared.config.after_unpark { + if let Some(f) = &self.worker.handle.shared.config.after_unpark { f(); } core @@ -515,7 +528,7 @@ impl Context { // If there are tasks available to steal, but this worker is not // looking for tasks to steal, notify another worker. if !core.is_searching && core.run_queue.is_stealable() { - self.worker.shared.notify_parked(); + self.worker.handle.shared.notify_parked(); } core @@ -530,7 +543,7 @@ impl Core { /// Return the next notified task available to this worker. fn next_task(&mut self, worker: &Worker) -> Option { - if self.tick % worker.shared.config.global_queue_interval == 0 { + if self.tick % worker.handle.shared.config.global_queue_interval == 0 { worker.inject().pop().or_else(|| self.next_local_task()) } else { self.next_local_task().or_else(|| worker.inject().pop()) @@ -551,7 +564,7 @@ impl Core { return None; } - let num = worker.shared.remotes.len(); + let num = worker.handle.shared.remotes.len(); // Start from a random worker let start = self.rand.fastrand_n(num as u32) as usize; @@ -563,7 +576,7 @@ impl Core { continue; } - let target = &worker.shared.remotes[i]; + let target = &worker.handle.shared.remotes[i]; if let Some(task) = target .steal .steal_into(&mut self.run_queue, &mut self.metrics) @@ -573,12 +586,12 @@ impl Core { } // Fallback on checking the global queue - worker.shared.inject.pop() + worker.handle.shared.inject.pop() } fn transition_to_searching(&mut self, worker: &Worker) -> bool { if !self.is_searching { - self.is_searching = worker.shared.idle.transition_worker_to_searching(); + self.is_searching = worker.handle.shared.idle.transition_worker_to_searching(); } self.is_searching @@ -590,7 +603,7 @@ impl Core { } self.is_searching = false; - worker.shared.transition_worker_from_searching(); + worker.handle.shared.transition_worker_from_searching(); } /// Prepares the worker state for parking. @@ -606,6 +619,7 @@ impl Core { // must check all the queues one last time in case work materialized // between the last work scan and transitioning out of searching. let is_last_searcher = worker + .handle .shared .idle .transition_worker_to_parked(worker.index, self.is_searching); @@ -615,7 +629,7 @@ impl Core { self.is_searching = false; if is_last_searcher { - worker.shared.notify_if_work_pending(); + worker.handle.shared.notify_if_work_pending(); } true @@ -630,11 +644,11 @@ impl Core { // state when the wake originates from another worker *or* a new task // is pushed. We do *not* want the worker to transition to "searching" // when it wakes when the I/O driver receives new events. - self.is_searching = !worker.shared.idle.unpark_worker_by_id(worker.index); + self.is_searching = !worker.handle.shared.idle.unpark_worker_by_id(worker.index); return true; } - if worker.shared.idle.is_parked(worker.index) { + if worker.handle.shared.idle.is_parked(worker.index) { return false; } @@ -646,7 +660,7 @@ impl Core { /// Runs maintenance work such as checking the pool's state. fn maintenance(&mut self, worker: &Worker) { self.metrics - .submit(&worker.shared.worker_metrics[worker.index]); + .submit(&worker.handle.shared.worker_metrics[worker.index]); if !self.is_shutdown { // Check if the scheduler has been shutdown @@ -658,10 +672,10 @@ impl Core { /// before we enter the single-threaded phase of shutdown processing. fn pre_shutdown(&mut self, worker: &Worker) { // Signal to all tasks to shut down. - worker.shared.owned.close_and_shutdown_all(); + worker.handle.shared.owned.close_and_shutdown_all(); self.metrics - .submit(&worker.shared.worker_metrics[worker.index]); + .submit(&worker.handle.shared.worker_metrics[worker.index]); } /// Shuts down the core. @@ -678,49 +692,31 @@ impl Core { impl Worker { /// Returns a reference to the scheduler's injection queue. - fn inject(&self) -> &Inject> { - &self.shared.inject + fn inject(&self) -> &Inject> { + &self.handle.shared.inject } } -impl task::Schedule for Arc { +impl task::Schedule for Arc { fn release(&self, task: &Task) -> Option { - self.owned.remove(task) + self.shared.owned.remove(task) } fn schedule(&self, task: Notified) { - (**self).schedule(task, false); + self.shared.schedule(task, false); } fn yield_now(&self, task: Notified) { - (**self).schedule(task, true); + self.shared.schedule(task, true); } } impl Shared { - pub(super) fn bind_new_task( - me: &Arc, - future: T, - id: crate::runtime::task::Id, - ) -> JoinHandle - where - T: Future + Send + 'static, - T::Output: Send + 'static, - { - let (handle, notified) = me.owned.bind(future, me.clone(), id); - - if let Some(notified) = notified { - me.schedule(notified, false); - } - - handle - } - pub(super) fn schedule(&self, task: Notified, is_yield: bool) { CURRENT.with(|maybe_cx| { if let Some(cx) = maybe_cx { // Make sure the task is part of the **current** scheduler. - if self.ptr_eq(&cx.worker.shared) { + if self.ptr_eq(&cx.worker.handle.shared) { // And the current thread still holds a core if let Some(core) = cx.core.borrow_mut().as_mut() { self.schedule_local(core, task, is_yield);